383 lines
12 KiB
Python
383 lines
12 KiB
Python
import sys
|
|
import json
|
|
import random
|
|
import base64
|
|
from flask import Flask, redirect, url_for, request, session, make_response, jsonify, send_from_directory
|
|
from flask import render_template
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import Integer, String, Column
|
|
from datetime import datetime
|
|
import uuid
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy import inspect
|
|
from sqlalchemy.orm import DeclarativeBase
|
|
|
|
random_order = True
|
|
# activate environment: cd C:\Users\Jan\Google Drive\Master Stuff\Code\SLAEForms Testing\.venv\Scripts\
|
|
# then this: activate
|
|
|
|
#SETUP--------------------------------------------------
|
|
|
|
#Set up sqlalchemy
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
db = SQLAlchemy(model_class=Base)
|
|
|
|
#create the app
|
|
app = Flask(__name__)
|
|
# configure the database, give it a path (it will be in the instances folder)
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///database.db"
|
|
db.init_app(app)
|
|
|
|
#set the secret key (TODO change this for final deployment)
|
|
app.secret_key = b"29fe9e8edd407c5491d4f1c05632d9fa33e26ed8734a3f5e080ebac3772a555a"
|
|
|
|
|
|
|
|
|
|
#---------testing
|
|
#open the json file with the config
|
|
configfile = open("singleformconfig.json")
|
|
configfile2 = open("pairwiseformconfig.json")
|
|
#convert it to dict
|
|
config = json.load(configfile)
|
|
config2 = json.load(configfile2)
|
|
configfile.close()
|
|
configfile2.close()
|
|
# get the questions: Questions is a list that contains the keys of the dictionary
|
|
questions = list(config)
|
|
|
|
# JSON TEST-------------------------------------------
|
|
configtest = open("test.json")
|
|
#config = json.load(configtest) #convert json to dict
|
|
configtest.close()
|
|
#blocks = list(config) # get the block names, aka a list of all keys
|
|
|
|
|
|
|
|
#--------temporär der code für
|
|
database_schema = {
|
|
"table_name": "userstest",
|
|
"id": {"type": "integer", "nullable": False},
|
|
"username": {"type": "string", "nullable": False, "size": 20},
|
|
"age": {"type": "integer", "nullable": True}
|
|
}
|
|
|
|
tablename = database_schema["table_name"].capitalize()
|
|
|
|
# test function to create tables from dicts
|
|
def create_model_class(schema):
|
|
class_name = schema["table_name"].capitalize()
|
|
|
|
# Define class attributes dynamically
|
|
attributes = {"__tablename__": schema["table_name"]}
|
|
|
|
attributes["uid"] = Column(db.UUID(as_uuid=True), primary_key=True, nullable=False)
|
|
|
|
|
|
for column_name, column_info in schema.items():
|
|
if column_name != "table_name":
|
|
if column_info["type"] == "integer":
|
|
column_type = Integer
|
|
elif column_info["type"] == "string":
|
|
column_type = String(column_info["size"])
|
|
|
|
attributes[column_name] = Column(column_type, nullable=column_info["nullable"])
|
|
|
|
# Create the model class
|
|
return type(class_name, (db.Model,), attributes)
|
|
|
|
print("creating the userstest table")
|
|
customtable = create_model_class(database_schema)
|
|
try:
|
|
with app.app_context():
|
|
print("try to create tables")
|
|
db.create_all()
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database creation:", str(e))
|
|
|
|
@app.route("/customsendtest/", methods=["POST"])
|
|
def customsendtest():
|
|
# Extract form data
|
|
form_data = request.form
|
|
|
|
# Create a new instance of the dynamically generated model
|
|
new_user = customtable()
|
|
|
|
# Assign form data to model attributes
|
|
for key, value in form_data.items():
|
|
if hasattr(new_user, key):
|
|
setattr(new_user, key, value)
|
|
new_id = uuid.uuid4()
|
|
setattr(new_user, "uid",new_id)
|
|
|
|
# Add new user to the database session and commit changes
|
|
try:
|
|
#print("new idea: {new_id} ".format(new_id=new_id))
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return 'Data submitted successfully!'
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database commit:", str(e))
|
|
return 'Data not submitted successfully!'
|
|
|
|
|
|
|
|
@app.route("/custom")
|
|
def custom():
|
|
try:
|
|
with app.app_context():
|
|
db.create_all()
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database creation:", str(e))
|
|
return render_template(
|
|
"templatetest1.html"
|
|
)
|
|
|
|
@app.route("/customsend/<inputid>", methods=["POST"])
|
|
def customsend():
|
|
|
|
session_user_id = session["slaeform_user_id"]
|
|
likert_score = request.form["likertscale"]
|
|
text_input = request.form["feedback"]
|
|
question_title = session["current_question"]
|
|
new_id = uuid.uuid4()
|
|
date = datetime.today()
|
|
print("new idea: {new_id} ".format(new_id=new_id))
|
|
new_response = Response(id=new_id,user_id = session_user_id, question_title = question_title,likert_result = likert_score,notes = text_input, date_created = date)
|
|
|
|
try:
|
|
db.session.add(new_response)
|
|
db.session.commit()
|
|
return redirect("/form")
|
|
except:
|
|
return "There was a problem while adding the response to the Database"
|
|
|
|
|
|
# End testing-------------------------------------
|
|
|
|
#------------------------------------------------------------------------
|
|
# Setting up DB Models -> should be removed eventually and replaced by json files and automatic generation ----------------------------
|
|
|
|
# create the model for the response table
|
|
class Response(db.Model):
|
|
id = db.Column(db.UUID(as_uuid=True), primary_key=True, nullable=False)
|
|
user_id = db.Column(db.UUID(as_uuid=True), nullable=False)
|
|
question_title = db.Column(db.String(30))
|
|
likert_result = db.Column(db.Integer, nullable=False)
|
|
notes = db.Column(db.String(200))
|
|
date_created = db.Column(db.DateTime)
|
|
def __repr__(self) -> str:
|
|
return "<Response %r>" % self.id
|
|
|
|
# This table is created always
|
|
class User(db.Model):
|
|
user_id = db.Column(db.UUID(as_uuid=True), primary_key=True, nullable=False)
|
|
device_id = db.Column(db.UUID(as_uuid=True), nullable=False)
|
|
question_order = db.Column(db.String(60))
|
|
date_created = db.Column(db.DateTime, default=datetime.today())
|
|
def __repr__(self) -> str:
|
|
return "<User %r>" % self.user_id
|
|
|
|
# create the table (existing tables are not overwritten)
|
|
try:
|
|
with app.app_context():
|
|
db.create_all()
|
|
except SQLAlchemyError as e:
|
|
print("Error occurred during database creation:", str(e))
|
|
|
|
|
|
|
|
# Actual main code for Form etc -----------------------------------------------------
|
|
|
|
@app.route("/video", methods=["GET", "POST"])
|
|
def videopage():
|
|
|
|
return render_template(
|
|
#"videorecorder3.html"
|
|
"myvideotemplate.html"
|
|
)
|
|
|
|
@app.route("/send_video", methods=["POST"])
|
|
def send_video():
|
|
data_url = request.json['dataUrl']
|
|
data = data_url.split(',')[1]
|
|
with open('video.webm', 'wb') as f:
|
|
f.write(base64.b64decode(data))
|
|
return jsonify({'message': 'Video saved successfully'})
|
|
|
|
|
|
|
|
@app.route("/send", methods=["POST"])
|
|
def sendpage():
|
|
session_user_id = session["slaeform_user_id"]
|
|
likert_score = request.form["likertscale"]
|
|
text_input = request.form["feedback"]
|
|
question_title = session["current_question"]
|
|
new_id = uuid.uuid4()
|
|
date = datetime.today()
|
|
print("new idea: {new_id} ".format(new_id=new_id))
|
|
new_response = Response(id=new_id,user_id = session_user_id, question_title = question_title,likert_result = likert_score,notes = text_input, date_created = date)
|
|
|
|
try:
|
|
db.session.add(new_response)
|
|
db.session.commit()
|
|
return redirect("/form")
|
|
except:
|
|
return "There was a problem while adding the response to the Database"
|
|
|
|
|
|
|
|
@app.route("/form", methods=["GET", "POST"]) # /<username> should not even be needed right?
|
|
def formpage():
|
|
#user is not yet registered and should not be here
|
|
if not "slaeform_user_id" in session:
|
|
return redirect("/start")
|
|
|
|
|
|
#TODO fill in code that determins at which question the user is
|
|
print("form starts, the sessionorder rn:")
|
|
print(session["question_order"])
|
|
if not session["question_order"]:
|
|
print("---------------question order is empty------------")
|
|
return redirect("/data")
|
|
|
|
print("pop the first element")
|
|
current_question = session["question_order"].pop(0)
|
|
session["current_question"] = current_question
|
|
print(current_question)
|
|
print("the new sessionorder rn:")
|
|
print(session["question_order"])
|
|
session["question_order"] = session["question_order"]
|
|
print("has this changed sth?:")
|
|
print(session["question_order"])
|
|
|
|
|
|
|
|
return render_template(
|
|
"layout2.html",
|
|
config=config,
|
|
current_question = current_question,
|
|
videotype=config[current_question]["type"],
|
|
video_url= config[current_question]["video1"],
|
|
blocks = config[current_question]["blocks"]
|
|
)
|
|
|
|
|
|
@app.route("/start", methods=["GET", "POST"])
|
|
def startpage():
|
|
if not "slaeform_device_id" in session:
|
|
# If this device was not seen, remember it.
|
|
new_device_id = uuid.uuid4()
|
|
session["slaeform_device_id"] = new_device_id
|
|
session["agreed_to_tos"] = False
|
|
|
|
if request.method == "POST":
|
|
#right now if a user that has an active session goes to the startpage again and accepts tos
|
|
#it will just start another session and discard the previous session
|
|
|
|
# get a random question order
|
|
if random_order:
|
|
order = questions
|
|
random.shuffle(order)
|
|
else:
|
|
order = questions
|
|
session["question_order"] = order
|
|
|
|
|
|
#save the new user to the database and the session
|
|
session["agreed_to_tos"] = True
|
|
new_user_id = uuid.uuid4()
|
|
session["slaeform_user_id"] = new_user_id
|
|
device_id = session["slaeform_device_id"]
|
|
user_id = new_user_id
|
|
question_order = str(order)
|
|
date = datetime.today()
|
|
new_user = User(user_id=user_id, device_id=device_id,question_order=question_order,date_created = date) #,question_order=question_order
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return redirect("/form")
|
|
"""
|
|
try:
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
return redirect("/form")
|
|
except:
|
|
return "There was a problem while adding the user to the Database" """
|
|
|
|
return render_template(
|
|
"startpage.html"
|
|
)
|
|
|
|
|
|
# Database stuff------------------------------------
|
|
|
|
# the contents of all tables
|
|
@app.route("/table_contents")
|
|
def table_contents():
|
|
tables = db.metadata.tables.keys()
|
|
table_contents = {}
|
|
|
|
for table_name in tables:
|
|
table = db.metadata.tables[table_name]
|
|
columns = table.columns.keys()
|
|
rows = db.session.query(table).all()
|
|
|
|
table_contents[table_name] = {
|
|
'columns': columns,
|
|
'rows': rows
|
|
}
|
|
|
|
return render_template(
|
|
"table_contents.html",
|
|
table_contents=table_contents,
|
|
)
|
|
|
|
# Route to delete all entries
|
|
@app.route('/delete_all_entries', methods=['GET'])
|
|
def delete_all_entries():
|
|
try:
|
|
meta = db.metadata
|
|
for table in reversed(meta.sorted_tables): # Iterate through tables in reverse order to handle foreign key constraints
|
|
db.session.execute(table.delete())
|
|
db.session.commit()
|
|
return 'All entries deleted successfully'
|
|
except Exception as e:
|
|
# Rollback changes if any error occurs
|
|
db.session.rollback()
|
|
return f'Error occurred: {str(e)}', 500
|
|
finally:
|
|
# Close the session
|
|
db.session.close()
|
|
|
|
@app.route('/show_tables')
|
|
def show_tables():
|
|
tables = db.metadata.tables
|
|
return render_template('show_tables.html', tables=tables)
|
|
|
|
# Root page -----------------------------
|
|
|
|
def has_no_empty_params(rule):
|
|
defaults = rule.defaults if rule.defaults is not None else ()
|
|
arguments = rule.arguments if rule.arguments is not None else ()
|
|
return len(defaults) >= len(arguments)
|
|
|
|
@app.route("/")
|
|
def all_links():
|
|
links = []
|
|
for rule in app.url_map.iter_rules():
|
|
# Filter out rules we can't navigate to in a browser
|
|
# and rules that require parameters
|
|
if "GET" in rule.methods and has_no_empty_params(rule):
|
|
url = url_for(rule.endpoint, **(rule.defaults or {}))
|
|
links.append((url, rule.endpoint))
|
|
return render_template("all_links.html", links=links)
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Create database tables
|
|
db.create_all() |