from flask import render_template, request, session, jsonify, flash, make_response, redirect, url_for, abort, send_file, render_template_string from flask_login import login_required, current_user from app.main import bp from app.extensions import db, dict_row, dict_instance, extract_exc from app.models.users import tbl_users from app.models.cv_data import * from app.auth import check_admin, check_capture from sqlalchemy import insert, delete, update, select from sqlalchemy import and_, or_, not_, func import json, requests, re, os from .forms import RecordForm, QualificationTypeForm, QualificationForm, QualificationTypeRemovalForm, QualificationRemovalForm, RecordQualificationForm, RecordQualificationRemovalForm from .forms import RoleDepartmentForm, RoleForm, RoleDepartmentRemovalForm, RoleRemovalForm, \ UploadForm, UploadRemovalForm, FilterForm from .forms import UserForm, UserRemovalForm import re import datetime as datetime_class from datetime import datetime, time, timezone, timedelta, date from app import Config from app.extensions import dict_instance, dict_row import io def end_user_debugging(s_additional:str = "", bl_html:bool = True): """just for initial testing""" d_exc = extract_exc(s_additional) s_dt = datetime.now().astimezone(timezone(timedelta(hours=2))).strftime("%Y-%m-%d %H:%M:%S") s_logging = f"" if (isinstance(d_exc, dict) and bl_html) else f"date-time: {s_dt}\nindex-landing page\n" + str(d_exc["s_except"]) return f"\n\n\nIssue\n\n\n

Issue details

\n{s_logging}\n\n" # end of end_user_debugging function @bp.route("/", methods=["GET", "POST"]) @login_required def index(): """landing page""" try: # filtering choices and current data filter_form = FilterForm() # populate some select choices l_role_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all())) for ix, d in enumerate(l_role_departments): l_role_departments[ix] = (d["id"], d["v_department_name"]) filter_form.sel_role_department.choices = [(0, "---")] + l_role_departments # populate roles if department was selected i_department_id = request.form.get("sel_role_department", 0, type=int) if i_department_id>0: q_roles = db.session.query(tbl_roles.id, tbl_roles.v_role_name).filter(tbl_roles.i_department_id==i_department_id) if q_roles.count()>0: q_roles = q_roles.order_by(tbl_roles.v_role_name) l_roles = list(map(dict_row, q_roles.all())) for ix, d in enumerate(l_roles): l_roles[ix] = (d["id"], d["v_role_name"]) filter_form.sel_role.choices = [(0, "---")] + l_roles # end of checking result count of query # end of checking if department submitted l_all_languages = list(map(dict_row, db.session.query(tbl_all_languages.v_language_abbreviation, tbl_all_languages.v_language_name).filter(tbl_all_languages.bl_south_african==True).order_by(tbl_all_languages.v_language_name).all())) for ix, d in enumerate(l_all_languages): l_all_languages[ix] = (d["v_language_abbreviation"], d["v_language_name"]) filter_form.sel_language.choices = [("", "---")] + l_all_languages l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all())) for ix, d in enumerate(l_qualification_types): l_qualification_types[ix] = (d["id"], d["v_qualification_type"]) filter_form.sel_qualification_type_1.choices = [(0, "---")] + l_qualification_types filter_form.sel_qualification_type_2.choices = [(0, "---")] + l_qualification_types i_type_1 = request.form.get("sel_qualification_type_1", 0, type=int) i_type_2 = request.form.get("sel_qualification_type_2", 0, type=int) q_qualifications = db.session.query(tbl_qualifications.id, tbl_qualifications.i_qualification_type, tbl_qualifications.v_qualification_name).order_by(tbl_qualifications.v_qualification_name) l_qualifications_1 = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_1).order_by(tbl_qualifications.v_qualification_name).all() if i_type_1>0 else [] l_qualifications_2 = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_2).order_by(tbl_qualifications.v_qualification_name).all() if i_type_2>0 else [] l_qualifications_1 = list(map(dict_row, l_qualifications_1)) l_qualifications_2 = list(map(dict_row, l_qualifications_2)) for ix, d in enumerate(l_qualifications_1): l_qualifications_1[ix] = (d["id"], d["v_qualification_name"]) for ix, d in enumerate(l_qualifications_2): l_qualifications_2[ix] = (d["id"], d["v_qualification_name"]) filter_form.sel_qualification_1.choices = [(0, "---")] + l_qualifications_1 filter_form.sel_qualification_2.choices = [(0, "---")] + l_qualifications_2 # initiate query that will possibly get filtered q_records = db.session.query(tbl_records.id, tbl_records.v_name_1, tbl_records.v_surname, tbl_records.si_years_experience, tbl_records.v_contact_number, tbl_records.v_email) if filter_form.validate_on_submit(): # possible filter values bl_case_sensitive, i_role_department_id, i_role_id, s_name, s_surname, s_id_number = (filter_form.chk_case_sensitive.data, filter_form.sel_role_department.data, filter_form.sel_role.data, str(filter_form.txt_name.data), str(filter_form.txt_surname.data), str(filter_form.txt_id_number.data)) s_sap_k_level, s_language = (filter_form.sel_sap_k_level.data, filter_form.sel_language.data) i_qualification_type_1, i_qualification_type_2, i_qualification_1, i_qualification_2 = (filter_form.sel_qualification_type_1.data, filter_form.sel_qualification_type_2.data, filter_form.sel_qualification_1.data, filter_form.sel_qualification_2.data) # apply filters if i_role_department_id>0: q_records = q_records.filter(tbl_records.i_department_id==i_role_department_id) if i_role_id>0: q_records = q_records.filter(tbl_records.i_role_id==i_role_id) if s_name!="": s_name = f"%{s_name}%" if bl_case_sensitive: sq_names = select(tbl_records.id).filter(or_(tbl_records.v_name_1.like(s_name), tbl_records.v_name_2.like(s_name), tbl_records.v_name_3.like(s_name))) else: sq_names = select(tbl_records.id).filter(or_(lower(tbl_records.v_name_1).like(func.lower(s_name)), func.lower(tbl_records.v_name_2).like(func.lower(s_name)), func.lower(tbl_records.v_name_3).like(func.lower(s_name)))) # end of checking if case sensitivity applies q_records = q_records.filter(tbl_records.id.in_(sq_names)) # end of checking if need to check for name substrings if s_surname!="": s_surname = f"%{s_surname}%" if bl_case_sensitive: q_records = q_records.filter(tbl_records.v_surname.like(s_surname)) else: q_records = q_records.filter(func.lower(tbl_records.v_surname).like(func.lower(s_surname))) # end of checking if case sensitivity applies # end of checking for surname filter if s_id_number!="": s_id_number = f"%{s_id_number}%" if bl_case_sensitive: q_records = q_records.filter(tbl_records.v_id_number.like(s_id_number)) else: q_records = q_records.filter(func.lower(tbl_records.v_id_number).like(func.lower(s_id_number))) # end of checking if case sensitivity applies # end of checking for ID number filter if s_sap_k_level!="": q_records = q_records.filter(tbl_records.v_sap_k_level==s_sap_k_level) if s_language!="": sq_languages = select(tbl_languages.i_record_id.distinct()).filter(tbl_languages.v_language_abbreviation==s_language) q_records = q_records.filter(tbl_records.id.in_(sq)) # end of checking if need to use language-matching subquery if i_qualification_type_1>0: if i_qualification_1>0: sq_qualification_1 = select(tbl_record_qualifications.i_record_id.distinct()).filter(tbl_record_qualifications.i_qualification_id==i_qualification_1) q_records = q_records.filter(tbl_records.id.in_(sq_qualification_1)) else: sq_qualification_1 = select(tbl_record_qualifications.i_record_id.distinct()).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=False).filter(tbl_qualifications.i_qualification_type==i_qualification_type_1) q_records = q_records.filter(tbl_records.id.in_(sq_qualification_1)) # end of checking if need to filter qualification itself or just type # end of checking for qualification type 1 if i_qualification_type_2>0: if i_qualification_2>0: sq_qualification_2 = select(tbl_record_qualifications.i_record_id.distinct()).filter(tbl_record_qualifications.i_qualification_id==i_qualification_2) q_records = q_records.filter(tbl_records.id.in_(sq_qualification_2)) else: sq_qualification_2 = select(tbl_record_qualifications.i_record_id.distinct()).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=False).filter(tbl_qualifications.i_qualification_type==i_qualification_type_2) q_records = q_records.filter(tbl_records.id.in_(sq_qualification_2)) # end of checking if need to filter qualification itself or just type # end of checking for qualification type 2 # end of checking for filter form submission i_page = request.form.get("hid_page", 1, type=int) q_records = q_records.order_by(tbl_records.v_surname, tbl_records.v_name_1) pagination = q_records.paginate(page=i_page, per_page=20) l_records = list(map(dict_row, pagination.items)) # l_records = list(map(dict_row, q_records.all())) for ix, record in enumerate(l_records): i_record = record["id"] l_record_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualification_types.v_qualification_type, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name).all())) l_records[ix]["record_qualifications"] = l_record_qualifications # end of looping through records to retrieve additional information s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.index")#, _external=True) return render_template("index.html", js=True, base=s_base, url=s_url, records=l_records, filter_form=filter_form, paging=pagination) except Exception as exc: return render_template_string(end_user_debugging("index/landing page")) # end of surrounding try-except # end of index view function for route / @bp.route("/icons", methods=["GET"]) def icons(): """icons preview - purely testing""" return render_template("icons.html") # end of icons view function for route / @bp.route("/capture_record//", methods=["GET", "POST"]) @bp.route("/capture_record/", defaults={"i_record": 0}, methods=["GET", "POST"]) @login_required @check_capture def capture_record(i_record:int = 0): """add/update C.V. record""" try: l_all_languages = list(map(dict_row, db.session.query(tbl_all_languages.v_language_abbreviation, tbl_all_languages.v_language_name).filter(tbl_all_languages.bl_south_african==True).order_by(tbl_all_languages.v_language_name).all())) l_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all())) l_roles = [] i_department_id, i_role_id =(0, 0) form = RecordForm() remove_qualification_form = RecordQualificationRemovalForm() qualification_form = RecordQualificationForm() i_qualification_type_id = request.form.get("sel_qualification_type", default=0, type=int) l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all())) l_qualifications = [] if len(l_qualification_types)>0: i_qualification_type_id = int(l_qualification_types[0]["id"]) if i_qualification_type_id<1 else i_qualification_type_id l_qualifications = list(map(dict_row, db.session.query(tbl_qualifications.id, tbl_qualifications.v_qualification_name).filter(tbl_qualifications.i_qualification_type==i_qualification_type_id).order_by(tbl_qualifications.v_qualification_name).all())) if len(l_qualifications)>0: l_choices = list(map((lambda x : (x["id"], x["v_qualification_name"])), l_qualifications)) qualification_form.sel_qualification.choices = l_choices # end of checking if qualifications were retrieved # end of checking if there are qualification types if form.validate_on_submit() and request.form.get("btn_save", "")=="Save": s_action = "updated" o_record = None s_name_1, s_name_2, s_name_3, s_surname = (form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data) s_id_number = form.txt_id_number.data s_gender = form.sel_gender.data i_years_experience = form.txt_years_experience.data s_sap_k_level = form.sel_sap_k_level.data s_contact_number = form.txt_contact_number.data s_email = form.txt_email.data i_department_id = request.form.get("sel_department_id", 0, type=int) i_role_id = request.form.get("sel_role_id", 0, type=int) l_languages = request.form.getlist("hid_langs") l_language_levels = request.form.getlist("hid_lang_levels") if i_record > 0: o_record = db.session.get(tbl_records, i_record) o_record.v_name_1, o_record.v_name_2, o_record.v_name_3, o_record.v_surname = (s_name_1, s_name_2, s_name_3, s_surname) o_record.v_id_number, o_record.c_gender, o_record.si_years_experience, o_record.v_sap_k_level = (s_id_number, s_gender, i_years_experience, s_sap_k_level) o_record.v_contact_number, o_record.v_email = (s_contact_number, s_email) o_record.i_department_id, o_record.i_role_id = (i_department_id, i_role_id) db.session.add(o_record) db.session.commit() else: s_action = "inserted" # save initial record o_record = tbl_records(v_name_1=s_name_1, v_name_2=s_name_2, v_name_3=s_name_3, v_surname=s_surname, v_id_number=s_id_number, c_gender=s_gender, si_years_experience=i_years_experience, v_sap_k_level=s_sap_k_level, v_contact_number=s_contact_number, v_email=s_email, i_department_id=i_department_id, i_role_id=i_role_id) db.session.add(o_record) db.session.commit() i_record = int(o_record.id) if o_record.id is not None else 0 # end of checking if new or existing record # save languages linked to parent record if i_record > 0: if len(l_languages)==len(l_language_levels): # first remove any prior language records that do not match currently submitted languages q_delete = delete(tbl_languages).where(tbl_languages.i_record_id==i_record).where(tbl_languages.v_language_abbreviation.not_in(l_languages)) db.session.execute(q_delete) # insert/update language records for ix, s_lang in enumerate(l_languages): s_language_name = "" q_language = db.session.query(tbl_all_languages.v_language_name).filter(tbl_all_languages.v_language_abbreviation==s_lang) if q_language.count() > 0: s_language_name = q_language.first()[0] # end of checking for matching language name record del q_language q_language = None if db.session.query(tbl_languages).filter(tbl_languages.i_record_id==i_record, tbl_languages.v_language_abbreviation==s_lang).count()<1: q_language = insert(tbl_languages).values({"i_record_id": i_record, "v_language_abbreviation": s_lang, "v_language_name": s_language_name, "si_level": int(l_language_levels[ix]), "si_ranking": ix+1}) else: q_language = update(tbl_languages).where(tbl_languages.i_record_id==i_record,tbl_languages.v_language_abbreviation==s_lang).values({"v_language_name": s_language_name, "si_level": int(l_language_levels[ix]), "si_ranking": ix+1}) # end of checking if insert or update db.session.execute(q_language) db.session.commit() # end of looping through languages # end of checking language list lengths flash(f"Record successfully {s_action}", "success") # end of checking if initial record submission took place elif qualification_form.validate_on_submit() and i_record>0 and request.form.get("btn_save_qualification", "")=="Save": i_record_qualification_id, i_qualification_id, d_acquired = (int(qualification_form.hid_record_qualification_id.data), int(qualification_form.sel_qualification.data), qualification_form.d_acquired.data) if i_record_qualification_id>0: o_record_qualification = db.session.get(tbl_record_qualifications, i_record_qualification_id) if o_record_qualification is not None: o_record_qualification.i_qualification_id = i_qualification_id o_record_qualification.d_acquired = d_acquired db.session.add(o_record_qualification) db.session.commit() flash("Qualification record updated") # end of checking if prior record retrieved else: o_record_qualification = tbl_record_qualifications(i_record_id=i_record, i_qualification_id=i_qualification_id, d_acquired=d_acquired) db.session.add(o_record_qualification) db.session.commit() flash("Qualification record recorded") # end of checking if existing or new qualification record elif remove_qualification_form.validate_on_submit() and request.form.get("hid_remove_qualification_id", None) is not None: i_qualification_removal_id = request.form.get("hid_remove_qualification_id", type=int) o_record_qualification = db.session.get(tbl_record_qualifications, i_qualification_removal_id) if o_record_qualification is not None: db.session.delete(o_record_qualification) db.session.commit() flash("Qualification removed") # end of making sure retrieved record # end of checking for form submission form.hid_record_id.data = i_record remove_qualification_form.hid_remove_qualification_id.data = 0 # populate record info if existing record if i_record>0: o_record = db.session.get(tbl_records, i_record) if o_record is not None: form.hid_record_id.data = i_record if o_record.i_department_id>0: i_department_id = o_record.i_department_id i_role_id = o_record.i_role_id l_roles = list(map(dict_row, db.session.query(tbl_roles.id, tbl_roles.v_role_name).filter(tbl_roles.i_department_id==i_department_id).order_by(tbl_roles.v_role_name).all())) # end of checking which department roles should populate form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data = (o_record.v_name_1, o_record.v_name_2, o_record.v_name_3, o_record.v_surname) form.txt_id_number.data = o_record.v_id_number form.sel_gender.data = o_record.c_gender form.txt_years_experience.data = o_record.si_years_experience form.sel_sap_k_level.data = o_record.v_sap_k_level form.txt_contact_number.data = o_record.v_contact_number form.txt_email.data = o_record.v_email else: i_record = 0 # end of making sure valid record # end of checking if existing record qualification_form.hid_record_qualification_id.data, qualification_form.sel_qualification.data, qualification_form.d_acquired.data = (0, None, None) # form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data = ("", "", "", "") # form.txt_id_number.data, form.sel_gender.data, form.txt_years_experience.data, form.sel_sap_k_level.data, form.txt_contact_number.data, form.txt_email.data = ("", "m", "0", "", "", "") l_languages = db.session.query(tbl_languages.v_language_abbreviation, tbl_languages.v_language_name, tbl_languages.si_level).filter(tbl_languages.i_record_id==i_record).order_by(tbl_languages.si_ranking).all() l_languages = list(map(dict_row, l_languages)) l_record_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualification_types.v_qualification_type, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name).all())) s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.capture_record")#, _external=True) return render_template("capture_record.html", js=True, base=s_base, url=s_url, form=form, record_id=i_record, languages=l_languages, all_languages=l_all_languages, qualification_types=l_qualification_types, qualification_form=qualification_form, record_qualifications=l_record_qualifications, departments=l_departments, roles=l_roles, department_id=i_department_id, role_id=i_role_id, remove_qualification_form=remove_qualification_form) except Exception as exc: return render_template_string(end_user_debugging("capture_record")) # end of surrounding try-except # end of capture_record view function for route /capture_record @bp.route("/download_upload///", methods=["GET"]) @bp.route("/download_upload//", defaults={"bl_download": True}, methods=["GET"]) @login_required def download_upload(i_upload:int = 0, bl_download:bool = True): """open upload within browser or activate download thereof""" try: bl_download = True if str(bl_download)=="True" else False bl_download = bool(bl_download) o_out = None o_upload = db.session.get(tbl_uploads, i_upload) if o_upload is not None: o_out = io.BytesIO(o_upload.b_file) s_mimetype = o_upload.v_mime_type s_filename = o_upload.v_filename # resp = send_file(o_out, mimetype=s_mimetype, as_attachment=bl_download, download_name=s_filename) resp = make_response(o_out) resp.headers.set("Content-Disposition", f"attachment; filename=\"{s_filename}\"" if bl_download else f"inline; filename=\"{s_filename}\"") resp.headers.set("Content-Type", s_mimetype) return resp else: return make_response("No upload retrieved", 500) # end of None check for record except Exception as exc: return render_template_string(end_user_debugging("download_upload")) # end of surrounding try-except # end of download_upload view function for route /download_upload//[] def qualification_type_choices(): """return possible qualification type choices""" l_choices = [] l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all())) l_type_choices = list(map((lambda x : (x["id"], x["v_qualification_type"])), l_qualification_types)) if len(l_type_choices)>0: l_choices = l_type_choices # end of checking if type choices exist return (l_qualification_types, l_choices) # end of qualification_type_choices function @bp.route("/qualifications/", methods=["GET", "POST"]) @login_required @check_admin def qualifications(): """manage qualification types and qualifications to make use of in rest of site""" try: i_type_filter = 0 type_form = QualificationTypeForm() form = QualificationForm() l_qualification_types, form.sel_qualification_type.choices = qualification_type_choices() remove_type_form = QualificationTypeRemovalForm() remove_form = QualificationRemovalForm() if type_form.validate_on_submit() and request.form.get("btn_save_type", "")=="Save": i_type_id, s_type = (int(type_form.hid_qualification_type_id.data), type_form.txt_qualification_type.data) if i_type_id > 0: o_type = db.session.get(tbl_qualification_types, i_type_id) if o_type is not None: o_type.v_qualification_type = s_type db.session.add(o_type) db.session.commit() flash("Type updated", "success") # end of checking if record existed else: if db.session.query(tbl_qualification_types.id).filter(tbl_qualification_types.v_qualification_type==s_type).count()<1: db.session.add(tbl_qualification_types(v_qualification_type=s_type)) db.session.commit() flash("Type inserted", "success") # end of making sure new type is not a duplicate # end of checking if new or existing type record elif remove_type_form.validate_on_submit() and request.form.get("hid_remove_qualification_type_id", None) is not None: i_type_id = int(remove_type_form.hid_remove_qualification_type_id.data) o_type = db.session.get(tbl_qualification_types, i_type_id) if o_type is not None: db.session.delete(o_type) db.session.commit() flash("Type removed", "success") # end of checking if record existed elif form.validate_on_submit() and request.form.get("btn_save_qualification", "")=="Save": i_qualification_id, i_qualification_type, s_qualification_name, s_description = (int(form.hid_qualification_id.data), int(form.sel_qualification_type.data), str(form.txt_qualification_name.data), str(form.txt_description.data)) if i_qualification_id>0: o_qualification = db.session.get(tbl_qualifications, i_qualification_id) if o_qualification is not None: if db.session.query(tbl_qualifications).filter(tbl_qualifications.id!=i_qualification_id, tbl_qualifications.v_qualification_name==s_qualification_name, tbl_qualifications.i_qualification_type==i_qualification_type).count()<1: o_qualification.i_qualification_type = i_qualification_type o_qualification.v_qualification_name = s_qualification_name o_qualification.v_description = s_description db.session.add(o_qualification) db.session.commit() flash("Qualification updated") i_type_filter = i_qualification_type else: flash("There is already a qualification under same type category with same name") i_type_filter = i_qualification_type # end of checking for alternative duplicate # end of retrieval check else: if db.session.query(tbl_qualifications).filter(tbl_qualifications.v_qualification_name==s_qualification_name, tbl_qualifications.i_qualification_type==i_qualification_type).count()<1: o_qualification = tbl_qualifications(i_qualification_type=i_qualification_type, v_qualification_name=s_qualification_name, v_description=s_description) db.session.add(o_qualification) db.session.commit() flash("Qualification added") i_type_filter = i_qualification_type else: flash("There is an existing qualification with same name under same type category") # end of making sure would not count as duplicate # end of checking if new or existing qualification record elif remove_form.validate_on_submit() and request.form.get("hid_remove_qualification_id", None) is not None: i_qualification_id = int(remove_form.hid_remove_qualification_id.data) o_qualification = db.session.get(tbl_qualifications, i_qualification_id) if o_qualification is not None: db.session.delete(o_qualification) db.session.commit() flash("Qualification removed", "success") # end of checking if record existed # end of checking for form submissions type_form.hid_qualification_type_id.data, type_form.txt_qualification_type.data = (0, "") remove_type_form.hid_remove_qualification_type_id.data = 0 # re-populate in case changes have occurred l_qualification_types, form.sel_qualification_type.choices = qualification_type_choices() s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.qualifications")#, _external=True) return render_template("qualifications.html", js=True, base=s_base, url=s_url, type_form=type_form, form=form, qualification_types=l_qualification_types, remove_type_form=remove_type_form, type_filter=i_type_filter, remove_form=remove_form) except Exception as exc: return render_template_string(end_user_debugging("qualifications")) # end of surrounding try-except # end of qualifications view function for route /qualifications @bp.route("/qualifications_list//", methods=["GET"]) @login_required def qualifications_list(i_type_id): """return list of qualification entries that match type filter""" try: l_out = [] q_qualifications = db.session.query(tbl_qualifications.id, tbl_qualifications.v_qualification_name, tbl_qualifications.v_description, tbl_qualification_types.v_qualification_type).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=True) q_qualifications = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_id) if i_type_id>0 else q_qualifications if q_qualifications.count()>0: q_qualifications = q_qualifications.order_by(tbl_qualifications.v_qualification_name) l_out = list(map(dict_row, q_qualifications.all())) if q_qualifications.count()>0 else [] # end of checking result count of query return make_response(jsonify(l_out), 200) except Exception as exc: return render_template_string(end_user_debugging("qualifications_list")) # end of surrounding try-except # end of qualifications_list view function @bp.route("/qualification_details//", methods=["GET"]) @login_required def qualification_details(i_qualification_id): """retrieve qualification details""" try: d_out = {} o_qualification = db.session.get(tbl_qualifications, i_qualification_id) if o_qualification is not None: d_out = dict_instance(o_qualification) # end of checking if record retrieved return make_response(jsonify(d_out), 200) except Exception as exc: return render_template_string(end_user_debugging("qualification_details")) # end of surrounding try-except # end of qualification_details view function def role_department_choices(): """return list of possible|existing role departments""" l_choices = [] l_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all())) l_department_choices = list(map((lambda x : (x["id"], x["v_department_name"])), l_departments)) if len(l_department_choices)>0: l_choices = l_department_choices # end of checking if there were records return (l_departments, l_choices) # end of role_department_choices function @bp.route("/roles/", methods=["GET", "POST"]) @login_required @check_admin def roles(): """manage roles available for assignment to records""" try: i_department_filter = 0 department_form = RoleDepartmentForm() form = RoleForm() # need to populate this first to allow form validation l_departments, form.sel_department.choices = role_department_choices() remove_department_form = RoleDepartmentRemovalForm() remove_form = RoleRemovalForm() if department_form.validate_on_submit() and request.form.get("btn_save_department", "")=="Save": i_department_id, s_department_name = (int(department_form.hid_department_id.data), department_form.txt_department_name.data) if i_department_id > 0: o_department = db.session.get(tbl_role_departments, i_department_id) if o_department is not None: o_department.v_department_name = s_department_name db.session.add(o_department) db.session.commit() flash("Department updated", "success") # end of checking if record existed else: if db.session.query(tbl_role_departments.id).filter(tbl_role_departments.v_department_name==s_department_name).count()<1: db.session.add(tbl_role_departments(v_department_name=s_department_name)) db.session.commit() flash("Department inserted", "success") # end of making sure new department is not a duplicate # end of checking if new or existing department record elif form.validate_on_submit() and request.form.get("btn_save_role", "")=="Save": i_role_id, i_department_id, s_role_name, s_description = (int(form.hid_role_id.data), int(form.sel_department.data), str(form.txt_role_name.data), str(form.txt_description.data)) if i_role_id>0: o_role = db.session.get(tbl_roles, i_role_id) if o_role is not None: if db.session.query(tbl_roles).filter(tbl_roles.id!=i_role_id, tbl_roles.v_role_name==s_role_name, tbl_roles.i_department_id==i_department_id).count()<1: o_role.i_department_id = i_department_id o_role.v_role_name = s_role_name o_role.v_description = s_description db.session.add(o_role) db.session.commit() flash("Role updated") i_department_filter = i_department_id else: flash("There is already a role under same department with same name") i_department_filter = i_department_id # end of checking for alternative duplicate # end of retrieval check else: if db.session.query(tbl_roles).filter(tbl_roles.v_role_name==s_role_name, tbl_roles.i_department_id==i_department_id).count()<1: o_role = tbl_roles(i_department_id=i_department_id, v_role_name=s_role_name, v_description=s_description) db.session.add(o_role) db.session.commit() flash("Role added") i_department_filter = i_department_id else: flash("There is an existing role with same name under same department") # end of making sure would not count as duplicate # end of checking if new or existing role record elif remove_department_form.validate_on_submit(): i_department_id = int(remove_department_form.hid_remove_department_id.data) o_department = db.session.get(tbl_role_departments, i_department_id) if o_department is not None: db.session.delete(o_department) db.session.commit() flash("Department removed", "success") # end of checking if record existed elif remove_form.validate_on_submit(): i_role_id = int(remove_form.hid_remove_role_id.data) o_role = db.session.get(tbl_roles, i_role_id) if o_role is not None: db.session.delete(o_role) db.session.commit() flash("Role removed", "success") # end of checking if record existed # end of checking for form submission department_form.hid_department_id.data, department_form.txt_department_name.data = (0, "") remove_form.hid_remove_role_id.data, remove_department_form.hid_remove_department_id.data = ("0", "0") # re-populate in case changes have occurred l_departments, form.sel_department.choices = role_department_choices() s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.roles")#, _external=True) return render_template("roles.html", js=True, base=s_base, url=s_url, departments=l_departments, department_filter=i_department_filter, department_form=department_form, form=form, remove_department_form=remove_department_form, remove_form=remove_form) except Exception as exc: return render_template_string(end_user_debugging("roles")) # end of surrounding try-except # end of roles view function for route /positions @bp.route("/roles_list//", methods=["GET"]) @login_required def roles_list(i_department_id): """return list of role entries that match department filter""" try: l_out = [] try: q_roles = db.session.query(tbl_roles.id, tbl_roles.v_role_name, tbl_roles.v_description, tbl_role_departments.v_department_name).join(tbl_role_departments, tbl_roles.i_department_id==tbl_role_departments.id, isouter=True) q_roles = q_roles.filter(tbl_roles.i_department_id==i_department_id) if i_department_id>0 else q_roles if q_roles.count()>0: q_roles = q_roles.order_by(tbl_roles.v_role_name) l_out = list(map(dict_row, q_roles.all())) # end of checking result count of query except Exception as exc: print(extract_exc("list roles?")) # end of try-except return make_response(jsonify(l_out), 200) except Exception as exc: return render_template_string(end_user_debugging("roles_list")) # end of surrounding try-except # end of roles_list view function for route /roles_list// @bp.route("/role_details//", methods=["GET"]) @login_required def role_details(i_role_id): """retrieve role details""" try: d_out = {} o_role = db.session.get(tbl_roles, i_role_id) if o_role is not None: d_out = dict_instance(o_role) # end of checking if record retrieved return make_response(jsonify(d_out), 200) except Exception as exc: return render_template_string(end_user_debugging("role_details")) # end of surrounding try-except # end of role_details view function for route /role_details// @bp.route("/uploads//", methods=["GET", "POST"]) @login_required @check_capture def uploads(i_record:int = 0): """capture uploads linked to CV record""" try: # https://blog.miguelgrinberg.com/post/handling-file-uploads-with-flask o_record = db.session.get(tbl_records, i_record) if o_record is None: return redirect(url_for("main.index")) # end of just making sure valid record remove_upload_form = UploadRemovalForm() upload_form = UploadForm() s_names = o_record.v_surname + ", " + " ".join([o_record.v_name_1, o_record.v_name_2, o_record.v_name_3]) l_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_qualifications.v_qualification_name).all())) upload_form.sel_match.choices.extend(list(map((lambda x : (x["id"], x["v_qualification_name"])), l_qualifications))) if upload_form.validate_on_submit() and request.form.get("btn_upload", None) is not None: i_upload_id, i_upload_type, i_matching_id, s_description = (int(upload_form.hid_upload_id.data), int(upload_form.sel_upload_type.data), int(upload_form.sel_match.data), str(upload_form.txt_description.data)) fil_upload = request.files.get("fil_upload_document") if fil_upload is not None and fil_upload.filename!="": s_ext = os.path.splitext(fil_upload.filename)[1] if s_ext not in [".jpg", ".png", ".pdf", ".doc", ".docx"]: abort(400) # end of double-checking file extension # extract data record values s_filename= fil_upload.filename s_mime_type = fil_upload.mimetype try: if i_upload_id>0: q_update = update(tbl_uploads).where(id==i_upload_id) q_update = q_update.values({"i_record_id": i_record, "i_matching_id": i_matching_id, "si_upload_type": i_upload_type, "v_description": s_description, "v_filename": s_filename, "v_mime_type": s_mime_type, "b_file": fil_upload.read()}) db.session.execute(q_update) flash("Document record updated") else: o_upload = tbl_uploads(i_record_id=i_record, i_matching_id=i_matching_id, si_upload_type=i_upload_type, v_description=s_description, v_filename=s_filename, v_mime_type=s_mime_type, b_file=fil_upload.read()) db.session.add(o_upload) db.session.commit() flash("Document uploaded") except Exception as exc: print(str(exc)) # end of try-except around saving data record else: flash("Invalid file upload") # end of making sure file was uploaded elif remove_upload_form.validate_on_submit(): i_upload_id = int(remove_upload_form.hid_remove_upload_id.data) o_upload = db.session.get(tbl_uploads, i_upload_id) if o_upload is not None: db.session.delete(o_upload) db.session.commit() flash("Uploaded document removed") # end of checking if record was retrieved # end of checking for form submission remove_upload_form.hid_remove_upload_id.data = 0 upload_form.hid_upload_id.data, upload_form.sel_upload_type.data, upload_form.sel_match.data, upload_form.txt_description.data = (0, 0, 0, "") l_uploads = list(map(dict_row, db.session.query(tbl_uploads.id, tbl_uploads.si_upload_type, tbl_uploads.v_description, tbl_uploads.v_filename, tbl_qualifications.v_qualification_name).join(tbl_qualifications, tbl_qualifications.id==tbl_uploads.i_matching_id, isouter=True).filter(tbl_uploads.i_record_id==i_record).order_by(tbl_uploads.v_filename).all())) for ix, d in enumerate(l_uploads): d["si_upload_type"] = {0: "Original CV", 1: "Alteram CV", 2: "Certificate/Diploma/Degree", 3: "I.D. Document or Passport"}[d["si_upload_type"]]; d["v_qualification_name"] = d["v_qualification_name"] if d["v_qualification_name"] is not None else "" s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.uploads", i_record=i_record)#, _external=True) return render_template("uploads.html", js=True, base=s_base, url=s_url, record_id=i_record, names=s_names, record_qualifications=l_qualifications, remove_upload_form=remove_upload_form, upload_form=upload_form, document_uploads=l_uploads) except Exception as exc: return render_template_string(end_user_debugging("uploads")) # end of surrounding try-except # end of uploads view function for route /uploads// @bp.route("/record_details//", methods=["GET"]) @login_required def record_details(i_record_id): """view all details for a record""" try: d_out = {} q_details = db.session.query(tbl_records.id, tbl_records.v_surname, tbl_records.v_name_1, tbl_records.v_name_2, tbl_records.v_name_3, tbl_records.v_id_number, tbl_records.c_gender, tbl_records.si_years_experience, tbl_records.v_sap_k_level, tbl_records.v_contact_number, tbl_records.v_email, tbl_role_departments.v_department_name, tbl_roles.v_role_name).join(tbl_role_departments, tbl_role_departments.id==tbl_records.i_department_id, isouter=True).join(tbl_roles, tbl_roles.id==tbl_records.i_role_id, isouter=True).filter(tbl_records.id==i_record_id) if q_details.count()>0: d_out["record"] = dict_row(q_details.first()) d_out["languages"] = list(map(dict_row, db.session.query(tbl_languages.v_language_abbreviation, tbl_languages.v_language_name, tbl_languages.si_level).filter(tbl_languages.i_record_id==i_record_id).order_by(tbl_languages.si_ranking).all())) for ix, d in enumerate(d_out["languages"]): d_out["languages"][ix]["level"] = ["basic", "intermediate", "proficient"][d["si_level"]] d_out["qualifications"] = list(map(dict_row, db.session.query(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name, tbl_qualification_types.v_qualification_type).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=True).join(tbl_qualification_types, tbl_qualification_types.id==tbl_qualifications.i_qualification_type, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record_id).all())) for ix, d in enumerate(d_out["qualifications"]): if isinstance(d["d_acquired"], datetime_class.date): d["d_acquired"] = d["d_acquired"].strftime("%Y-%m-%d") # end of looping through qualifications d_out["uploads"] = list(map(dict_row, db.session.query(tbl_uploads.id, tbl_uploads.si_upload_type, tbl_uploads.v_filename, tbl_uploads.v_description, tbl_qualifications.v_qualification_name).join(tbl_qualifications, tbl_qualifications.id==tbl_uploads.i_matching_id, isouter=True).filter(tbl_uploads.i_record_id==i_record_id).all())) for ix, d in enumerate(d_out["uploads"]): d["si_upload_type"] = {0: "Original CV", 1: "Alteram CV", 2: "Certificate/Diploma/Degree", 3: "I.D. Document or Passport"}[d["si_upload_type"]] # end of initial query length check return make_response(jsonify(d_out), 200) except Exception as exc: s_logging = end_user_debugging("record_details", False) return make_response(s_logging, 200) # end of surrounding try-except # end of record_details view function for route /record_details// @bp.route("/users/", methods=["GET", "POST"]) @login_required @check_admin def users(): """manage access|authentication IDs for admin, capture and view""" try: form = UserForm() removal_form = UserRemovalForm() if form.validate_on_submit() and request.form.get("btn_save", None)=="Save": i_user_id, s_user_id, s_password, s_password_confirm, bl_admin, bl_capture = (int(form.hid_user_id.data), str(form.txt_user_id.data), str(form.txt_password.data), str(form.txt_password_confirm.data), bool(form.chk_admin.data), bool(form.chk_capture.data)) if i_user_id>0: o_user = db.session.get(tbl_users, i_user_id) if o_user is not None: if db.session.query(tbl_users.id).filter(tbl_users.id!=i_user_id, tbl_users.v_user_id==s_user_id).count()>0: flash("Another User with same User ID exists") else: o_user.v_user_id = s_user_id o_user.bl_admin = bl_admin o_user.bl_capture = bl_capture s_password_change = "" if len(s_password)>3 and s_password==s_password_confirm: o_user.set_password(s_password) s_password_change = " (with password changed)" # end of checking if password must be changed db.session.add(o_user) db.session.commit() flash(f"User record updated{s_password_change}") # end of checking for another user record with same user ID # end of checking for record retrieval else: if len(s_password)>3 and s_password==s_password_confirm: if db.session.query(tbl_users.id).filter(tbl_users.v_user_id==s_user_id).count()>0: flash("Another User with same User ID already exists") else: o_user = tbl_users(v_user_id=s_user_id, v_password=s_password, bl_admin=bl_admin, bl_capture=bl_capture) db.session.add(o_user) db.session.commit() i_user_id = int(o_user.id) if o_user.id is not None else 0 if o_user.id>0: flash("User record inserted") # end of checking for record insertion # end of checking for matching record with same User ID else: flash("Password values must be longer than 3 characters, and must match") # end of making sure passwords match # end of checking if existing user elif removal_form.validate_on_submit(): i_user_id = int(removal_form.hid_remove_user_id.data) o_user = db.session.get(tbl_users, i_user_id) if o_user is not None: db.session.delete(o_user) db.session.commit() flash("User record removed") # end of checking for record retrieval # end of checking for form submission form.hid_user_id.data, form.txt_user_id.data, form.txt_password.data, form.txt_password_confirm.data, form.chk_admin.data, form.chk_capture.data = (0, "", "", "", False, False) removal_form.hid_remove_user_id.data = 0 l_users = list(map(dict_row, db.session.query(tbl_users.id, tbl_users.v_user_id, tbl_users.bl_admin, tbl_users.bl_capture).filter(tbl_users.v_user_id!="admin").order_by(tbl_users.v_user_id).all())) s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.users")#, _external=True) return render_template("users.html", js=True, base=s_base, url=s_url, users=l_users, form=form, removal_form=removal_form) except Exception as exc: return render_template_string(end_user_debugging("users")) # end of surrounding try-except # end of users view function for route /users @bp.route("/user_details//", methods=["GET"]) @login_required @check_admin def user_details(i_user_id:int = 0): """retrieve user record details""" try: d_out = {} o_user = db.session.get(tbl_users, i_user_id) if o_user is not None: d_out = dict_instance(o_user) del d_out["v_password"] # end of checking for record retrieval return make_response(jsonify(d_out), 200) except Exception as exc: return render_template_string(end_user_debugging("user_details")) # end of surrounding try-except # end of user_details view function for route /user_details// @bp.route("/css_sample", methods=["GET"]) def css_sample(): """CSS sample page - purely testing""" s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html" s_url = url_for("main.index")#, _external=True) return render_template("css_sample.html", js=True, base=s_base, url=s_url) # end of css_sample view function for route /css_sample s_todo = """ double-check both try-except across the board, along with then logging exceptions """