From f8f45523554fd279a94fae3774a3a6a19820f274 Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Tue, 21 Oct 2025 18:01:34 +0200 Subject: [PATCH] start new data flow --- rowers/dataflow.py | 149 +++++++++++++++++++++++++++++++++++ rowers/dataprep.py | 12 --- rowers/views/workoutviews.py | 1 + 3 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 rowers/dataflow.py diff --git a/rowers/dataflow.py b/rowers/dataflow.py new file mode 100644 index 00000000..e0486c03 --- /dev/null +++ b/rowers/dataflow.py @@ -0,0 +1,149 @@ +from rowers.celery import app +from rowers.utils import myqueue +import zipfile +import os +from rowingdata import get_file_type +import django_rq + +queue = django_rq.get_queue('default') +queuelow = django_rq.get_queue('low') +queuehigh = django_rq.get_queue('default') + +from django.conf import settings +from rowers.forms import DocumentsForm, TeamUploadOptionsForm +from rowers.models import TeamInviteForm, Workout +from uuid import uuid4 + +def generate_job_id(): + return str(uuid4()) + +def valid_uploadoptions(uploadoptions): + secret = uploadoptions.get('secret', '') + if secret != settings.UPLOAD_SERVICE_SECRET: + return False + + fstr = uploadoptions.get('file', None) + if fstr is None: + return False + + # check if file can be found + if not os.path.isfile(fstr): + return False + + form = DocumentsForm(uploadoptions) + optionsform = TeamUploadOptionsForm(uploadoptions) + rowerform = TeamInviteForm(uploadoptions) + rowerform.fields.pop('email') # we don't need email here + return form.is_valid() and optionsform.is_valid() and rowerform.is_valid() + +def is_zipfile(file_path): + fileformat = get_file_type(file_path) + return fileformat[0] == 'zip' + +def is_invalid_file(file_path): + fileformat = get_file_type(file_path) + if fileformat == "imageformat": + return False, "Image files are not supported for upload." + if fileformat == "json": + return False, "JSON files are not supported for upload." + if fileformat == "c2log": + return False, "Concept2 log files are not supported for upload." + if fileformat == "nostrokes": + return False, "No stroke data found in the file." + if fileformat == "kml": + return False, "KML files are not supported for upload." + if fileformat == "notgzip": + return False, "The gzip file appears to be corrupted." + if fileformat == "rowprolog": + return False, "RowPro logbook summary files are not supported for upload." + if fileformat == "gpx": + return False, "GPX files are not supported for upload." + if fileformat == "unknown": + extension = os.path.splitext(f2)[1] + filename = os.path.splitext(f2)[0] + if extension == '.gz': + filename = os.path.splitext(filename)[0] + extension2 = os.path.splitext(filename)[1]+extension + extension = extension2 + f4 = filename+'a'+extension + copyfile(f2, f4) + _ = myqueue(queuehigh, + handle_sendemail_unrecognized, + f4, + r.user.email) + return False, "The file format is not recognized or supported." + return True, "" + + +def upload_handler(filename, uploadoptions): + if not valid_uploadoptions(uploadoptions): + return { + "status": "error", + "job_id": None, + "message": "Invalid upload options or file not found." + } + is_valid, message = is_invalid_file(filename) + if not is_valid: + return { + "status": "error", + "job_id": None, + "message": message + } + if is_zipfile(file): + parent_job_id = generate_job_id() + _ = myqueue.enqueue( + queuehigh, + unzip_and_process, + filename, + uploadoptions, + parent_job_id) + return { + "status": "processing", + "job_id": parent_job_id, + "message": "Your zip file is being processed. You will be notified when it is complete." + } + job_id = generate_job_id() + _ = myqueue.enqueue( + queuehigh, + process_single_file, + filename, + uploadoptions, + job_id) + return { + "status": "processing", + "job_id": job_id, + "message": "Your file is being processed. You will be notified when it is complete." + } + +@app.task +def unzip_and_process(zip_filepath, uploadoptions, parent_job_id): + with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: + for id, filename in enumerate(zip_ref.namelist()): + datafile = zip_ref.extract(filename, path='media/') + if id > 0: + uploadoptions['title'] = uploadoptions['title'] + " Part {id+1}".format(id=id) + uploadoptions['file'] = datafile + job_id = generate_job_id() + _ = myqueue.enqueue( + queuehigh, + process_single_file, + datafile, + uploadoptions, + job_id) + + return { + "status": "completed", + "job_id": parent_job_id, + "message": "All files from the zip have been processed." + } + + +@app.task +def process_single_file(file_path, uploadoptions, job_id): + # placeholder + return True + + # process data to create df + + # + diff --git a/rowers/dataprep.py b/rowers/dataprep.py index fb58aee3..5081c2f7 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -1656,18 +1656,6 @@ def new_workout_from_file(r, f2, if fileformat == 'unknown': # pragma: no cover message = "We couldn't recognize the file type" - extension = os.path.splitext(f2)[1] - filename = os.path.splitext(f2)[0] - if extension == '.gz': - filename = os.path.splitext(filename)[0] - extension2 = os.path.splitext(filename)[1]+extension - extension = extension2 - f4 = filename+'a'+extension - copyfile(f2, f4) - _ = myqueue(queuehigh, - handle_sendemail_unrecognized, - f4, - r.user.email) return (0, message, f2) diff --git a/rowers/views/workoutviews.py b/rowers/views/workoutviews.py index 312c0742..a2259104 100644 --- a/rowers/views/workoutviews.py +++ b/rowers/views/workoutviews.py @@ -23,6 +23,7 @@ def default(o): # pragma: no cover return int(o) raise TypeError +from rowers.dataflow import upload_handler def get_video_id(url): """Returns Video_ID extracting from the given url of Youtube