import sys import json import random import base64 from flask import Flask, redirect, url_for, request, session, make_response, jsonify, send_from_directory from flask import render_template from flask_sqlalchemy import SQLAlchemy from sqlalchemy import Integer, String, Column from datetime import datetime import uuid from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import inspect from sqlalchemy.orm import DeclarativeBase random_order = True # activate environment: cd C:\Users\Jan\Google Drive\Master Stuff\Code\SLAEForms Testing\.venv\Scripts\ # then this: activate #SETUP-------------------------------------------------- #Set up sqlalchemy class Base(DeclarativeBase): pass db = SQLAlchemy(model_class=Base) #create the app app = Flask(__name__) # configure the database, give it a path (it will be in the instances folder) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db" db.init_app(app) #set the secret key (TODO change this for final deployment) app.secret_key = b"29fe9e8edd407c5491d4f1c05632d9fa33e26ed8734a3f5e080ebac3772a555a" #---------testing JSON Stuff #open the json file with the config testconfigfile = open("singleformconfig.json") #convert it to dict testconfig = json.load(testconfigfile) testconfigfile.close() # get the questions: Questions is a list that contains the keys of the dictionary questions = list(testconfig) #------------------------------------------------------------------------ # Setting up DB Models, response -> should be removed eventually and replaced by json files and automatic generation ---------------------------- # create the model for the response table class Response(db.Model): id = db.Column("id",db.UUID(as_uuid=True), primary_key=True, nullable=False) user_id = db.Column("user_id",db.UUID(as_uuid=True), nullable=False) question_title = db.Column("question_title",db.String(30)) likert_result = db.Column("likert_result",db.Integer, nullable=False) notes = db.Column("notes",db.String(200)) date_created = db.Column("date_created",db.DateTime) def __repr__(self) -> str: return "" % self.id # This table is always created class User(db.Model): user_id = db.Column("user_id",db.UUID(as_uuid=True), primary_key=True, nullable=False) device_id = db.Column("device_id",db.UUID(as_uuid=True), nullable=False) question_order = db.Column("question_order",db.String(60)) date_created = db.Column("date_created",db.DateTime, default=datetime.today()) # todo test if this default works def __repr__(self) -> str: return "" % self.user_id # create the table (existing tables are not overwritten) try: with app.app_context(): db.create_all() except SQLAlchemyError as e: print("Error occurred during database creation:", str(e)) # -------Testing the actual final form from json------------------------------------------------ #open the json file with the config configfile = open("default.json") #todo replace with other name #convert it to dict config = json.load(configfile) configfile.close() db_tables = {} # contains all dynamically created tables, key = table/classname #TODO insert code to create all tables def create_json_tables(): print(config) for block_key, block_content in config.items(): if "database_table" in block_content: print("debug") if not (block_content["database_table"]["table_name"] in db_tables): print("New table: \n {table}".format(table=block_content["database_table"]["table_name"])) db_tables[block_content["database_table"]["table_name"]]=create_model_class(block_content["database_table"]) print("created table: {table}".format(table=db_tables[block_content["database_table"]["table_name"]])) print("tables in db_tables: \n {db_tables}".format(db_tables=db_tables)) def create_model_class(schema): class_name = schema["table_name"].capitalize() print("creating table class: {class_name}".format(class_name=class_name)) # Define class attributes dynamically attributes = {"__tablename__": schema["table_name"]} # id as key and date as standard fields attributes["id"] = Column("id",db.UUID(as_uuid=True), primary_key=True, nullable=False) attributes["user_id"] = Column("user_id",db.UUID(as_uuid=True), primary_key=True, nullable=False) attributes["date_created"] = db.Column("date_created",db.DateTime) for column_name, column_info in schema["fields"].items(): if column_info["type"] == "integer": column_type = Integer elif column_info["type"] == "string": column_type = String(int(column_info["size"])) attributes[column_name] = Column(column_name,column_type, nullable=column_info["nullable"]) print("attributes of the table: ",attributes) # Create the model class return type(class_name, (db.Model,), attributes) create_json_tables() # create the table (existing tables are not overwritten) try: print("try to create tables") with app.app_context(): db.create_all() print("successfully created all tables") except SQLAlchemyError as e: print("Error occurred during database creation:", str(e)) @app.route("/teststart", methods=["GET", "POST"]) def teststartpage(): if not "slaeform_device_id" in session: # If this device was not seen, remember it. new_device_id = uuid.uuid4() session["slaeform_device_id"] = new_device_id session["agreed_to_tos"] = False if request.method == "POST": #right now if a user that has an active session goes to the startpage again and accepts tos #it will just start another session and discard the previous session #config is the dict with the info #get the block names block_names = config.keys() session["block_names"] = list(block_names) session["block_order"] = {} # only for templates, for each block the list of keys for stimuli session["current_block_index"] = 0 session["current_stimulus_index"] = 0 session["current_block_name"] = session["block_names"][session["current_block_index"]] session["number_of_blocks"] = len(session["block_names"]) current_block = config[session["current_block_name"]] # if the block has stimuli, get how many if "stimuli" in current_block: session["number_of_stimuli"] = len(list(current_block["stimuli"]["list"])) print("number of blocks: ",len(session["block_names"])) print("Startpage post") print(session["block_names"]) for name in block_names: if config[name]["type"] == "TaskTemplate": match config[name]["stimuli"]["type"]: case "single_video": order = list(config[name]["stimuli"]["list"]) # order = list of simuli keys print("order: ",order) if config[name]["stimuli"]["order"] == "random": random.shuffle(order) #in random order session["block_order"][name] = order #save the new user to the database and the session session["agreed_to_tos"] = True new_user_id = uuid.uuid4() session["slaeform_user_id"] = new_user_id device_id = session["slaeform_device_id"] user_id = new_user_id question_order = str(session["block_order"]) date = datetime.today() new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date) #,question_order=question_order db.session.add(new_user) db.session.commit() print("block order: {order}".format(order=session["block_order"])) try: db.session.add(new_user) db.session.commit() except: return "There was a problem while adding the user to the Database" return redirect("/jsonform") return render_template( "teststartpage.html" ) @app.route("/jsonform") def jsonform(): #user is not yet registered and should not be here if not "slaeform_user_id" in session: return redirect(url_for(teststartpage)) #TODO replace this later with actual startpage current_block = config[session["current_block_name"]] print("jsonform") print("current_block_name: {current_block_name}".format(current_block_name=session["current_block_name"])) print("current_block_index: {current_block_order}".format(current_block_order=session["current_block_index"])) print("current_stimulus: {current_stimulus}".format(current_stimulus=session["current_stimulus_index"])) #print("current Blockname: {blockname}, current block index: {blockindex}, current stim index: {stimulusindex}".format(blockname=session["current_block_name"], # blockindex=session["current_block_index"], # stimulusindex=session["current_stimulus_index"] ) ) # erster Fall: SinglePage if current_block["type"] == "SinglePage": return render_template(current_block["template"]) # ansonsten, templates: current_block_order = session["block_order"][session["current_block_name"]] current_block_stimuli = current_block["stimuli"] current_stimulus = current_block_order[session["current_stimulus_index"]] stimulus_type=current_block["stimuli"]["type"] stimulus_configuration = current_block["stimuli"]["configuration"] # dict with the config if current_block["type"] == "TaskTemplate": print("case: TaskTemplate") match stimulus_type: case "single_video": stimulus_configuration["video_url"] = config[session["current_block_name"]]["stimuli"]["list"][current_stimulus] print("-------------videourl: ", stimulus_configuration["video_url"]) return render_template( "standard_template.html", stimuli=current_block_stimuli, stimulus_type=stimulus_type, current_stimulus=current_stimulus, stimulus_configuration=stimulus_configuration, questions=current_block["questions"] ) return "Error, none of the Blocks triggered" @app.route("/send_json", methods=["POST"]) def sendpage_json(): print("send_json") # Do I need to write to a table at all? # I can figure it out by checking if the current block has a database field, that is best if not ("database_table" in config[session["current_block_name"]]): #it has no database field, so nothing to receive # so just move on print("no database table") update_session() return redirect("/jsonform") # now to if it has a database field # find out which table we need to write to table_name = config[session["current_block_name"]]["database_table"]["table_name"] print("Form posted: {rqform}".format(rqform=request.form)) print("Writing to table: {table_name}".format(table_name=table_name)) session_user_id = session["slaeform_user_id"] new_id = uuid.uuid4() date = datetime.today() new_entry = db_tables[table_name](id=new_id,user_id = session_user_id,date_created = date) for key, value in request.form.items(): print("hasattr key: ", key) if hasattr(new_entry, key): print("key exists: ", key) setattr(new_entry, key, value) print("setattr value: ", value) print("entry: ", new_entry) try: db.session.add(new_entry) db.session.commit() except Exception as e: print("Error occurred: {e}".format(e=str(e))) return "There was a problem while adding the response to the Database" # Now move to the next stimulus or block update_session() return redirect("/jsonform") def update_session(): if "stimuli" in config[session["current_block_name"]]: # if there are stimuli in this block if session["current_stimulus_index"] < session["number_of_stimuli"]-1: # if there are still stimuli left, keep going through them session["current_stimulus_index"] += 1 else: # if there are no stimuli left.. if(session["current_block_index"] < session["number_of_blocks"]-1): # go to next block if possible session["current_block_index"] += 1 session["current_block_name"] = session["block_names"][session["current_block_index"]] session["number_of_stimuli"] = len(list(config[session["current_block_name"]]["list"])) session["current_stimulus_index"] = 0 else: # if there arent any stimuli, go to the next block session["number_of_stimuli"] = 0 if(session["current_block_index"] < session["number_of_blocks"]-1): session["current_block_index"] += 1 session["current_block_name"] = session["block_names"][session["current_block_index"]] print("---Session updated---") print("current_block_index / number_of_blocks: {current_block_index} / {number_of_blocks}".format(current_block_index=session["current_block_index"],number_of_blocks=session["number_of_blocks"])) print("current_block_name: ", session["current_block_name"]) print("current_stimulus_index: ", session["current_stimulus_index"]) print("Current number_of_stimuli: ", session["number_of_stimuli"]) # Actual main code for Form etc -------------------------------------------------------------- @app.route("/video") def videopage(): return render_template( #"videorecorder3.html" "myvideotemplate.html" ) @app.route("/send_video", methods=["POST"]) def send_video(): data_url = request.json['dataUrl'] data = data_url.split(',')[1] with open('video.webm', 'wb') as f: f.write(base64.b64decode(data)) return jsonify({'message': 'Video saved successfully'}) @app.route("/send", methods=["POST"]) def sendpage(): session_user_id = session["slaeform_user_id"] likert_score = request.form["likertscale"] text_input = request.form["feedback"] question_title = session["current_question"] new_id = uuid.uuid4() date = datetime.today() print("new idea: {new_id} ".format(new_id=new_id)) new_response = Response(id=new_id,user_id = session_user_id, question_title = question_title,likert_result = likert_score,notes = text_input, date_created = date) try: db.session.add(new_response) db.session.commit() return redirect("/form") except: return "There was a problem while adding the response to the Database" #TODO this is bugged rn, it will skip to the next question when refreshed. #Move the removal of the first element to the send function to fix this @app.route("/form", methods=["GET", "POST"]) def formpage(): #user is not yet registered and should not be here if not "slaeform_user_id" in session: return redirect("/start") #TODO fill in code that determins at which question the user is print("form starts, the sessionorder rn:") print(session["question_order"]) if not session["question_order"]: print("---------------question order is empty------------") return redirect("/data") print("pop the first element") current_question = session["question_order"].pop(0) session["current_question"] = current_question print(current_question) print("the new sessionorder rn:") print(session["question_order"]) return render_template( "layout2.html", config=testconfig, current_question = current_question, videotype=testconfig[current_question]["type"], video_url= testconfig[current_question]["video1"], blocks = testconfig[current_question]["blocks"] ) @app.route("/start", methods=["GET", "POST"]) def startpage(): if not "slaeform_device_id" in session: # If this device was not seen, remember it. new_device_id = uuid.uuid4() session["slaeform_device_id"] = new_device_id session["agreed_to_tos"] = False if request.method == "POST": #right now if a user that has an active session goes to the startpage again and accepts tos #it will just start another session and discard the previous session # get a random question order if random_order: order = questions random.shuffle(order) else: order = questions session["question_order"] = order #save the new user to the database and the session session["agreed_to_tos"] = True new_user_id = uuid.uuid4() session["slaeform_user_id"] = new_user_id device_id = session["slaeform_device_id"] user_id = new_user_id question_order = str(order) date = datetime.today() new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date) #,question_order=question_order db.session.add(new_user) db.session.commit() return redirect("/form") """ try: db.session.add(new_user) db.session.commit() return redirect("/form") except: return "There was a problem while adding the user to the Database" """ return render_template( "startpage.html" ) # Database stuff------------------------------------------------------------------------------ # the contents of all tables @app.route("/table_contents") def table_contents(): meta = db.metadata #meta.reflect(db.engine) # Uncomment this to also get the hidden tables, but this crashes rn tables = meta.tables.keys() table_contents = {} #print(tables) for table_name in tables: table = meta.tables[table_name] columns = table.columns.keys() #print(table) rows = db.session.query(table) try: rows = rows.all() except Exception as e: print("Error occurred: {e}".format(e=str(e))) print("Values:\n Table = {table}\n Tabletype = {tabletype}\n".format(table=table,tabletype=type(table))) table_contents[table_name] = { 'columns': columns, 'rows': rows } return render_template( "table_contents.html", table_contents=table_contents, ) @app.route('/show_tables') def show_tables(): meta = db.metadata meta.reflect(db.engine) tables = meta.tables return render_template('show_tables.html', tables=tables) # Root page ----------------------------- def has_no_empty_params(rule): defaults = rule.defaults if rule.defaults is not None else () arguments = rule.arguments if rule.arguments is not None else () return len(defaults) >= len(arguments) @app.route("/") def all_links(): links = [] for rule in app.url_map.iter_rules(): # Filter out rules we can't navigate to in a browser # and rules that require parameters if "GET" in rule.methods and has_no_empty_params(rule): url = url_for(rule.endpoint, **(rule.defaults or {})) links.append((url, rule.endpoint)) return render_template("all_links.html", links=links) # delete all tables as last link -------------------------- # Route to delete all entries @app.route('/delete_json_tables', methods=['GET']) def delete_json_tables(): try: meta = db.metadata meta.reflect(db.engine) for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints if table.name != "user" and table.name != "response": print("Deleting Table: {name}".format(name=table.name)) db.session.execute(table.delete()) db.session.commit() return 'All entries deleted successfully' except Exception as e: # Rollback changes if any error occurs db.session.rollback() return f'Error occurred: {str(e)}', 500 finally: # Close the session db.session.close() # Route to delete all entries @app.route('/delete_all_entries', methods=['GET']) def delete_all_entries(): # here I could also use a "drop_all()", that works jsut like create all from the creation part # this together with the reflect could drop actually all tables try: meta = db.metadata for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints db.session.execute(table.delete()) db.session.commit() return 'All entries deleted successfully' except Exception as e: # Rollback changes if any error occurs db.session.rollback() return f'Error occurred: {str(e)}', 500 finally: # Close the session db.session.close() if __name__ == '__main__': app.run(debug=True)