diff --git a/rowers/dataflow.py b/rowers/dataflow.py index e0486c03..92d3500b 100644 --- a/rowers/dataflow.py +++ b/rowers/dataflow.py @@ -4,16 +4,66 @@ import zipfile import os from rowingdata import get_file_type import django_rq +from shutil import copyfile +from time import strftime +import numpy as np +from scipy.signal import find_peaks, savgol_filter +import pandas as pd +import datetime +import math + +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" +from YamJam import yamjam +CFG = yamjam()['rowsandallapp'] + +try: + os.environ.setdefault("DJANGO_SETTINGS_MODULE",CFG['settings_name']) +except KeyError: # pragma: no cover + os.environ.setdefault("DJANGO_SETTINGS_MODULE","rowsandall_app.settings") + +from django.core.wsgi import get_wsgi_application +application = get_wsgi_application() queue = django_rq.get_queue('default') queuelow = django_rq.get_queue('low') queuehigh = django_rq.get_queue('default') from django.conf import settings +from django.urls import reverse +from django.utils import timezone as tz + from rowers.forms import DocumentsForm, TeamUploadOptionsForm -from rowers.models import TeamInviteForm, Workout +from rowers.models import TeamInviteForm, Workout, User, Rower, Team +from rowers.opaque import encoder +from rowers import uploads + +from rowingdata import rower as rrower + +from rowers.dataroutines import rdata, get_startdate_time_zone, df_resample, checkduplicates, dataplep +from rowers.mytypes import otetypes, otwtypes +from rowers.utils import totaltime_sec_to_string +from rowers.dataprep import check_marker, checkbreakthrough, update_wps +from rowers.emails import send_confirm +from rowers.tasks import handle_sendemail_unrecognized, handle_sendemail_breakthrough, handle_sendemail_hard, handle_calctrimp + from uuid import uuid4 +def getrower(user): + try: + if user is None or user.is_anonymous: + return None + except AttributeError: # pragma: no cover + if User.objects.get(id=user).is_anonymous: + return None + try: + r = Rower.objects.get(user=user) + except Rower.DoesNotExist: # pragma: no cover: + r = Rower(user=user) + r.save() + + return r + + def generate_job_id(): return str(uuid4()) @@ -75,7 +125,7 @@ def is_invalid_file(file_path): return True, "" -def upload_handler(filename, uploadoptions): +def upload_handler(uploadoptions, filename): if not valid_uploadoptions(uploadoptions): return { "status": "error", @@ -84,14 +134,15 @@ def upload_handler(filename, uploadoptions): } is_valid, message = is_invalid_file(filename) if not is_valid: + os.remove(filename) return { "status": "error", "job_id": None, "message": message } - if is_zipfile(file): + if is_zipfile(filename): parent_job_id = generate_job_id() - _ = myqueue.enqueue( + _ = myqueue( queuehigh, unzip_and_process, filename, @@ -103,7 +154,7 @@ def upload_handler(filename, uploadoptions): "message": "Your zip file is being processed. You will be notified when it is complete." } job_id = generate_job_id() - _ = myqueue.enqueue( + _ = myqueue( queuehigh, process_single_file, filename, @@ -116,7 +167,7 @@ def upload_handler(filename, uploadoptions): } @app.task -def unzip_and_process(zip_filepath, uploadoptions, parent_job_id): +def unzip_and_process(zip_filepath, uploadoptions, parent_job_id, debug=False, **kwargs): with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: for id, filename in enumerate(zip_ref.namelist()): datafile = zip_ref.extract(filename, path='media/') @@ -124,7 +175,7 @@ def unzip_and_process(zip_filepath, uploadoptions, parent_job_id): uploadoptions['title'] = uploadoptions['title'] + " Part {id+1}".format(id=id) uploadoptions['file'] = datafile job_id = generate_job_id() - _ = myqueue.enqueue( + _ = myqueue( queuehigh, process_single_file, datafile, @@ -137,13 +188,452 @@ def unzip_and_process(zip_filepath, uploadoptions, parent_job_id): "message": "All files from the zip have been processed." } +def get_rower_from_uploadoptions(uploadoptions): + rowerform = TeamInviteForm(uploadoptions) + if not rowerform.is_valid(): + return None + try: + u = rowerform.cleaned_data['user'] + r = getrower(u) + except KeyError: + if 'useremail' in uploadoptions: + us = User.objects.filter(email=uploadoptions['useremail']) + if len(us): + u = us[0] + r = getrower(u) + else: + r = None + for rwr in Rower.objects.all(): + if rwr.emailalternatives is not None: + if uploadoptions['useremail'] in rwr.emailalternatives: + r = rwr + break + return r +def check_and_fix_samplerate(row, file_path): + # implement sample rate check and fix here + dtavg = row.df['TimeStamp (sec)'].diff().mean() + if dtavg < 1: + newdf = df_resample(row.df) + try: + os.remove(file_path) + except Exception: + pass + row = rrdata(df=newdf) + row.write_csv(file_path, gzip=True) + return row, file_path + +def is_water_rowing(df): + lat = df[' latitude'] + if lat.mean() != 0 and lat.std() != 0: + return True + +def remove_negative_power_peaks(row): + x = row.df[' Power (watts)'].values + x = x * - 1 + neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value + + row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] + x = row.df[' Power (watts)'].values + x = x * - 1 + neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value + + row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] + + return row + +def do_smooth(row, f2): + # implement smoothing here if needed + pace = row.df[' Stroke500mPace (sec/500m)'].values + velo = 500. / pace + + f = row.df['TimeStamp (sec)'].diff().mean() + if f != 0 and not np.isnan(f): + windowsize = 2 * (int(10. / (f))) + 1 + else: # pragma: no cover + windowsize = 1 + if 'originalvelo' not in row.df: + row.df['originalvelo'] = velo + + if windowsize > 3 and windowsize < len(velo): + velo2 = savgol_filter(velo, windowsize, 3) + else: # pragma: no cover + velo2 = velo + + velo3 = pd.Series(velo2, dtype='float') + velo3 = velo3.replace([-np.inf, np.inf], np.nan) + velo3 = velo3.fillna(method='ffill') + + pace2 = 500. / abs(velo3) + + row.df[' Stroke500mPace (sec/500m)'] = pace2 + + row.df = row.df.fillna(0) + + row.write_csv(f2, gzip=True) + try: + os.remove(f2) + except: + pass + + return row + +def update_workout_attributes(w, row, file_path, uploadoptions, + startdatetime='', + timezone='', forceunit='lbs'): + + # calculate + startdatetime, startdate, starttime, timezone_str, partofday = get_startdate_time_zone( + w.user, row, startdatetime=startdatetime, timezone=timezone + ) + + boattype = uploadoptions.get('boattype', '1x') + workoutsource = 'unknown' + stravaid = uploadoptions.get('stravaid', 0) + rpe = uploadoptions.get('rpe', 0) + notes = uploadoptions.get('notes', '') + inboard = uploadoptions.get('inboard', 0.88) + oarlength = uploadoptions.get('oarlength', 2.89) + useImpeller = uploadoptions.get('useImpeller', False) + seatnumber = uploadoptions.get('seatNumber', 1) + boatname = uploadoptions.get('boatName','') + portStarboard = uploadoptions.get('portStarboard', 1) + empowerside = 'port' + + if portStarboard == 1: + empowerside = 'starboard' + + stravaid = uploadoptions.get('stravaid','') + if stravaid != 0: + workoutsource = 'strava' + + workouttype = uploadoptions.get('workouttype', 'rower') + title = uploadoptions.get('title', '') + if title is None or title == '': + title = 'Workout' + if partofday is not None: + title = '{partofday} {workouttype}'.format( + partofday=partofday, + workouttype=workouttype, + ) + averagehr = row.df[' HRCur (bpm)'].mean() + maxhr = row.df[' HRCur (bpm)'].max() + + totaldist = uploadoptions.get('distance', 0) + if totaldist == 0: + totaldist = row.df['cum_dist'].max() + + totaltime = uploadoptions.get('duration', 0) + if totaltime == 0: + totaltime = row.df['TimeStamp (sec)'].max() - row.df['TimeStamp (sec)'].min() + try: + totaltime = totaltime + row.df.loc[:, ' ElapsedTime (sec)'].iloc[0] + except KeyError: + pass + + if np.isnan(totaltime): + totaltime = 0 + + if uploadoptions.get('summary', '') == '': + summary = row.allstats() + + if uploadoptions.get('makeprivate', False): + privacy = 'hidden' + elif workoutsource != 'strava': + privacy = 'visible' + else: + privacy = 'hidden' + + # checking for in values + totaldist = np.nan_to_num(totaldist) + maxhr = np.nan_to_num(maxhr) + averagehr = np.nan_to_num(averagehr) + + dragfactor = 0 + if workouttype in otetypes: + dragfactor = row.dragfactor + + delta = datetime.timedelta(seconds=totaltime) + + try: + workoutenddatetime = startdatetime+delta + except AttributeError as e: + workoutstartdatetime = pendulum.parse(str(startdatetime)) + workoutenddatetime = startdatetime+delta + + + # check for duplicate start times and duration + duplicate = checkduplicates( + w.user, startdate, startdatetime, workoutenddatetime) + if duplicate: + rankingpiece = False + + # test title length + if title is not None and len(title) > 140: # pragma: no cover + title = title[0:140] + + timezone_str = str(startdatetime.tzinfo) + + + duration = totaltime_sec_to_string(totaltime) + + # implement workout attribute updates here + w.name = title + w.date = startdate + w.workouttype = workouttype + w.boattype = boattype + w.dragfactor = dragfactor + w.duration = duration + w.distance = totaldist + w.weightcategory = w.user.weightcategory + w.adaptiveclass = w.user.adaptiveclass + w.starttime = starttime + w.duplicate = duplicate + w.workoutsource = workoutsource + w.rankingpiece = False + w.forceunit = forceunit + w.rpe = rpe + w.csvfilename = file_path + w.notes = notes + w.summary = summary + w.maxhr = maxhr + w.averagehr = averagehr + w.startdatetime = startdatetime + w.inboard = inboard + w.oarlength = oarlength + w.seatnumber = seatnumber + w.boatname = boatname + w.empowerside = empowerside + w.timezone = timezone_str + w.privacy = privacy + w.impeller = useImpeller + w.save() + + return w + +def send_upload_confirmation_email(rower, workout): + # implement email sending here + if rower.getemailnotifications and not rower.emailbounced: # pragma: no cover + link = settings.SITE_URL+reverse( + rower.defaultlandingpage, + kwargs={ + 'id': encoder.encode_hex(workout.id), + } + ) + _ = send_confirm(rower.user, workout.name, link, '') + + +def update_running_wps(r, w, row): + # implement wps update here + if not w.duplicate and w.workouttype in otetypes: + cntr = Workout.objects.filter(user=r, workouttype__in=otetypes, + startdatetime__gt=tz.now()-tz.timedelta(days=42), + duplicate=False).count() + new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) + # if new_value is not zero or infinite or -inf, r.running_wps can be set to value + if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): + r.running_wps_erg = new_value + elif not (math.isnan(r.running_wps_erg) or math.isinf(r.running_wps_erg) or r.running_wps_erg == 0): + pass + else: + r.running_wps_erg = 600. + r.save() + + if not w.duplicate and w.workouttype in otwtypes: + cntr = Workout.objects.filter(user=r, workouttype__in=otwtypes, + startdatetime__gt=tz.now()-tz.timedelta(days=42), + duplicate=False).count() + try: + new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) + except TypeError: + new_value = r.running_wps + if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): + r.running_wps = new_value + elif not (math.isnan(r.running_wps) or math.isinf(r.running_wps) or r.running_wps == 0): + pass + else: + r.running_wps = 400. + r.save() + @app.task -def process_single_file(file_path, uploadoptions, job_id): - # placeholder +def process_single_file(file_path, uploadoptions, job_id, debug=False, **kwargs): + # copy file to a unique name in media folder + f2 = file_path + try: + nn, ext = os.path.splitext(f2) + if ext == '.gz': + nn, ext2 = os.path.splitext(nn) + ext = ext2 + ext + f1 = uuid4().hex[:10]+'-'+strftime('%Y%m%d-%H%M%S')+ext + f2 = 'media/'+f1 + copyfile(file_path, f2) + except FileNotFoundError: + return { + "status": "error", + "job_id": job_id, + "message": "File not found during processing." + } + + # determine the user + r = get_rower_from_uploadoptions(uploadoptions) + if r is None: + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Rower not found for the provided upload options." + } + + try: + fileformat = get_file_type(f2) + except Exception as e: + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Error determining file format: {error}".format(error=str(e)) + } + + # Get fileformat from fit & tcx + if fileformat == 'fit': + workouttype = get_workouttype_from_fit(f2) + uploadoptions['workouttype'] = workouttype + new_title = get_title_from_fit(f2) + if new_title: + uploadoptions['title'] = new_title + new_notes = get_notes_from_fit(f2) + if new_notes: + uploadoptions['notes'] = new_notes + + # handle non-Painsled + if fileformat != 'csv': + f2, summary, oarlength, inboard, fileformat, impeller = handle_nonpainsled( + f2, + fileformat, + ) + if not f2: + return { + "status": "error", + "job_id": job_id, + "message": "Error processing non-Painsled file." + } + + # create raw row data object + powerperc = 100 * np.array([r.pw_ut2, + r.pw_ut1, + r.pw_at, + r.pw_tr, r.pw_an]) / r.ftp + + rr = rrower(hrmax=r.max, hrut2=r.ut2, + hrut1=r.ut1, hrat=r.at, + hrtr=r.tr, hran=r.an, ftp=r.ftp, + powerperc=powerperc, powerzones=r.powerzones) + row = rdata(f2, rower=rr) + + if row.df.empty: + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "No valid data found in the uploaded file." + } + + if row == 0: + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Error creating row data from the file." + } + + # check and fix sample rate + row, f2 = check_and_fix_samplerate(row, f2) + + # change rower type to water if GPS data is present + if is_water_rowing(row.df): + uploadoptions['workouttype'] = 'water' + + # remove negative power peaks + row = remove_negative_power_peaks(row) + + # optional auto smoothing + row = do_smooth(row, f2) + + # recalculate power data + if uploadoptions['workouttype'] in otetypes: + try: + if r.erg_recalculatepower: + row.erg_recalculatepower() + row.write_csv(f2, gzip=True) + except Exception as e: + pass + + workoutid = uploadoptions.get('workoutid', None) + if workoutid is not None: + try: + w = Workout.objects.get(id=workoutid) + except Workout.DoesNotExist: + w = Workout(user=r, duration='00:00:00') + w.save() + else: + w = Workout(user=r, duration='00:00:00') + w.save() + + # set workout attributes from uploadoptions and calculated values + w = update_workout_attributes(w, row, f2, uploadoptions) + + + # add teams + if w.privacy == 'visible': + ts = Team.objects.filter(rower=r + ) + for t in ts: + w.team.add(t) + + # put stroke data in file store through "dataplep" + try: + row = rrdata_pl(df=pl.form_pandas(row.df)) + except: + pass + + _ = dataplep(row.df, id=w.id, bands=True, + barchart=True, otwpower=True, empower=True, inboard=w.inboard) + + # send confirmation email + send_upload_confirmation_email(r, w) + + # check for breakthroughs + isbreakthrough, ishard = checkbreakthrough(w, r) + _ = check_marker(w) + _ = update_wps(r, otwtypes) + _ = update_wps(r, otetypes) + + # update running_wps + update_running_wps(r, w, row) + + # calculate TRIMP + if w.workouttype in otwtypes: + wps_avg = r.median_wps + elif w.workouttype in otetypes: + wps_avg = r.median_wps_erg + else: + wps_avg = 0 + + _ = myqueue(queuehigh, handle_calctrimp, w.id, f2, + r.ftp, r.sex, r.hrftp, r.max, r.rest, wps_avg) + + # make plots + if uploadoptions['make_plot']: + res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) + elif r.staticchartonupload != 'None': # pragma: no cover + plottype = r.staticchartonupload + res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) + + # sync workouts to connected services + uploads.do_sync(w, uploadoptions, quick=True) + + return True - # process data to create df - # diff --git a/rowers/forms.py b/rowers/forms.py index ad865dd6..ed5bcacc 100644 --- a/rowers/forms.py +++ b/rowers/forms.py @@ -425,8 +425,6 @@ class DocumentsForm(forms.Form): notes = forms.CharField(required=False, widget=forms.Textarea) - offline = forms.BooleanField(initial=False, required=False, - label='Process in Background') class Meta: fields = ['title', 'file', 'workouttype', @@ -580,9 +578,6 @@ class UploadOptionsForm(forms.Form): label='Submit as challenge Result', required=False) - landingpage = forms.ChoiceField(choices=nextpages, - initial='workout_edit_view', - label='After Upload, go to') raceid = forms.IntegerField(initial=0, widget=HiddenInput()) diff --git a/rowers/templates/file_upload.html b/rowers/templates/file_upload.html new file mode 100644 index 00000000..e1240fa9 --- /dev/null +++ b/rowers/templates/file_upload.html @@ -0,0 +1,343 @@ +{% extends "newbase.html" %} +{% load static %} +{% load rowerfilters %} + +{% block title %}File loading{% endblock %} + +{% block meta %} + + +{% endblock %} + +{% block main %} +
Drag and drop files here
++
+ You can select one static plot to be generated immediately for + this workout. You can select to export to major fitness + platforms automatically. + If you check "make private", this workout will not be visible to your followers and will not show up in your teams' workouts list. With the Landing Page option, you can select to which (workout related) page you will be + taken after a successfull upload. +
+ ++ If you don't have a workout file but have written down the splits, + you can create a workout file yourself from this template +
+ + +Select Files with the File button or drag them on the marked area
+ +