from flask import render_template, request, session, jsonify, flash, make_response, redirect, url_for, abort, send_file, render_template_string
from flask_login import login_required, current_user
from app.main import bp
from app.extensions import db, dict_row, dict_instance, extract_exc
from app.models.users import tbl_users
from app.models.cv_data import *
from app.auth import check_admin, check_capture
from sqlalchemy import insert, delete, update, select
from sqlalchemy import and_, or_, not_, func
import json, requests, re, os
from .forms import RecordForm, QualificationTypeForm, QualificationForm, QualificationTypeRemovalForm, QualificationRemovalForm, RecordQualificationForm, RecordQualificationRemovalForm
from .forms import RoleDepartmentForm, RoleForm, RoleDepartmentRemovalForm, RoleRemovalForm, \
UploadForm, UploadRemovalForm, FilterForm
from .forms import UserForm, UserRemovalForm
import re
import datetime as datetime_class
from datetime import datetime, time, timezone, timedelta, date
from app import Config
from app.extensions import dict_instance, dict_row
import io
def end_user_debugging(s_additional:str = "", bl_html:bool = True):
"""just for initial testing"""
d_exc = extract_exc(s_additional)
s_dt = datetime.now().astimezone(timezone(timedelta(hours=2))).strftime("%Y-%m-%d %H:%M:%S")
s_logging = f"
- date-time: {s_dt}
\n- line no.: {d_exc['i_lineno']}
\n- filename: {d_exc['s_filename']}
\n- except: {d_exc['s_except']}
\n- additional: {d_exc['s_additional']}
\n
" if (isinstance(d_exc, dict) and bl_html) else f"date-time: {s_dt}\nindex-landing page\n" + str(d_exc["s_except"])
return f"\n\n\nIssue\n\n\nIssue details
\n{s_logging}\n\n"
# end of end_user_debugging function
@bp.route("/", methods=["GET", "POST"])
@login_required
def index():
"""landing page"""
try:
# filtering choices and current data
filter_form = FilterForm()
# populate some select choices
l_role_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all()))
for ix, d in enumerate(l_role_departments): l_role_departments[ix] = (d["id"], d["v_department_name"])
filter_form.sel_role_department.choices = [(0, "---")] + l_role_departments
# populate roles if department was selected
i_department_id = request.form.get("sel_role_department", 0, type=int)
if i_department_id>0:
q_roles = db.session.query(tbl_roles.id, tbl_roles.v_role_name).filter(tbl_roles.i_department_id==i_department_id)
if q_roles.count()>0:
q_roles = q_roles.order_by(tbl_roles.v_role_name)
l_roles = list(map(dict_row, q_roles.all()))
for ix, d in enumerate(l_roles): l_roles[ix] = (d["id"], d["v_role_name"])
filter_form.sel_role.choices = [(0, "---")] + l_roles
# end of checking result count of query
# end of checking if department submitted
l_all_languages = list(map(dict_row, db.session.query(tbl_all_languages.v_language_abbreviation, tbl_all_languages.v_language_name).filter(tbl_all_languages.bl_south_african==True).order_by(tbl_all_languages.v_language_name).all()))
for ix, d in enumerate(l_all_languages): l_all_languages[ix] = (d["v_language_abbreviation"], d["v_language_name"])
filter_form.sel_language.choices = [("", "---")] + l_all_languages
l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all()))
for ix, d in enumerate(l_qualification_types): l_qualification_types[ix] = (d["id"], d["v_qualification_type"])
filter_form.sel_qualification_type_1.choices = [(0, "---")] + l_qualification_types
filter_form.sel_qualification_type_2.choices = [(0, "---")] + l_qualification_types
i_type_1 = request.form.get("sel_qualification_type_1", 0, type=int)
i_type_2 = request.form.get("sel_qualification_type_2", 0, type=int)
q_qualifications = db.session.query(tbl_qualifications.id, tbl_qualifications.i_qualification_type, tbl_qualifications.v_qualification_name).order_by(tbl_qualifications.v_qualification_name)
l_qualifications_1 = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_1).order_by(tbl_qualifications.v_qualification_name).all() if i_type_1>0 else []
l_qualifications_2 = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_2).order_by(tbl_qualifications.v_qualification_name).all() if i_type_2>0 else []
l_qualifications_1 = list(map(dict_row, l_qualifications_1))
l_qualifications_2 = list(map(dict_row, l_qualifications_2))
for ix, d in enumerate(l_qualifications_1): l_qualifications_1[ix] = (d["id"], d["v_qualification_name"])
for ix, d in enumerate(l_qualifications_2): l_qualifications_2[ix] = (d["id"], d["v_qualification_name"])
filter_form.sel_qualification_1.choices = [(0, "---")] + l_qualifications_1
filter_form.sel_qualification_2.choices = [(0, "---")] + l_qualifications_2
# initiate query that will possibly get filtered
q_records = db.session.query(tbl_records.id, tbl_records.v_name_1, tbl_records.v_surname, tbl_records.si_years_experience, tbl_records.v_contact_number, tbl_records.v_email)
if filter_form.validate_on_submit():
# possible filter values
bl_case_sensitive, i_role_department_id, i_role_id, s_name, s_surname, s_id_number = (filter_form.chk_case_sensitive.data, filter_form.sel_role_department.data, filter_form.sel_role.data, str(filter_form.txt_name.data), str(filter_form.txt_surname.data), str(filter_form.txt_id_number.data))
s_sap_k_level, s_language = (filter_form.sel_sap_k_level.data, filter_form.sel_language.data)
i_qualification_type_1, i_qualification_type_2, i_qualification_1, i_qualification_2 = (filter_form.sel_qualification_type_1.data, filter_form.sel_qualification_type_2.data, filter_form.sel_qualification_1.data, filter_form.sel_qualification_2.data)
# apply filters
if i_role_department_id>0: q_records = q_records.filter(tbl_records.i_department_id==i_role_department_id)
if i_role_id>0: q_records = q_records.filter(tbl_records.i_role_id==i_role_id)
if s_name!="":
s_name = f"%{s_name}%"
if bl_case_sensitive:
sq_names = select(tbl_records.id).filter(or_(tbl_records.v_name_1.like(s_name), tbl_records.v_name_2.like(s_name), tbl_records.v_name_3.like(s_name)))
else:
sq_names = select(tbl_records.id).filter(or_(lower(tbl_records.v_name_1).like(func.lower(s_name)), func.lower(tbl_records.v_name_2).like(func.lower(s_name)), func.lower(tbl_records.v_name_3).like(func.lower(s_name))))
# end of checking if case sensitivity applies
q_records = q_records.filter(tbl_records.id.in_(sq_names))
# end of checking if need to check for name substrings
if s_surname!="":
s_surname = f"%{s_surname}%"
if bl_case_sensitive:
q_records = q_records.filter(tbl_records.v_surname.like(s_surname))
else:
q_records = q_records.filter(func.lower(tbl_records.v_surname).like(func.lower(s_surname)))
# end of checking if case sensitivity applies
# end of checking for surname filter
if s_id_number!="":
s_id_number = f"%{s_id_number}%"
if bl_case_sensitive:
q_records = q_records.filter(tbl_records.v_id_number.like(s_id_number))
else:
q_records = q_records.filter(func.lower(tbl_records.v_id_number).like(func.lower(s_id_number)))
# end of checking if case sensitivity applies
# end of checking for ID number filter
if s_sap_k_level!="": q_records = q_records.filter(tbl_records.v_sap_k_level==s_sap_k_level)
if s_language!="":
sq_languages = select(tbl_languages.i_record_id.distinct()).filter(tbl_languages.v_language_abbreviation==s_language)
q_records = q_records.filter(tbl_records.id.in_(sq))
# end of checking if need to use language-matching subquery
if i_qualification_type_1>0:
if i_qualification_1>0:
sq_qualification_1 = select(tbl_record_qualifications.i_record_id.distinct()).filter(tbl_record_qualifications.i_qualification_id==i_qualification_1)
q_records = q_records.filter(tbl_records.id.in_(sq_qualification_1))
else:
sq_qualification_1 = select(tbl_record_qualifications.i_record_id.distinct()).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=False).filter(tbl_qualifications.i_qualification_type==i_qualification_type_1)
q_records = q_records.filter(tbl_records.id.in_(sq_qualification_1))
# end of checking if need to filter qualification itself or just type
# end of checking for qualification type 1
if i_qualification_type_2>0:
if i_qualification_2>0:
sq_qualification_2 = select(tbl_record_qualifications.i_record_id.distinct()).filter(tbl_record_qualifications.i_qualification_id==i_qualification_2)
q_records = q_records.filter(tbl_records.id.in_(sq_qualification_2))
else:
sq_qualification_2 = select(tbl_record_qualifications.i_record_id.distinct()).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=False).filter(tbl_qualifications.i_qualification_type==i_qualification_type_2)
q_records = q_records.filter(tbl_records.id.in_(sq_qualification_2))
# end of checking if need to filter qualification itself or just type
# end of checking for qualification type 2
# end of checking for filter form submission
i_page = request.form.get("hid_page", 1, type=int)
q_records = q_records.order_by(tbl_records.v_surname, tbl_records.v_name_1)
pagination = q_records.paginate(page=i_page, per_page=20)
l_records = list(map(dict_row, pagination.items))
# l_records = list(map(dict_row, q_records.all()))
for ix, record in enumerate(l_records):
i_record = record["id"]
l_record_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualification_types.v_qualification_type, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name).all()))
l_records[ix]["record_qualifications"] = l_record_qualifications
# end of looping through records to retrieve additional information
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.index")#, _external=True)
return render_template("index.html", js=True, base=s_base, url=s_url, records=l_records, filter_form=filter_form, paging=pagination)
except Exception as exc:
return render_template_string(end_user_debugging("index/landing page"))
# end of surrounding try-except
# end of index view function for route /
@bp.route("/icons", methods=["GET"])
def icons():
"""icons preview - purely testing"""
return render_template("icons.html")
# end of icons view function for route /
@bp.route("/capture_record//", methods=["GET", "POST"])
@bp.route("/capture_record/", defaults={"i_record": 0}, methods=["GET", "POST"])
@login_required
@check_capture
def capture_record(i_record:int = 0):
"""add/update C.V. record"""
try:
l_all_languages = list(map(dict_row, db.session.query(tbl_all_languages.v_language_abbreviation, tbl_all_languages.v_language_name).filter(tbl_all_languages.bl_south_african==True).order_by(tbl_all_languages.v_language_name).all()))
l_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all()))
l_roles = []
i_department_id, i_role_id =(0, 0)
form = RecordForm()
remove_qualification_form = RecordQualificationRemovalForm()
qualification_form = RecordQualificationForm()
i_qualification_type_id = request.form.get("sel_qualification_type", default=0, type=int)
l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all()))
l_qualifications = []
if len(l_qualification_types)>0:
i_qualification_type_id = int(l_qualification_types[0]["id"]) if i_qualification_type_id<1 else i_qualification_type_id
l_qualifications = list(map(dict_row, db.session.query(tbl_qualifications.id, tbl_qualifications.v_qualification_name).filter(tbl_qualifications.i_qualification_type==i_qualification_type_id).order_by(tbl_qualifications.v_qualification_name).all()))
if len(l_qualifications)>0:
l_choices = list(map((lambda x : (x["id"], x["v_qualification_name"])), l_qualifications))
qualification_form.sel_qualification.choices = l_choices
# end of checking if qualifications were retrieved
# end of checking if there are qualification types
if form.validate_on_submit() and request.form.get("btn_save", "")=="Save":
s_action = "updated"
o_record = None
s_name_1, s_name_2, s_name_3, s_surname = (form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data)
s_id_number = form.txt_id_number.data
s_gender = form.sel_gender.data
i_years_experience = form.txt_years_experience.data
s_sap_k_level = form.sel_sap_k_level.data
s_contact_number = form.txt_contact_number.data
s_email = form.txt_email.data
i_department_id = request.form.get("sel_department_id", 0, type=int)
i_role_id = request.form.get("sel_role_id", 0, type=int)
l_languages = request.form.getlist("hid_langs")
l_language_levels = request.form.getlist("hid_lang_levels")
if i_record > 0:
o_record = db.session.get(tbl_records, i_record)
o_record.v_name_1, o_record.v_name_2, o_record.v_name_3, o_record.v_surname = (s_name_1, s_name_2, s_name_3, s_surname)
o_record.v_id_number, o_record.c_gender, o_record.si_years_experience, o_record.v_sap_k_level = (s_id_number, s_gender, i_years_experience, s_sap_k_level)
o_record.v_contact_number, o_record.v_email = (s_contact_number, s_email)
o_record.i_department_id, o_record.i_role_id = (i_department_id, i_role_id)
db.session.add(o_record)
db.session.commit()
else:
s_action = "inserted"
# save initial record
o_record = tbl_records(v_name_1=s_name_1, v_name_2=s_name_2, v_name_3=s_name_3, v_surname=s_surname, v_id_number=s_id_number, c_gender=s_gender, si_years_experience=i_years_experience, v_sap_k_level=s_sap_k_level, v_contact_number=s_contact_number, v_email=s_email, i_department_id=i_department_id, i_role_id=i_role_id)
db.session.add(o_record)
db.session.commit()
i_record = int(o_record.id) if o_record.id is not None else 0
# end of checking if new or existing record
# save languages linked to parent record
if i_record > 0:
if len(l_languages)==len(l_language_levels):
# first remove any prior language records that do not match currently submitted languages
q_delete = delete(tbl_languages).where(tbl_languages.i_record_id==i_record).where(tbl_languages.v_language_abbreviation.not_in(l_languages))
db.session.execute(q_delete)
# insert/update language records
for ix, s_lang in enumerate(l_languages):
s_language_name = ""
q_language = db.session.query(tbl_all_languages.v_language_name).filter(tbl_all_languages.v_language_abbreviation==s_lang)
if q_language.count() > 0:
s_language_name = q_language.first()[0]
# end of checking for matching language name record
del q_language
q_language = None
if db.session.query(tbl_languages).filter(tbl_languages.i_record_id==i_record, tbl_languages.v_language_abbreviation==s_lang).count()<1:
q_language = insert(tbl_languages).values({"i_record_id": i_record, "v_language_abbreviation": s_lang, "v_language_name": s_language_name, "si_level": int(l_language_levels[ix]), "si_ranking": ix+1})
else:
q_language = update(tbl_languages).where(tbl_languages.i_record_id==i_record,tbl_languages.v_language_abbreviation==s_lang).values({"v_language_name": s_language_name, "si_level": int(l_language_levels[ix]), "si_ranking": ix+1})
# end of checking if insert or update
db.session.execute(q_language)
db.session.commit()
# end of looping through languages
# end of checking language list lengths
flash(f"Record successfully {s_action}", "success")
# end of checking if initial record submission took place
elif qualification_form.validate_on_submit() and i_record>0 and request.form.get("btn_save_qualification", "")=="Save":
i_record_qualification_id, i_qualification_id, d_acquired = (int(qualification_form.hid_record_qualification_id.data), int(qualification_form.sel_qualification.data), qualification_form.d_acquired.data)
if i_record_qualification_id>0:
o_record_qualification = db.session.get(tbl_record_qualifications, i_record_qualification_id)
if o_record_qualification is not None:
o_record_qualification.i_qualification_id = i_qualification_id
o_record_qualification.d_acquired = d_acquired
db.session.add(o_record_qualification)
db.session.commit()
flash("Qualification record updated")
# end of checking if prior record retrieved
else:
o_record_qualification = tbl_record_qualifications(i_record_id=i_record, i_qualification_id=i_qualification_id, d_acquired=d_acquired)
db.session.add(o_record_qualification)
db.session.commit()
flash("Qualification record recorded")
# end of checking if existing or new qualification record
elif remove_qualification_form.validate_on_submit() and request.form.get("hid_remove_qualification_id", None) is not None:
i_qualification_removal_id = request.form.get("hid_remove_qualification_id", type=int)
o_record_qualification = db.session.get(tbl_record_qualifications, i_qualification_removal_id)
if o_record_qualification is not None:
db.session.delete(o_record_qualification)
db.session.commit()
flash("Qualification removed")
# end of making sure retrieved record
# end of checking for form submission
form.hid_record_id.data = i_record
remove_qualification_form.hid_remove_qualification_id.data = 0
# populate record info if existing record
if i_record>0:
o_record = db.session.get(tbl_records, i_record)
if o_record is not None:
form.hid_record_id.data = i_record
if o_record.i_department_id>0:
i_department_id = o_record.i_department_id
i_role_id = o_record.i_role_id
l_roles = list(map(dict_row, db.session.query(tbl_roles.id, tbl_roles.v_role_name).filter(tbl_roles.i_department_id==i_department_id).order_by(tbl_roles.v_role_name).all()))
# end of checking which department roles should populate
form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data = (o_record.v_name_1, o_record.v_name_2, o_record.v_name_3, o_record.v_surname)
form.txt_id_number.data = o_record.v_id_number
form.sel_gender.data = o_record.c_gender
form.txt_years_experience.data = o_record.si_years_experience
form.sel_sap_k_level.data = o_record.v_sap_k_level
form.txt_contact_number.data = o_record.v_contact_number
form.txt_email.data = o_record.v_email
else:
i_record = 0
# end of making sure valid record
# end of checking if existing record
qualification_form.hid_record_qualification_id.data, qualification_form.sel_qualification.data, qualification_form.d_acquired.data = (0, None, None)
# form.txt_name_1.data, form.txt_name_2.data, form.txt_name_3.data, form.txt_surname.data = ("", "", "", "")
# form.txt_id_number.data, form.sel_gender.data, form.txt_years_experience.data, form.sel_sap_k_level.data, form.txt_contact_number.data, form.txt_email.data = ("", "m", "0", "", "", "")
l_languages = db.session.query(tbl_languages.v_language_abbreviation, tbl_languages.v_language_name, tbl_languages.si_level).filter(tbl_languages.i_record_id==i_record).order_by(tbl_languages.si_ranking).all()
l_languages = list(map(dict_row, l_languages))
l_record_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualification_types.v_qualification_type, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name).all()))
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.capture_record")#, _external=True)
return render_template("capture_record.html", js=True, base=s_base, url=s_url, form=form, record_id=i_record, languages=l_languages, all_languages=l_all_languages, qualification_types=l_qualification_types, qualification_form=qualification_form, record_qualifications=l_record_qualifications, departments=l_departments, roles=l_roles, department_id=i_department_id, role_id=i_role_id, remove_qualification_form=remove_qualification_form)
except Exception as exc:
return render_template_string(end_user_debugging("capture_record"))
# end of surrounding try-except
# end of capture_record view function for route /capture_record
@bp.route("/download_upload///", methods=["GET"])
@bp.route("/download_upload//", defaults={"bl_download": True}, methods=["GET"])
@login_required
def download_upload(i_upload:int = 0, bl_download:bool = True):
"""open upload within browser or activate download thereof"""
try:
bl_download = True if str(bl_download)=="True" else False
bl_download = bool(bl_download)
o_out = None
o_upload = db.session.get(tbl_uploads, i_upload)
if o_upload is not None:
o_out = io.BytesIO(o_upload.b_file)
s_mimetype = o_upload.v_mime_type
s_filename = o_upload.v_filename
# resp = send_file(o_out, mimetype=s_mimetype, as_attachment=bl_download, download_name=s_filename)
resp = make_response(o_out)
resp.headers.set("Content-Disposition", f"attachment; filename=\"{s_filename}\"" if bl_download else f"inline; filename=\"{s_filename}\"")
resp.headers.set("Content-Type", s_mimetype)
return resp
else:
return make_response("No upload retrieved", 500)
# end of None check for record
except Exception as exc:
return render_template_string(end_user_debugging("download_upload"))
# end of surrounding try-except
# end of download_upload view function for route /download_upload//[]
def qualification_type_choices():
"""return possible qualification type choices"""
l_choices = []
l_qualification_types = list(map(dict_row, db.session.query(tbl_qualification_types.id, tbl_qualification_types.v_qualification_type).order_by(tbl_qualification_types.v_qualification_type).all()))
l_type_choices = list(map((lambda x : (x["id"], x["v_qualification_type"])), l_qualification_types))
if len(l_type_choices)>0:
l_choices = l_type_choices
# end of checking if type choices exist
return (l_qualification_types, l_choices)
# end of qualification_type_choices function
@bp.route("/qualifications/", methods=["GET", "POST"])
@login_required
@check_admin
def qualifications():
"""manage qualification types and qualifications to make use of in rest of site"""
try:
i_type_filter = 0
type_form = QualificationTypeForm()
form = QualificationForm()
l_qualification_types, form.sel_qualification_type.choices = qualification_type_choices()
remove_type_form = QualificationTypeRemovalForm()
remove_form = QualificationRemovalForm()
if type_form.validate_on_submit() and request.form.get("btn_save_type", "")=="Save":
i_type_id, s_type = (int(type_form.hid_qualification_type_id.data), type_form.txt_qualification_type.data)
if i_type_id > 0:
o_type = db.session.get(tbl_qualification_types, i_type_id)
if o_type is not None:
o_type.v_qualification_type = s_type
db.session.add(o_type)
db.session.commit()
flash("Type updated", "success")
# end of checking if record existed
else:
if db.session.query(tbl_qualification_types.id).filter(tbl_qualification_types.v_qualification_type==s_type).count()<1:
db.session.add(tbl_qualification_types(v_qualification_type=s_type))
db.session.commit()
flash("Type inserted", "success")
# end of making sure new type is not a duplicate
# end of checking if new or existing type record
elif remove_type_form.validate_on_submit() and request.form.get("hid_remove_qualification_type_id", None) is not None:
i_type_id = int(remove_type_form.hid_remove_qualification_type_id.data)
o_type = db.session.get(tbl_qualification_types, i_type_id)
if o_type is not None:
db.session.delete(o_type)
db.session.commit()
flash("Type removed", "success")
# end of checking if record existed
elif form.validate_on_submit() and request.form.get("btn_save_qualification", "")=="Save":
i_qualification_id, i_qualification_type, s_qualification_name, s_description = (int(form.hid_qualification_id.data), int(form.sel_qualification_type.data), str(form.txt_qualification_name.data), str(form.txt_description.data))
if i_qualification_id>0:
o_qualification = db.session.get(tbl_qualifications, i_qualification_id)
if o_qualification is not None:
if db.session.query(tbl_qualifications).filter(tbl_qualifications.id!=i_qualification_id, tbl_qualifications.v_qualification_name==s_qualification_name, tbl_qualifications.i_qualification_type==i_qualification_type).count()<1:
o_qualification.i_qualification_type = i_qualification_type
o_qualification.v_qualification_name = s_qualification_name
o_qualification.v_description = s_description
db.session.add(o_qualification)
db.session.commit()
flash("Qualification updated")
i_type_filter = i_qualification_type
else:
flash("There is already a qualification under same type category with same name")
i_type_filter = i_qualification_type
# end of checking for alternative duplicate
# end of retrieval check
else:
if db.session.query(tbl_qualifications).filter(tbl_qualifications.v_qualification_name==s_qualification_name, tbl_qualifications.i_qualification_type==i_qualification_type).count()<1:
o_qualification = tbl_qualifications(i_qualification_type=i_qualification_type, v_qualification_name=s_qualification_name, v_description=s_description)
db.session.add(o_qualification)
db.session.commit()
flash("Qualification added")
i_type_filter = i_qualification_type
else:
flash("There is an existing qualification with same name under same type category")
# end of making sure would not count as duplicate
# end of checking if new or existing qualification record
elif remove_form.validate_on_submit() and request.form.get("hid_remove_qualification_id", None) is not None:
i_qualification_id = int(remove_form.hid_remove_qualification_id.data)
o_qualification = db.session.get(tbl_qualifications, i_qualification_id)
if o_qualification is not None:
db.session.delete(o_qualification)
db.session.commit()
flash("Qualification removed", "success")
# end of checking if record existed
# end of checking for form submissions
type_form.hid_qualification_type_id.data, type_form.txt_qualification_type.data = (0, "")
remove_type_form.hid_remove_qualification_type_id.data = 0
# re-populate in case changes have occurred
l_qualification_types, form.sel_qualification_type.choices = qualification_type_choices()
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.qualifications")#, _external=True)
return render_template("qualifications.html", js=True, base=s_base, url=s_url, type_form=type_form, form=form, qualification_types=l_qualification_types, remove_type_form=remove_type_form, type_filter=i_type_filter, remove_form=remove_form)
except Exception as exc:
return render_template_string(end_user_debugging("qualifications"))
# end of surrounding try-except
# end of qualifications view function for route /qualifications
@bp.route("/qualifications_list//", methods=["GET"])
@login_required
def qualifications_list(i_type_id):
"""return list of qualification entries that match type filter"""
try:
l_out = []
q_qualifications = db.session.query(tbl_qualifications.id, tbl_qualifications.v_qualification_name, tbl_qualifications.v_description, tbl_qualification_types.v_qualification_type).join(tbl_qualification_types, tbl_qualifications.i_qualification_type==tbl_qualification_types.id, isouter=True)
q_qualifications = q_qualifications.filter(tbl_qualifications.i_qualification_type==i_type_id) if i_type_id>0 else q_qualifications
if q_qualifications.count()>0:
q_qualifications = q_qualifications.order_by(tbl_qualifications.v_qualification_name)
l_out = list(map(dict_row, q_qualifications.all())) if q_qualifications.count()>0 else []
# end of checking result count of query
return make_response(jsonify(l_out), 200)
except Exception as exc:
return render_template_string(end_user_debugging("qualifications_list"))
# end of surrounding try-except
# end of qualifications_list view function
@bp.route("/qualification_details//", methods=["GET"])
@login_required
def qualification_details(i_qualification_id):
"""retrieve qualification details"""
try:
d_out = {}
o_qualification = db.session.get(tbl_qualifications, i_qualification_id)
if o_qualification is not None:
d_out = dict_instance(o_qualification)
# end of checking if record retrieved
return make_response(jsonify(d_out), 200)
except Exception as exc:
return render_template_string(end_user_debugging("qualification_details"))
# end of surrounding try-except
# end of qualification_details view function
def role_department_choices():
"""return list of possible|existing role departments"""
l_choices = []
l_departments = list(map(dict_row, db.session.query(tbl_role_departments.id, tbl_role_departments.v_department_name).order_by(tbl_role_departments.v_department_name).all()))
l_department_choices = list(map((lambda x : (x["id"], x["v_department_name"])), l_departments))
if len(l_department_choices)>0:
l_choices = l_department_choices
# end of checking if there were records
return (l_departments, l_choices)
# end of role_department_choices function
@bp.route("/roles/", methods=["GET", "POST"])
@login_required
@check_admin
def roles():
"""manage roles available for assignment to records"""
try:
i_department_filter = 0
department_form = RoleDepartmentForm()
form = RoleForm()
# need to populate this first to allow form validation
l_departments, form.sel_department.choices = role_department_choices()
remove_department_form = RoleDepartmentRemovalForm()
remove_form = RoleRemovalForm()
if department_form.validate_on_submit() and request.form.get("btn_save_department", "")=="Save":
i_department_id, s_department_name = (int(department_form.hid_department_id.data), department_form.txt_department_name.data)
if i_department_id > 0:
o_department = db.session.get(tbl_role_departments, i_department_id)
if o_department is not None:
o_department.v_department_name = s_department_name
db.session.add(o_department)
db.session.commit()
flash("Department updated", "success")
# end of checking if record existed
else:
if db.session.query(tbl_role_departments.id).filter(tbl_role_departments.v_department_name==s_department_name).count()<1:
db.session.add(tbl_role_departments(v_department_name=s_department_name))
db.session.commit()
flash("Department inserted", "success")
# end of making sure new department is not a duplicate
# end of checking if new or existing department record
elif form.validate_on_submit() and request.form.get("btn_save_role", "")=="Save":
i_role_id, i_department_id, s_role_name, s_description = (int(form.hid_role_id.data), int(form.sel_department.data), str(form.txt_role_name.data), str(form.txt_description.data))
if i_role_id>0:
o_role = db.session.get(tbl_roles, i_role_id)
if o_role is not None:
if db.session.query(tbl_roles).filter(tbl_roles.id!=i_role_id, tbl_roles.v_role_name==s_role_name, tbl_roles.i_department_id==i_department_id).count()<1:
o_role.i_department_id = i_department_id
o_role.v_role_name = s_role_name
o_role.v_description = s_description
db.session.add(o_role)
db.session.commit()
flash("Role updated")
i_department_filter = i_department_id
else:
flash("There is already a role under same department with same name")
i_department_filter = i_department_id
# end of checking for alternative duplicate
# end of retrieval check
else:
if db.session.query(tbl_roles).filter(tbl_roles.v_role_name==s_role_name, tbl_roles.i_department_id==i_department_id).count()<1:
o_role = tbl_roles(i_department_id=i_department_id, v_role_name=s_role_name, v_description=s_description)
db.session.add(o_role)
db.session.commit()
flash("Role added")
i_department_filter = i_department_id
else:
flash("There is an existing role with same name under same department")
# end of making sure would not count as duplicate
# end of checking if new or existing role record
elif remove_department_form.validate_on_submit():
i_department_id = int(remove_department_form.hid_remove_department_id.data)
o_department = db.session.get(tbl_role_departments, i_department_id)
if o_department is not None:
db.session.delete(o_department)
db.session.commit()
flash("Department removed", "success")
# end of checking if record existed
elif remove_form.validate_on_submit():
i_role_id = int(remove_form.hid_remove_role_id.data)
o_role = db.session.get(tbl_roles, i_role_id)
if o_role is not None:
db.session.delete(o_role)
db.session.commit()
flash("Role removed", "success")
# end of checking if record existed
# end of checking for form submission
department_form.hid_department_id.data, department_form.txt_department_name.data = (0, "")
remove_form.hid_remove_role_id.data, remove_department_form.hid_remove_department_id.data = ("0", "0")
# re-populate in case changes have occurred
l_departments, form.sel_department.choices = role_department_choices()
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.roles")#, _external=True)
return render_template("roles.html", js=True, base=s_base, url=s_url, departments=l_departments, department_filter=i_department_filter, department_form=department_form, form=form, remove_department_form=remove_department_form, remove_form=remove_form)
except Exception as exc:
return render_template_string(end_user_debugging("roles"))
# end of surrounding try-except
# end of roles view function for route /positions
@bp.route("/roles_list//", methods=["GET"])
@login_required
def roles_list(i_department_id):
"""return list of role entries that match department filter"""
try:
l_out = []
try:
q_roles = db.session.query(tbl_roles.id, tbl_roles.v_role_name, tbl_roles.v_description, tbl_role_departments.v_department_name).join(tbl_role_departments, tbl_roles.i_department_id==tbl_role_departments.id, isouter=True)
q_roles = q_roles.filter(tbl_roles.i_department_id==i_department_id) if i_department_id>0 else q_roles
if q_roles.count()>0:
q_roles = q_roles.order_by(tbl_roles.v_role_name)
l_out = list(map(dict_row, q_roles.all()))
# end of checking result count of query
except Exception as exc:
print(extract_exc("list roles?"))
# end of try-except
return make_response(jsonify(l_out), 200)
except Exception as exc:
return render_template_string(end_user_debugging("roles_list"))
# end of surrounding try-except
# end of roles_list view function for route /roles_list//
@bp.route("/role_details//", methods=["GET"])
@login_required
def role_details(i_role_id):
"""retrieve role details"""
try:
d_out = {}
o_role = db.session.get(tbl_roles, i_role_id)
if o_role is not None:
d_out = dict_instance(o_role)
# end of checking if record retrieved
return make_response(jsonify(d_out), 200)
except Exception as exc:
return render_template_string(end_user_debugging("role_details"))
# end of surrounding try-except
# end of role_details view function for route /role_details//
@bp.route("/uploads//", methods=["GET", "POST"])
@login_required
@check_capture
def uploads(i_record:int = 0):
"""capture uploads linked to CV record"""
try:
# https://blog.miguelgrinberg.com/post/handling-file-uploads-with-flask
o_record = db.session.get(tbl_records, i_record)
if o_record is None:
return redirect(url_for("main.index"))
# end of just making sure valid record
remove_upload_form = UploadRemovalForm()
upload_form = UploadForm()
s_names = o_record.v_surname + ", " + " ".join([o_record.v_name_1, o_record.v_name_2, o_record.v_name_3])
l_qualifications = list(map(dict_row, db.session.query(tbl_record_qualifications.id, tbl_qualifications.v_qualification_name, tbl_record_qualifications.d_acquired).join(tbl_qualifications, tbl_record_qualifications.i_qualification_id==tbl_qualifications.id, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record).order_by(tbl_qualifications.v_qualification_name).all()))
upload_form.sel_match.choices.extend(list(map((lambda x : (x["id"], x["v_qualification_name"])), l_qualifications)))
if upload_form.validate_on_submit() and request.form.get("btn_upload", None) is not None:
i_upload_id, i_upload_type, i_matching_id, s_description = (int(upload_form.hid_upload_id.data), int(upload_form.sel_upload_type.data), int(upload_form.sel_match.data), str(upload_form.txt_description.data))
fil_upload = request.files.get("fil_upload_document")
if fil_upload is not None and fil_upload.filename!="":
s_ext = os.path.splitext(fil_upload.filename)[1]
if s_ext not in [".jpg", ".png", ".pdf", ".doc", ".docx"]:
abort(400)
# end of double-checking file extension
# extract data record values
s_filename= fil_upload.filename
s_mime_type = fil_upload.mimetype
try:
if i_upload_id>0:
q_update = update(tbl_uploads).where(id==i_upload_id)
q_update = q_update.values({"i_record_id": i_record, "i_matching_id": i_matching_id, "si_upload_type": i_upload_type, "v_description": s_description, "v_filename": s_filename, "v_mime_type": s_mime_type, "b_file": fil_upload.read()})
db.session.execute(q_update)
flash("Document record updated")
else:
o_upload = tbl_uploads(i_record_id=i_record, i_matching_id=i_matching_id, si_upload_type=i_upload_type, v_description=s_description, v_filename=s_filename, v_mime_type=s_mime_type, b_file=fil_upload.read())
db.session.add(o_upload)
db.session.commit()
flash("Document uploaded")
except Exception as exc:
print(str(exc))
# end of try-except around saving data record
else:
flash("Invalid file upload")
# end of making sure file was uploaded
elif remove_upload_form.validate_on_submit():
i_upload_id = int(remove_upload_form.hid_remove_upload_id.data)
o_upload = db.session.get(tbl_uploads, i_upload_id)
if o_upload is not None:
db.session.delete(o_upload)
db.session.commit()
flash("Uploaded document removed")
# end of checking if record was retrieved
# end of checking for form submission
remove_upload_form.hid_remove_upload_id.data = 0
upload_form.hid_upload_id.data, upload_form.sel_upload_type.data, upload_form.sel_match.data, upload_form.txt_description.data = (0, 0, 0, "")
l_uploads = list(map(dict_row, db.session.query(tbl_uploads.id, tbl_uploads.si_upload_type, tbl_uploads.v_description, tbl_uploads.v_filename, tbl_qualifications.v_qualification_name).join(tbl_qualifications, tbl_qualifications.id==tbl_uploads.i_matching_id, isouter=True).filter(tbl_uploads.i_record_id==i_record).order_by(tbl_uploads.v_filename).all()))
for ix, d in enumerate(l_uploads): d["si_upload_type"] = {0: "Original CV", 1: "Alteram CV", 2: "Certificate/Diploma/Degree", 3: "I.D. Document or Passport"}[d["si_upload_type"]]; d["v_qualification_name"] = d["v_qualification_name"] if d["v_qualification_name"] is not None else ""
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.uploads", i_record=i_record)#, _external=True)
return render_template("uploads.html", js=True, base=s_base, url=s_url, record_id=i_record, names=s_names, record_qualifications=l_qualifications, remove_upload_form=remove_upload_form, upload_form=upload_form, document_uploads=l_uploads)
except Exception as exc:
return render_template_string(end_user_debugging("uploads"))
# end of surrounding try-except
# end of uploads view function for route /uploads//
@bp.route("/record_details//", methods=["GET"])
@login_required
def record_details(i_record_id):
"""view all details for a record"""
try:
d_out = {}
q_details = db.session.query(tbl_records.id, tbl_records.v_surname, tbl_records.v_name_1, tbl_records.v_name_2, tbl_records.v_name_3, tbl_records.v_id_number, tbl_records.c_gender, tbl_records.si_years_experience, tbl_records.v_sap_k_level, tbl_records.v_contact_number, tbl_records.v_email, tbl_role_departments.v_department_name, tbl_roles.v_role_name).join(tbl_role_departments, tbl_role_departments.id==tbl_records.i_department_id, isouter=True).join(tbl_roles, tbl_roles.id==tbl_records.i_role_id, isouter=True).filter(tbl_records.id==i_record_id)
if q_details.count()>0:
d_out["record"] = dict_row(q_details.first())
d_out["languages"] = list(map(dict_row, db.session.query(tbl_languages.v_language_abbreviation, tbl_languages.v_language_name, tbl_languages.si_level).filter(tbl_languages.i_record_id==i_record_id).order_by(tbl_languages.si_ranking).all()))
for ix, d in enumerate(d_out["languages"]): d_out["languages"][ix]["level"] = ["basic", "intermediate", "proficient"][d["si_level"]]
d_out["qualifications"] = list(map(dict_row, db.session.query(tbl_record_qualifications.d_acquired, tbl_qualifications.v_qualification_name, tbl_qualification_types.v_qualification_type).join(tbl_qualifications, tbl_qualifications.id==tbl_record_qualifications.i_qualification_id, isouter=True).join(tbl_qualification_types, tbl_qualification_types.id==tbl_qualifications.i_qualification_type, isouter=False).filter(tbl_record_qualifications.i_record_id==i_record_id).all()))
for ix, d in enumerate(d_out["qualifications"]):
if isinstance(d["d_acquired"], datetime_class.date): d["d_acquired"] = d["d_acquired"].strftime("%Y-%m-%d")
# end of looping through qualifications
d_out["uploads"] = list(map(dict_row, db.session.query(tbl_uploads.id, tbl_uploads.si_upload_type, tbl_uploads.v_filename, tbl_uploads.v_description, tbl_qualifications.v_qualification_name).join(tbl_qualifications, tbl_qualifications.id==tbl_uploads.i_matching_id, isouter=True).filter(tbl_uploads.i_record_id==i_record_id).all()))
for ix, d in enumerate(d_out["uploads"]): d["si_upload_type"] = {0: "Original CV", 1: "Alteram CV", 2: "Certificate/Diploma/Degree", 3: "I.D. Document or Passport"}[d["si_upload_type"]]
# end of initial query length check
return make_response(jsonify(d_out), 200)
except Exception as exc:
s_logging = end_user_debugging("record_details", False)
return make_response(s_logging, 200)
# end of surrounding try-except
# end of record_details view function for route /record_details//
@bp.route("/users/", methods=["GET", "POST"])
@login_required
@check_admin
def users():
"""manage access|authentication IDs for admin, capture and view"""
try:
form = UserForm()
removal_form = UserRemovalForm()
if form.validate_on_submit() and request.form.get("btn_save", None)=="Save":
i_user_id, s_user_id, s_password, s_password_confirm, bl_admin, bl_capture = (int(form.hid_user_id.data), str(form.txt_user_id.data), str(form.txt_password.data), str(form.txt_password_confirm.data), bool(form.chk_admin.data), bool(form.chk_capture.data))
if i_user_id>0:
o_user = db.session.get(tbl_users, i_user_id)
if o_user is not None:
if db.session.query(tbl_users.id).filter(tbl_users.id!=i_user_id, tbl_users.v_user_id==s_user_id).count()>0:
flash("Another User with same User ID exists")
else:
o_user.v_user_id = s_user_id
o_user.bl_admin = bl_admin
o_user.bl_capture = bl_capture
s_password_change = ""
if len(s_password)>3 and s_password==s_password_confirm:
o_user.set_password(s_password)
s_password_change = " (with password changed)"
# end of checking if password must be changed
db.session.add(o_user)
db.session.commit()
flash(f"User record updated{s_password_change}")
# end of checking for another user record with same user ID
# end of checking for record retrieval
else:
if len(s_password)>3 and s_password==s_password_confirm:
if db.session.query(tbl_users.id).filter(tbl_users.v_user_id==s_user_id).count()>0:
flash("Another User with same User ID already exists")
else:
o_user = tbl_users(v_user_id=s_user_id, v_password=s_password, bl_admin=bl_admin, bl_capture=bl_capture)
db.session.add(o_user)
db.session.commit()
i_user_id = int(o_user.id) if o_user.id is not None else 0
if o_user.id>0:
flash("User record inserted")
# end of checking for record insertion
# end of checking for matching record with same User ID
else:
flash("Password values must be longer than 3 characters, and must match")
# end of making sure passwords match
# end of checking if existing user
elif removal_form.validate_on_submit():
i_user_id = int(removal_form.hid_remove_user_id.data)
o_user = db.session.get(tbl_users, i_user_id)
if o_user is not None:
db.session.delete(o_user)
db.session.commit()
flash("User record removed")
# end of checking for record retrieval
# end of checking for form submission
form.hid_user_id.data, form.txt_user_id.data, form.txt_password.data, form.txt_password_confirm.data, form.chk_admin.data, form.chk_capture.data = (0, "", "", "", False, False)
removal_form.hid_remove_user_id.data = 0
l_users = list(map(dict_row, db.session.query(tbl_users.id, tbl_users.v_user_id, tbl_users.bl_admin, tbl_users.bl_capture).filter(tbl_users.v_user_id!="admin").order_by(tbl_users.v_user_id).all()))
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.users")#, _external=True)
return render_template("users.html", js=True, base=s_base, url=s_url, users=l_users, form=form, removal_form=removal_form)
except Exception as exc:
return render_template_string(end_user_debugging("users"))
# end of surrounding try-except
# end of users view function for route /users
@bp.route("/user_details//", methods=["GET"])
@login_required
@check_admin
def user_details(i_user_id:int = 0):
"""retrieve user record details"""
try:
d_out = {}
o_user = db.session.get(tbl_users, i_user_id)
if o_user is not None:
d_out = dict_instance(o_user)
del d_out["v_password"]
# end of checking for record retrieval
return make_response(jsonify(d_out), 200)
except Exception as exc:
return render_template_string(end_user_debugging("user_details"))
# end of surrounding try-except
# end of user_details view function for route /user_details//
@bp.route("/css_sample", methods=["GET"])
def css_sample():
"""CSS sample page - purely testing"""
s_base = "base_bs.html" if Config.BOOTSTRAP else "base.html"
s_url = url_for("main.index")#, _external=True)
return render_template("css_sample.html", js=True, base=s_base, url=s_url)
# end of css_sample view function for route /css_sample
s_todo = """
double-check both try-except across the board, along with then logging exceptions
"""