From 46b4b29cdba38506b0ec6aa0491a695a3ddc8436 Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Thu, 23 Oct 2025 13:10:25 +0200 Subject: [PATCH] ds --- rowers/c2stuff.py | 1 - rowers/dataflow.py | 711 ++++++++++++++ rowers/integrations/c2.py | 1 - rowers/integrations/intervals.py | 47 +- rowers/integrations/polar.py | 15 +- rowers/integrations/rp3.py | 1 - rowers/nkimportutils.py | 24 +- rowers/rojabo_stuff.py | 1 - rowers/tests/testdata/testdata.tcx.gz | Bin 3989 -> 3989 bytes rowers/upload_tasks.py | 1237 +++++++++++++++++++++++++ 10 files changed, 1983 insertions(+), 55 deletions(-) create mode 100644 rowers/dataflow.py create mode 100644 rowers/upload_tasks.py diff --git a/rowers/c2stuff.py b/rowers/c2stuff.py index 18f43903..91975d74 100644 --- a/rowers/c2stuff.py +++ b/rowers/c2stuff.py @@ -25,7 +25,6 @@ import numpy as np from rowsandall_app.settings import ( C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET, - UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET ) from rowers.tasks import ( diff --git a/rowers/dataflow.py b/rowers/dataflow.py new file mode 100644 index 00000000..542c543e --- /dev/null +++ b/rowers/dataflow.py @@ -0,0 +1,711 @@ +from rowers.celery import app +from rowers.utils import myqueue +import zipfile +import os +from rowingdata import get_file_type +from rowingdata import rowingdata as rrdata +import django_rq +from shutil import copyfile +from time import strftime +import numpy as np +from scipy.signal import find_peaks, savgol_filter +import pandas as pd +import datetime +import math + +os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" +from YamJam import yamjam +CFG = yamjam()['rowsandallapp'] + +try: + os.environ.setdefault("DJANGO_SETTINGS_MODULE",CFG['settings_name']) +except KeyError: # pragma: no cover + os.environ.setdefault("DJANGO_SETTINGS_MODULE","rowsandall_app.settings") + +from django.core.wsgi import get_wsgi_application +application = get_wsgi_application() + +queue = django_rq.get_queue('default') +queuelow = django_rq.get_queue('low') +queuehigh = django_rq.get_queue('default') + +from django.conf import settings +from django.urls import reverse +from django.utils import timezone as tz + +from rowers.forms import DocumentsForm, TeamUploadOptionsForm +from rowers.models import ( + TeamInviteForm, Workout, User, Rower, Team, + VirtualRace, IndoorVirtualRaceResult, VirtualRaceResult) +from rowers.opaque import encoder +from rowers import uploads + +from rowingdata import rower as rrower + +from rowers.dataroutines import ( + rdata, get_startdate_time_zone, df_resample, checkduplicates, dataplep, + get_workouttype_from_fit, + get_title_from_fit, + get_notes_from_fit, +) +from rowers.mytypes import otetypes, otwtypes +from rowers.utils import totaltime_sec_to_string +from rowers.dataprep import check_marker, checkbreakthrough, update_wps, handle_nonpainsled +from rowers.emails import send_confirm +from rowers.tasks import handle_sendemail_unrecognized, handle_sendemail_breakthrough, handle_sendemail_hard, handle_calctrimp + +from uuid import uuid4 + +def getrower(user): + try: + if user is None or user.is_anonymous: # pragma: no cover + return None + except AttributeError: # pragma: no cover + if User.objects.get(id=user).is_anonymous: + return None + try: + r = Rower.objects.get(user=user) + except Rower.DoesNotExist: # pragma: no cover: + r = Rower(user=user) + r.save() + + return r + + +def generate_job_id(): + return str(uuid4()) + +def valid_uploadoptions(uploadoptions): + fstr = uploadoptions.get('file', None) + if fstr is None: # pragma: no cover + return False, "Missing file in upload options." + + # check if file can be found + if isinstance(fstr, str): + if not os.path.isfile(fstr): # pragma: no cover + return False, f"File not found: {fstr}" + + + form = DocumentsForm(uploadoptions) + optionsform = TeamUploadOptionsForm(uploadoptions) + rowerform = TeamInviteForm(uploadoptions) + rowerform.fields.pop('email') # we don't need email here + return ( + form.is_valid() and optionsform.is_valid() and rowerform.is_valid(), + "{form_errors}, {optionsform_errors}, {rowerform_errors}".format( + form_errors=form.errors, + optionsform_errors=optionsform.errors, + rowerform_errors=rowerform.errors, + )) + +def is_zipfile(file_path): + fileformat = get_file_type(file_path) + return fileformat[0] == 'zip' + +def is_invalid_file(file_path): + fileformat = get_file_type(file_path) + if fileformat == "imageformat": + return False, "Image files are not supported for upload." + if fileformat == "json": + return False, "JSON files are not supported for upload." + if fileformat == "c2log": # pragma: no cover + return False, "Concept2 log files are not supported for upload." + if fileformat == "nostrokes": # pragma: no cover + return False, "No stroke data found in the file." + if fileformat == "kml": + return False, "KML files are not supported for upload." + if fileformat == "notgzip": # pragma: no cover + return False, "The gzip file appears to be corrupted." + if fileformat == "rowprolog": # pragma: no cover + return False, "RowPro logbook summary files are not supported for upload." + if fileformat == "gpx": + return False, "GPX files are not supported for upload." + if fileformat == "unknown": # pragma: no cover + extension = os.path.splitext(f2)[1] + filename = os.path.splitext(f2)[0] + if extension == '.gz': + filename = os.path.splitext(filename)[0] + extension2 = os.path.splitext(filename)[1]+extension + extension = extension2 + f4 = filename+'a'+extension + copyfile(f2, f4) + _ = myqueue(queuelow, + handle_sendemail_unrecognized, + f4, + r.user.email) + return False, "The file format is not recognized or supported." + return True, "" + + +def upload_handler(uploadoptions, filename): + valid, message = valid_uploadoptions(uploadoptions) + if not valid: # pragma: no cover + return { + "status": "error", + "job_id": None, + "message": message + } + is_valid, message = is_invalid_file(filename) + if not is_valid: # pragma: no cover + os.remove(filename) + return { + "status": "error", + "job_id": None, + "message": message + } + if is_zipfile(filename): + parent_job_id = generate_job_id() + _ = myqueue( + queuehigh, + unzip_and_process, + filename, + uploadoptions, + parent_job_id) + return { + "status": "processing", + "job_id": parent_job_id, + "message": "Your zip file is being processed. You will be notified when it is complete." + } + job_id = generate_job_id() + _ = myqueue( + queuehigh, + process_single_file, + filename, + uploadoptions, + job_id) + return { + "status": "processing", + "job_id": job_id, + "message": "Your file is being processed. You will be notified when it is complete." + } + +@app.task +def unzip_and_process(zip_filepath, uploadoptions, parent_job_id, debug=False, **kwargs): + with zipfile.ZipFile(zip_filepath, 'r') as zip_ref: + for id, filename in enumerate(zip_ref.namelist()): + datafile = zip_ref.extract(filename, path='media/') + if id > 0: + uploadoptions['title'] = uploadoptions['title'] + " Part {id}".format(id=id) + uploadoptions['file'] = datafile + job_id = generate_job_id() + _ = myqueue( + queuehigh, + process_single_file, + datafile, + uploadoptions, + job_id) + + return { + "status": "completed", + "job_id": parent_job_id, + "message": "All files from the zip have been processed." + } + +def get_rower_from_uploadoptions(uploadoptions): + rowerform = TeamInviteForm(uploadoptions) + if not rowerform.is_valid(): # pragma: no cover + return None + try: + u = rowerform.cleaned_data['user'] + r = getrower(u) + except KeyError: + if 'useremail' in uploadoptions: + us = User.objects.filter(email=uploadoptions['useremail']) + if len(us): + u = us[0] + r = getrower(u) + else: # pragma: no cover + r = None + for rwr in Rower.objects.all(): + if rwr.emailalternatives is not None: + if uploadoptions['useremail'] in rwr.emailalternatives: + r = rwr + break + return r + +def check_and_fix_samplerate(row, file_path): + # implement sample rate check and fix here + dtavg = row.df['TimeStamp (sec)'].diff().mean() + if dtavg < 1: + newdf = df_resample(row.df) + try: + os.remove(file_path) + except Exception: + pass + row = rrdata(df=newdf) + row.write_csv(file_path, gzip=True) + return row, file_path + +def is_water_rowing(df): + try: + lat = df[' latitude'] + if lat.mean() != 0 and lat.std() != 0: + return True + except KeyError: + return False + +def remove_negative_power_peaks(row): + x = row.df[' Power (watts)'].values + x = x * - 1 + neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value + + row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] + x = row.df[' Power (watts)'].values + x = x * - 1 + neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value + + row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1] + + return row + +def do_smooth(row, f2): + # implement smoothing here if needed + pace = row.df[' Stroke500mPace (sec/500m)'].values + velo = 500. / pace + + f = row.df['TimeStamp (sec)'].diff().mean() + if f != 0 and not np.isnan(f): + windowsize = 2 * (int(10. / (f))) + 1 + else: # pragma: no cover + windowsize = 1 + if 'originalvelo' not in row.df: + row.df['originalvelo'] = velo + + if windowsize > 3 and windowsize < len(velo): + velo2 = savgol_filter(velo, windowsize, 3) + else: # pragma: no cover + velo2 = velo + + velo3 = pd.Series(velo2, dtype='float') + velo3 = velo3.replace([-np.inf, np.inf], np.nan) + velo3 = velo3.fillna(method='ffill') + + pace2 = 500. / abs(velo3) + + row.df[' Stroke500mPace (sec/500m)'] = pace2 + + row.df = row.df.fillna(0) + + row.write_csv(f2, gzip=True) + try: + os.remove(f2) + except: + pass + + return row + +def update_workout_attributes(w, row, file_path, uploadoptions, + startdatetime='', + timezone='', forceunit='lbs'): + + # calculate + startdatetime, startdate, starttime, timezone_str, partofday = get_startdate_time_zone( + w.user, row, startdatetime=startdatetime, timezone=timezone + ) + + boattype = uploadoptions.get('boattype', '1x') + workoutsource = uploadoptions.get('workoutsource', 'unknown') + stravaid = uploadoptions.get('stravaid', 0) + rpe = uploadoptions.get('rpe', 0) + notes = uploadoptions.get('notes', '') + inboard = uploadoptions.get('inboard', 0.88) + oarlength = uploadoptions.get('oarlength', 2.89) + useImpeller = uploadoptions.get('useImpeller', False) + seatnumber = uploadoptions.get('seatNumber', 1) + boatname = uploadoptions.get('boatName','') + portStarboard = uploadoptions.get('portStarboard', 1) + empowerside = 'port' + raceid = uploadoptions.get('raceid', 0) + registrationid = uploadoptions.get('submitrace', 0) + + if portStarboard == 1: + empowerside = 'starboard' + + stravaid = uploadoptions.get('stravaid',0) + if stravaid != 0: # pragma: no cover + workoutsource = 'strava' + w.uploadedtostrava = stravaid + + workouttype = uploadoptions.get('workouttype', 'rower') + title = uploadoptions.get('title', '') + if title is None or title == '': + title = 'Workout' + if partofday is not None: + title = '{partofday} {workouttype}'.format( + partofday=partofday, + workouttype=workouttype, + ) + averagehr = row.df[' HRCur (bpm)'].mean() + maxhr = row.df[' HRCur (bpm)'].max() + + totaldist = uploadoptions.get('distance', 0) + if totaldist == 0: + totaldist = row.df['cum_dist'].max() + + totaltime = uploadoptions.get('duration', 0) + if totaltime == 0: + totaltime = row.df['TimeStamp (sec)'].max() - row.df['TimeStamp (sec)'].min() + try: + totaltime = totaltime + row.df.loc[:, ' ElapsedTime (sec)'].iloc[0] + except KeyError: # pragma: no cover + pass + + if np.isnan(totaltime): # pragma: no cover + totaltime = 0 + + if uploadoptions.get('summary', '') == '': + summary = row.allstats() + else: + summary = uploadoptions.get('summary', '') + + if uploadoptions.get('makeprivate', False): # pragma: no cover + privacy = 'hidden' + elif workoutsource != 'strava': + privacy = 'visible' + else: # pragma: no cover + privacy = 'hidden' + + # checking for in values + totaldist = np.nan_to_num(totaldist) + maxhr = np.nan_to_num(maxhr) + averagehr = np.nan_to_num(averagehr) + + dragfactor = 0 + if workouttype in otetypes: + dragfactor = row.dragfactor + + delta = datetime.timedelta(seconds=totaltime) + + try: + workoutenddatetime = startdatetime+delta + except AttributeError as e: # pragma: no cover + workoutstartdatetime = pendulum.parse(str(startdatetime)) + workoutenddatetime = startdatetime+delta + + + # check for duplicate start times and duration + duplicate = checkduplicates( + w.user, startdate, startdatetime, workoutenddatetime) + if duplicate: # pragma: no cover + rankingpiece = False + + # test title length + if title is not None and len(title) > 140: # pragma: no cover + title = title[0:140] + + timezone_str = str(startdatetime.tzinfo) + + + duration = totaltime_sec_to_string(totaltime) + + # implement workout attribute updates here + w.name = title + w.date = startdate + w.workouttype = workouttype + w.boattype = boattype + w.dragfactor = dragfactor + w.duration = duration + w.distance = totaldist + w.weightcategory = w.user.weightcategory + w.adaptiveclass = w.user.adaptiveclass + w.starttime = starttime + w.duplicate = duplicate + w.workoutsource = workoutsource + w.rankingpiece = False + w.forceunit = forceunit + w.rpe = rpe + w.csvfilename = file_path + w.notes = notes + w.summary = summary + w.maxhr = maxhr + w.averagehr = averagehr + w.startdatetime = startdatetime + w.inboard = inboard + w.oarlength = oarlength + w.seatnumber = seatnumber + w.boatname = boatname + w.empowerside = empowerside + w.timezone = timezone_str + w.privacy = privacy + w.impeller = useImpeller + w.save() + + # check for registrationid + if registrationid != 0: # pragma: no cover + races = VirtualRace.objects.filter( + registration_closure__gt=tz.now(), + id=raceid, + ) + registrations = IndoorVirtualRaceResult.objects.filter( + race__in=races, + id=registrationid, + userid=w.user.id + ) + registrations2 = VirtualRaceResult.objects.filter( + race__in=races, + id=registrationid, + userid=w.user.id) + + if registrationid in [r.id for r in registrations]: + # indoor race + registrations = registrations.filter(id=registrationid) + if registrations: + race = registrations[0].race + if race.sessiontype == 'indoorrace': + result, comments, errors, jobid = add_workout_indoorrace( + [w], race, w.user, recordid=registrations[0].id + ) + elif race.sessiontype in ['fastest_time', 'fastest_distance']: + result, comments, errors, jobid = add_workout_fastestrace( + [w], race, w.user, recordid=registrations[0].id + ) + + if registrationid in [r.id for r in registrations2]: + registration = registrations2.filter(id=registrationid) + if registrations: + race = registrations[0].race + if race.sessiontype == 'race': + result, comments, errors, jobid = add_workout_race( + [w], race, w.user, recordid=registrations2[0].id + ) + elif race.sessiontype in ['fastest_time', 'fastest_distance']: + result, comments, errors, jobid = add_workout_fastestrace( + [w], race, w.user, recordid=registrations2[0].id + ) + + return w + +def send_upload_confirmation_email(rower, workout): + # implement email sending here + if rower.getemailnotifications and not rower.emailbounced: # pragma: no cover + link = settings.SITE_URL+reverse( + rower.defaultlandingpage, + kwargs={ + 'id': encoder.encode_hex(workout.id), + } + ) + _ = send_confirm(rower.user, workout.name, link, '') + + +def update_running_wps(r, w, row): + # implement wps update here + if not w.duplicate and w.workouttype in otetypes: + cntr = Workout.objects.filter(user=r, workouttype__in=otetypes, + startdatetime__gt=tz.now()-tz.timedelta(days=42), + duplicate=False).count() + new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) + # if new_value is not zero or infinite or -inf, r.running_wps can be set to value + if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): # pragma: no cover + r.running_wps_erg = new_value + elif not (math.isnan(r.running_wps_erg) or math.isinf(r.running_wps_erg) or r.running_wps_erg == 0): + pass + else: # pragma: no cover + r.running_wps_erg = 600. + r.save() + + if not w.duplicate and w.workouttype in otwtypes: + cntr = Workout.objects.filter(user=r, workouttype__in=otwtypes, + startdatetime__gt=tz.now()-tz.timedelta(days=42), + duplicate=False).count() + try: + new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0) + except TypeError: # pragma: no cover + new_value = r.running_wps + if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0): + r.running_wps = new_value + elif not (math.isnan(r.running_wps) or math.isinf(r.running_wps) or r.running_wps == 0): + pass + else: # pragma: no cover + r.running_wps = 400. + r.save() + +@app.task +def process_single_file(file_path, uploadoptions, job_id, debug=False, **kwargs): + # copy file to a unique name in media folder + f2 = file_path + try: + nn, ext = os.path.splitext(f2) + if ext == '.gz': + nn, ext2 = os.path.splitext(nn) + ext = ext2 + ext + f1 = uuid4().hex[:10]+'-'+strftime('%Y%m%d-%H%M%S')+ext + f2 = 'media/'+f1 + copyfile(file_path, f2) + except FileNotFoundError: # pragma: no cover + return { + "status": "error", + "job_id": job_id, + "message": "File not found during processing." + } + + # determine the user + r = get_rower_from_uploadoptions(uploadoptions) + if r is None: # pragma: no cover + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Rower not found for the provided upload options." + } + + try: + fileformat = get_file_type(f2) + except Exception as e: # pragma: no cover + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Error determining file format: {error}".format(error=str(e)) + } + + # Get fileformat from fit & tcx + if "fit" in fileformat: + workouttype = get_workouttype_from_fit(f2) + uploadoptions['workouttype'] = workouttype + new_title = get_title_from_fit(f2) + if new_title: # pragma: no cover + uploadoptions['title'] = new_title + new_notes = get_notes_from_fit(f2) + if new_notes: # pragma: no cover + uploadoptions['notes'] = new_notes + + + # handle non-Painsled + if fileformat != 'csv': + f2, summary, oarlength, inboard, fileformat, impeller = handle_nonpainsled( + f2, + fileformat, + ) + uploadoptions['summary'] = summary + uploadoptions['oarlength'] = oarlength + uploadoptions['inboard'] = inboard + uploadoptions['useImpeller'] = impeller + if uploadoptions['workouttype'] != 'strave': + uploadoptions['workoutsource'] = fileformat + if not f2: # pragma: no cover + return { + "status": "error", + "job_id": job_id, + "message": "Error processing non-Painsled file." + } + + # create raw row data object + powerperc = 100 * np.array([r.pw_ut2, + r.pw_ut1, + r.pw_at, + r.pw_tr, r.pw_an]) / r.ftp + + rr = rrower(hrmax=r.max, hrut2=r.ut2, + hrut1=r.ut1, hrat=r.at, + hrtr=r.tr, hran=r.an, ftp=r.ftp, + powerperc=powerperc, powerzones=r.powerzones) + row = rdata(f2, rower=rr) + + if row.df.empty: # pragma: no cover + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "No valid data found in the uploaded file." + } + + if row == 0: # pragma: no cover + os.remove(f2) + return { + "status": "error", + "job_id": job_id, + "message": "Error creating row data from the file." + } + + # check and fix sample rate + row, f2 = check_and_fix_samplerate(row, f2) + + # change rower type to water if GPS data is present + if is_water_rowing(row.df): + uploadoptions['workouttype'] = 'water' + + # remove negative power peaks + row = remove_negative_power_peaks(row) + + # optional auto smoothing + row = do_smooth(row, f2) + + # recalculate power data + if uploadoptions['workouttype'] in otetypes: + try: + if r.erg_recalculatepower: + row.erg_recalculatepower() + row.write_csv(f2, gzip=True) + except Exception as e: + pass + + workoutid = uploadoptions.get('id', None) + if workoutid is not None: # pragma: no cover + try: + w = Workout.objects.get(id=workoutid) + except Workout.DoesNotExist: + w = Workout(user=r, duration='00:00:00') + w.save() + else: + w = Workout(user=r, duration='00:00:00') + w.save() + + # set workout attributes from uploadoptions and calculated values + w = update_workout_attributes(w, row, f2, uploadoptions) + + + # add teams + if w.privacy == 'visible': + ts = Team.objects.filter(rower=r + ) + for t in ts: # pragma: no cover + w.team.add(t) + + # put stroke data in file store through "dataplep" + try: + row = rrdata_pl(df=pl.form_pandas(row.df)) + except: + pass + + _ = dataplep(row.df, id=w.id, bands=True, + barchart=True, otwpower=True, empower=True, inboard=w.inboard) + + # send confirmation email + send_upload_confirmation_email(r, w) + + # check for breakthroughs + isbreakthrough, ishard = checkbreakthrough(w, r) + _ = check_marker(w) + _ = update_wps(r, otwtypes) + _ = update_wps(r, otetypes) + + # update running_wps + update_running_wps(r, w, row) + + # calculate TRIMP + if w.workouttype in otwtypes: + wps_avg = r.median_wps + elif w.workouttype in otetypes: + wps_avg = r.median_wps_erg + else: # pragma: no cover + wps_avg = 0 + + _ = myqueue(queuehigh, handle_calctrimp, w.id, f2, + r.ftp, r.sex, r.hrftp, r.max, r.rest, wps_avg) + + # make plots + if uploadoptions.get('makeplot', False): # pragma: no cover + plottype = uploadoptions.get('plottype', 'timeplot') + res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) + elif r.staticchartonupload != 'None': # pragma: no cover + plottype = r.staticchartonupload + res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name) + + # sync workouts to connected services + uploads.do_sync(w, uploadoptions, quick=True) + + + return True, f2 + + + diff --git a/rowers/integrations/c2.py b/rowers/integrations/c2.py index c2d21332..1f042c4b 100644 --- a/rowers/integrations/c2.py +++ b/rowers/integrations/c2.py @@ -15,7 +15,6 @@ import pytz from rowsandall_app.settings import ( C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET, - UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET ) from rowers.tasks import ( diff --git a/rowers/integrations/intervals.py b/rowers/integrations/intervals.py index 6ba6d46a..10e66d31 100644 --- a/rowers/integrations/intervals.py +++ b/rowers/integrations/intervals.py @@ -26,7 +26,6 @@ from rowers.opaque import encoder from rowsandall_app.settings import ( INTERVALS_CLIENT_ID, INTERVALS_REDIRECT_URI, INTERVALS_CLIENT_SECRET, SITE_URL, - UPLOAD_SERVICE_SECRET, UPLOAD_SERVICE_URL ) import django_rq @@ -57,6 +56,7 @@ intervals_token_url = 'https://intervals.icu/api/oauth/token' webhookverification = 'JA9Vt6RNH10' class IntervalsIntegration(SyncIntegration): + def __init__(self, *args, **kwargs): super(IntervalsIntegration, self).__init__(*args, **kwargs) self.oauth_data = { @@ -315,6 +315,7 @@ class IntervalsIntegration(SyncIntegration): return workouts def update_workout(self, id, *args, **kwargs) -> int: + from rowers.dataflow import upload_handler try: _ = self.open() except NoTokenError: @@ -419,7 +420,6 @@ class IntervalsIntegration(SyncIntegration): uploadoptions = { - 'secret': UPLOAD_SERVICE_SECRET, 'user': self.rower.user.id, 'boattype': '1x', 'workouttype': w.workouttype, @@ -427,8 +427,8 @@ class IntervalsIntegration(SyncIntegration): 'intervalsid': id, 'id': w.id, } - url = UPLOAD_SERVICE_URL - response = requests.post(url, data=uploadoptions) + + response = upload_handler(uploadoptions, temp_filename) except FileNotFoundError: return 0 except Exception as e: @@ -443,6 +443,7 @@ class IntervalsIntegration(SyncIntegration): return 1 def get_workout(self, id, *args, **kwargs) -> int: + from rowers.dataflow import upload_handler try: _ = self.open() except NoTokenError: @@ -542,8 +543,17 @@ class IntervalsIntegration(SyncIntegration): except: return 0 + w = Workout( + user=r, + name=title, + workoutsource='intervals.icu', + workouttype=workouttype, + duration=duration, + distance=distance, + intervalsid=id, + ) + uploadoptions = { - 'secret': UPLOAD_SERVICE_SECRET, 'user': r.user.id, 'boattype': '1x', 'workouttype': workouttype, @@ -555,30 +565,25 @@ class IntervalsIntegration(SyncIntegration): 'offline': False, } - url = UPLOAD_SERVICE_URL - handle_request_post(url, uploadoptions) + response = upload_handler(uploadoptions, fit_filename) try: pair_id = data['paired_event_id'] pss = PlannedSession.objects.filter(intervals_icu_id=pair_id, rower=r) - ws = Workout.objects.filter(uploadedtointervals=id) - for w in ws: - w.sub_type = subtype - w.save() + + w.sub_type = subtype + w.save() if is_commute: - for w in ws: - w.is_commute = True - w.sub_type = "Commute" - w.save() + w.is_commute = True + w.sub_type = "Commute" + w.save() if is_race: - for w in ws: - w.is_race = True - w.save() + w.is_race = True + w.save() if pss.count() > 0: for ps in pss: - for w in ws: - w.plannedsession = ps - w.save() + w.plannedsession = ps + w.save() except KeyError: pass except PlannedSession.DoesNotExist: diff --git a/rowers/integrations/polar.py b/rowers/integrations/polar.py index 9bc40472..2ac1aba1 100644 --- a/rowers/integrations/polar.py +++ b/rowers/integrations/polar.py @@ -103,6 +103,8 @@ class PolarIntegration(SyncIntegration): return 1 def get_polar_workouts(self, user): + from rowers.dataflow import upload_handler + r = Rower.objects.get(user=user) exercise_list = [] @@ -191,17 +193,10 @@ class PolarIntegration(SyncIntegration): 'title': '', } - url = settings.UPLOAD_SERVICE_URL + response = upload_handler(uploadoptions, filename) + if response['status'] != 'processing': + return 0 - dologging('polar.log', uploadoptions) - dologging('polar.log', url) - - _ = myqueue( - queuehigh, - handle_request_post, - url, - uploadoptions - ) dologging('polar.log', response.status_code) if response.status_code != 200: # pragma: no cover diff --git a/rowers/integrations/rp3.py b/rowers/integrations/rp3.py index bdbae35e..764b422b 100644 --- a/rowers/integrations/rp3.py +++ b/rowers/integrations/rp3.py @@ -4,7 +4,6 @@ from rowers.models import User, Rower, Workout, TombStone from rowers.tasks import handle_rp3_async_workout from rowsandall_app.settings import ( RP3_CLIENT_ID, RP3_CLIENT_KEY, RP3_REDIRECT_URI, RP3_CLIENT_SECRET, - UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET ) from rowers.utils import myqueue, NoTokenError, dologging, uniqify diff --git a/rowers/nkimportutils.py b/rowers/nkimportutils.py index 1778fe38..cf8eafa9 100644 --- a/rowers/nkimportutils.py +++ b/rowers/nkimportutils.py @@ -5,7 +5,6 @@ from datetime import timedelta from uuid import uuid4 import traceback -from rowsandall_app.settings import UPLOAD_SERVICE_SECRET, UPLOAD_SERVICE_URL from rowsandall_app.settings import NK_API_LOCATION from rowers.utils import dologging @@ -106,7 +105,6 @@ def add_workout_from_data(userid, nkid, data, strokedata, source='nk', splitdata boattype = "1x" uploadoptions = { - 'secret': UPLOAD_SERVICE_SECRET, 'user': userid, 'file': csvfilename, 'title': title, @@ -128,26 +126,12 @@ def add_workout_from_data(userid, nkid, data, strokedata, source='nk', splitdata dologging('nklog.log',json.dumps(uploadoptions)) dologging('metrics.log','NK ID {nkid}'.format(nkid=nkid)) - session = requests.session() - newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'} - session.headers.update(newHeaders) + response = upload_handler(uploadoptions, csvfilename) + if response['status'] != 'processing': + return 0, response['message'] - response = session.post(UPLOAD_SERVICE_URL, json=uploadoptions) - - if response.status_code != 200: # pragma: no cover - return 0, response.text - try: - workoutid = response.json()['id'] - except KeyError: # pragma: no cover - workoutid = 0 - - # dologging('nklog.log','Workout ID {id}'.format(id=workoutid)) - - # evt update workout summary - - # return - return workoutid, "" + return 1, "" def get_nk_intervalstats(workoutdata, strokedata): diff --git a/rowers/rojabo_stuff.py b/rowers/rojabo_stuff.py index ffce6958..c91a021a 100644 --- a/rowers/rojabo_stuff.py +++ b/rowers/rojabo_stuff.py @@ -8,7 +8,6 @@ from datetime import timedelta from rowsandall_app.settings import ( ROJABO_CLIENT_ID, ROJABO_REDIRECT_URI, ROJABO_CLIENT_SECRET, SITE_URL, ROJABO_OAUTH_LOCATION, - UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET, ) import gzip import rowers.mytypes as mytypes diff --git a/rowers/tests/testdata/testdata.tcx.gz b/rowers/tests/testdata/testdata.tcx.gz index 1e672f9baf0efdf0a5d722ae1827a6988d9c83ba..bd458c41a800306c8332f71681815733e78008c8 100644 GIT binary patch delta 16 XcmbO#KUJPxzMF$%8`rOm?0x(IDy0QE delta 16 XcmbO#KUJPxzMF%?N#)x{_C9_9C; 0: + pss = PlannedSession.objects.filter(rower=rower,intervals_icu_id=paired_event_id) + if pss.count() > 0: + for ps in pss: + for w in ws: + w.plannedsession = ps + w.save() + except KeyError: + pass + except Workout.DoesNotExist: + pass + except PlannedSession.DoesNotExist: + pass + + return w.id + +def splitstdata(lijst): # pragma: no cover + t = [] + latlong = [] + while len(lijst) >= 2: + t.append(lijst[0]) + latlong.append(lijst[1]) + lijst = lijst[2:] + + return [np.array(t), np.array(latlong)] + + +@app.task +def handle_sporttracks_workout_from_data(user, importid, source, + workoutsource, debug=False, **kwargs): # pragma: no cover + + r = user.rower + authorizationstring = str('Bearer ' + r.sporttrackstoken) + headers = {'Authorization': authorizationstring, + 'user-agent': 'sanderroosendaal', + 'Content-Type': 'application/json'} + url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \ + str(importid) + s = requests.get(url, headers=headers) + + data = s.json() + + strokedata = pd.DataFrame.from_dict({ + key: pd.Series(value, dtype='object') for key, value in data.items() + }) + + try: + workouttype = data['type'] + except KeyError: # pragma: no cover + workouttype = 'other' + + if workouttype not in [x[0] for x in Workout.workouttypes]: + workouttype = 'other' + try: + comments = data['comments'] + except: + comments = '' + + r = Rower.objects.get(user=user) + rowdatetime = iso8601.parse_date(data['start_time']) + starttimeunix = arrow.get(rowdatetime).timestamp() + + try: + title = data['name'] + except: # pragma: no cover + title = "Imported data" + + try: + res = splitstdata(data['distance']) + distance = res[1] + times_distance = res[0] + except KeyError: # pragma: no cover + try: + res = splitstdata(data['heartrate']) + times_distance = res[0] + distance = 0*times_distance + except KeyError: + return (0, "No distance or heart rate data in the workout") + + try: + locs = data['location'] + + res = splitstdata(locs) + times_location = res[0] + latlong = res[1] + latcoord = [] + loncoord = [] + + for coord in latlong: + lat = coord[0] + lon = coord[1] + latcoord.append(lat) + loncoord.append(lon) + except: + times_location = times_distance + latcoord = np.zeros(len(times_distance)) + loncoord = np.zeros(len(times_distance)) + if workouttype in mytypes.otwtypes: # pragma: no cover + workouttype = 'rower' + + try: + res = splitstdata(data['cadence']) + times_spm = res[0] + spm = res[1] + except KeyError: # pragma: no cover + times_spm = times_distance + spm = 0*times_distance + + try: + res = splitstdata(data['heartrate']) + hr = res[1] + times_hr = res[0] + except KeyError: + times_hr = times_distance + hr = 0*times_distance + + # create data series and remove duplicates + distseries = pd.Series(distance, index=times_distance) + distseries = distseries.groupby(distseries.index).first() + latseries = pd.Series(latcoord, index=times_location) + latseries = latseries.groupby(latseries.index).first() + lonseries = pd.Series(loncoord, index=times_location) + lonseries = lonseries.groupby(lonseries.index).first() + spmseries = pd.Series(spm, index=times_spm) + spmseries = spmseries.groupby(spmseries.index).first() + hrseries = pd.Series(hr, index=times_hr) + hrseries = hrseries.groupby(hrseries.index).first() + + # Create dicts and big dataframe + d = { + ' Horizontal (meters)': distseries, + ' latitude': latseries, + ' longitude': lonseries, + ' Cadence (stokes/min)': spmseries, + ' HRCur (bpm)': hrseries, + } + + df = pd.DataFrame(d) + + df = df.groupby(level=0).last() + + cum_time = df.index.values + df[' ElapsedTime (sec)'] = cum_time + + velo = df[' Horizontal (meters)'].diff()/df[' ElapsedTime (sec)'].diff() + + df[' Power (watts)'] = 0.0*velo + + nr_rows = len(velo.values) + + df[' DriveLength (meters)'] = np.zeros(nr_rows) + df[' StrokeDistance (meters)'] = np.zeros(nr_rows) + df[' DriveTime (ms)'] = np.zeros(nr_rows) + df[' StrokeRecoveryTime (ms)'] = np.zeros(nr_rows) + df[' AverageDriveForce (lbs)'] = np.zeros(nr_rows) + df[' PeakDriveForce (lbs)'] = np.zeros(nr_rows) + df[' lapIdx'] = np.zeros(nr_rows) + + unixtime = cum_time+starttimeunix + unixtime[0] = starttimeunix + + df['TimeStamp (sec)'] = unixtime + + dt = np.diff(cum_time).mean() + wsize = round(5./dt) + + velo2 = ewmovingaverage(velo, wsize) + + df[' Stroke500mPace (sec/500m)'] = 500./velo2 + + df = df.fillna(0) + + df.sort_values(by='TimeStamp (sec)', ascending=True) + + + csvfilename = 'media/{code}_{importid}.csv'.format( + importid=importid, + code=uuid4().hex[:16] + ) + + res = df.to_csv(csvfilename+'.gz', index_label='index', + compression='gzip') + + w = Workout( + user=r, + duration=totaltime_sec_to_string(cum_time[-1]), + uploadedtosporttracks=importid, + ) + w.save() + + uploadoptions = { + 'user': user.id, + 'file': csvfilename+'.gz', + 'title': '', + 'workouttype': workouttype, + 'boattype': '1x', + 'sporttracksid': importid, + 'id': w.id, + 'title':title, + } + + response = upload_handler(uploadoptions, csvfilename+'.gz') + if response['status'] != 'processing': + return 0 + + return 1 + +@app.task +def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs): + graphql_url = "https://rp3rowing-app.com/graphql" + + timezone = kwargs.get('timezone', 'UTC') + + headers = {'Authorization': 'Bearer ' + rp3token} + + get_download_link = """{ + download(workout_id: """ + str(rp3id) + """, type:csv){ + id + status + link + } +}""" + + have_link = False + download_url = '' + counter = 0 + + waittime = 3 + while not have_link: + try: + response = requests.post( + url=graphql_url, + headers=headers, + json={'query': get_download_link} + ) + dologging('rp3_import.log',response.status_code) + + if response.status_code != 200: # pragma: no cover + have_link = True + + workout_download_details = pd.json_normalize( + response.json()['data']['download']) + dologging('rp3_import.log', response.json()) + except Exception as e: # pragma: no cover + return 0 + + if workout_download_details.iat[0, 1] == 'ready': + download_url = workout_download_details.iat[0, 2] + have_link = True + + dologging('rp3_import.log', download_url) + + counter += 1 + + dologging('rp3_import.log', counter) + + if counter > max_attempts: # pragma: no cover + have_link = True + + time.sleep(waittime) + + if download_url == '': # pragma: no cover + return 0 + + filename = 'media/RP3Import_'+str(rp3id)+'.csv' + + res = requests.get(download_url, headers=headers) + dologging('rp3_import.log','tasks.py '+str(rp3id)) + dologging('rp3_import.log',startdatetime) + + if not startdatetime: # pragma: no cover + startdatetime = str(timezone.now()) + + try: + startdatetime = str(startdatetime) + except: # pragma: no cover + pass + + if res.status_code != 200: # pragma: no cover + return 0 + + with open(filename, 'wb') as f: + # dologging('rp3_import.log',res.text) + dologging('rp3_import.log', 'Rp3 ID = {id}'.format(id=rp3id)) + f.write(res.content) + + w = Workout( + user=User.objects.get(id=userid).rower, + duration='00:00:01', + uploadedtosporttracks=int(rp3id) + ) + w.save() + + uploadoptions = { + 'user': userid, + 'file': filename, + 'workouttype': 'rower', + 'boattype': 'rp3', + 'rp3id': int(rp3id), + 'startdatetime': startdatetime, + 'timezone': timezone, + } + + response = upload_handler(uploadoptions, filename) + if response['status'] != 'processing': + return 0 + + return 1 + +@app.task +def handle_c2_getworkout(userid, c2token, c2id, defaulttimezone, debug=False, **kwargs): + authorizationstring = str('Bearer ' + c2token) + headers = {'Authorization': authorizationstring, + 'user-agent': 'sanderroosendaal', + 'Content-Type': 'application/json'} + url = "https://log.concept2.com/api/users/me/results/"+str(c2id) + s = requests.get(url, headers=headers) + + if s.status_code != 200: # pragma: no cover + return 0 + + data = s.json()['data'] + alldata = {c2id: data} + + return handle_c2_async_workout(alldata, userid, c2token, c2id, 0, defaulttimezone) + +# Concept2 logbook sends over split data for each interval +# We use it here to generate a custom summary +# Some users complained about small differences +def summaryfromsplitdata(splitdata, data, filename, sep='|', workouttype='rower'): + workouttype = workouttype.lower() + + totaldist = data['distance'] + totaltime = data['time']/10. + try: + spm = data['stroke_rate'] + except KeyError: + spm = 0 + try: + resttime = data['rest_time']/10. + except KeyError: # pragma: no cover + resttime = 0 + try: + restdistance = data['rest_distance'] + except KeyError: # pragma: no cover + restdistance = 0 + try: + avghr = data['heart_rate']['average'] + except KeyError: # pragma: no cover + avghr = 0 + try: + maxhr = data['heart_rate']['max'] + except KeyError: # pragma: no cover + maxhr = 0 + + try: + avgpace = 500.*totaltime/totaldist + except (ZeroDivisionError, OverflowError): # pragma: no cover + avgpace = 0. + + try: + restpace = 500.*resttime/restdistance + except (ZeroDivisionError, OverflowError): # pragma: no cover + restpace = 0. + + try: + velo = totaldist/totaltime + avgpower = 2.8*velo**(3.0) + except (ZeroDivisionError, OverflowError): # pragma: no cover + velo = 0 + avgpower = 0 + if workouttype in ['bike', 'bikeerg']: # pragma: no cover + velo = velo/2. + avgpower = 2.8*velo**(3.0) + velo = velo*2 + + try: + restvelo = restdistance/resttime + except (ZeroDivisionError, OverflowError): # pragma: no cover + restvelo = 0 + + restpower = 2.8*restvelo**(3.0) + if workouttype in ['bike', 'bikeerg']: # pragma: no cover + restvelo = restvelo/2. + restpower = 2.8*restvelo**(3.0) + restvelo = restvelo*2 + + try: + avgdps = totaldist/data['stroke_count'] + except (ZeroDivisionError, OverflowError, KeyError): + avgdps = 0 + + from rowingdata import summarystring, workstring, interval_string + + sums = summarystring(totaldist, totaltime, avgpace, spm, avghr, maxhr, + avgdps, avgpower, readFile=filename, + separator=sep) + + sums += workstring(totaldist, totaltime, avgpace, spm, avghr, maxhr, + avgdps, avgpower, separator=sep, symbol='W') + + sums += workstring(restdistance, resttime, restpace, 0, 0, 0, 0, restpower, + separator=sep, + symbol='R') + + sums += '\nWorkout Details\n' + sums += '#-{sep}SDist{sep}-Split-{sep}-SPace-{sep}-Pwr-{sep}SPM-{sep}AvgHR{sep}MaxHR{sep}DPS-\n'.format( + sep=sep + ) + + intervalnr = 0 + sa = [] + results = [] + + try: + timebased = data['workout_type'] in [ + 'FixedTimeSplits', 'FixedTimeInterval'] + except KeyError: # pragma: no cover + timebased = False + + for interval in splitdata: + try: + idist = interval['distance'] + except KeyError: # pragma: no cover + idist = 0 + + try: + itime = interval['time']/10. + except KeyError: # pragma: no cover + itime = 0 + try: + ipace = 500.*itime/idist + except (ZeroDivisionError, OverflowError): # pragma: no cover + ipace = 180. + + try: + ispm = interval['stroke_rate'] + except KeyError: # pragma: no cover + ispm = 0 + try: + irest_time = interval['rest_time']/10. + except KeyError: # pragma: no cover + irest_time = 0 + try: + iavghr = interval['heart_rate']['average'] + except KeyError: # pragma: no cover + iavghr = 0 + try: + imaxhr = interval['heart_rate']['average'] + except KeyError: # pragma: no cover + imaxhr = 0 + + # create interval values + iarr = [idist, 'meters', 'work'] + resarr = [itime] + if timebased: # pragma: no cover + iarr = [itime, 'seconds', 'work'] + resarr = [idist] + + if irest_time > 0: + iarr += [irest_time, 'seconds', 'rest'] + try: + resarr += [interval['rest_distance']] + except KeyError: + resarr += [np.nan] + + sa += iarr + results += resarr + + if itime != 0: + ivelo = idist/itime + ipower = 2.8*ivelo**(3.0) + if workouttype in ['bike', 'bikeerg']: # pragma: no cover + ipower = 2.8*(ivelo/2.)**(3.0) + else: # pragma: no cover + ivelo = 0 + ipower = 0 + + sums += interval_string(intervalnr, idist, itime, ipace, ispm, + iavghr, imaxhr, 0, ipower, separator=sep) + intervalnr += 1 + + return sums, sa, results + +@app.task +def handle_c2_async_workout(alldata, userid, c2token, c2id, delaysec, + defaulttimezone, debug=False, **kwargs): + time.sleep(delaysec) + dologging('c2_import.log',str(c2id)+' for userid '+str(userid)) + data = alldata[c2id] + splitdata = None + + distance = data['distance'] + try: # pragma: no cover + rest_distance = data['rest_distance'] + # rest_time = data['rest_time']/10. + except KeyError: + rest_distance = 0 + # rest_time = 0 + distance = distance+rest_distance + c2id = data['id'] + dologging('c2_import.log',data['type']) + if data['type'] in ['rower','dynamic','slides']: + workouttype = 'rower' + boattype = data['type'] + if data['type'] == 'rower': + boattype = 'static' + else: + workouttype = data['type'] + boattype = 'static' + # verified = data['verified'] + + # weightclass = data['weight_class'] + + try: + has_strokedata = data['stroke_data'] + except KeyError: # pragma: no cover + has_strokedata = True + + s = 'User {userid}, C2 ID {c2id}'.format(userid=userid, c2id=c2id) + dologging('c2_import.log', s) + dologging('c2_import.log', json.dumps(data)) + + try: + title = data['name'] + except KeyError: + title = "" + try: + t = data['comments'].split('\n', 1)[0] + title += t[:40] + except: # pragma: no cover + title = '' + + # Create CSV file name and save data to CSV file + csvfilename = 'media/{code}_{c2id}.csv.gz'.format( + code=uuid4().hex[:16], c2id=c2id) + + startdatetime, starttime, workoutdate, duration, starttimeunix, timezone = utils.get_startdatetime_from_c2data( + data + ) + + s = 'Time zone {timezone}, startdatetime {startdatetime}, duration {duration}'.format( + timezone=timezone, startdatetime=startdatetime, + duration=duration) + dologging('c2_import.log', s) + + authorizationstring = str('Bearer ' + c2token) + headers = {'Authorization': authorizationstring, + 'user-agent': 'sanderroosendaal', + 'Content-Type': 'application/json'} + url = "https://log.concept2.com/api/users/me/results/"+str(c2id)+"/strokes" + try: + s = requests.get(url, headers=headers) + except ConnectionError: # pragma: no cover + return 0 + + if s.status_code != 200: # pragma: no cover + dologging('c2_import.log', 'No Stroke Data. Status Code {code}'.format( + code=s.status_code)) + dologging('c2_import.log', s.text) + has_strokedata = False + + if not has_strokedata: # pragma: no cover + df = df_from_summary(data) + else: + # dologging('debuglog.log',json.dumps(s.json())) + try: + strokedata = pd.DataFrame.from_dict(s.json()['data']) + except AttributeError: # pragma: no cover + dologging('c2_import.log', 'No stroke data in stroke data') + return 0 + + try: + res = make_cumvalues(0.1*strokedata['t']) + cum_time = res[0] + lapidx = res[1] + except KeyError: # pragma: no cover + dologging('c2_import.log', 'No time values in stroke data') + return 0 + + unixtime = cum_time+starttimeunix + # unixtime[0] = starttimeunix + seconds = 0.1*strokedata.loc[:, 't'] + + nr_rows = len(unixtime) + + try: # pragma: no cover + latcoord = strokedata.loc[:, 'lat'] + loncoord = strokedata.loc[:, 'lon'] + except: + latcoord = np.zeros(nr_rows) + loncoord = np.zeros(nr_rows) + + try: + strokelength = strokedata.loc[:,'strokelength'] + except: # pragma: no cover + strokelength = np.zeros(nr_rows) + + dist2 = 0.1*strokedata.loc[:, 'd'] + cumdist, intervals = make_cumvalues(dist2) + + try: + spm = strokedata.loc[:, 'spm'] + except KeyError: # pragma: no cover + spm = 0*dist2 + + try: + hr = strokedata.loc[:, 'hr'] + except KeyError: # pragma: no cover + hr = 0*spm + + pace = strokedata.loc[:, 'p']/10. + pace = np.clip(pace, 0, 1e4) + pace = pace.replace(0, 300) + + velo = 500./pace + power = 2.8*velo**3 + if workouttype == 'bike': # pragma: no cover + velo = 1000./pace + + dologging('c2_import.log', 'Unix Time Stamp {s}'.format(s=unixtime[0])) + # dologging('debuglog.log',json.dumps(s.json())) + + df = pd.DataFrame({'TimeStamp (sec)': unixtime, + ' Horizontal (meters)': dist2, + ' Cadence (stokes/min)': spm, + ' HRCur (bpm)': hr, + ' longitude': loncoord, + ' latitude': latcoord, + ' Stroke500mPace (sec/500m)': pace, + ' Power (watts)': power, + ' DragFactor': np.zeros(nr_rows), + ' DriveLength (meters)': np.zeros(nr_rows), + ' StrokeDistance (meters)': strokelength, + ' DriveTime (ms)': np.zeros(nr_rows), + ' StrokeRecoveryTime (ms)': np.zeros(nr_rows), + ' AverageDriveForce (lbs)': np.zeros(nr_rows), + ' PeakDriveForce (lbs)': np.zeros(nr_rows), + ' lapIdx': lapidx, + ' WorkoutState': 4, + ' ElapsedTime (sec)': seconds, + 'cum_dist': cumdist + }) + + df.sort_values(by='TimeStamp (sec)', ascending=True) + + _ = df.to_csv(csvfilename, index_label='index', compression='gzip') + + + w = Workout( + user=User.objects.get(id=userid).rower, + duration=duration, + distance=distance, + uploadedtoc2=c2id, + ) + w.save() + + uploadoptions = { + 'user': userid, + 'file': csvfilename, + 'title': title, + 'workouttype': workouttype, + 'boattype': boattype, + 'c2id': c2id, + 'startdatetime': startdatetime.isoformat(), + 'timezone': str(timezone) + } + + response = upload_handler(uploadoptions, csvfilename) + if response['status'] != 'processing': + return 0 + + dologging('c2_import.log','workout id {id}'.format(id=w.id)) + + record = create_or_update_syncrecord(w.user, w, c2id=c2id) + + + + # summary + if 'workout' in data: + if 'splits' in data['workout']: # pragma: no cover + splitdata = data['workout']['splits'] + elif 'intervals' in data['workout']: # pragma: no cover + splitdata = data['workout']['intervals'] + else: # pragma: no cover + splitdata = False + else: + splitdata = False + + if splitdata: # pragma: no cover + summary, sa, results = summaryfromsplitdata( + splitdata, data, csvfilename, workouttype=workouttype) + + w.summary = summary + w.save() + + from rowingdata.trainingparser import getlist + if sa: + values = getlist(sa) + units = getlist(sa, sel='unit') + types = getlist(sa, sel='type') + + rowdata = rdata(csvfile=csvfilename) + if rowdata: + rowdata.updateintervaldata(values, units, types, results) + + rowdata.write_csv(csvfilename, gzip=True) + update_strokedata(w.id, rowdata.df) + + return 1 + + +@app.task +def handle_split_workout_by_intervals(id, debug=False, **kwargs): + row = Workout.objects.get(id=id) + r = row.user + rowdata = rdata(csvfile=row.csvfilename) + if rowdata == 0: + messages.error(request,"No Data file found for this workout") + return HttpResponseRedirect(url) + + try: + new_rowdata = rowdata.split_by_intervals() + except KeyError: + new_rowdata = rowdata + return 0 + + interval_i = 1 + for data in new_rowdata: + filename = 'media/{code}.csv'.format( + code = uuid4().hex[:16] + ) + + data.write_csv(filename) + + uploadoptions = { + 'user': r.user.id, + 'title': '{title} - interval {i}'.format(title=row.name, i=interval_i), + 'file': filename, + 'boattype': row.boattype, + 'workouttype': row.workouttype, + } + + response = upload_handler(uploadoptions, filename) + + interval_i = interval_i + 1 + + return 1 + +@app.task +def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid, debug=False, **kwargs): + authorizationstring = str('Bearer '+stravatoken) + headers = {'Authorization': authorizationstring, + 'user-agent': 'sanderroosendaal', + 'Content-Type': 'application/json', + 'resolution': 'medium', } + url = "https://www.strava.com/api/v3/activities/"+str(stravaid) + response = requests.get(url, headers=headers) + if response.status_code != 200: # pragma: no cover + dologging('stravalog.log', 'handle_get_strava_file response code {code}\n'.format( + code=response.status_code)) + try: + dologging('stravalog.log','Response json {json}\n'.format(json=response.json())) + except: + pass + + return 0 + + try: + workoutsummary = requests.get(url, headers=headers).json() + except: # pragma: no cover + return 0 + + spm = get_strava_stream(None, 'cadence', stravaid, + authorizationstring=authorizationstring) + hr = get_strava_stream(None, 'heartrate', stravaid, + authorizationstring=authorizationstring) + t = get_strava_stream(None, 'time', stravaid, + authorizationstring=authorizationstring) + velo = get_strava_stream(None, 'velocity_smooth', + stravaid, authorizationstring=authorizationstring) + d = get_strava_stream(None, 'distance', stravaid, + authorizationstring=authorizationstring) + coords = get_strava_stream( + None, 'latlng', stravaid, authorizationstring=authorizationstring) + power = get_strava_stream(None, 'watts', stravaid, + authorizationstring=authorizationstring) + + + if t is not None: + nr_rows = len(t) + else: # pragma: no cover + try: + duration = int(workoutsummary['elapsed_time']) + except KeyError: + duration = 0 + t = pd.Series(range(duration+1)) + + nr_rows = len(t) + + if nr_rows == 0: # pragma: no cover + return 0 + + if d is None: # pragma: no cover + d = 0*t + + if spm is None: # pragma: no cover + spm = np.zeros(nr_rows) + + if power is None: # pragma: no cover + power = np.zeros(nr_rows) + + if hr is None: # pragma: no cover + hr = np.zeros(nr_rows) + + if velo is None: # pragma: no cover + velo = np.zeros(nr_rows) + + try: + dt = np.diff(t).mean() + wsize = round(5./dt) + + velo2 = ewmovingaverage(velo, wsize) + except ValueError: # pragma: no cover + velo2 = velo + + if coords is not None: + try: + lat = coords[:, 0] + lon = coords[:, 1] + except IndexError: # pragma: no cover + lat = np.zeros(len(t)) + lon = np.zeros(len(t)) + else: # pragma: no cover + lat = np.zeros(len(t)) + lon = np.zeros(len(t)) + + try: + strokelength = velo*60./(spm) + strokelength[np.isinf(strokelength)] = 0.0 + except ValueError: + strokelength = np.zeros(len(t)) + + pace = 500./(1.0*velo2) + pace[np.isinf(pace)] = 0.0 + + try: + strokedata = pl.DataFrame({'t': 10*t, + 'd': 10*d, + 'p': 10*pace, + 'spm': spm, + 'hr': hr, + 'lat': lat, + 'lon': lon, + 'power': power, + 'strokelength': strokelength, + }) + except ValueError: # pragma: no cover + return 0 + except ShapeError: + return 0 + + try: + workouttype = mytypes.stravamappinginv[workoutsummary['type']] + except KeyError: # pragma: no cover + workouttype = 'other' + + if workouttype.lower() == 'rowing': # pragma: no cover + workouttype = 'rower' + + try: + if 'summary_polyline' in workoutsummary['map'] and workouttype == 'rower': # pragma: no cover + workouttype = 'water' + except (KeyError,TypeError): # pragma: no cover + pass + + try: + rowdatetime = iso8601.parse_date(workoutsummary['date_utc']) + except KeyError: + try: + rowdatetime = iso8601.parse_date(workoutsummary['start_date']) + except KeyError: + rowdatetime = iso8601.parse_date(workoutsummary['date']) + except ParseError: # pragma: no cover + rowdatetime = iso8601.parse_date(workoutsummary['date']) + + try: + title = workoutsummary['name'] + except KeyError: # pragma: no cover + title = "" + try: + t = workoutsummary['comments'].split('\n', 1)[0] + title += t[:20] + except: + title = '' + + starttimeunix = arrow.get(rowdatetime).timestamp() + + res = make_cumvalues_array(0.1*strokedata['t'].to_numpy()) + cum_time = pl.Series(res[0]) + lapidx = pl.Series(res[1]) + + unixtime = cum_time+starttimeunix + seconds = 0.1*strokedata['t'] + + nr_rows = len(unixtime) + + try: + latcoord = strokedata['lat'] + loncoord = strokedata['lon'] + if latcoord.std() == 0 and loncoord.std() == 0 and workouttype == 'water': # pragma: no cover + workouttype = 'rower' + except: # pragma: no cover + latcoord = np.zeros(nr_rows) + loncoord = np.zeros(nr_rows) + if workouttype == 'water': + workouttype = 'rower' + + try: + strokelength = strokedata['strokelength'] + except: # pragma: no cover + strokelength = np.zeros(nr_rows) + + dist2 = 0.1*strokedata['d'] + + try: + spm = strokedata['spm'] + except (KeyError, ColumnNotFoundError): # pragma: no cover + spm = 0*dist2 + + try: + hr = strokedata['hr'] + except (KeyError, ColumnNotFoundError): # pragma: no cover + hr = 0*spm + pace = strokedata['p']/10. + pace = np.clip(pace, 0, 1e4) + pace = pl.Series(pace).replace(0, 300) + + velo = 500./pace + + try: + power = strokedata['power'] + except KeyError: # pragma: no cover + power = 2.8*velo**3 + + # if power.std() == 0 and power.mean() == 0: + # power = 2.8*velo**3 + + # save csv + # Create data frame with all necessary data to write to csv + df = pl.DataFrame({'TimeStamp (sec)': unixtime, + ' Horizontal (meters)': dist2, + ' Cadence (stokes/min)': spm, + ' HRCur (bpm)': hr, + ' longitude': loncoord, + ' latitude': latcoord, + ' Stroke500mPace (sec/500m)': pace, + ' Power (watts)': power, + ' DragFactor': np.zeros(nr_rows), + ' DriveLength (meters)': np.zeros(nr_rows), + ' StrokeDistance (meters)': strokelength, + ' DriveTime (ms)': np.zeros(nr_rows), + ' StrokeRecoveryTime (ms)': np.zeros(nr_rows), + ' AverageDriveForce (lbs)': np.zeros(nr_rows), + ' PeakDriveForce (lbs)': np.zeros(nr_rows), + ' lapIdx': lapidx, + ' ElapsedTime (sec)': seconds, + 'cum_dist': dist2, + }) + + df.sort('TimeStamp (sec)') + + row = rowingdata.rowingdata_pl(df=df) + try: + row.write_csv(csvfilename, compressed=False) + except ComputeError: + dologging('stravalog.log','polars not working') + row = rowingdata.rowingdata(df=df.to_pandas()) + row.write_csv(csvfilename) + + # summary = row.allstats() + # maxdist = df['cum_dist'].max() + duration = row.duration + + uploadoptions = { + 'user': userid, + 'file': csvfilename, + 'title': title, + 'workouttype': workouttype, + 'boattype': '1x', + 'stravaid': stravaid, + } + + response = upload_handler(uploadoptions, csvfilename) + if response['status'] != 'processing': + return 0 + + + dologging('strava_webhooks.log','fetch_strava_workout posted file with strava id {stravaid} user id {userid}\n'.format( + stravaid=stravaid, userid=userid)) + + return 1