Private
Public Access
1
0

some testing, some pandas to polars in tasks.py

This commit is contained in:
2024-04-17 13:35:02 +02:00
parent 4e05799e35
commit af62267996
9 changed files with 35 additions and 507 deletions

View File

@@ -89,10 +89,9 @@ from rowers.dataroutines import *
from rowers.tasks import (
handle_sendemail_newftp,
handle_sendemail_unrecognized, handle_setcp,
handle_sendemail_unrecognized,
handle_getagegrouprecords, handle_update_wps,
handle_request_post, handle_calctrimp,
handle_updatecp, handle_updateergcp,
handle_sendemail_breakthrough,
handle_sendemail_hard,
)

View File

@@ -2705,7 +2705,7 @@ def create_c2_stroke_data_db(
else:
power = 0
df = pd.DataFrame({
df = pl.DataFrame({
'TimeStamp (sec)': unixtime,
' Horizontal (meters)': d,
' Cadence (stokes/min)': spm,
@@ -2726,9 +2726,10 @@ def create_c2_stroke_data_db(
'cum_dist': d
})
df[' ElapsedTime (sec)'] = df['TimeStamp (sec)']
df = df.with_columns((pl.col("TimeStamp (sec)")).alias(" ElapsedTime (sec)"))
_ = df.to_csv(csvfilename, index_label='index', compression='gzip')
row = rrdata_pl(df=df)
row.writecsv(csvfilename, compression=True)
data = dataplep(df, id=workoutid, bands=False, debug=debug)

View File

@@ -2,7 +2,7 @@ from .integrations import SyncIntegration, NoTokenError, create_or_update_syncre
from rowers.models import User, Rower, Workout, TombStone
from django.db.utils import IntegrityError
from rowingdata import rowingdata
from rowingdata import rowingdata, rowingdata_pl
import numpy as np
import datetime
import json

View File

@@ -61,7 +61,7 @@ from rowers.models import (
Workout, User, Rower, WorkoutForm, RowerForm,
GraphImage, GeoPolygon, GeoCourse, GeoPoint,
)
from rowers.tasks import handle_setcp
from rowingdata import rower as rrower
from rowingdata import main as rmain
from rowingdata import cumcpdata, histodata
@@ -1380,7 +1380,7 @@ def interactive_chart(id=0, promember=0, intervaldata={}):
columns = ['time', 'pace', 'hr', 'fpace', 'ftime', 'spm']
datadf = dataprep.getsmallrowdata_pl(columns, ids=[id])
if datadf.is_empty():
if datadf.is_eompty():
return "", "No Valid Data Available"
datadf = datadf.fill_nan(None).drop_nulls()
@@ -2373,223 +2373,6 @@ def get_zones_report_pl(rower, startdate, enddate, trainingzones='hr', date_agg=
return chart_data
def get_zones_report(rower, startdate, enddate, trainingzones='hr', date_agg='week',
yaxis='time'):
dates = []
dates_sorting = []
minutes = []
hours = []
zones = []
enddate = enddate + datetime.timedelta(days=1)
workouts = Workout.objects.filter(
user=rower,
startdatetime__gte=startdate,
startdatetime__lte=enddate,
duplicate=False,
).order_by("-startdatetime")
ids = [w.id for w in workouts]
columns = ['workoutid', 'hr', 'power', 'time']
df = dataprep.getsmallrowdata_db(columns, ids=ids)
try:
df['deltat'] = df['time'].diff().clip(lower=0).clip(upper=20*1e3)
except KeyError: # pragma: no cover
pass
df = dataprep.clean_df_stats(df, workstrokesonly=False,
ignoreadvanced=True, ignorehr=False)
hrzones = rower.hrzones
powerzones = rower.powerzones
for w in workouts:
dd3 = w.date.strftime('%Y/%m')
dd4 = '{year}/{week:02d}'.format(
week=arrow.get(w.date).isocalendar()[1],
year=w.date.strftime('%y')
)
dd4 = (w.date - datetime.timedelta(days=w.date.weekday())
).strftime('%y/%m/%d')
# print(w.date,arrow.get(w.date),arrow.get(w.date).isocalendar())
iswater = w.workouttype in mytypes.otwtypes
qryw = 'workoutid == {workoutid}'.format(workoutid=w.id)
qry = 'hr < {ut2}'.format(ut2=rower.ut2)
if trainingzones == 'power':
qry = 'power < {ut2}'.format(ut2=rower.pw_ut2)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append('<{ut2}'.format(ut2=hrzones[1]))
else:
zones.append('<{ut2}'.format(ut2=powerzones[1]))
# print(w,dd,timeinzone,'<UT2')
qry = '{ut2} <= hr < {ut1}'.format(ut1=rower.ut1, ut2=rower.ut2)
if trainingzones == 'power':
qry = '{ut2} <= power < {ut1}'.format(
ut1=rower.pw_ut1, ut2=rower.pw_ut2)
if iswater:
qry = '{ut2} <= power < {ut1}'.format(
ut1=rower.pw_ut1*rower.otwslack/100., ut2=rower.pw_ut2*rower.otwslack/100.)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append(hrzones[1])
else:
zones.append(powerzones[1])
# print(w,dd,timeinzone,'UT2')
qry = '{ut1} <= hr < {at}'.format(ut1=rower.ut1, at=rower.at)
if trainingzones == 'power':
qry = '{ut1} <= power < {at}'.format(
ut1=rower.pw_ut1, at=rower.pw_at)
if iswater:
qry = '{ut1} <= power < {at}'.format(
ut1=rower.pw_ut1*rower.otwslack/100., at=rower.pw_at*rower.otwslack/100.)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append(hrzones[2])
else:
zones.append(powerzones[2])
# print(w,dd,timeinzone,'UT1')
qry = '{at} <= hr < {tr}'.format(at=rower.at, tr=rower.tr)
if trainingzones == 'power':
qry = '{at} <= power < {tr}'.format(at=rower.pw_at, tr=rower.pw_tr)
if iswater:
qry = '{at} <= power < {tr}'.format(at=rower.pw_at*rower.otwslack/100.,
tr=rower.pw_tr*rower.otwslack/100.)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append(hrzones[3])
else:
zones.append(powerzones[3])
# print(w,dd,timeinzone,'AT')
qry = '{tr} <= hr < {an}'.format(tr=rower.tr, an=rower.an)
if trainingzones == 'power':
qry = '{tr} <= power < {an}'.format(tr=rower.pw_tr, an=rower.pw_an)
if iswater:
qry = '{tr} <= power < {an}'.format(tr=rower.pw_tr*rower.otwslack/100.,
an=rower.pw_an*rower.otwslack/100.)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append(hrzones[4])
else:
zones.append(powerzones[4])
# print(w,dd,timeinzone,'TR')
qry = 'hr >= {an}'.format(an=rower.an)
if trainingzones == 'power':
qry = 'power >= {an}'.format(an=rower.pw_an)
if iswater:
qry = 'power >= {an}'.format(an=rower.pw_an*rower.otwslack/100.)
timeinzone = df.query(qry).query(qryw)['deltat'].sum()/(60*1e3)
if date_agg == 'week':
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(dd3)
dates_sorting.append(dd3)
minutes.append(timeinzone)
hours.append(timeinzone/60.)
if trainingzones == 'hr':
zones.append(hrzones[5])
else:
zones.append(powerzones[5])
# print(w,dd,timeinzone,'AN')
try:
d = utc.localize(startdate)
except (ValueError, AttributeError): # pragma: no cover
d = startdate
try:
enddate = utc.localize(enddate)
except (ValueError, AttributeError): # pragma: no cover
pass
while d <= enddate:
if date_agg == 'week':
dd4 = '{year}/{week:02d}'.format(
week=arrow.get(d).isocalendar()[1],
year=d.strftime('%y')
)
dd4 = (d - datetime.timedelta(days=d.weekday())).strftime('%y/%m/%d')
dates.append(dd4)
dates_sorting.append(dd4)
else: # pragma: no cover
dates.append(d.strftime('%Y/%m'))
dates_sorting.append(d.strftime('%Y/%m'))
minutes.append(0)
hours.append(0)
if trainingzones == 'hr':
zones.append(hrzones[1])
else:
zones.append(powerzones[1])
d += datetime.timedelta(days=1)
# this should be renamed with rower zones
data = {
'date': dates,
'date_sorting': dates_sorting,
'minutes': minutes,
'zones': zones,
'hours': hours,
}
return data
def interactive_zoneschart2(rower, data, startdate, enddate, trainingzones='hr', date_agg='week',
yaxis='time'):
if startdate >= enddate: # pragma: no cover
@@ -2621,133 +2404,3 @@ def interactive_zoneschart2(rower, data, startdate, enddate, trainingzones='hr',
return script, div
def interactive_zoneschart(rower, data, startdate, enddate, trainingzones='hr', date_agg='week',
yaxis='time'):
if startdate >= enddate: # pragma: no cover
st = startdate
startdate = enddate
enddate = st
hrzones = rower.hrzones
powerzones = rower.powerzones
color_map = {
'<{ut2}'.format(ut2=hrzones[1]): 'green',
hrzones[1]: 'lime',
hrzones[2]: 'yellow',
hrzones[3]: 'blue',
hrzones[4]: 'purple',
hrzones[5]: 'red',
}
if trainingzones == 'power':
color_map = {
'<{ut2}'.format(ut2=powerzones[1]): 'green',
powerzones[1]: 'lime',
powerzones[2]: 'yellow',
powerzones[3]: 'blue',
powerzones[4]: 'purple',
powerzones[5]: 'red',
}
zones_order = [
'<{ut2}'.format(ut2=hrzones[1]),
hrzones[1],
hrzones[2],
hrzones[3],
hrzones[4],
hrzones[5]
]
if trainingzones == 'power':
zones_order = [
'<{ut2}'.format(ut2=powerzones[1]),
powerzones[1],
powerzones[2],
powerzones[3],
powerzones[4],
powerzones[5]
]
df = pd.DataFrame(data)
df2 = pd.DataFrame(data)
df.drop('minutes', inplace=True, axis='columns')
df.sort_values('date_sorting', inplace=True)
df.drop('date_sorting', inplace=True, axis='columns')
df['totaltime'] = 0
if df.empty: # pragma: no cover
return '', 'No Data Found'
if yaxis == 'percentage':
dates = list(set(df['date'].values))
for date in dates:
qry = 'date == "{d}"'.format(d=date)
totaltime = df.query(qry)['hours'].sum()
mask = df['date'] == date
df.loc[mask, 'totaltime'] = totaltime
df['percentage'] = 100.*df['hours']/df['totaltime']
df.drop('hours', inplace=True, axis='columns')
df.drop('totaltime', inplace=True, axis='columns')
hv.extension('bokeh')
xrotation = 0
nrdates = len(list(set(df['date'].values)))
if nrdates > 10:
xrotation = 45
bars = hv.Bars(df, kdims=['date', 'zones']).aggregate(
function=np.sum).redim.values(zones=zones_order)
bars.opts(
opts.Bars(cmap=color_map, show_legend=True, stacked=True,
tools=['tap', 'hover'], width=550, padding=(0, (0, .1)),
legend_position='bottom',
xrotation=xrotation,
show_frame=False)
)
p = hv.render(bars)
p.title.text = 'Activity {d1} to {d2} for {r}'.format(
d1=startdate.strftime("%Y-%m-%d"),
d2=enddate.strftime("%Y-%m-%d"),
r=str(rower),
)
if date_agg == 'week':
p.xaxis.axis_label = 'Week'
else: # pragma: no cover
p.xaxis.axis_label = 'Month'
if yaxis == 'percentage':
p.yaxis.axis_label = 'Percentage'
p.width = 550
p.height = 350
p.toolbar_location = 'right'
p.y_range.start = 0
#p.sizing_mode = 'stretch_both'
if yaxis == 'percentage':
tidy_df = df2.groupby(['date']).sum()
source2 = ColumnDataSource(tidy_df)
y2rangemax = tidy_df.loc[:, 'hours'].max()*1.1
p.extra_y_ranges["yax2"] = Range1d(start=0, end=y2rangemax)
p.line('date', 'hours', source=source2,
y_range_name="yax2", color="black", width=5)
p.circle('date', 'hours', source=source2, y_range_name="yax2", color="black", size=10)
# p.circle('date', 'hours', source=source2, y_range_name="yax2", color="black", size=10,
# legend_label='Hours')
p.add_layout(LinearAxis(y_range_name="yax2",
axis_label='Hours'), 'right')
script, div = components(p)
return script, div

View File

@@ -56,7 +56,7 @@ from scipy.signal import savgol_filter
from scipy.interpolate import griddata
import rowingdata
from rowingdata import make_cumvalues
from rowingdata import make_cumvalues, make_cumvalues_array
from uuid import uuid4
from rowingdata import rowingdata as rdata
@@ -115,6 +115,9 @@ tpapilocation = TP_API_LOCATION
from requests_oauthlib import OAuth1, OAuth1Session
import pandas as pd
import polars as pl
from polars.exceptions import ColumnNotFoundError
from django_rq import job
from django.utils import timezone
@@ -2583,53 +2586,6 @@ def handle_otwsetpower(self, f1, boattype, boatclass, coastalbrand, weightvalue,
return 1
@app.task
def handle_updateergcp(rower_id, workoutfilenames, debug=False, **kwargs):
therows = []
for f1 in workoutfilenames:
try:
rowdata = rdata(csvfile=f1)
except IOError: # pragma: no cover
try:
rowdata = rdata(csvfile=f1 + '.csv')
except IOError:
try:
rowdata = rdata(csvfile=f1 + '.gz')
except IOError:
rowdata = 0
if rowdata != 0:
therows.append(rowdata)
cpdata = rowingdata.cumcpdata(therows)
cpdata.columns = cpdata.columns.str.lower()
updatecpdata_sql(rower_id, cpdata['delta'], cpdata['cp'],
table='ergcpdata', distance=cpdata['distance'],
debug=debug)
return 1
@app.task
def handle_updatecp(rower_id, workoutids, debug=False, table='cpdata', **kwargs):
columns = ['power', 'workoutid', 'time']
df = getsmallrowdata_db(columns, ids=workoutids, debug=debug)
if df.empty: # pragma: no cover
return 0
maxt = 1.05*df['time'].max()/1000.
logarr = datautils.getlogarr(maxt)
dfgrouped = df.groupby(['workoutid'])
delta, cpvalue, avgpower = datautils.getcp(dfgrouped, logarr)
updatecpdata_sql(rower_id, delta, cpvalue, debug=debug, table=table)
return 1
@app.task
def handle_makeplot(f1, f2, t, hrdata, plotnr, imagename,
@@ -3179,43 +3135,6 @@ def handle_sendemail_invite_reject(email, name, teamname, managername,
return 1
@app.task
def handle_setcp(strokesdf, filename, workoutid, debug=False, **kwargs):
try:
os.remove(filename)
except FileNotFoundError:
pass
if not strokesdf.empty:
try:
totaltime = strokesdf['time'].max()
except KeyError: # pragma: no cover
return 0
try:
powermean = strokesdf['power'].mean()
except KeyError: # pragma: no cover
powermean = 0
if powermean != 0:
thesecs = totaltime
maxt = 1.05 * thesecs
if maxt > 0:
logarr = datautils.getlogarr(maxt)
dfgrouped = strokesdf.groupby(['workoutid'])
delta, cpvalues, avgpower = datautils.getcp(dfgrouped, logarr)
df = pd.DataFrame({
'delta': delta,
'cp': cpvalues,
'id': workoutid,
})
df.to_parquet(filename, engine='fastparquet',
compression='GZIP')
return 1
return 1 # pragma: no cover
@app.task
def handle_sendemail_invite_accept(email, name, teamname, managername,
@@ -3647,7 +3566,7 @@ def handle_c2_async_workout(alldata, userid, c2token, c2id, delaysec,
loncoord = np.zeros(nr_rows)
try:
strokelength = strokedata.loc[:, 'strokelength']
strokelength = strokedata.loc[:,'strokelength']
except: # pragma: no cover
strokelength = np.zeros(nr_rows)
@@ -3901,7 +3820,7 @@ def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid,
pace[np.isinf(pace)] = 0.0
try:
strokedata = pd.DataFrame({'t': 10*t,
strokedata = pl.DataFrame({'t': 10*t,
'd': 10*d,
'p': 10*pace,
'spm': spm,
@@ -3947,18 +3866,18 @@ def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid,
starttimeunix = arrow.get(rowdatetime).timestamp()
res = make_cumvalues(0.1*strokedata['t'])
cum_time = res[0]
lapidx = res[1]
res = make_cumvalues_array(0.1*strokedata['t'].to_numpy())
cum_time = pl.Series(res[0])
lapidx = pl.Series(res[1])
unixtime = cum_time+starttimeunix
seconds = 0.1*strokedata.loc[:, 't']
seconds = 0.1*strokedata['t']
nr_rows = len(unixtime)
try:
latcoord = strokedata.loc[:, 'lat']
loncoord = strokedata.loc[:, 'lon']
latcoord = strokedata['lat']
loncoord = strokedata['lon']
if latcoord.std() == 0 and loncoord.std() == 0 and workouttype == 'water': # pragma: no cover
workouttype = 'rower'
except: # pragma: no cover
@@ -3968,29 +3887,29 @@ def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid,
workouttype = 'rower'
try:
strokelength = strokedata.loc[:, 'strokelength']
strokelength = strokedata['strokelength']
except: # pragma: no cover
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata.loc[:, 'd']
dist2 = 0.1*strokedata['d']
try:
spm = strokedata.loc[:, 'spm']
except KeyError: # pragma: no cover
spm = strokedata['spm']
except (KeyError, ColumnNotFoundError): # pragma: no cover
spm = 0*dist2
try:
hr = strokedata.loc[:, 'hr']
except KeyError: # pragma: no cover
hr = strokedata['hr']
except (KeyError, ColumnNotFoundError): # pragma: no cover
hr = 0*spm
pace = strokedata.loc[:, 'p']/10.
pace = strokedata['p']/10.
pace = np.clip(pace, 0, 1e4)
pace = pace.replace(0, 300)
pace = pl.Series(pace).replace(0, 300)
velo = 500./pace
try:
power = strokedata.loc[:, 'power']
power = strokedata['power']
except KeyError: # pragma: no cover
power = 2.8*velo**3
@@ -3999,7 +3918,7 @@ def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid,
# save csv
# Create data frame with all necessary data to write to csv
df = pd.DataFrame({'TimeStamp (sec)': unixtime,
df = pl.DataFrame({'TimeStamp (sec)': unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)': spm,
' HRCur (bpm)': hr,
@@ -4019,10 +3938,10 @@ def fetch_strava_workout(stravatoken, oauth_data, stravaid, csvfilename, userid,
'cum_dist': dist2,
})
df.sort_values(by='TimeStamp (sec)', ascending=True)
df.sort('TimeStamp (sec)')
row = rowingdata.rowingdata(df=df)
row.write_csv(csvfilename, gzip=False)
row = rowingdata.rowingdata_pl(df=df)
row.write_csv(csvfilename, compressed=False)
# summary = row.allstats()
# maxdist = df['cum_dist'].max()

View File

@@ -484,51 +484,7 @@ class AsyncTaskTests(TestCase):
res = tasks.handle_c2_import_stroke_data(c2token,c2id,workoutid,starttimeunix,csvfilename)
self.assertEqual(res,1)
@patch('rowers.tasks.grpc',side_effect=mocked_grpc)
@patch('rowers.tasks.send_template_email',side_effect=mocked_send_template_email)
def test_handle_otwsetpower(self,mocked_send_template_email,mocked_grpc):
f1 = get_random_file(filename='rowers/tests/testdata/sprintervals.csv')['filename']
boattype = '1x'
boatclass = 'water'
coastalbrand = 'other'
weightvalue = 80.
first_name = self.u.first_name
last_name = self.u.last_name
email = self.u.email
workoutid = self.wwater.id
job = fakerequest()
res = tasks.handle_otwsetpower(f1,boattype,boatclass,coastalbrand,
weightvalue,first_name,last_name,email,workoutid,
jobkey='23')
self.assertEqual(res,1)
@patch('rowers.dataprep.create_engine')
def test_handle_updateergcp(self,mocked_sqlalchemy):
f1 = get_random_file()['filename']
res = tasks.handle_updateergcp(1,[f1])
self.assertEqual(res,1)
@patch('rowers.dataprep.getsmallrowdata_db')
def test_handle_updatecp(self,mocked_getsmallrowdata_db_updatecp):
rower_id = 1
workoutids = [1]
res = tasks.handle_updatecp(rower_id,workoutids)
self.assertEqual(res,1)
@patch('rowers.dataprep.getsmallrowdata_db')
def test_handle_setcp(self,mocked_getsmallrowdata_db_setcp):
strokesdf = pd.read_csv('rowers/tests/testdata/uhfull.csv')
filename = 'rowers/tests/testdata/temp/pq.gz'
workoutids = 1
res = tasks.handle_setcp(strokesdf,filename,1)
self.assertEqual(res,1)
try:
os.remove(filename)
except FileNotFoundError:
pass
@patch('rowers.dataprep.getsmallrowdata_db')
def test_handle_update_wps(self,mocked_getsmallrowdata_db_wps):

View File

@@ -703,7 +703,7 @@ class InteractivePlotTests(TestCase):
self.assertFalse(len(div)==0)
@patch('rowers.dataprep.create_engine')
@patch('rowers.dataprep.getsmallrowdata_db', side_effect=mocked_getsmallrowdata_db)
@patch('rowers.dataprep.getsmallrowdata_pl', side_effect=mocked_getsmallrowdata_pl)
def test_interactive_chart(self, mocked_sqlalchemy,
mocked_getsmallrowdata_db):
workout = Workout.objects.filter(user=self.r,workouttype__in=mytypes.rowtypes)[0]

Binary file not shown.

View File

@@ -252,7 +252,7 @@ from rowers.tasks import (
handle_sendemailfile,
handle_sendemailkml,
handle_sendemailnewresponse, handle_updatedps,
handle_updatecp, long_test_task, long_test_task2,
long_test_task, long_test_task2,
handle_zip_file, handle_getagegrouprecords,
handle_update_empower,
handle_sendemailics,