import sys import json import random import base64 from flask import Flask, redirect, url_for, request, session, make_response, jsonify, send_file from flask import render_template from flask_sqlalchemy import SQLAlchemy from sqlalchemy import Integer, String, Column, Float from datetime import datetime import uuid from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import select, join from sqlalchemy.orm import DeclarativeBase #from flask_wtf.csrf import CSRFProtect import os import csv from zipfile import ZipFile import hashlib # activate environment: cd C:\...\...\....\...\Code\SLAEForms Testing\.venv\Scripts\ # then this: activate #SETUP-------------------------------------------------- #Set up sqlalchemy class Base(DeclarativeBase): pass db = SQLAlchemy(model_class=Base) #create the app app = Flask(__name__) # configure the database, give it a path (it will be in the instances folder) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db" app.config["PERMANENT_SESSION_LIFETIME"] = 10800 #3 Stunden, 10800 sekunden app.config['MAX_CONTENT_LENGTH'] = 22 * 1000 * 1000 # try and fix video upload not working db.init_app(app) #set the secret key (TODO change this for final deployment) app.secret_key = b"29fe9e8edd407c5491d4f1c05632d9fa33e26ed8734a3f5e080ebac3772a555a" UPLOAD_FOLDER = 'uploads' EXPORT_FOLDER = 'exports' PASSWORD = 'd5aff9fc14d1f20f4ccddaa8b4f2c1765228b74ed0b1dfb868bf1064e0d655e2' CONFIGFILE = 'userstudy1.json' # CONFIGFILE = 'test.json' # CONFIGFILE = 'default.json' #csrf = CSRFProtect(app) #enable CSRF protection globally #------------------------------------------------------------------------ # Setting up DB Models # This table is always created, tracks all users class User(db.Model): user_id = db.Column("user_id",db.UUID(as_uuid=True), primary_key=True, nullable=False) device_id = db.Column("device_id",db.UUID(as_uuid=True), nullable=False) question_order = db.Column("question_order",db.String(60)) date_created = db.Column("date_created",db.DateTime, default=datetime.today()) # todo test if this default works form_completed = db.Column("form_completed",db.Boolean, default=False) def __repr__(self) -> str: return "" % self.user_id # create the table (existing tables are not overwritten) try: with app.app_context(): db.create_all() except SQLAlchemyError as e: print("Error occurred during database creation:", str(e)) #----------------------------------------------------------------------------- #open, parse and execute json file #open the json file with the config configfile = open(CONFIGFILE, encoding='utf-8') #todo replace with other name #convert it to dict config = json.load(configfile) configfile.close() db_tables = {} # contains all dynamically created tables, key = table/classname #TODO insert code to create all tables def create_json_tables(): #print(config) for block_key, block_content in config.items(): if "database_table" in block_content: if not (block_content["database_table"]["table_name"] in db_tables): print("New table: \n {table}".format(table=block_content["database_table"]["table_name"])) db_tables[block_content["database_table"]["table_name"]]=create_model_class(block_content["database_table"]) print("created table: {table}".format(table=db_tables[block_content["database_table"]["table_name"]])) print("tables in db_tables: \n {db_tables}".format(db_tables=db_tables)) def create_model_class(schema): class_name = schema["table_name"].capitalize() #print("creating table class: {class_name}".format(class_name=class_name)) # Define class attributes dynamically attributes = {"__tablename__": schema["table_name"]} # id as key and date as standard fields attributes["id"] = Column("id",db.UUID(as_uuid=True), primary_key=True, nullable=False) attributes["user_id"] = Column("user_id",db.UUID(as_uuid=True), nullable=False) attributes["date_created"] = db.Column("date_created",db.DateTime) attributes["stimulus_name"] = db.Column("stimulus_name",db.String(30)) for column_name, column_info in schema["fields"].items(): if column_info["type"] == "integer": column_type = Integer elif column_info["type"] == "string": column_type = String(int(column_info["size"])) if column_info["type"] == "float": column_type = Float attributes[column_name] = Column(column_name,column_type, nullable=column_info["nullable"]) #print("attributes of the table: ",attributes) # Create the model class return type(class_name, (db.Model,), attributes) create_json_tables() # create the table (existing tables are not overwritten) try: print("try to create tables") with app.app_context(): db.create_all() print("successfully created all tables") except SQLAlchemyError as e: print("Error occurred during database creation:", str(e)) #------------------------------------------------------------------------------ #actual page logic with start, form and send #@app.route("/popuptest", methods=["GET"]) def popuptest(): return render_template( "popuptest.html" ) @app.route("/start", methods=["GET", "POST"]) def startpage(): session.permanent = False if not "slaeform_device_id" in session: # If this device was not seen, remember it. new_device_id = uuid.uuid4() session["slaeform_device_id"] = new_device_id session["agreed_to_tos"] = False if request.method == "POST": #right now if a user that has an active session goes to the startpage again and accepts tos #it will just start another session and discard the previous session #config is the dict with the info #get the block names block_names = config.keys() session["block_names"] = list(block_names) session["block_order"] = {} # only for templates, for each block the list of keys for stimuli session["current_block_index"] = 0 session["current_stimulus_index"] = 0 session["current_block_name"] = session["block_names"][session["current_block_index"]] session["number_of_blocks"] = len(session["block_names"]) current_block = config[session["current_block_name"]] session["number_of_stimuli"] = 0 # if the block has stimuli, get how many if "stimuli" in current_block: if current_block["stimuli"]["type"] == "single_video" or current_block["stimuli"]["type"] == "empty": session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"])) elif current_block["stimuli"]["type"] == "double_video": session["number_of_stimuli"] = len(list(current_block["stimuli"]["list_1"])) print("number of blocks: ",len(session["block_names"])) print("Startpage post") print(session["block_names"]) for name in block_names: print("block: ",name) if config[name]["type"] == "TaskTemplate" and ("stimuli" in config[name]): match config[name]["stimuli"]["type"]: case "single_video": order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys print("order: ",order) if "order" in config[name]["stimuli"]: if config[name]["stimuli"]["order"] == "random": random.shuffle(order) #in random order session["block_order"][name] = order case "double_video": order = [] # order = list of stimuli keys list_1 = list(config[name]["stimuli"]["list_1"]) list_2 = list(config[name]["stimuli"]["list_2"]) for i in range(len(list(config[name]["stimuli"]["list_1"]))): order.append((list_1[i], list_2[i])) print("order: ",order) #TODO random is not implemented here session["block_order"][name] = order case "empty": order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys print("order: ",order) session["block_order"][name] = order if "stimuli" in current_block: #get the name of the current stimulus session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]] #save the new user to the database and the session session["agreed_to_tos"] = True new_user_id = uuid.uuid4() session["slaeform_user_id"] = new_user_id device_id = session["slaeform_device_id"] user_id = new_user_id question_order = str(session["block_order"]) date = datetime.today() new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date,form_completed=False) #,question_order=question_order db.session.add(new_user) db.session.commit() print("block order: {order}".format(order=session["block_order"])) try: db.session.add(new_user) db.session.commit() except: return "There was a problem while adding the user to the Database" return redirect("/form") return render_template( "startpage.html" ) @app.route("/endpage") def endpage(): print("Form is done, sent to endpage") return render_template("endpage.html") @app.route("/datenschutz") def datenschutz(): return render_template("datenschutz.html") @app.route("/impressum") def impressum(): return render_template("impressum.html") @app.route("/studytest") def studytest(): return render_template("studytest.html") @app.route("/form") def form(): #user is not yet registered and should not be here if not "slaeform_user_id" in session: return redirect("/start") #TODO replace this later with actual startpage current_block = config[session["current_block_name"]] print("form") print("current_block_name: {current_block_name}".format(current_block_name=session["current_block_name"])) print("current_block_index: {current_block_order}".format(current_block_order=session["current_block_index"])) print("current_stimulus: {current_stimulus}".format(current_stimulus=session["current_stimulus_index"])) #print("current Blockname: {blockname}, current block index: {blockindex}, current stim index: {stimulusindex}".format(blockname=session["current_block_name"], # blockindex=session["current_block_index"], # stimulusindex=session["current_stimulus_index"] ) ) infovideo = None if "infovideo" in current_block: infovideo = current_block["infovideo"] # erster Fall: SinglePage if current_block["type"] == "SinglePage": return render_template(current_block["template"]) #zweiter Fall, empty TaskTemplate if current_block["type"] == "TaskTemplate" and current_block["stimuli"]["type"] == "empty": current_block_order = session["block_order"][session["current_block_name"]] current_block_stimuli = current_block["stimuli"] current_stimulus = current_block_order[session["current_stimulus_index"]] stimulus_type=current_block["stimuli"]["type"] return render_template( "standard_template.html", stimuli=current_block_stimuli, stimulus_type=stimulus_type, current_stimulus=current_stimulus, questions=current_block["questions"], infovideo=infovideo ) # ansonsten, templates: current_block_order = session["block_order"][session["current_block_name"]] current_block_stimuli = current_block["stimuli"] current_stimulus = current_block_order[session["current_stimulus_index"]] stimulus_type=current_block["stimuli"]["type"] stimulus_configuration = current_block["stimuli"]["configuration"] # dict with the config if current_block["type"] == "TaskTemplate": print("case: TaskTemplate") match stimulus_type: case "single_video": stimulus_configuration["video_url"] = config[session["current_block_name"]]["stimuli"]["list"][current_stimulus] print("-------------videourl: ", stimulus_configuration["video_url"]) case "double_video": stimulus_configuration["video_url1"] = config[session["current_block_name"]]["stimuli"]["list_1"][current_stimulus[0]] stimulus_configuration["video_url2"] = config[session["current_block_name"]]["stimuli"]["list_2"][current_stimulus[1]] return render_template( "standard_template.html", stimuli=current_block_stimuli, stimulus_type=stimulus_type, current_stimulus=current_stimulus, stimulus_configuration=stimulus_configuration, questions=current_block["questions"], infovideo=infovideo ) return "Error, none of the Blocks triggered" @app.route("/send", methods=["POST"]) def sendpage(): print("send") # Do I need to write to a table at all? # I can figure it out by checking if the current block has a database field, that is best if not ("database_table" in config[session["current_block_name"]]): #it has no database field, so nothing to receive # so just move on print("no database table") update_session() return redirect("/form") # now to if it has a database field # find out which table we need to write to table_name = config[session["current_block_name"]]["database_table"]["table_name"] print("Form posted: {rqform}".format(rqform=request.form)) print("Writing to table: {table_name}".format(table_name=table_name)) session_user_id = session["slaeform_user_id"] new_id = uuid.uuid4() date = datetime.today() stimulus_name = str(session["current_stimulus_name"]) new_entry = db_tables[table_name](id=new_id,user_id = session_user_id,date_created = date,stimulus_name=stimulus_name) # handle possible Video that was send if 'recordedVideo' in request.files: video = request.files['recordedVideo'] formatted_date = date.strftime("%Y.%m.%d %H-%M-%S") print("date: ", date) video_name = str(session_user_id) + "_" + session["current_block_name"] + "_" + str(session["current_stimulus_name"]) + "_" + str(formatted_date) + ".webm" path = os.path.join(UPLOAD_FOLDER, video_name) print("path: ",path) os.makedirs(UPLOAD_FOLDER, exist_ok=True) video.save(path) if 'recordedVideo' in request.files: if hasattr(new_entry, "video_upload"): setattr(new_entry, "video_upload", video_name) # TODO maybe find a prettier solution, this handeles multiple choice now, so the fact that there can be # multiple keys that are the same in the form data, but I want to bring them together to 1 key value pair form_data = {} for key in request.form: values = request.form.getlist(key) # If there's more than one value for the key, join them with commas if len(values) > 1: form_data[key] = ','.join(map(str, values)) # Join multiple values into a single comma-separated string else: form_data[key] = values[0] # If only one value, store it directly for key, value in form_data.items(): print("hasattr key: ", key) if hasattr(new_entry, key): print("key exists: ", key) setattr(new_entry, key, value) print("setattr value: ", value) print("entry: ", new_entry) try: db.session.add(new_entry) db.session.commit() except Exception as e: print("Error occurred: {e}".format(e=str(e))) return "There was a problem while adding the response to the Database" if (session["current_block_index"] == session["number_of_blocks"]-1) and (session["current_stimulus_index"] >= session["number_of_stimuli"]-1): #update the database entry, the form is completed user = db.session.query(User).filter_by(user_id=session["slaeform_user_id"]).one() user.form_completed = True print("updated db entry for form_completed") # This user is done, so we can remove it from the session session.pop("slaeform_user_id") try: db.session.commit() except Exception as e: print("Error occurred: {e}".format(e=str(e))) return "There was a problem while updating the user" return redirect("/endpage") # Now move to the next stimulus or block update_session() print("now redirect and reload the page") return redirect("/form") def update_session(): print("update session") if "stimuli" in config[session["current_block_name"]]: # if there are stimuli in this block if session["current_stimulus_index"] < session["number_of_stimuli"]-1: print("there are still stimuli left") # if there are still stimuli left, keep going through them session["current_stimulus_index"] += 1 # set the name of the current stimulus session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]] else: print("here are no stimuli left") session["number_of_stimuli"] = 0 # if there are no stimuli left.. if(session["current_block_index"] < session["number_of_blocks"]-1): # go to next block if possible session["current_block_index"] += 1 session["current_block_name"] = session["block_names"][session["current_block_index"]] session["current_stimulus_index"] = 0 if "stimuli" in config[session["current_block_name"]]: session["number_of_stimuli"] = len(session["block_order"][session["current_block_name"]]) else: # if there arent any stimuli, go to the next block session["number_of_stimuli"] = 0 session["current_stimulus_index"] = 0 if(session["current_block_index"] < session["number_of_blocks"]-1): session["current_block_index"] += 1 session["current_block_name"] = session["block_names"][session["current_block_index"]] #set values for the new block current_block = config[session["current_block_name"]] if "stimuli" in current_block: # set the name of the current stimulus session["current_stimulus_name"] = session["block_order"][session["current_block_name"]][session["current_stimulus_index"]] # if the block has stimuli, get how many if current_block["stimuli"]["type"] == "single_video" or current_block["stimuli"]["type"] == "empty": session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"])) elif current_block["stimuli"]["type"] == "double_video": session["number_of_stimuli"] = len(list(current_block["stimuli"]["list_1"])) print("---Session updated-----------------------------------------------") print("current_block_index / number_of_blocks: {current_block_index} / {number_of_blocks}".format(current_block_index=session["current_block_index"],number_of_blocks=session["number_of_blocks"])) print("current_block_name: ", session["current_block_name"]) print("current_stimulus_index: ", session["current_stimulus_index"]) print("Current number_of_stimuli: ", session["number_of_stimuli"]) #@app.route("/update") def update(): print("Current Session: ",session) try: user = db.session.query(User).filter_by(user_id=session["slaeform_user_id"]).one() user.form_completed = True db.session.commit() except Exception as e: print("Error occurred: {e}".format(e=str(e))) return "There was a problem while updating the user" return "db entry updated!" # Database stuff------------------------------------------------------------------------------ def zipdir(path, ziph): # ziph is zipfile handle for root, dirs, files in os.walk(path): for file in files: ziph.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(path, '..'))) #create all csvs def create_all_csvs(): meta = db.metadata tables = meta.tables.keys() for table_name in tables: create_csv(meta.tables[table_name], table_name) # export CSV @app.route("/export_all_tables") def export_all_tables(): if not session.get("logged_in"): return redirect("/login") create_all_csvs() with ZipFile('zip_exports/all_tables.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression zipdir('exports/', zipf) return send_file("zip_exports/all_tables.zip", as_attachment=False, download_name="all_tables.zip") # export Database @app.route("/export_db") def export_db(): if not session.get("logged_in"): return redirect("/login") with ZipFile('zip_exports/db.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression zipdir('instance/', zipf) return send_file("zip_exports/db.zip", as_attachment=False, download_name="db.zip") # export CSV @app.route("/export_all_videos") def export_all_videos(): if not session.get("logged_in"): return redirect("/login") with ZipFile('zip_exports/all_videos.zip', 'w') as zipf: #no compression, need to add zipfile.ZIP_DEFLATED for compression zipdir('uploads/', zipf) return send_file("zip_exports/all_videos.zip", as_attachment=False, download_name="all_tables.zip") def create_csv(table, filename): filename = filename + ".csv" # Query all data from the table data = db.session.query(table).all() # Get the column names from the model column_names = [column.name for column in table.columns] # Open a CSV file and write data path = os.path.join(EXPORT_FOLDER, filename) with open(path, 'w', newline='') as file: writer = csv.writer(file) writer.writerow(column_names) # Write header for row in data: writer.writerow([getattr(row, column) for column in column_names]) # export CSV @app.route("/export_csv/") def export_csv(table_name): meta = db.metadata try: create_csv(meta.tables[table_name], table_name) except Exception as e: return f'Error occurred: {str(e)}', 500 return redirect("/") # the contents of all tables @app.route("/table_contents") def table_contents(): if not session.get("logged_in"): return redirect("/login") meta = db.metadata #meta.reflect(db.engine) # Uncomment this to also get the hidden tables, but this crashes rn tables = meta.tables.keys() table_contents = {} for table_name in tables: table = meta.tables[table_name] columns = table.columns.keys() #print(table) rows = db.session.query(table) try: rows = rows.all() except Exception as e: print("Error occurred: {e}".format(e=str(e))) print("Values:\n Table = {table}\n Tabletype = {tabletype}\n".format(table=table,tabletype=type(table))) table_contents[table_name] = { 'columns': columns, 'rows': rows } return render_template( "table_contents.html", table_contents=table_contents, ) @app.route('/show_tables') def show_tables(): if not session.get("logged_in"): return redirect("/login") meta = db.metadata meta.reflect(db.engine) tables = meta.tables return render_template('show_tables.html', tables=tables) @app.route("/manage_uploads") def manage_uploads(): if not session.get("logged_in"): return redirect("/login") videodir = "uploads/" videolist = os.listdir(videodir) num_videos = len(videolist) return render_template("manage_uploads.html", videolist=videolist, num_videos=num_videos) @app.route("/deleteuploads", methods=["POST"]) def deleteuploads(): if not session.get("logged_in"): return redirect("/login") print("deleting all videos") videodir = "uploads/" for video in os.listdir(videodir): os.remove(os.path.join(videodir, video)) print("videos deleted") return redirect("/all_links") @app.route("/task3") def task3(): return render_template("task3.html") # Root page ----------------------------- @app.route("/login", methods=["GET","POST"]) def login(): if request.method == "POST": pwhash = hashlib.sha256(request.form["password"].encode('utf-8')).hexdigest() if pwhash == PASSWORD: session["logged_in"] = True return redirect(url_for("all_links")) return render_template("login.html") @app.route("/logout") def logout(): session["logged_in"] = False return redirect("/") def has_no_empty_params(rule): defaults = rule.defaults if rule.defaults is not None else () arguments = rule.arguments if rule.arguments is not None else () return len(defaults) >= len(arguments) @app.route("/") def root(): return redirect("/start") @app.route("/all_links") def all_links(): links = [] for rule in app.url_map.iter_rules(): # Filter out rules we can't navigate to in a browser # and rules that require parameters if "GET" in rule.methods and has_no_empty_params(rule): url = url_for(rule.endpoint, **(rule.defaults or {})) links.append((url, rule.endpoint)) return render_template("all_links.html", links=links) # delete all tables as last link -------------------------- # Route to delete all entries #@app.route('/delete_json_tables', methods=['GET']) def delete_json_tables(): try: meta = db.metadata meta.reflect(db.engine) for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints if table.name != "user" and table.name != "response": print("Deleting Table: {name}".format(name=table.name)) db.session.execute(table.delete()) db.session.commit() return 'All entries deleted successfully' except Exception as e: # Rollback changes if any error occurs db.session.rollback() return f'Error occurred: {str(e)}', 500 finally: # Close the session db.session.close() # Route to delete all entries @app.route('/delete_all_entries', methods=['GET']) def delete_all_entries(): if not session.get("logged_in"): return redirect("/login") # here I could also use a "drop_all()", that works jsut like create all from the creation part # this together with the reflect could drop actually all tables try: meta = db.metadata for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints db.session.execute(table.delete()) db.session.commit() return 'All entries deleted successfully' except Exception as e: # Rollback changes if any error occurs db.session.rollback() return f'Error occurred: {str(e)}', 500 finally: # Close the session db.session.close() def create_app(): return app if __name__ == '__main__': app.run()