from rowers.celery import app from rowers.utils import myqueue import zipfile import os from rowingdata import get_file_type from rowingdata import rowingdata as rrdata import django_rq from shutil import copyfile from time import strftime import numpy as np from scipy.signal import find_peaks, savgol_filter import pandas as pd import datetime import math os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" from YamJam import yamjam CFG = yamjam()['rowsandallapp'] try: os.environ.setdefault("DJANGO_SETTINGS_MODULE",CFG['settings_name']) except KeyError: # pragma: no cover os.environ.setdefault("DJANGO_SETTINGS_MODULE","rowsandall_app.settings") from django.core.wsgi import get_wsgi_application application = get_wsgi_application() queue = django_rq.get_queue('default') queuelow = django_rq.get_queue('low') queuehigh = django_rq.get_queue('default') from django.conf import settings from django.urls import reverse from django.utils import timezone as tz from rowers.forms import DocumentsForm, TeamUploadOptionsForm from rowers.models import ( TeamInviteForm, Workout, User, Rower, Team, VirtualRace, IndoorVirtualRaceResult, VirtualRaceResult) from rowers.opaque import encoder from rowers import uploads from rowingdata import rower as rrower from rowers.dataroutines import ( rdata, get_startdate_time_zone, df_resample, checkduplicates, dataplep, get_workouttype_from_fit, get_title_from_fit, get_notes_from_fit, ) from rowers.mytypes import otetypes, otwtypes from rowers.utils import totaltime_sec_to_string from rowers.dataprep import check_marker, checkbreakthrough, update_wps, handle_nonpainsled from rowers.emails import send_confirm from rowers.tasks import handle_sendemail_unrecognized, handle_sendemail_breakthrough, handle_sendemail_hard, handle_calctrimp from uuid import uuid4 def getrower(user): try: if user is None or user.is_anonymous: # pragma: no cover return None except AttributeError: # pragma: no cover if User.objects.get(id=user).is_anonymous: return None try: r = Rower.objects.get(user=user) except Rower.DoesNotExist: # pragma: no cover: r = Rower(user=user) r.save() return r def generate_job_id(): return str(uuid4()) def valid_uploadoptions(uploadoptions): fstr = uploadoptions.get('file', None) if fstr is None: # pragma: no cover return False, "Missing file in upload options." # check if file can be found if isinstance(fstr, str): if not os.path.isfile(fstr): # pragma: no cover return False, f"File not found: {fstr}" form = DocumentsForm(uploadoptions) optionsform = TeamUploadOptionsForm(uploadoptions) rowerform = TeamInviteForm(uploadoptions) rowerform.fields.pop('email') # we don't need email here return ( form.is_valid() and optionsform.is_valid() and rowerform.is_valid(), "{form_errors}, {optionsform_errors}, {rowerform_errors}".format( form_errors=form.errors, optionsform_errors=optionsform.errors, rowerform_errors=rowerform.errors, )) def is_zipfile(file_path): fileformat = get_file_type(file_path) return fileformat[0] == 'zip' def is_invalid_file(file_path): fileformat = get_file_type(file_path) if fileformat == "imageformat": return False, "Image files are not supported for upload." if fileformat == "json": return False, "JSON files are not supported for upload." if fileformat == "c2log": # pragma: no cover return False, "Concept2 log files are not supported for upload." if fileformat == "nostrokes": # pragma: no cover return False, "No stroke data found in the file." if fileformat == "kml": return False, "KML files are not supported for upload." if fileformat == "notgzip": # pragma: no cover return False, "The gzip file appears to be corrupted." if fileformat == "rowprolog": # pragma: no cover return False, "RowPro logbook summary files are not supported for upload." if fileformat == "gpx": return False, "GPX files are not supported for upload." if fileformat == "unknown": # pragma: no cover extension = os.path.splitext(f2)[1] filename = os.path.splitext(f2)[0] if extension == '.gz': filename = os.path.splitext(filename)[0] extension2 = os.path.splitext(filename)[1]+extension extension = extension2 f4 = filename+'a'+extension copyfile(f2, f4) _ = myqueue(queuelow, handle_sendemail_unrecognized, f4, r.user.email) return False, "The file format is not recognized or supported." return True, "" def upload_handler(uploadoptions, filename, createworkout=False, debug=False, **kwargs): valid, message = valid_uploadoptions(uploadoptions) if not valid: # pragma: no cover return { "status": "error", "job_id": None, "message": message } is_valid, message = is_invalid_file(filename) if not is_valid: # pragma: no cover os.remove(filename) return { "status": "error", "job_id": None, "message": message } if is_zipfile(filename): parent_job_id = generate_job_id() _ = myqueue( queuehigh, unzip_and_process, filename, uploadoptions, parent_job_id) return { "status": "processing", "job_id": parent_job_id, "message": "Your zip file is being processed. You will be notified when it is complete." } job_id = generate_job_id() if 'id' not in uploadoptions and createworkout: w = Workout( user=get_rower_from_uploadoptions(uploadoptions), duration='00:00:00' ) w.save() uploadoptions['id'] = w.id if 'id' in uploadoptions: job_id = encoder.encode_hex(uploadoptions['id']) _ = myqueue( queuehigh, process_single_file, filename, uploadoptions, job_id) return { "status": "processing", "job_id": job_id, "message": "Your file is being processed. You will be notified when it is complete." } @app.task def unzip_and_process(zip_filepath, uploadoptions, parent_job_id, debug=False, **kwargs): with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: for id, filename in enumerate(zip_ref.namelist()): datafile = zip_ref.extract(filename, path='media/') if id > 0: uploadoptions['title'] = uploadoptions['title'] + " Part {id}".format(id=id) uploadoptions['file'] = datafile job_id = generate_job_id() _ = myqueue( queuehigh, process_single_file, datafile, uploadoptions, job_id) return { "status": "completed", "job_id": parent_job_id, "message": "All files from the zip have been processed." } def get_rower_from_uploadoptions(uploadoptions): rowerform = TeamInviteForm(uploadoptions) if not rowerform.is_valid(): # pragma: no cover return None try: u = rowerform.cleaned_data['user'] r = getrower(u) except KeyError: if 'useremail' in uploadoptions: us = User.objects.filter(email=uploadoptions['useremail']) if len(us): u = us[0] r = getrower(u) else: # pragma: no cover r = None for rwr in Rower.objects.all(): if rwr.emailalternatives is not None: if uploadoptions['useremail'] in rwr.emailalternatives: r = rwr break return r def check_and_fix_samplerate(row, file_path): # implement sample rate check and fix here dtavg = row.df['TimeStamp (sec)'].diff().mean() if dtavg < 1: newdf = df_resample(row.df) try: os.remove(file_path) except Exception: pass row = rrdata(df=newdf) row.write_csv(file_path, gzip=True) return row, file_path def is_water_rowing(df): try: lat = df[' latitude'] if lat.mean() != 0 and lat.std() != 0: return True except KeyError: return False def remove_negative_power_peaks(row): x = row.df[' Power (watts)'].values x = x * - 1 neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] x = row.df[' Power (watts)'].values x = x * - 1 neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] return row def do_smooth(row, f2): # implement smoothing here if needed pace = row.df[' Stroke500mPace (sec/500m)'].values velo = 500. / pace f = row.df['TimeStamp (sec)'].diff().mean() if f != 0 and not np.isnan(f): windowsize = 2 * (int(10. / (f))) + 1 else: # pragma: no cover windowsize = 1 if 'originalvelo' not in row.df: row.df['originalvelo'] = velo if windowsize > 3 and windowsize < len(velo): velo2 = savgol_filter(velo, windowsize, 3) else: # pragma: no cover velo2 = velo velo3 = pd.Series(velo2, dtype='float') velo3 = velo3.replace([-np.inf, np.inf], np.nan) velo3 = velo3.fillna(method='ffill') pace2 = 500. / abs(velo3) row.df[' Stroke500mPace (sec/500m)'] = pace2 row.df = row.df.fillna(0) row.write_csv(f2, gzip=True) try: os.remove(f2) except: pass return row def update_workout_attributes(w, row, file_path, uploadoptions, startdatetime='', timezone='', forceunit='lbs'): # calculate startdatetime, startdate, starttime, timezone_str, partofday = get_startdate_time_zone( w.user, row, startdatetime=startdatetime, timezone=timezone ) boattype = uploadoptions.get('boattype', '1x') workoutsource = uploadoptions.get('workoutsource', 'unknown') stravaid = uploadoptions.get('stravaid', 0) rpe = uploadoptions.get('rpe', 0) notes = uploadoptions.get('notes', '') inboard = uploadoptions.get('inboard', 0.88) oarlength = uploadoptions.get('oarlength', 2.89) useImpeller = uploadoptions.get('useImpeller', False) seatnumber = uploadoptions.get('seatNumber', 1) boatname = uploadoptions.get('boatName','') portStarboard = uploadoptions.get('portStarboard', 1) empowerside = 'port' raceid = uploadoptions.get('raceid', 0) registrationid = uploadoptions.get('submitrace', 0) if portStarboard == 1: empowerside = 'starboard' stravaid = uploadoptions.get('stravaid',0) if stravaid != 0: # pragma: no cover workoutsource = 'strava' w.uploadedtostrava = stravaid workouttype = uploadoptions.get('workouttype', 'rower') title = uploadoptions.get('title', '') if title is None or title == '': title = 'Workout' if partofday is not None: title = '{partofday} {workouttype}'.format( partofday=partofday, workouttype=workouttype, ) averagehr = row.df[' HRCur (bpm)'].mean() maxhr = row.df[' HRCur (bpm)'].max() totaldist = uploadoptions.get('distance', 0) if totaldist == 0: totaldist = row.df['cum_dist'].max() totaltime = uploadoptions.get('duration', 0) if totaltime == 0: totaltime = row.df['TimeStamp (sec)'].max() - row.df['TimeStamp (sec)'].min() try: totaltime = totaltime + row.df.loc[:, ' ElapsedTime (sec)'].iloc[0] except KeyError: # pragma: no cover pass if np.isnan(totaltime): # pragma: no cover totaltime = 0 if uploadoptions.get('summary', '') == '': summary = row.allstats() else: summary = uploadoptions.get('summary', '') if uploadoptions.get('makeprivate', False): # pragma: no cover privacy = 'hidden' elif workoutsource != 'strava': privacy = 'visible' else: # pragma: no cover privacy = 'hidden' # checking for in values totaldist = np.nan_to_num(totaldist) maxhr = np.nan_to_num(maxhr) averagehr = np.nan_to_num(averagehr) dragfactor = 0 if workouttype in otetypes: dragfactor = row.dragfactor delta = datetime.timedelta(seconds=totaltime) try: workoutenddatetime = startdatetime+delta except AttributeError as e: # pragma: no cover workoutstartdatetime = pendulum.parse(str(startdatetime)) workoutenddatetime = startdatetime+delta # check for duplicate start times and duration duplicate = checkduplicates( w.user, startdate, startdatetime, workoutenddatetime) if duplicate: # pragma: no cover rankingpiece = False # test title length if title is not None and len(title) > 140: # pragma: no cover title = title[0:140] timezone_str = str(startdatetime.tzinfo) duration = totaltime_sec_to_string(totaltime) # implement workout attribute updates here w.name = title w.date = startdate w.workouttype = workouttype w.boattype = boattype w.dragfactor = dragfactor w.duration = duration w.distance = totaldist w.weightcategory = w.user.weightcategory w.adaptiveclass = w.user.adaptiveclass w.starttime = starttime w.duplicate = duplicate w.workoutsource = workoutsource w.rankingpiece = False w.forceunit = forceunit w.rpe = rpe w.csvfilename = file_path w.notes = notes w.summary = summary w.maxhr = maxhr w.averagehr = averagehr w.startdatetime = startdatetime w.inboard = inboard w.oarlength = oarlength w.seatnumber = seatnumber w.boatname = boatname w.empowerside = empowerside w.timezone = timezone_str w.privacy = privacy w.impeller = useImpeller w.save() # check for registrationid if registrationid != 0: # pragma: no cover races = VirtualRace.objects.filter( registration_closure__gt=tz.now(), id=raceid, ) registrations = IndoorVirtualRaceResult.objects.filter( race__in=races, id=registrationid, userid=w.user.id ) registrations2 = VirtualRaceResult.objects.filter( race__in=races, id=registrationid, userid=w.user.id) if registrationid in [r.id for r in registrations]: # indoor race registrations = registrations.filter(id=registrationid) if registrations: race = registrations[0].race if race.sessiontype == 'indoorrace': result, comments, errors, jobid = add_workout_indoorrace( [w], race, w.user, recordid=registrations[0].id ) elif race.sessiontype in ['fastest_time', 'fastest_distance']: result, comments, errors, jobid = add_workout_fastestrace( [w], race, w.user, recordid=registrations[0].id ) if registrationid in [r.id for r in registrations2]: registration = registrations2.filter(id=registrationid) if registrations: race = registrations[0].race if race.sessiontype == 'race': result, comments, errors, jobid = add_workout_race( [w], race, w.user, recordid=registrations2[0].id ) elif race.sessiontype in ['fastest_time', 'fastest_distance']: result, comments, errors, jobid = add_workout_fastestrace( [w], race, w.user, recordid=registrations2[0].id ) return w def send_upload_confirmation_email(rower, workout): # implement email sending here if rower.getemailnotifications and not rower.emailbounced: # pragma: no cover link = settings.SITE_URL+reverse( rower.defaultlandingpage, kwargs={ 'id': encoder.encode_hex(workout.id), } ) _ = send_confirm(rower.user, workout.name, link, '') def update_running_wps(r, w, row): # implement wps update here if not w.duplicate and w.workouttype in otetypes: cntr = Workout.objects.filter(user=r, workouttype__in=otetypes, startdatetime__gt=tz.now()-tz.timedelta(days=42), duplicate=False).count() new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) # if new_value is not zero or infinite or -inf, r.running_wps can be set to value if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): # pragma: no cover r.running_wps_erg = new_value elif not (math.isnan(r.running_wps_erg) or math.isinf(r.running_wps_erg) or r.running_wps_erg == 0): pass else: # pragma: no cover r.running_wps_erg = 600. r.save() if not w.duplicate and w.workouttype in otwtypes: cntr = Workout.objects.filter(user=r, workouttype__in=otwtypes, startdatetime__gt=tz.now()-tz.timedelta(days=42), duplicate=False).count() try: new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) except TypeError: # pragma: no cover new_value = r.running_wps if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): r.running_wps = new_value elif not (math.isnan(r.running_wps) or math.isinf(r.running_wps) or r.running_wps == 0): pass else: # pragma: no cover r.running_wps = 400. r.save() @app.task def process_single_file(file_path, uploadoptions, job_id, debug=False, **kwargs): # copy file to a unique name in media folder f2 = file_path try: nn, ext = os.path.splitext(f2) if ext == '.gz': nn, ext2 = os.path.splitext(nn) ext = ext2 + ext f1 = uuid4().hex[:10]+'-'+strftime('%Y%m%d-%H%M%S')+ext f2 = 'media/'+f1 copyfile(file_path, f2) except FileNotFoundError: # pragma: no cover return { "status": "error", "job_id": job_id, "message": "File not found during processing." } # determine the user r = get_rower_from_uploadoptions(uploadoptions) if r is None: # pragma: no cover os.remove(f2) return { "status": "error", "job_id": job_id, "message": "Rower not found for the provided upload options." } try: fileformat = get_file_type(f2) except Exception as e: # pragma: no cover os.remove(f2) return { "status": "error", "job_id": job_id, "message": "Error determining file format: {error}".format(error=str(e)) } # Get fileformat from fit & tcx if "fit" in fileformat: workouttype = get_workouttype_from_fit(f2) uploadoptions['workouttype'] = workouttype new_title = get_title_from_fit(f2) if new_title: # pragma: no cover uploadoptions['title'] = new_title new_notes = get_notes_from_fit(f2) if new_notes: # pragma: no cover uploadoptions['notes'] = new_notes # handle non-Painsled if fileformat != 'csv': f2, summary, oarlength, inboard, fileformat, impeller = handle_nonpainsled( f2, fileformat, ) uploadoptions['summary'] = summary uploadoptions['oarlength'] = oarlength uploadoptions['inboard'] = inboard uploadoptions['useImpeller'] = impeller if uploadoptions['workouttype'] != 'strave': uploadoptions['workoutsource'] = fileformat if not f2: # pragma: no cover return { "status": "error", "job_id": job_id, "message": "Error processing non-Painsled file." } # create raw row data object powerperc = 100 * np.array([r.pw_ut2, r.pw_ut1, r.pw_at, r.pw_tr, r.pw_an]) / r.ftp rr = rrower(hrmax=r.max, hrut2=r.ut2, hrut1=r.ut1, hrat=r.at, hrtr=r.tr, hran=r.an, ftp=r.ftp, powerperc=powerperc, powerzones=r.powerzones) row = rdata(f2, rower=rr) if row.df.empty: # pragma: no cover os.remove(f2) return { "status": "error", "job_id": job_id, "message": "No valid data found in the uploaded file." } if row == 0: # pragma: no cover os.remove(f2) return { "status": "error", "job_id": job_id, "message": "Error creating row data from the file." } # check and fix sample rate row, f2 = check_and_fix_samplerate(row, f2) # change rower type to water if GPS data is present if is_water_rowing(row.df): uploadoptions['workouttype'] = 'water' # remove negative power peaks row = remove_negative_power_peaks(row) # optional auto smoothing row = do_smooth(row, f2) # recalculate power data if uploadoptions['workouttype'] in otetypes: try: if r.erg_recalculatepower: row.erg_recalculatepower() row.write_csv(f2, gzip=True) except Exception as e: pass workoutid = uploadoptions.get('id', None) if workoutid is not None: # pragma: no cover try: w = Workout.objects.get(id=workoutid) except Workout.DoesNotExist: w = Workout(user=r, duration='00:00:00') w.save() else: w = Workout(user=r, duration='00:00:00') w.save() # set workout attributes from uploadoptions and calculated values w = update_workout_attributes(w, row, f2, uploadoptions) # add teams if w.privacy == 'visible': ts = Team.objects.filter(rower=r ) for t in ts: # pragma: no cover w.team.add(t) # put stroke data in file store through "dataplep" try: row = rrdata_pl(df=pl.form_pandas(row.df)) except: pass _ = dataplep(row.df, id=w.id, bands=True, barchart=True, otwpower=True, empower=True, inboard=w.inboard) # send confirmation email send_upload_confirmation_email(r, w) # check for breakthroughs isbreakthrough, ishard = checkbreakthrough(w, r) _ = check_marker(w) _ = update_wps(r, otwtypes) _ = update_wps(r, otetypes) # update running_wps update_running_wps(r, w, row) # calculate TRIMP if w.workouttype in otwtypes: wps_avg = r.median_wps elif w.workouttype in otetypes: wps_avg = r.median_wps_erg else: # pragma: no cover wps_avg = 0 _ = myqueue(queuehigh, handle_calctrimp, w.id, f2, r.ftp, r.sex, r.hrftp, r.max, r.rest, wps_avg) # make plots if uploadoptions.get('makeplot', False): # pragma: no cover plottype = uploadoptions.get('plottype', 'timeplot') res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) elif r.staticchartonupload != 'None': # pragma: no cover plottype = r.staticchartonupload res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) # sync workouts to connected services uploads.do_sync(w, uploadoptions, quick=True) return True, f2