Private
Public Access
1
0

moving to upload_tasks, stopped just before c2

This commit is contained in:
2025-10-22 18:03:13 +02:00
parent 0796f34904
commit 385bd52a94
5 changed files with 305 additions and 285 deletions

View File

@@ -1,7 +1,7 @@
from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids
from rowers.models import User, Rower, Workout, TombStone from rowers.models import User, Rower, Workout, TombStone
from rowers.tasks import handle_rp3_async_workout from rowers.upload_tasks import handle_rp3_async_workout
from rowsandall_app.settings import ( from rowsandall_app.settings import (
RP3_CLIENT_ID, RP3_CLIENT_KEY, RP3_REDIRECT_URI, RP3_CLIENT_SECRET, RP3_CLIENT_ID, RP3_CLIENT_KEY, RP3_REDIRECT_URI, RP3_CLIENT_SECRET,
UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET UPLOAD_SERVICE_URL, UPLOAD_SERVICE_SECRET

View File

@@ -3,7 +3,8 @@ from rowers.models import User, Rower, Workout, TombStone
from rowingdata import rowingdata from rowingdata import rowingdata
from rowers.tasks import handle_sporttracks_sync, handle_sporttracks_workout_from_data from rowers.tasks import handle_sporttracks_sync
from rowers.upload_tasks import handle_sporttracks_workout_from_data
from rowers.rower_rules import is_workout_user from rowers.rower_rules import is_workout_user
import rowers.mytypes as mytypes import rowers.mytypes as mytypes
from rowsandall_app.settings import ( from rowsandall_app.settings import (

View File

@@ -750,188 +750,7 @@ def handle_c2_sync(workoutid, url, headers, data, debug=False, **kwargs):
return 1 return 1
def splitstdata(lijst): # pragma: no cover
t = []
latlong = []
while len(lijst) >= 2:
t.append(lijst[0])
latlong.append(lijst[1])
lijst = lijst[2:]
return [np.array(t), np.array(latlong)]
@app.task
def handle_sporttracks_workout_from_data(user, importid, source,
workoutsource, debug=False, **kwargs): # pragma: no cover
r = user.rower
authorizationstring = str('Bearer ' + r.sporttrackstoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \
str(importid)
s = requests.get(url, headers=headers)
data = s.json()
strokedata = pd.DataFrame.from_dict({
key: pd.Series(value, dtype='object') for key, value in data.items()
})
try:
workouttype = data['type']
except KeyError: # pragma: no cover
workouttype = 'other'
if workouttype not in [x[0] for x in Workout.workouttypes]:
workouttype = 'other'
try:
comments = data['comments']
except:
comments = ''
r = Rower.objects.get(user=user)
rowdatetime = iso8601.parse_date(data['start_time'])
starttimeunix = arrow.get(rowdatetime).timestamp()
try:
title = data['name']
except: # pragma: no cover
title = "Imported data"
try:
res = splitstdata(data['distance'])
distance = res[1]
times_distance = res[0]
except KeyError: # pragma: no cover
try:
res = splitstdata(data['heartrate'])
times_distance = res[0]
distance = 0*times_distance
except KeyError:
return (0, "No distance or heart rate data in the workout")
try:
locs = data['location']
res = splitstdata(locs)
times_location = res[0]
latlong = res[1]
latcoord = []
loncoord = []
for coord in latlong:
lat = coord[0]
lon = coord[1]
latcoord.append(lat)
loncoord.append(lon)
except:
times_location = times_distance
latcoord = np.zeros(len(times_distance))
loncoord = np.zeros(len(times_distance))
if workouttype in mytypes.otwtypes: # pragma: no cover
workouttype = 'rower'
try:
res = splitstdata(data['cadence'])
times_spm = res[0]
spm = res[1]
except KeyError: # pragma: no cover
times_spm = times_distance
spm = 0*times_distance
try:
res = splitstdata(data['heartrate'])
hr = res[1]
times_hr = res[0]
except KeyError:
times_hr = times_distance
hr = 0*times_distance
# create data series and remove duplicates
distseries = pd.Series(distance, index=times_distance)
distseries = distseries.groupby(distseries.index).first()
latseries = pd.Series(latcoord, index=times_location)
latseries = latseries.groupby(latseries.index).first()
lonseries = pd.Series(loncoord, index=times_location)
lonseries = lonseries.groupby(lonseries.index).first()
spmseries = pd.Series(spm, index=times_spm)
spmseries = spmseries.groupby(spmseries.index).first()
hrseries = pd.Series(hr, index=times_hr)
hrseries = hrseries.groupby(hrseries.index).first()
# Create dicts and big dataframe
d = {
' Horizontal (meters)': distseries,
' latitude': latseries,
' longitude': lonseries,
' Cadence (stokes/min)': spmseries,
' HRCur (bpm)': hrseries,
}
df = pd.DataFrame(d)
df = df.groupby(level=0).last()
cum_time = df.index.values
df[' ElapsedTime (sec)'] = cum_time
velo = df[' Horizontal (meters)'].diff()/df[' ElapsedTime (sec)'].diff()
df[' Power (watts)'] = 0.0*velo
nr_rows = len(velo.values)
df[' DriveLength (meters)'] = np.zeros(nr_rows)
df[' StrokeDistance (meters)'] = np.zeros(nr_rows)
df[' DriveTime (ms)'] = np.zeros(nr_rows)
df[' StrokeRecoveryTime (ms)'] = np.zeros(nr_rows)
df[' AverageDriveForce (lbs)'] = np.zeros(nr_rows)
df[' PeakDriveForce (lbs)'] = np.zeros(nr_rows)
df[' lapIdx'] = np.zeros(nr_rows)
unixtime = cum_time+starttimeunix
unixtime[0] = starttimeunix
df['TimeStamp (sec)'] = unixtime
dt = np.diff(cum_time).mean()
wsize = round(5./dt)
velo2 = ewmovingaverage(velo, wsize)
df[' Stroke500mPace (sec/500m)'] = 500./velo2
df = df.fillna(0)
df.sort_values(by='TimeStamp (sec)', ascending=True)
csvfilename = 'media/{code}_{importid}.csv'.format(
importid=importid,
code=uuid4().hex[:16]
)
res = df.to_csv(csvfilename+'.gz', index_label='index',
compression='gzip')
uploadoptions = {
'secret': UPLOAD_SERVICE_SECRET,
'user': user.id,
'file': csvfilename+'.gz',
'title': '',
'workouttype': workouttype,
'boattype': '1x',
'sporttracksid': importid,
'title':title,
}
session = requests.session()
newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'}
session.headers.update(newHeaders)
_ = session.post(UPLOAD_SERVICE_URL, json=uploadoptions)
return 1
@app.task @app.task
@@ -3468,107 +3287,6 @@ def handle_update_wps(rid, types, ids, mode, debug=False, **kwargs):
return wps_median return wps_median
@app.task
def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs):
timezone = kwargs.get('timezone', 'UTC')
headers = {'Authorization': 'Bearer ' + rp3token}
get_download_link = """{
download(workout_id: """ + str(rp3id) + """, type:csv){
id
status
link
}
}"""
have_link = False
download_url = ''
counter = 0
waittime = 3
while not have_link:
try:
response = requests.post(
url=graphql_url,
headers=headers,
json={'query': get_download_link}
)
dologging('rp3_import.log',response.status_code)
if response.status_code != 200: # pragma: no cover
have_link = True
workout_download_details = pd.json_normalize(
response.json()['data']['download'])
dologging('rp3_import.log', response.json())
except: # pragma: no cover
return 0
if workout_download_details.iat[0, 1] == 'ready':
download_url = workout_download_details.iat[0, 2]
have_link = True
dologging('rp3_import.log', download_url)
counter += 1
dologging('rp3_import.log', counter)
if counter > max_attempts: # pragma: no cover
have_link = True
time.sleep(waittime)
if download_url == '': # pragma: no cover
return 0
filename = 'media/RP3Import_'+str(rp3id)+'.csv'
res = requests.get(download_url, headers=headers)
dologging('rp3_import.log','tasks.py '+str(rp3id))
dologging('rp3_import.log',startdatetime)
if not startdatetime: # pragma: no cover
startdatetime = str(timezone.now())
try:
startdatetime = str(startdatetime)
except: # pragma: no cover
pass
if res.status_code != 200: # pragma: no cover
return 0
with open(filename, 'wb') as f:
# dologging('rp3_import.log',res.text)
dologging('rp3_import.log', 'Rp3 ID = {id}'.format(id=rp3id))
f.write(res.content)
uploadoptions = {
'secret': UPLOAD_SERVICE_SECRET,
'user': userid,
'file': filename,
'workouttype': 'rower',
'boattype': 'rp3',
'rp3id': int(rp3id),
'startdatetime': startdatetime,
'timezone': timezone,
}
session = requests.session()
newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'}
session.headers.update(newHeaders)
response = session.post(UPLOAD_SERVICE_URL, json=uploadoptions)
if response.status_code != 200: # pragma: no cover
return 0
workoutid = response.json()['id']
return workoutid
@app.task @app.task

View File

@@ -1,10 +1,15 @@
import os import os
import time
from uuid import uuid4 from uuid import uuid4
import shutil import shutil
import requests import requests
from rowingdata import FITParser as FP from rowingdata import FITParser as FP
from rowingdata.otherparsers import FitSummaryData from rowingdata.otherparsers import FitSummaryData
import rowingdata import rowingdata
import pandas as pd
import iso8601
import arrow
import numpy as np
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true" os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
from YamJam import yamjam from YamJam import yamjam
@@ -32,10 +37,13 @@ from rowers.mytypes import intervalsmappinginv
from rowers.dataroutines import ( from rowers.dataroutines import (
totaltime_sec_to_string, totaltime_sec_to_string,
) )
from rowers.utils import ewmovingaverage, dologging
from rowers.celery import app from rowers.celery import app
from celery import shared_task from celery import shared_task
from django.utils import timezone
SITE_URL = CFG['site_url'] SITE_URL = CFG['site_url']
SITE_URL_DEV = CFG['site_url'] SITE_URL_DEV = CFG['site_url']
PROGRESS_CACHE_SECRET = CFG['progress_cache_secret'] PROGRESS_CACHE_SECRET = CFG['progress_cache_secret']
@@ -229,3 +237,295 @@ def handle_intervals_getworkout(rower, intervalstoken, workoutid, debug=False, *
return w.id return w.id
def splitstdata(lijst): # pragma: no cover
t = []
latlong = []
while len(lijst) >= 2:
t.append(lijst[0])
latlong.append(lijst[1])
lijst = lijst[2:]
return [np.array(t), np.array(latlong)]
@app.task
def handle_sporttracks_workout_from_data(user, importid, source,
workoutsource, debug=False, **kwargs): # pragma: no cover
r = user.rower
authorizationstring = str('Bearer ' + r.sporttrackstoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \
str(importid)
s = requests.get(url, headers=headers)
data = s.json()
strokedata = pd.DataFrame.from_dict({
key: pd.Series(value, dtype='object') for key, value in data.items()
})
try:
workouttype = data['type']
except KeyError: # pragma: no cover
workouttype = 'other'
if workouttype not in [x[0] for x in Workout.workouttypes]:
workouttype = 'other'
try:
comments = data['comments']
except:
comments = ''
r = Rower.objects.get(user=user)
rowdatetime = iso8601.parse_date(data['start_time'])
starttimeunix = arrow.get(rowdatetime).timestamp()
try:
title = data['name']
except: # pragma: no cover
title = "Imported data"
try:
res = splitstdata(data['distance'])
distance = res[1]
times_distance = res[0]
except KeyError: # pragma: no cover
try:
res = splitstdata(data['heartrate'])
times_distance = res[0]
distance = 0*times_distance
except KeyError:
return (0, "No distance or heart rate data in the workout")
try:
locs = data['location']
res = splitstdata(locs)
times_location = res[0]
latlong = res[1]
latcoord = []
loncoord = []
for coord in latlong:
lat = coord[0]
lon = coord[1]
latcoord.append(lat)
loncoord.append(lon)
except:
times_location = times_distance
latcoord = np.zeros(len(times_distance))
loncoord = np.zeros(len(times_distance))
if workouttype in mytypes.otwtypes: # pragma: no cover
workouttype = 'rower'
try:
res = splitstdata(data['cadence'])
times_spm = res[0]
spm = res[1]
except KeyError: # pragma: no cover
times_spm = times_distance
spm = 0*times_distance
try:
res = splitstdata(data['heartrate'])
hr = res[1]
times_hr = res[0]
except KeyError:
times_hr = times_distance
hr = 0*times_distance
# create data series and remove duplicates
distseries = pd.Series(distance, index=times_distance)
distseries = distseries.groupby(distseries.index).first()
latseries = pd.Series(latcoord, index=times_location)
latseries = latseries.groupby(latseries.index).first()
lonseries = pd.Series(loncoord, index=times_location)
lonseries = lonseries.groupby(lonseries.index).first()
spmseries = pd.Series(spm, index=times_spm)
spmseries = spmseries.groupby(spmseries.index).first()
hrseries = pd.Series(hr, index=times_hr)
hrseries = hrseries.groupby(hrseries.index).first()
# Create dicts and big dataframe
d = {
' Horizontal (meters)': distseries,
' latitude': latseries,
' longitude': lonseries,
' Cadence (stokes/min)': spmseries,
' HRCur (bpm)': hrseries,
}
df = pd.DataFrame(d)
df = df.groupby(level=0).last()
cum_time = df.index.values
df[' ElapsedTime (sec)'] = cum_time
velo = df[' Horizontal (meters)'].diff()/df[' ElapsedTime (sec)'].diff()
df[' Power (watts)'] = 0.0*velo
nr_rows = len(velo.values)
df[' DriveLength (meters)'] = np.zeros(nr_rows)
df[' StrokeDistance (meters)'] = np.zeros(nr_rows)
df[' DriveTime (ms)'] = np.zeros(nr_rows)
df[' StrokeRecoveryTime (ms)'] = np.zeros(nr_rows)
df[' AverageDriveForce (lbs)'] = np.zeros(nr_rows)
df[' PeakDriveForce (lbs)'] = np.zeros(nr_rows)
df[' lapIdx'] = np.zeros(nr_rows)
unixtime = cum_time+starttimeunix
unixtime[0] = starttimeunix
df['TimeStamp (sec)'] = unixtime
dt = np.diff(cum_time).mean()
wsize = round(5./dt)
velo2 = ewmovingaverage(velo, wsize)
df[' Stroke500mPace (sec/500m)'] = 500./velo2
df = df.fillna(0)
df.sort_values(by='TimeStamp (sec)', ascending=True)
csvfilename = 'media/{code}_{importid}.csv'.format(
importid=importid,
code=uuid4().hex[:16]
)
res = df.to_csv(csvfilename+'.gz', index_label='index',
compression='gzip')
w = Workout(
user=r,
duration=totaltime_sec_to_string(cum_time[-1]),
uploadedtosporttracks=importid,
)
w.save()
uploadoptions = {
'user': user.id,
'file': csvfilename+'.gz',
'title': '',
'workouttype': workouttype,
'boattype': '1x',
'sporttracksid': importid,
'id': w.id,
'title':title,
}
response = upload_handler(uploadoptions, csvfilename+'.gz')
if response['status'] != 'processing':
return 0
return 1
@app.task
def handle_rp3_async_workout(userid, rp3token, rp3id, startdatetime, max_attempts, debug=False, **kwargs):
timezone = kwargs.get('timezone', 'UTC')
headers = {'Authorization': 'Bearer ' + rp3token}
get_download_link = """{
download(workout_id: """ + str(rp3id) + """, type:csv){
id
status
link
}
}"""
have_link = False
download_url = ''
counter = 0
waittime = 3
while not have_link:
try:
response = requests.post(
url=graphql_url,
headers=headers,
json={'query': get_download_link}
)
dologging('rp3_import.log',response.status_code)
if response.status_code != 200: # pragma: no cover
have_link = True
workout_download_details = pd.json_normalize(
response.json()['data']['download'])
dologging('rp3_import.log', response.json())
except: # pragma: no cover
return 0
if workout_download_details.iat[0, 1] == 'ready':
download_url = workout_download_details.iat[0, 2]
have_link = True
dologging('rp3_import.log', download_url)
counter += 1
dologging('rp3_import.log', counter)
if counter > max_attempts: # pragma: no cover
have_link = True
time.sleep(waittime)
if download_url == '': # pragma: no cover
return 0
filename = 'media/RP3Import_'+str(rp3id)+'.csv'
res = requests.get(download_url, headers=headers)
dologging('rp3_import.log','tasks.py '+str(rp3id))
dologging('rp3_import.log',startdatetime)
if not startdatetime: # pragma: no cover
startdatetime = str(timezone.now())
try:
startdatetime = str(startdatetime)
except: # pragma: no cover
pass
if res.status_code != 200: # pragma: no cover
return 0
with open(filename, 'wb') as f:
# dologging('rp3_import.log',res.text)
dologging('rp3_import.log', 'Rp3 ID = {id}'.format(id=rp3id))
f.write(res.content)
w = Workout(
user=User.objects.get(id=userid),
duration=totaltime_sec_to_string(cum_time[-1]),
uploadedtosporttracks=importid,
)
w.save()
uploadoptions = {
'secret': UPLOAD_SERVICE_SECRET,
'user': userid,
'file': filename,
'workouttype': 'rower',
'boattype': 'rp3',
'rp3id': int(rp3id),
'startdatetime': startdatetime,
'timezone': timezone,
}
response = upload_handler(uploadoptions, filename)
if response['status'] != 'processing':
return 0
return 1

View File

@@ -261,7 +261,6 @@ from rowers.tasks import (
handle_sendemail_unrecognized, handle_sendemailnewcomment, handle_sendemail_unrecognized, handle_sendemailnewcomment,
handle_request_post, handle_request_post,
handle_sendemailsummary, handle_sendemailsummary,
handle_rp3_async_workout,
handle_send_template_email, handle_send_template_email,
handle_send_disqualification_email, handle_send_disqualification_email,
handle_send_withdraw_email, handle_send_withdraw_email,
@@ -287,6 +286,8 @@ from rowers.tasks import (
from rowers.upload_tasks import ( from rowers.upload_tasks import (
handle_assignworkouts, handle_assignworkouts,
handle_post_workout_api, handle_post_workout_api,
handle_rp3_async_workout,
handle_sporttracks_workout_from_data
) )
from scipy.signal import savgol_filter from scipy.signal import savgol_filter