From 385bd52a94f68ebada7a89957757b0014604166b Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Wed, 22 Oct 2025 18:03:13 +0200 Subject: [PATCH] moving to upload_tasks, stopped just before c2 --- rowers/integrations/rp3.py | 2 +- rowers/integrations/sporttracks.py | 3 +- rowers/tasks.py | 282 --------------------------- rowers/upload_tasks.py | 300 +++++++++++++++++++++++++++++ rowers/views/statements.py | 3 +- 5 files changed, 305 insertions(+), 285 deletions(-) diff --git a/rowers/integrations/rp3.py b/rowers/integrations/rp3.py index bdbae35e..e6cd4789 100644 --- a/rowers/integrations/rp3.py +++ b/rowers/integrations/rp3.py @@ -1,7 +1,7 @@ from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids from rowers.models import User, Rower, Workout, TombStone -from rowers.tasks import handle_rp3_async_workout +from rowers.upload_tasks import handle_rp3_async_workout from rowsandall_app.settings import ( RP3_CLIENT_ID, RP3_CLIENT_KEY, RP3_REDIRECT_URI, RP3_CLIENT_SECRET, UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET diff --git a/rowers/integrations/sporttracks.py b/rowers/integrations/sporttracks.py index d004b4f4..c7d9e943 100644 --- a/rowers/integrations/sporttracks.py +++ b/rowers/integrations/sporttracks.py @@ -3,7 +3,8 @@ from rowers.models import User, Rower, Workout, TombStone from rowingdata import rowingdata -from rowers.tasks import handle_sporttracks_sync, handle_sporttracks_workout_from_data +from rowers.tasks import handle_sporttracks_sync +from rowers.upload_tasks import handle_sporttracks_workout_from_data from rowers.rower_rules import is_workout_user import rowers.mytypes as mytypes from rowsandall_app.settings import ( diff --git a/rowers/tasks.py b/rowers/tasks.py index c10325ee..5cf1dc1c 100644 --- a/rowers/tasks.py +++ b/rowers/tasks.py @@ -750,188 +750,7 @@ def handle_c2_sync(workoutid, url, headers, data, debug=False, **kwargs): return 1 -def splitstdata(lijst): # pragma: no cover - t = [] - latlong = [] - while len(lijst) >= 2: - t.append(lijst[0]) - latlong.append(lijst[1]) - lijst = lijst[2:] - return [np.array(t), np.array(latlong)] - -@app.task -def handle_sporttracks_workout_from_data(user, importid, source, - workoutsource, debug=False, **kwargs): # pragma: no cover - - r = user.rower - authorizationstring = str('Bearer ' + r.sporttrackstoken) - headers = {'Authorization': authorizationstring, - 'user-agent': 'sanderroosendaal', - 'Content-Type': 'application/json'} - url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \ - str(importid) - s = requests.get(url, headers=headers) - - data = s.json() - - strokedata = pd.DataFrame.from_dict({ - key: pd.Series(value, dtype='object') for key, value in data.items() - }) - - try: - workouttype = data['type'] - except KeyError: # pragma: no cover - workouttype = 'other' - - if workouttype not in [x[0] for x in Workout.workouttypes]: - workouttype = 'other' - try: - comments = data['comments'] - except: - comments = '' - - r = Rower.objects.get(user=user) - rowdatetime = iso8601.parse_date(data['start_time']) - starttimeunix = arrow.get(rowdatetime).timestamp() - - try: - title = data['name'] - except: # pragma: no cover - title = "Imported data" - - try: - res = splitstdata(data['distance']) - distance = res[1] - times_distance = res[0] - except KeyError: # pragma: no cover - try: - res = splitstdata(data['heartrate']) - times_distance = res[0] - distance = 0*times_distance - except KeyError: - return (0, "No distance or heart rate data in the workout") - - try: - locs = data['location'] - - res = splitstdata(locs) - times_location = res[0] - latlong = res[1] - latcoord = [] - loncoord = [] - - for coord in latlong: - lat = coord[0] - lon = coord[1] - latcoord.append(lat) - loncoord.append(lon) - except: - times_location = times_distance - latcoord = np.zeros(len(times_distance)) - loncoord = np.zeros(len(times_distance)) - if workouttype in mytypes.otwtypes: # pragma: no cover - workouttype = 'rower' - - try: - res = splitstdata(data['cadence']) - times_spm = res[0] - spm = res[1] - except KeyError: # pragma: no cover - times_spm = times_distance - spm = 0*times_distance - - try: - res = splitstdata(data['heartrate']) - hr = res[1] - times_hr = res[0] - except KeyError: - times_hr = times_distance - hr = 0*times_distance - - # create data series and remove duplicates - distseries = pd.Series(distance, index=times_distance) - distseries = distseries.groupby(distseries.index).first() - latseries = pd.Series(latcoord, index=times_location) - latseries = latseries.groupby(latseries.index).first() - lonseries = pd.Series(loncoord, index=times_location) - lonseries = lonseries.groupby(lonseries.index).first() - spmseries = pd.Series(spm, index=times_spm) - spmseries = spmseries.groupby(spmseries.index).first() - hrseries = pd.Series(hr, index=times_hr) - hrseries = hrseries.groupby(hrseries.index).first() - - # Create dicts and big dataframe - d = { - ' Horizontal (meters)': distseries, - ' latitude': latseries, - ' longitude': lonseries, - ' Cadence (stokes/min)': spmseries, - ' HRCur (bpm)': hrseries, - } - - df = pd.DataFrame(d) - - df = df.groupby(level=0).last() - - cum_time = df.index.values - df[' ElapsedTime (sec)'] = cum_time - - velo = df[' Horizontal (meters)'].diff()/df[' ElapsedTime (sec)'].diff() - - df[' Power (watts)'] = 0.0*velo - - nr_rows = len(velo.values) - - df[' DriveLength (meters)'] = np.zeros(nr_rows) - df[' StrokeDistance (meters)'] = np.zeros(nr_rows) - df[' DriveTime (ms)'] = np.zeros(nr_rows) - df[' StrokeRecoveryTime (ms)'] = np.zeros(nr_rows) - df[' AverageDriveForce (lbs)'] = np.zeros(nr_rows) - df[' PeakDriveForce (lbs)'] = np.zeros(nr_rows) - df[' lapIdx'] = np.zeros(nr_rows) - - unixtime = cum_time+starttimeunix - unixtime[0] = starttimeunix - - df['TimeStamp (sec)'] = unixtime - - dt = np.diff(cum_time).mean() - wsize = round(5./dt) - - velo2 = ewmovingaverage(velo, wsize) - - df[' Stroke500mPace (sec/500m)'] = 500./velo2 - - df = df.fillna(0) - - df.sort_values(by='TimeStamp (sec)', ascending=True) - - - csvfilename = 'media/{code}_{importid}.csv'.format( - importid=importid, - code=uuid4().hex[:16] - ) - - res = df.to_csv(csvfilename+'.gz', index_label='index', - compression='gzip') - - uploadoptions = { - 'secret': UPLOAD_SERVICE_SECRET, - 'user': user.id, - 'file': csvfilename+'.gz', - 'title': '', - 'workouttype': workouttype, - 'boattype': '1x', - 'sporttracksid': importid, - 'title':title, - } - session = requests.session() - newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'} - session.headers.update(newHeaders) - _ = session.post(UPLOAD_SERVICE_URL, json=uploadoptions) - - return 1 @app.task @@ -3468,107 +3287,6 @@ def handle_update_wps(rid, types, ids, mode, debug=False, **kwargs): return wps_median -@app.task -def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs): - - timezone = kwargs.get('timezone', 'UTC') - - headers = {'Authorization': 'Bearer ' + rp3token} - - get_download_link = """{ - download(workout_id: """ + str(rp3id) + """, type:csv){ - id - status - link - } -}""" - - have_link = False - download_url = '' - counter = 0 - - waittime = 3 - while not have_link: - try: - response = requests.post( - url=graphql_url, - headers=headers, - json={'query': get_download_link} - ) - dologging('rp3_import.log',response.status_code) - - if response.status_code != 200: # pragma: no cover - have_link = True - - workout_download_details = pd.json_normalize( - response.json()['data']['download']) - dologging('rp3_import.log', response.json()) - except: # pragma: no cover - return 0 - - if workout_download_details.iat[0, 1] == 'ready': - download_url = workout_download_details.iat[0, 2] - have_link = True - - dologging('rp3_import.log', download_url) - - counter += 1 - - dologging('rp3_import.log', counter) - - if counter > max_attempts: # pragma: no cover - have_link = True - - time.sleep(waittime) - - if download_url == '': # pragma: no cover - return 0 - - filename = 'media/RP3Import_'+str(rp3id)+'.csv' - - res = requests.get(download_url, headers=headers) - dologging('rp3_import.log','tasks.py '+str(rp3id)) - dologging('rp3_import.log',startdatetime) - - if not startdatetime: # pragma: no cover - startdatetime = str(timezone.now()) - - try: - startdatetime = str(startdatetime) - except: # pragma: no cover - pass - - if res.status_code != 200: # pragma: no cover - return 0 - - with open(filename, 'wb') as f: - # dologging('rp3_import.log',res.text) - dologging('rp3_import.log', 'Rp3 ID = {id}'.format(id=rp3id)) - f.write(res.content) - - uploadoptions = { - 'secret': UPLOAD_SERVICE_SECRET, - 'user': userid, - 'file': filename, - 'workouttype': 'rower', - 'boattype': 'rp3', - 'rp3id': int(rp3id), - 'startdatetime': startdatetime, - 'timezone': timezone, - } - - session = requests.session() - newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'} - session.headers.update(newHeaders) - - response = session.post(UPLOAD_SERVICE_URL, json=uploadoptions) - - if response.status_code != 200: # pragma: no cover - return 0 - - workoutid = response.json()['id'] - - return workoutid @app.task diff --git a/rowers/upload_tasks.py b/rowers/upload_tasks.py index d565b476..f9e4ecb3 100644 --- a/rowers/upload_tasks.py +++ b/rowers/upload_tasks.py @@ -1,10 +1,15 @@ import os +import time from uuid import uuid4 import shutil import requests from rowingdata import FITParser as FP from rowingdata.otherparsers import FitSummaryData import rowingdata +import pandas as pd +import iso8601 +import arrow +import numpy as np os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" from YamJam import yamjam @@ -32,10 +37,13 @@ from rowers.mytypes import intervalsmappinginv from rowers.dataroutines import ( totaltime_sec_to_string, ) +from rowers.utils import ewmovingaverage, dologging from rowers.celery import app from celery import shared_task +from django.utils import timezone + SITE_URL = CFG['site_url'] SITE_URL_DEV = CFG['site_url'] PROGRESS_CACHE_SECRET = CFG['progress_cache_secret'] @@ -229,3 +237,295 @@ def handle_intervals_getworkout(rower, intervalstoken, workoutid, debug=False, * return w.id +def splitstdata(lijst): # pragma: no cover + t = [] + latlong = [] + while len(lijst) >= 2: + t.append(lijst[0]) + latlong.append(lijst[1]) + lijst = lijst[2:] + + return [np.array(t), np.array(latlong)] + + +@app.task +def handle_sporttracks_workout_from_data(user, importid, source, + workoutsource, debug=False, **kwargs): # pragma: no cover + + r = user.rower + authorizationstring = str('Bearer ' + r.sporttrackstoken) + headers = {'Authorization': authorizationstring, + 'user-agent': 'sanderroosendaal', + 'Content-Type': 'application/json'} + url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \ + str(importid) + s = requests.get(url, headers=headers) + + data = s.json() + + strokedata = pd.DataFrame.from_dict({ + key: pd.Series(value, dtype='object') for key, value in data.items() + }) + + try: + workouttype = data['type'] + except KeyError: # pragma: no cover + workouttype = 'other' + + if workouttype not in [x[0] for x in Workout.workouttypes]: + workouttype = 'other' + try: + comments = data['comments'] + except: + comments = '' + + r = Rower.objects.get(user=user) + rowdatetime = iso8601.parse_date(data['start_time']) + starttimeunix = arrow.get(rowdatetime).timestamp() + + try: + title = data['name'] + except: # pragma: no cover + title = "Imported data" + + try: + res = splitstdata(data['distance']) + distance = res[1] + times_distance = res[0] + except KeyError: # pragma: no cover + try: + res = splitstdata(data['heartrate']) + times_distance = res[0] + distance = 0*times_distance + except KeyError: + return (0, "No distance or heart rate data in the workout") + + try: + locs = data['location'] + + res = splitstdata(locs) + times_location = res[0] + latlong = res[1] + latcoord = [] + loncoord = [] + + for coord in latlong: + lat = coord[0] + lon = coord[1] + latcoord.append(lat) + loncoord.append(lon) + except: + times_location = times_distance + latcoord = np.zeros(len(times_distance)) + loncoord = np.zeros(len(times_distance)) + if workouttype in mytypes.otwtypes: # pragma: no cover + workouttype = 'rower' + + try: + res = splitstdata(data['cadence']) + times_spm = res[0] + spm = res[1] + except KeyError: # pragma: no cover + times_spm = times_distance + spm = 0*times_distance + + try: + res = splitstdata(data['heartrate']) + hr = res[1] + times_hr = res[0] + except KeyError: + times_hr = times_distance + hr = 0*times_distance + + # create data series and remove duplicates + distseries = pd.Series(distance, index=times_distance) + distseries = distseries.groupby(distseries.index).first() + latseries = pd.Series(latcoord, index=times_location) + latseries = latseries.groupby(latseries.index).first() + lonseries = pd.Series(loncoord, index=times_location) + lonseries = lonseries.groupby(lonseries.index).first() + spmseries = pd.Series(spm, index=times_spm) + spmseries = spmseries.groupby(spmseries.index).first() + hrseries = pd.Series(hr, index=times_hr) + hrseries = hrseries.groupby(hrseries.index).first() + + # Create dicts and big dataframe + d = { + ' Horizontal (meters)': distseries, + ' latitude': latseries, + ' longitude': lonseries, + ' Cadence (stokes/min)': spmseries, + ' HRCur (bpm)': hrseries, + } + + df = pd.DataFrame(d) + + df = df.groupby(level=0).last() + + cum_time = df.index.values + df[' ElapsedTime (sec)'] = cum_time + + velo = df[' Horizontal (meters)'].diff()/df[' ElapsedTime (sec)'].diff() + + df[' Power (watts)'] = 0.0*velo + + nr_rows = len(velo.values) + + df[' DriveLength (meters)'] = np.zeros(nr_rows) + df[' StrokeDistance (meters)'] = np.zeros(nr_rows) + df[' DriveTime (ms)'] = np.zeros(nr_rows) + df[' StrokeRecoveryTime (ms)'] = np.zeros(nr_rows) + df[' AverageDriveForce (lbs)'] = np.zeros(nr_rows) + df[' PeakDriveForce (lbs)'] = np.zeros(nr_rows) + df[' lapIdx'] = np.zeros(nr_rows) + + unixtime = cum_time+starttimeunix + unixtime[0] = starttimeunix + + df['TimeStamp (sec)'] = unixtime + + dt = np.diff(cum_time).mean() + wsize = round(5./dt) + + velo2 = ewmovingaverage(velo, wsize) + + df[' Stroke500mPace (sec/500m)'] = 500./velo2 + + df = df.fillna(0) + + df.sort_values(by='TimeStamp (sec)', ascending=True) + + + csvfilename = 'media/{code}_{importid}.csv'.format( + importid=importid, + code=uuid4().hex[:16] + ) + + res = df.to_csv(csvfilename+'.gz', index_label='index', + compression='gzip') + + w = Workout( + user=r, + duration=totaltime_sec_to_string(cum_time[-1]), + uploadedtosporttracks=importid, + ) + w.save() + + uploadoptions = { + 'user': user.id, + 'file': csvfilename+'.gz', + 'title': '', + 'workouttype': workouttype, + 'boattype': '1x', + 'sporttracksid': importid, + 'id': w.id, + 'title':title, + } + + response = upload_handler(uploadoptions, csvfilename+'.gz') + if response['status'] != 'processing': + return 0 + + return 1 + +@app.task +def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs): + + timezone = kwargs.get('timezone', 'UTC') + + headers = {'Authorization': 'Bearer ' + rp3token} + + get_download_link = """{ + download(workout_id: """ + str(rp3id) + """, type:csv){ + id + status + link + } +}""" + + have_link = False + download_url = '' + counter = 0 + + waittime = 3 + while not have_link: + try: + response = requests.post( + url=graphql_url, + headers=headers, + json={'query': get_download_link} + ) + dologging('rp3_import.log',response.status_code) + + if response.status_code != 200: # pragma: no cover + have_link = True + + workout_download_details = pd.json_normalize( + response.json()['data']['download']) + dologging('rp3_import.log', response.json()) + except: # pragma: no cover + return 0 + + if workout_download_details.iat[0, 1] == 'ready': + download_url = workout_download_details.iat[0, 2] + have_link = True + + dologging('rp3_import.log', download_url) + + counter += 1 + + dologging('rp3_import.log', counter) + + if counter > max_attempts: # pragma: no cover + have_link = True + + time.sleep(waittime) + + if download_url == '': # pragma: no cover + return 0 + + filename = 'media/RP3Import_'+str(rp3id)+'.csv' + + res = requests.get(download_url, headers=headers) + dologging('rp3_import.log','tasks.py '+str(rp3id)) + dologging('rp3_import.log',startdatetime) + + if not startdatetime: # pragma: no cover + startdatetime = str(timezone.now()) + + try: + startdatetime = str(startdatetime) + except: # pragma: no cover + pass + + if res.status_code != 200: # pragma: no cover + return 0 + + with open(filename, 'wb') as f: + # dologging('rp3_import.log',res.text) + dologging('rp3_import.log', 'Rp3 ID = {id}'.format(id=rp3id)) + f.write(res.content) + + w = Workout( + user=User.objects.get(id=userid), + duration=totaltime_sec_to_string(cum_time[-1]), + uploadedtosporttracks=importid, + ) + w.save() + + uploadoptions = { + 'secret': UPLOAD_SERVICE_SECRET, + 'user': userid, + 'file': filename, + 'workouttype': 'rower', + 'boattype': 'rp3', + 'rp3id': int(rp3id), + 'startdatetime': startdatetime, + 'timezone': timezone, + } + + response = upload_handler(uploadoptions, filename) + if response['status'] != 'processing': + return 0 + + return 1 diff --git a/rowers/views/statements.py b/rowers/views/statements.py index 74a67371..bdcd567c 100644 --- a/rowers/views/statements.py +++ b/rowers/views/statements.py @@ -261,7 +261,6 @@ from rowers.tasks import ( handle_sendemail_unrecognized, handle_sendemailnewcomment, handle_request_post, handle_sendemailsummary, - handle_rp3_async_workout, handle_send_template_email, handle_send_disqualification_email, handle_send_withdraw_email, @@ -287,6 +286,8 @@ from rowers.tasks import ( from rowers.upload_tasks import ( handle_assignworkouts, handle_post_workout_api, + handle_rp3_async_workout, + handle_sporttracks_workout_from_data ) from scipy.signal import savgol_filter