Private
Public Access
1
0

all tasks.py done

This commit is contained in:
2025-10-22 20:21:13 +02:00
parent 385bd52a94
commit ceced92022
13 changed files with 738 additions and 750 deletions

View File

@@ -10,6 +10,11 @@ import pandas as pd
import iso8601
import arrow
import numpy as np
import json
from polars.exceptions import (
ColumnNotFoundError, ComputeError, ShapeError
)
import polars as pl
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
from YamJam import yamjam
@@ -36,13 +41,21 @@ from rowers.nkimportutils import (
from rowers.mytypes import intervalsmappinginv
from rowers.dataroutines import (
totaltime_sec_to_string,
update_strokedata,
)
from rowers.utils import ewmovingaverage, dologging
from rowers.models import User
import rowers.utils as utils
from rowers.models import create_or_update_syncrecord
from rowers.utils import get_strava_stream
import rowers.mytypes as mytypes
from rowers.celery import app
from celery import shared_task
from django.utils import timezone
from rowingdata import make_cumvalues, make_cumvalues_array
from rowingdata import rowingdata as rdata
SITE_URL = CFG['site_url']
SITE_URL_DEV = CFG['site_url']
@@ -65,7 +78,6 @@ from rowers.dataflow import upload_handler
def handle_assignworkouts(workouts, rowers, remove_workout, debug=False, **kwargs):
for workout in workouts:
uploadoptions = {
'secret': UPLOAD_SERVICE_SECRET,
'title': workout.name,
'boattype': workout.boattype,
'workouttype': workout.workouttype,
@@ -430,6 +442,7 @@ def handle_sporttracks_workout_from_data(user, importid, source,
@app.task
def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs):
graphql_url = "https://rp3rowing-app.com/graphql"
timezone = kwargs.get('timezone', 'UTC')
@@ -463,7 +476,7 @@ def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempt
workout_download_details = pd.json_normalize(
response.json()['data']['download'])
dologging('rp3_import.log', response.json())
except: # pragma: no cover
except Exception as e: # pragma: no cover
return 0
if workout_download_details.iat[0, 1] == 'ready':
@@ -507,14 +520,13 @@ def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempt
f.write(res.content)
w = Workout(
user=User.objects.get(id=userid),
duration=totaltime_sec_to_string(cum_time[-1]),
uploadedtosporttracks=importid,
user=User.objects.get(id=userid).rower,
duration='00:00:01',
uploadedtosporttracks=int(rp3id)
)
w.save()
uploadoptions = {
'secret': UPLOAD_SERVICE_SECRET,
'user': userid,
'file': filename,
'workouttype': 'rower',
@@ -529,3 +541,697 @@ def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempt
return 0
return 1
@app.task
def handle_c2_getworkout(userid, c2token, c2id, defaulttimezone, debug=False, **kwargs):
authorizationstring = str('Bearer ' + c2token)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://log.concept2.com/api/users/me/results/"+str(c2id)
s = requests.get(url, headers=headers)
if s.status_code != 200: # pragma: no cover
return 0
data = s.json()['data']
alldata = {c2id: data}
return handle_c2_async_workout(alldata, userid, c2token, c2id, 0, defaulttimezone)
# Concept2 logbook sends over split data for each interval
# We use it here to generate a custom summary
# Some users complained about small differences
def summaryfromsplitdata(splitdata, data, filename, sep='|', workouttype='rower'):
workouttype = workouttype.lower()
totaldist = data['distance']
totaltime = data['time']/10.
try:
spm = data['stroke_rate']
except KeyError:
spm = 0
try:
resttime = data['rest_time']/10.
except KeyError: # pragma: no cover
resttime = 0
try:
restdistance = data['rest_distance']
except KeyError: # pragma: no cover
restdistance = 0
try:
avghr = data['heart_rate']['average']
except KeyError: # pragma: no cover
avghr = 0
try:
maxhr = data['heart_rate']['max']
except KeyError: # pragma: no cover
maxhr = 0
try:
avgpace = 500.*totaltime/totaldist
except (ZeroDivisionError, OverflowError): # pragma: no cover
avgpace = 0.
try:
restpace = 500.*resttime/restdistance
except (ZeroDivisionError, OverflowError): # pragma: no cover
restpace = 0.
try:
velo = totaldist/totaltime
avgpower = 2.8*velo**(3.0)
except (ZeroDivisionError, OverflowError): # pragma: no cover
velo = 0
avgpower = 0
if workouttype in ['bike', 'bikeerg']: # pragma: no cover
velo = velo/2.
avgpower = 2.8*velo**(3.0)
velo = velo*2
try:
restvelo = restdistance/resttime
except (ZeroDivisionError, OverflowError): # pragma: no cover
restvelo = 0
restpower = 2.8*restvelo**(3.0)
if workouttype in ['bike', 'bikeerg']: # pragma: no cover
restvelo = restvelo/2.
restpower = 2.8*restvelo**(3.0)
restvelo = restvelo*2
try:
avgdps = totaldist/data['stroke_count']
except (ZeroDivisionError, OverflowError, KeyError):
avgdps = 0
from rowingdata import summarystring, workstring, interval_string
sums = summarystring(totaldist, totaltime, avgpace, spm, avghr, maxhr,
avgdps, avgpower, readFile=filename,
separator=sep)
sums += workstring(totaldist, totaltime, avgpace, spm, avghr, maxhr,
avgdps, avgpower, separator=sep, symbol='W')
sums += workstring(restdistance, resttime, restpace, 0, 0, 0, 0, restpower,
separator=sep,
symbol='R')
sums += '\nWorkout Details\n'
sums += '#-{sep}SDist{sep}-Split-{sep}-SPace-{sep}-Pwr-{sep}SPM-{sep}AvgHR{sep}MaxHR{sep}DPS-\n'.format(
sep=sep
)
intervalnr = 0
sa = []
results = []
try:
timebased = data['workout_type'] in [
'FixedTimeSplits', 'FixedTimeInterval']
except KeyError: # pragma: no cover
timebased = False
for interval in splitdata:
try:
idist = interval['distance']
except KeyError: # pragma: no cover
idist = 0
try:
itime = interval['time']/10.
except KeyError: # pragma: no cover
itime = 0
try:
ipace = 500.*itime/idist
except (ZeroDivisionError, OverflowError): # pragma: no cover
ipace = 180.
try:
ispm = interval['stroke_rate']
except KeyError: # pragma: no cover
ispm = 0
try:
irest_time = interval['rest_time']/10.
except KeyError: # pragma: no cover
irest_time = 0
try:
iavghr = interval['heart_rate']['average']
except KeyError: # pragma: no cover
iavghr = 0
try:
imaxhr = interval['heart_rate']['average']
except KeyError: # pragma: no cover
imaxhr = 0
# create interval values
iarr = [idist, 'meters', 'work']
resarr = [itime]
if timebased: # pragma: no cover
iarr = [itime, 'seconds', 'work']
resarr = [idist]
if irest_time > 0:
iarr += [irest_time, 'seconds', 'rest']
try:
resarr += [interval['rest_distance']]
except KeyError:
resarr += [np.nan]
sa += iarr
results += resarr
if itime != 0:
ivelo = idist/itime
ipower = 2.8*ivelo**(3.0)
if workouttype in ['bike', 'bikeerg']: # pragma: no cover
ipower = 2.8*(ivelo/2.)**(3.0)
else: # pragma: no cover
ivelo = 0
ipower = 0
sums += interval_string(intervalnr, idist, itime, ipace, ispm,
iavghr, imaxhr, 0, ipower, separator=sep)
intervalnr += 1
return sums, sa, results
@app.task
def handle_c2_async_workout(alldata, userid, c2token, c2id, delaysec,
defaulttimezone, debug=False, **kwargs):
time.sleep(delaysec)
dologging('c2_import.log',str(c2id)+' for userid '+str(userid))
data = alldata[c2id]
splitdata = None
distance = data['distance']
try: # pragma: no cover
rest_distance = data['rest_distance']
# rest_time = data['rest_time']/10.
except KeyError:
rest_distance = 0
# rest_time = 0
distance = distance+rest_distance
c2id = data['id']
dologging('c2_import.log',data['type'])
if data['type'] in ['rower','dynamic','slides']:
workouttype = 'rower'
boattype = data['type']
if data['type'] == 'rower':
boattype = 'static'
else:
workouttype = data['type']
boattype = 'static'
# verified = data['verified']
# weightclass = data['weight_class']
try:
has_strokedata = data['stroke_data']
except KeyError: # pragma: no cover
has_strokedata = True
s = 'User {userid}, C2 ID {c2id}'.format(userid=userid, c2id=c2id)
dologging('c2_import.log', s)
dologging('c2_import.log', json.dumps(data))
try:
title = data['name']
except KeyError:
title = ""
try:
t = data['comments'].split('\n', 1)[0]
title += t[:40]
except: # pragma: no cover
title = ''
# Create CSV file name and save data to CSV file
csvfilename = 'media/{code}_{c2id}.csv.gz'.format(
code=uuid4().hex[:16], c2id=c2id)
startdatetime, starttime, workoutdate, duration, starttimeunix, timezone = utils.get_startdatetime_from_c2data(
data
)
s = 'Time zone {timezone}, startdatetime {startdatetime}, duration {duration}'.format(
timezone=timezone, startdatetime=startdatetime,
duration=duration)
dologging('c2_import.log', s)
authorizationstring = str('Bearer ' + c2token)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://log.concept2.com/api/users/me/results/"+str(c2id)+"/strokes"
try:
s = requests.get(url, headers=headers)
except ConnectionError: # pragma: no cover
return 0
if s.status_code != 200: # pragma: no cover
dologging('c2_import.log', 'No Stroke Data. Status Code {code}'.format(
code=s.status_code))
dologging('c2_import.log', s.text)
has_strokedata = False
if not has_strokedata: # pragma: no cover
df = df_from_summary(data)
else:
# dologging('debuglog.log',json.dumps(s.json()))
try:
strokedata = pd.DataFrame.from_dict(s.json()['data'])
except AttributeError: # pragma: no cover
dologging('c2_import.log', 'No stroke data in stroke data')
return 0
try:
res = make_cumvalues(0.1*strokedata['t'])
cum_time = res[0]
lapidx = res[1]
except KeyError: # pragma: no cover
dologging('c2_import.log', 'No time values in stroke data')
return 0
unixtime = cum_time+starttimeunix
# unixtime[0] = starttimeunix
seconds = 0.1*strokedata.loc[:, 't']
nr_rows = len(unixtime)
try: # pragma: no cover
latcoord = strokedata.loc[:, 'lat']
loncoord = strokedata.loc[:, 'lon']
except:
latcoord = np.zeros(nr_rows)
loncoord = np.zeros(nr_rows)
try:
strokelength = strokedata.loc[:,'strokelength']
except: # pragma: no cover
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata.loc[:, 'd']
cumdist, intervals = make_cumvalues(dist2)
try:
spm = strokedata.loc[:, 'spm']
except KeyError: # pragma: no cover
spm = 0*dist2
try:
hr = strokedata.loc[:, 'hr']
except KeyError: # pragma: no cover
hr = 0*spm
pace = strokedata.loc[:, 'p']/10.
pace = np.clip(pace, 0, 1e4)
pace = pace.replace(0, 300)
velo = 500./pace
power = 2.8*velo**3
if workouttype == 'bike': # pragma: no cover
velo = 1000./pace
dologging('c2_import.log', 'Unix Time Stamp {s}'.format(s=unixtime[0]))
# dologging('debuglog.log',json.dumps(s.json()))
df = pd.DataFrame({'TimeStamp (sec)': unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)': spm,
' HRCur (bpm)': hr,
' longitude': loncoord,
' latitude': latcoord,
' Stroke500mPace (sec/500m)': pace,
' Power (watts)': power,
' DragFactor': np.zeros(nr_rows),
' DriveLength (meters)': np.zeros(nr_rows),
' StrokeDistance (meters)': strokelength,
' DriveTime (ms)': np.zeros(nr_rows),
' StrokeRecoveryTime (ms)': np.zeros(nr_rows),
' AverageDriveForce (lbs)': np.zeros(nr_rows),
' PeakDriveForce (lbs)': np.zeros(nr_rows),
' lapIdx': lapidx,
' WorkoutState': 4,
' ElapsedTime (sec)': seconds,
'cum_dist': cumdist
})
df.sort_values(by='TimeStamp (sec)', ascending=True)
_ = df.to_csv(csvfilename, index_label='index', compression='gzip')
w = Workout(
user=User.objects.get(id=userid).rower,
duration=duration,
distance=distance,
uploadedtoc2=c2id,
)
w.save()
uploadoptions = {
'user': userid,
'file': csvfilename,
'title': title,
'workouttype': workouttype,
'boattype': boattype,
'c2id': c2id,
'startdatetime': startdatetime.isoformat(),
'timezone': str(timezone)
}
response = upload_handler(uploadoptions, csvfilename)
if response['status'] != 'processing':
return 0
dologging('c2_import.log','workout id {id}'.format(id=w.id))
record = create_or_update_syncrecord(w.user, w, c2id=c2id)
# summary
if 'workout' in data:
if 'splits' in data['workout']: # pragma: no cover
splitdata = data['workout']['splits']
elif 'intervals' in data['workout']: # pragma: no cover
splitdata = data['workout']['intervals']
else: # pragma: no cover
splitdata = False
else:
splitdata = False
if splitdata: # pragma: no cover
summary, sa, results = summaryfromsplitdata(
splitdata, data, csvfilename, workouttype=workouttype)
w.summary = summary
w.save()
from rowingdata.trainingparser import getlist
if sa:
values = getlist(sa)
units = getlist(sa, sel='unit')
types = getlist(sa, sel='type')
rowdata = rdata(csvfile=csvfilename)
if rowdata:
rowdata.updateintervaldata(values, units, types, results)
rowdata.write_csv(csvfilename, gzip=True)
update_strokedata(w.id, rowdata.df)
return 1
@app.task
def handle_split_workout_by_intervals(id, debug=False, **kwargs):
row = Workout.objects.get(id=id)
r = row.user
rowdata = rdata(csvfile=row.csvfilename)
if rowdata == 0:
messages.error(request,"No Data file found for this workout")
return HttpResponseRedirect(url)
try:
new_rowdata = rowdata.split_by_intervals()
except KeyError:
new_rowdata = rowdata
return 0
interval_i = 1
for data in new_rowdata:
filename = 'media/{code}.csv'.format(
code = uuid4().hex[:16]
)
data.write_csv(filename)
uploadoptions = {
'user': r.user.id,
'title': '{title} - interval {i}'.format(title=row.name, i=interval_i),
'file': filename,
'boattype': row.boattype,
'workouttype': row.workouttype,
}
response = upload_handler(uploadoptions, filename)
interval_i = interval_i + 1
return 1
@app.task
def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid, debug=False, **kwargs):
authorizationstring = str('Bearer '+stravatoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json',
'resolution': 'medium', }
url = "https://www.strava.com/api/v3/activities/"+str(stravaid)
response = requests.get(url, headers=headers)
if response.status_code != 200: # pragma: no cover
dologging('stravalog.log', 'handle_get_strava_file response code {code}\n'.format(
code=response.status_code))
try:
dologging('stravalog.log','Response json {json}\n'.format(json=response.json()))
except:
pass
return 0
try:
workoutsummary = requests.get(url, headers=headers).json()
except: # pragma: no cover
return 0
spm = get_strava_stream(None, 'cadence', stravaid,
authorizationstring=authorizationstring)
hr = get_strava_stream(None, 'heartrate', stravaid,
authorizationstring=authorizationstring)
t = get_strava_stream(None, 'time', stravaid,
authorizationstring=authorizationstring)
velo = get_strava_stream(None, 'velocity_smooth',
stravaid, authorizationstring=authorizationstring)
d = get_strava_stream(None, 'distance', stravaid,
authorizationstring=authorizationstring)
coords = get_strava_stream(
None, 'latlng', stravaid, authorizationstring=authorizationstring)
power = get_strava_stream(None, 'watts', stravaid,
authorizationstring=authorizationstring)
if t is not None:
nr_rows = len(t)
else: # pragma: no cover
try:
duration = int(workoutsummary['elapsed_time'])
except KeyError:
duration = 0
t = pd.Series(range(duration+1))
nr_rows = len(t)
if nr_rows == 0: # pragma: no cover
return 0
if d is None: # pragma: no cover
d = 0*t
if spm is None: # pragma: no cover
spm = np.zeros(nr_rows)
if power is None: # pragma: no cover
power = np.zeros(nr_rows)
if hr is None: # pragma: no cover
hr = np.zeros(nr_rows)
if velo is None: # pragma: no cover
velo = np.zeros(nr_rows)
try:
dt = np.diff(t).mean()
wsize = round(5./dt)
velo2 = ewmovingaverage(velo, wsize)
except ValueError: # pragma: no cover
velo2 = velo
if coords is not None:
try:
lat = coords[:, 0]
lon = coords[:, 1]
except IndexError: # pragma: no cover
lat = np.zeros(len(t))
lon = np.zeros(len(t))
else: # pragma: no cover
lat = np.zeros(len(t))
lon = np.zeros(len(t))
try:
strokelength = velo*60./(spm)
strokelength[np.isinf(strokelength)] = 0.0
except ValueError:
strokelength = np.zeros(len(t))
pace = 500./(1.0*velo2)
pace[np.isinf(pace)] = 0.0
try:
strokedata = pl.DataFrame({'t': 10*t,
'd': 10*d,
'p': 10*pace,
'spm': spm,
'hr': hr,
'lat': lat,
'lon': lon,
'power': power,
'strokelength': strokelength,
})
except ValueError: # pragma: no cover
return 0
except ShapeError:
return 0
try:
workouttype = mytypes.stravamappinginv[workoutsummary['type']]
except KeyError: # pragma: no cover
workouttype = 'other'
if workouttype.lower() == 'rowing': # pragma: no cover
workouttype = 'rower'
try:
if 'summary_polyline' in workoutsummary['map'] and workouttype == 'rower': # pragma: no cover
workouttype = 'water'
except (KeyError,TypeError): # pragma: no cover
pass
try:
rowdatetime = iso8601.parse_date(workoutsummary['date_utc'])
except KeyError:
try:
rowdatetime = iso8601.parse_date(workoutsummary['start_date'])
except KeyError:
rowdatetime = iso8601.parse_date(workoutsummary['date'])
except ParseError: # pragma: no cover
rowdatetime = iso8601.parse_date(workoutsummary['date'])
try:
title = workoutsummary['name']
except KeyError: # pragma: no cover
title = ""
try:
t = workoutsummary['comments'].split('\n', 1)[0]
title += t[:20]
except:
title = ''
starttimeunix = arrow.get(rowdatetime).timestamp()
res = make_cumvalues_array(0.1*strokedata['t'].to_numpy())
cum_time = pl.Series(res[0])
lapidx = pl.Series(res[1])
unixtime = cum_time+starttimeunix
seconds = 0.1*strokedata['t']
nr_rows = len(unixtime)
try:
latcoord = strokedata['lat']
loncoord = strokedata['lon']
if latcoord.std() == 0 and loncoord.std() == 0 and workouttype == 'water': # pragma: no cover
workouttype = 'rower'
except: # pragma: no cover
latcoord = np.zeros(nr_rows)
loncoord = np.zeros(nr_rows)
if workouttype == 'water':
workouttype = 'rower'
try:
strokelength = strokedata['strokelength']
except: # pragma: no cover
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata['d']
try:
spm = strokedata['spm']
except (KeyError, ColumnNotFoundError): # pragma: no cover
spm = 0*dist2
try:
hr = strokedata['hr']
except (KeyError, ColumnNotFoundError): # pragma: no cover
hr = 0*spm
pace = strokedata['p']/10.
pace = np.clip(pace, 0, 1e4)
pace = pl.Series(pace).replace(0, 300)
velo = 500./pace
try:
power = strokedata['power']
except KeyError: # pragma: no cover
power = 2.8*velo**3
# if power.std() == 0 and power.mean() == 0:
# power = 2.8*velo**3
# save csv
# Create data frame with all necessary data to write to csv
df = pl.DataFrame({'TimeStamp (sec)': unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)': spm,
' HRCur (bpm)': hr,
' longitude': loncoord,
' latitude': latcoord,
' Stroke500mPace (sec/500m)': pace,
' Power (watts)': power,
' DragFactor': np.zeros(nr_rows),
' DriveLength (meters)': np.zeros(nr_rows),
' StrokeDistance (meters)': strokelength,
' DriveTime (ms)': np.zeros(nr_rows),
' StrokeRecoveryTime (ms)': np.zeros(nr_rows),
' AverageDriveForce (lbs)': np.zeros(nr_rows),
' PeakDriveForce (lbs)': np.zeros(nr_rows),
' lapIdx': lapidx,
' ElapsedTime (sec)': seconds,
'cum_dist': dist2,
})
df.sort('TimeStamp (sec)')
row = rowingdata.rowingdata_pl(df=df)
try:
row.write_csv(csvfilename, compressed=False)
except ComputeError:
dologging('stravalog.log','polars not working')
row = rowingdata.rowingdata(df=df.to_pandas())
row.write_csv(csvfilename)
# summary = row.allstats()
# maxdist = df['cum_dist'].max()
duration = row.duration
uploadoptions = {
'user': userid,
'file': csvfilename,
'title': title,
'workouttype': workouttype,
'boattype': '1x',
'stravaid': stravaid,
}
response = upload_handler(uploadoptions, csvfilename)
if response['status'] != 'processing':
return 0
dologging('strava_webhooks.log','fetch_strava_workout posted file with strava id {stravaid} user id {userid}\n'.format(
stravaid=stravaid, userid=userid))
return 1