Private
Public Access
1
0
Files
rowsandall/rowers/interactiveplots.py

2420 lines
72 KiB
Python

from rowers.metrics import axes, axlabels, yaxminima, yaxmaxima, get_yaxminima, get_yaxmaxima
from rowers.dataprep import nicepaceformat, niceformat, strfdelta
from rowers.datautils import p0, rpetotss
from rowers.metrics import rowingmetrics, metricsdicts
from scipy.spatial import ConvexHull, Delaunay
from scipy.stats import linregress, percentileofscore
from pytz import timezone as tz, utc
from rowers.models import course_spline, VirtualRaceResult, InStrokeAnalysis, ForceCurveAnalysis
from bokeh.palettes import Category20c, Category10
from bokeh.layouts import layout
from bokeh.resources import CDN, INLINE
from rowers.dataprep import timedeltaconv, rscore_approx
from pandas.core.groupby.groupby import DataError
import rowers.datautils as datautils
from rowers.utils import lbstoN
import rowers.c2stuff as c2stuff
import rowers.metrics as metrics
import rowers.dataprep as dataprep
from rowers.dataprep import rdata
from rowers.dataroutines import remove_nulls_pl
import rowers.utils as utils
import polars as pl
import pytz
from rowers.rower_rules import ispromember
from polars.exceptions import ColumnNotFoundError, ComputeError
from scipy.interpolate import griddata
from scipy.signal import savgol_filter
from scipy import optimize
from django.utils.timezone import activate
from django.utils.timezone import get_current_timezone
from holoviews import opts
import holoviews as hv
import pandas as pd
import numpy as np
import math
import datetime
from rowers import mytypes
from rowers.courses import (
course_coord_center, course_coord_maxmin,
polygon_coord_center, course_coord_crewnerd_navigation,
)
from django.conf import settings
from collections import OrderedDict
from bokeh.core.properties import value
from rowers.opaque import encoder
from bokeh.models import (
OpenURL, TapTool, CrosshairTool, Span, Label, SaveTool,
PanTool, BoxZoomTool, WheelZoomTool, ResetTool,)
from bokeh.models.glyphs import ImageURL
from bokeh.transform import cumsum
from bokeh.models import (
LinearAxis, LogAxis, Range1d, DatetimeTickFormatter, HoverTool, Axis, PrintfTickFormatter
)
from bokeh.layouts import column as layoutcolumn
from bokeh.layouts import row as layoutrow
from bokeh.embed import components
import colorsys
from rowers.models import (
Workout, User, Rower, WorkoutForm, RowerForm,
GraphImage, GeoPolygon, GeoCourse, GeoPoint,
)
from rowingdata import rower as rrower
from rowingdata import main as rmain
from rowingdata import cumcpdata, histodata
from rowingdata import rowingdata as rrdata
from math import pi, log2
from django.utils import timezone
from rowingdata import make_cumvalues
from bokeh.palettes import Dark2_8 as palette
from bokeh.palettes import Set1_4 as palette2
from bokeh.models.glyphs import MultiLine
import itertools
from bokeh.plotting import figure, ColumnDataSource, curdoc
from bokeh.models import CustomJS, Slider, TextInput, BoxAnnotation, Band
import arrow
from rowers.utils import myqueue, totaltime_sec_to_string
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
import requests
from rowers.serializers import *
activate(settings.TIME_ZONE)
thetimezone = get_current_timezone()
def get_chart(end_point, chart_data, debug=False):
if debug:
print(chart_data)
url = settings.ROWSANDALL_CHARTS_URL+end_point
headers = {'authorization':"Bearer {token}".format(token=settings.ROWSANDALL_CHARTS_TOKEN)}
try:
response = requests.post(url, json=chart_data, headers=headers)
except Exception as err:
if debug:
print("Chart Server Error")
print(err)
script = ''
div = 'Chart Server Error'
return script, div
if debug:
print("Status Code",response.status_code)
if response.status_code == 200:
script = response.json()['script']
div = response.json()['div']
else:
script = ''
try:
div = response.reason
except AttributeError:
div = 'The chart server errored'
#if not debug:
# script = jsmin(script)
return script, div
# Example for 3D
def filmdeaths():
data = pd.read_csv("~/Downloads/filmdeathcounts.csv")
chart_data = data.to_dict("records")
chart_data_dict = {"data": chart_data}
script, div = get_chart("/filmdeaths", chart_data_dict)
return script, div
# Example for BokehJS
def sleep():
data = {
'work': 8,
'eat': 2,
'commute': 2,
'sport': 0,
'tv': 1,
'sleep': 8,
}
script, div = get_chart("/sleep", data)
return script, div
def workoutname(id):
try:
w = Workout.objects.get(id=id)
except Workout.DoesNotExist:
return ''
return str(w)
def all_goldmedalstandards(workouts, startdate, enddate):
dates = []
testpowers = []
testduration = []
ids = []
for w in workouts:
goldmedalstandard, goldmedalseconds = dataprep.workout_goldmedalstandard(
w)
if goldmedalseconds > 60:
dates.append(arrow.get(w.date).datetime)
testpowers.append(goldmedalstandard)
testduration.append(goldmedalseconds)
ids.append(w.id)
return dates, testpowers, testduration, ids
def tailwind(bearing, vwind, winddir):
""" Calculates head-on head/tailwind in direction of rowing
positive numbers are tail wind
"""
b = np.radians(bearing)
w = np.radians(winddir)
vtail = -vwind*np.cos(w-b)
return vtail
def interactive_hr_piechart(df, rower, title, totalseconds=0):
if df.is_empty():
return "", "Not enough data to make a chart"
try:
df = df.sort("hr")
except ColumnNotFoundError:
return "", "Not enough data to make a chart"
df = df.with_columns((pl.col("deltat")*pl.col("hr")).alias("deltat"))
sumtimehr = df['deltat'].sum()
if sumtimehr == 0:
return "", "No HR data"
if totalseconds == 0:
totalseconds = sumtimehr
hrzones = rower.hrzones
qrydata = df.filter(pl.col("hr") < rower.ut2)
frac_lut2 = totalseconds*qrydata['deltat'].sum()/sumtimehr
qrydata = df.lazy().filter(pl.col("hr") >= rower.ut2).filter(pl.col("hr") < rower.ut1)
frac_ut2 = totalseconds*qrydata.collect()['deltat'].sum()/sumtimehr
qrydata = df.lazy().filter(pl.col("hr") >= rower.ut1).filter(pl.col("hr") < rower.at)
frac_ut1 = totalseconds*qrydata.collect()['deltat'].sum()/sumtimehr
qrydata = df.lazy().filter(pl.col("hr") >= rower.at).filter(pl.col("hr") < rower.tr)
frac_at = totalseconds*qrydata.collect()['deltat'].sum()/sumtimehr
qrydata = df.lazy().filter(pl.col("hr") >= rower.tr).filter(pl.col("hr") < rower.an)
frac_tr = totalseconds*qrydata.collect()['deltat'].sum()/sumtimehr
qrydata = df.filter(pl.col("hr") >= rower.an)
frac_an = totalseconds*qrydata['deltat'].sum()/sumtimehr
datadict = {
'<{ut2}'.format(ut2=hrzones[1]): frac_lut2,
'{ut2}'.format(ut2=hrzones[1]): frac_ut2,
'{ut1}'.format(ut1=hrzones[2]): frac_ut1,
'{at}'.format(at=hrzones[3]): frac_at,
'{tr}'.format(tr=hrzones[4]): frac_tr,
'{an}'.format(an=hrzones[5]): frac_an,
}
colors = ['gray', 'yellow', 'lime', 'blue', 'purple', 'red']
data = pd.Series(datadict).reset_index(
name='value').rename(columns={'index': 'zone'})
data['angle'] = data['value']/data['value'].sum() * 2*pi
data['color'] = colors
data['zone'] = [
'<{ut2}'.format(ut2=hrzones[1]),
'{ut2}'.format(ut2=hrzones[1]),
'{ut1}'.format(ut1=hrzones[2]),
'{at}'.format(at=hrzones[3]),
'{tr}'.format(tr=hrzones[4]),
'{an}'.format(an=hrzones[5])
]
data['totaltime'] = pd.Series([pretty_timedelta(v) for v in data['value']])
data_dict = data.to_dict("records")
chart_data = {
'data': data_dict,
'title': "HR "+ title
}
script, div = get_chart("/hrpie", chart_data)
return script, div
def pretty_timedelta(secs):
hours, remainder = divmod(secs, 3600)
minutes, seconds = divmod(remainder, 60)
return '{}:{:02}:{:02}'.format(int(hours), int(minutes), int(seconds))
def mapcolors(x):
try:
return mytypes.color_map[x]
except KeyError: # pragma: no cover
return mytypes.colors[-1]
def interactive_workouttype_piechart(workouts):
if len(workouts) == 0:
return "", "Not enough workouts to make a chart"
datadict = {}
for w in workouts:
try:
# label = mytypes.workouttypes_ordered[w.workouttype]
label = w.workouttype
except KeyError: # pragma: no cover
label = w.workouttype
try:
datadict[label] += 60*(60*w.duration.hour +
w.duration.minute)+w.duration.second
except KeyError:
datadict[label] = 60*(60*w.duration.hour +
w.duration.minute)+w.duration.second
data = pd.Series(datadict).reset_index(
name='value').rename(columns={'index': 'type'})
data['angle'] = data['value']/data['value'].sum() * 2*pi
data = pd.DataFrame(data)
data['color'] = data['type'].apply(lambda x: mapcolors(x))
data['totaltime'] = data['value'].apply(lambda x: pretty_timedelta(x))
try:
data['type'] = data['type'].apply(
lambda x: mytypes.workouttypes_ordered[x])
except KeyError: # pragma: no cover
pass
data_dict = data.to_dict("records")
chart_data = {
"data": data_dict,
"title": "Types"
}
script, div = get_chart("/workouttypepie", chart_data, debug=False)
return script, div
def interactive_boxchart(datadf, fieldname, extratitle='',
spmmin=0, spmmax=0, workmin=0, workmax=0):
if datadf.is_empty(): # pragma: no cover
return '', 'It looks like there are no data matching your filter'
columns = datadf.columns
if fieldname not in columns: # pragma: no cover
return '', 'It looks like there are no data matching your filter'
if 'date' not in columns: # pragma: no cover
return '', 'Not enough data'
datadf = datadf.with_columns((pl.col("date").dt.strftime("%Y-%m-%d")).alias("date"))
datadf = datadf.with_columns((pl.col(fieldname)).alias("value"))
data_dict = datadf.to_dicts()
boxplot_data = {
"metric": metricsdicts[fieldname]["verbose_name"],
"data": data_dict
}
script, div = get_chart("/boxplot", boxplot_data, debug=False)
return script, div
def interactive_planchart(data, startdate, enddate):
# data = data.melt(id_vars=['startdate'], value_vars=['executed', 'planned'])
data = data.with_columns((pl.col("startdate").dt.strftime("%Y-%m-%d")).alias("startdate"))
data_dict = data.to_dicts()
chart_data = {
'data': data_dict,
}
script, div = get_chart("/plan", chart_data, debug=False)
return script, div
def interactive_activitychart2(workouts, startdate, enddate, stack='type',
yaxis='duration'):
startdate = datetime.datetime(
year=startdate.year, month=startdate.month, day=startdate.day)
enddate = datetime.datetime(
year=enddate.year, month=enddate.month, day=enddate.day)
totaldays = (enddate-startdate).days
data_dicts = []
aantal = 1
for w in workouts:
rr = w.user
rowersinitials = rr.user.first_name[0:aantal]+rr.user.last_name[0:aantal]
dd = w.date.strftime('%Y-%m-%d')
du = w.duration.hour*60+w.duration.minute
trimp = w.trimp
rscore = w.rscore
distance = w.distance
if rscore == 0: # pragma: no cover
rscore = w.hrtss
link = "{siteurl}/rowers/workout/{code}/".format(
siteurl=settings.SITE_URL,
code=encoder.encode_hex(w.id)
)
data_dicts.append({
'date': dd,
'duration': du,
'distance': distance,
'trimp': trimp,
'rscore': rscore,
'type': w.workouttype,
'link': link,
'rower': rowersinitials
})
if totaldays < 30:
datebin = "day"
elif totaldays < 50:
datebin = "week"
else:
datebin = "month"
stacknames = {
'TRIMP': 'trimp',
'distance': 'distance',
'time': 'duration',
'rScore': 'rscore',
'duration': 'duration',
}
chart_data = {
'data': data_dicts,
'title': 'Activity {d1} to {d2}'.format(
d1=startdate.strftime("%Y-%m-%d"),
d2=enddate.strftime("%Y-%m-%d"),
),
'datebin': datebin,
'colorby': stack,
'stackby': stacknames[yaxis],
'doreduce': True,
'dosort': True,
'colors': mytypes.color_map,
}
script, div = get_chart("/activity_bar", chart_data, debug=False)
return script, div
def interactive_forcecurve(theworkouts):
ids = [int(w.id) for w in theworkouts]
boattype = theworkouts[0].boattype
columns = ['catch', 'slip', 'wash', 'finish', 'averageforce',
'peakforceangle', 'peakforce', 'spm', 'distance',
'workoutstate', 'driveenergy', 'cumdist', 'workoutid']
columns = columns + [name for name, d in metrics.rowingmetrics]
rowdata = dataprep.read_data(columns, ids=ids,
workstrokesonly=False)
if rowdata.is_empty():
return "", "No Valid Data Available"
rowdata = dataprep.remove_nulls_pl(rowdata)
data_dict = rowdata.to_dicts()
thresholdforce = 100. if 'x' in boattype else 200.
chart_data = {
'title': theworkouts[0].name,
'data': data_dict,
'thresholdforce': thresholdforce,
}
script, div = get_chart("/forcecurve", chart_data, debug=False)
return script, div
def weightfromrecord(row,metricchoice):
vv = row[metricchoice][0]
if vv > 0:
return vv
if metricchoice == 'rscore': # pragma: no cover
return rscore_approx(row)
return 0 # pragma: no cover
def getfatigues(
df,
fatigues, fitnesses, dates, testpower, testduration,
impulses,
startdate, enddate, user, metricchoice, kfatigue, kfitness):
fatigue = 0
fitness = 0
impulses = []
for f in fatigues:
impulses.append(0)
lambda_a = 2/(kfatigue+1)
lambda_c = 2/(kfitness+1)
nrdays = (enddate-startdate).days
for i in range(nrdays+1):
date = startdate+datetime.timedelta(days=i)
datekey = date.strftime('%Y-%m-%d')
weight = 0
try:
df2 = df.filter(pl.col("date") == date.date())
if type(df2) == pl.Series: # pragma: no cover
weight += weightfromrecord(df2,metricchoice)
else:
for row in df2.iter_slices(n_rows=1):
weight += weightfromrecord(row,metricchoice)
except KeyError:
pass
impulses.append(weight)
fatigue = (1-lambda_a)*fatigue+weight*lambda_a
fitness = (1-lambda_c)*fitness+weight*lambda_c
fatigues.append(fatigue)
fitnesses.append(fitness)
dates.append(arrow.get(date).datetime)
testpower.append(np.nan)
testduration.append(np.nan)
return fatigues, fitnesses, dates, testpower, testduration, impulses
def goldmedalscorechart(user, startdate=None, enddate=None):
# to avoid data mess later on
startdate = arrow.get(startdate).datetime.replace(
hour=0, minute=0, second=0, microsecond=0)
enddate = enddate+datetime.timedelta(days=1)
enddate = arrow.get(enddate).datetime.replace(
hour=0, minute=0, second=0, microsecond=0)
# marker workouts
workouts = Workout.objects.filter(user=user.rower, date__gte=startdate,
date__lte=enddate,
workouttype__in=mytypes.rowtypes,
duplicate=False).order_by('date')
markerworkouts = workouts.filter(rankingpiece=True)
outids = [w.id for w in markerworkouts]
dates = [arrow.get(w.date).datetime for w in markerworkouts]
testpower = [
w.goldmedalstandard if w.rankingpiece else np.nan for w in markerworkouts]
testduration = [
w.goldmedalseconds if w.rankingpiece else 0 for w in markerworkouts]
df = pl.DataFrame({
'id': outids,
'date': dates,
'testpower': testpower,
'testduration': testduration,
})
df = df.sort('date')
df = df.drop_nulls()
dates = df['date']
testpower = df['testpower']
ids = df['id']
outids = ids.unique()
# all workouts
alldates, alltestpower, allduration, allids = all_goldmedalstandards(
workouts, startdate, enddate)
nrdays = (enddate-startdate).days
td = []
markerscore = []
score = []
markerduration = []
duration = []
workoutid = []
for i in range(len(dates)):
id = ids[i]
w = Workout.objects.get(id=id)
# td.append(arrow.get(dd).datetime)
td.append(arrow.get(w.date).datetime)
markerscore.append(testpower[i])
markerduration.append(testduration[i])
score.append(testpower[i])
duration.append(testduration[i])
workoutid.append(id)
for i in range(len(alldates)):
td.append(arrow.get(alldates[i]).datetime)
markerscore.append(np.nan)
score.append(alltestpower[i])
markerduration.append(np.nan)
duration.append(allduration[i])
workoutid.append(allids[i])
for i in range(nrdays+1):
td.append(arrow.get(startdate+datetime.timedelta(days=i)).datetime)
markerscore.append(np.nan)
score.append(np.nan)
markerduration.append(np.nan)
duration.append(np.nan)
workoutid.append(0)
df = pl.DataFrame({
'markerscore': markerscore,
'markerduration': markerduration,
'score': score,
'duration': duration,
'date': td,
'id': workoutid,
})
df = df.with_columns((pl.col("id").map_elements(lambda x: settings.SITE_URL +
'/rowers/workout/{id}/'.format(id=encoder.encode_hex(x)))).alias("url"))
df = df.with_columns((pl.col("id").map_elements(lambda x: workoutname(x))).alias("workout"))
df = df.sort('date')
# find index values where score is max
dfmax = df.group_by("date", maintain_order=True).max()
dfmax = dfmax.fill_nan(0)
dfmax = dfmax.with_columns((pl.col("date").dt.strftime("%Y-%m-%d")).alias("date"))
dfmax = dfmax.with_columns((pl.col("duration").map_elements(lambda x: totaltime_sec_to_string(x, shorten=True))).alias("duration"))
dfmax = dfmax.with_columns((pl.col("markerduration").map_elements(lambda x: totaltime_sec_to_string(x, shorten=True))).alias("markerduration"))
data_dicts = dfmax.to_dicts()
chart_data = {
'data': data_dicts
}
script, div = get_chart("/markerworkouts", chart_data)
return script, div, outids
def performance_chart(user, startdate=None, enddate=None, kfitness=42, kfatigue=7,
metricchoice='trimp', doform=False, dofatigue=False,
showtests=False):
TOOLS = 'save,pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
TOOLS2 = 'box_zoom,hover'
# to avoid data mess later on
startdate = arrow.get(startdate).datetime.replace(
hour=0, minute=0, second=0, microsecond=0)
enddate = enddate+datetime.timedelta(days=1)
enddate = arrow.get(enddate).datetime.replace(
hour=0, minute=0, second=0, microsecond=0)
modelchoice = 'coggan'
p0 = 0
k1 = 1
k2 = 1
dates = []
testpower = []
fatigues = []
fitnesses = []
testduration = []
impulses = []
outids = []
workouts = Workout.objects.filter(user=user.rower, date__gte=startdate-datetime.timedelta(days=90),
date__lte=enddate,
duplicate=False).order_by('date')
# make fast dict for dates / workouts
records = []
for w in workouts:
dd = {
'date':w.date,
'trimp':w.trimp,
'rscore':w.rscore,
'hrtss':w.hrtss,
'duration':w.duration,
'id':w.id,
'rpe':w.rpe,
}
records.append(dd)
df = pl.from_records(records)
if df.is_empty(): # pragma: no cover
return ['', 'No Data', 0, 0, 0, outids]
markerworkouts = Workout.objects.filter(
user=user.rower, date__gte=startdate-datetime.timedelta(days=90),
date__lte=enddate,
duplicate=False,
rankingpiece=True, workouttype__in=mytypes.rowtypes).order_by('date')
outids = [w.id for w in markerworkouts]
dates = [arrow.get(w.date).datetime for w in workouts]
testpower = [
w.goldmedalstandard if w.rankingpiece else np.nan for w in workouts]
impulses = [np.nan for w in workouts]
testduration = [
w.goldmedalseconds if w.rankingpiece else 0 for w in workouts]
fitnesses = [np.nan for w in workouts]
fatigues = [np.nan for w in workouts]
fatigues, fitnesses, dates, testpower, testduration, impulses = getfatigues(
df,
fatigues,
fitnesses,
dates,
testpower, testduration,
impulses,
startdate -datetime.timedelta(days=90),
enddate,
user, metricchoice,
kfatigue, kfitness
)
df = pl.DataFrame({
'date': dates,
'testpower': testpower,
'testduration': testduration,
'fatigue': fatigues,
'fitness': fitnesses,
'impulse': impulses,
})
endfitness = fitnesses[-2]
endfatigue = fatigues[-2]
endform = endfitness-endfatigue
if modelchoice == 'banister': # pragma: no cover
df = df.with_columns((pl.col("fatigue")*k2))
df = df.with_columns((p0+pl.col("fitness")*k1))
df = df.with_columns((pl.col("fitness")-pl.col("fatigue")).alias("form"))
df = df.sort("date")
df = df.group_by('date').max()
startdate = startdate.replace(tzinfo=None)
startdate = pytz.utc.localize(startdate)
df = df.filter(pl.col("date") > startdate)
try:
df2 = pl.DataFrame({
"testpower" :df['testpower'],
"testduration":df['testduration'].apply(
lambda x: totaltime_sec_to_string(x, shorten=True)),
"fitness":df['fitness'],
"fatigue":df['fatigue'],
"form":df['form'],
"impulse":df['impulse'],
"date": df['date'].dt.strftime('%Y-%m-%d'),
})
except ComputeError:
df2 = pl.DataFrame({
"testpower" :df['testpower'],
"fitness":df['fitness'],
"fatigue":df['fatigue'],
"form":df['form'],
"impulse":df['impulse'],
"date": df['date'].dt.strftime('%Y-%m-%d'),
})
df2 = df2.with_columns((pl.lit("--")).alias("testduration"))
df2 = df2.fill_nan(0)
data_dict = df2.to_dicts()
chart_data = {
'data': data_dict,
'title': 'Performance Manager '+user.first_name,
'plotform' : doform,
'plotfatigue': dofatigue,
}
script, div = get_chart("/performance", chart_data)
return [script, div, endfitness, endfatigue, endform, outids]
def interactive_histoall(theworkouts, histoparam, includereststrokes,
spmmin=0, spmmax=55,
extratitle='',
workmin=0, workmax=1500):
TOOLS = 'save,pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
ids = [int(w.id) for w in theworkouts]
columns = [histoparam, 'spm', 'driveenergy', 'distance', 'workoutstate', 'workoutid']
workstrokesonly = not includereststrokes
rowdata = dataprep.read_data(
columns, ids=ids, doclean=True, workstrokesonly=workstrokesonly)
rowdata = rowdata.fill_nan(None).drop_nulls()
if rowdata.is_empty():
return "", "No Valid Data Available"
try:
histopwr = rowdata[histoparam].to_numpy()
except KeyError:
return "", "No data"
if len(histopwr) == 0: # pragma: no cover
return "", "No valid data available"
# throw out nans
histopwr = histopwr[~np.isinf(histopwr)]
if histoparam == 'catch': # pragma: no cover
histopwr = histopwr[histopwr < yaxminima[histoparam]]
histopwr = histopwr[histopwr > yaxmaxima[histoparam]]
else:
histopwr = histopwr[histopwr > yaxminima[histoparam]]
histopwr = histopwr[histopwr < yaxmaxima[histoparam]]
data_dict = {"data": histopwr.tolist(),
"metric": metricsdicts[histoparam]["verbose_name"]}
script, div = get_chart("/histogram", data_dict, debug=False)
return script, div
def course_map(course):
course_dict = GeoCourseSerializer(course).data
script, div = get_chart("/map", course_dict)
return script, div
def leaflet_chart(lat, lon, name="", raceresult=0):
try:
if lat.empty or lon.empty: # pragma: no cover
return [0, "invalid coordinate data"]
except AttributeError:
if not len(lat) or not len(lon): # pragma: no cover
return [0, "invalid coordinate data"]
# Throw out 0,0
df = pd.DataFrame({
'lat': lat,
'lon': lon
})
df = df.replace(0, np.nan)
df = df.loc[(df != 0).any(axis=1)]
df.fillna(method='bfill', axis=0, inplace=True)
df.fillna(method='ffill', axis=0, inplace=True)
lat = df['lat']
lon = df['lon']
if lat.empty or lon.empty: # pragma: no cover
return [0, "invalid coordinate data"]
latmean = lat.mean()
lonmean = lon.mean()
latbegin = lat[lat.index[0]]
longbegin = lon[lon.index[0]]
latend = lat[lat.index[-1]]
longend = lon[lon.index[-1]]
coordinates = zip(lat, lon)
data = {
'coordinates': [{'latitude': c[0], 'longitude': c[1]} for c in list(coordinates)],
'latmean': latmean,
'lonmean': lonmean,
'latbegin': latbegin,
'latend': latend,
'longbegin': longbegin,
'longend': longend,
}
if raceresult != 0:
record = VirtualRaceResult.objects.get(id=raceresult)
course = record.course
course_dict = GeoCourseSerializer(course).data
data['course'] = course_dict
coordinates = zip(lat, lon)
script, div = get_chart("/workoutmap", data)
return script, div
def leaflet_chart_compare(course, workoutids, labeldict={}, startenddict={}):
data = []
for id in workoutids:
if id != 0 and id is not None:
try:
w = Workout.objects.get(id=id)
rowdata = rdata(w.csvfilename)
time = rowdata.df['TimeStamp (sec)']
df = pl.DataFrame({
'workoutid': id,
'lat': rowdata.df[' latitude'],
'lon': rowdata.df[' longitude'],
'time': time-time[0],
})
data.append(df)
except (Workout.DoesNotExist, KeyError): # pragma: no cover
pass
try:
df = pl.concat(data, rechunk=True)
except ValueError: # pragma: no cover
df = pl.DataFrame()
latmean, lonmean, coordinates = course_coord_center(course)
course_dict = GeoCourseSerializer(course).data
# Throw out 0,0
df = df.with_columns(
(pl.col("lat")+pl.col("lon")).alias("latlon")
)
df =df.filter(pl.col("latlon")!=0,)
df = df.fill_nan(None)
df = df.select(pl.all()).interpolate()
try:
lat = df['lat']
lon = df['lon']
except KeyError: # pragma: no cover
return [0, "invalid coordinate data"]
if lat.is_empty() or lon.is_empty(): # pragma: no cover
return [0, "invalid coordinate data"]
colors = itertools.cycle(palette)
try:
items = itertools.izip(workoutids, colors)
except AttributeError:
items = zip(workoutids, colors)
trajectories = []
for id, color in items:
group = df.filter(pl.col("workoutid") == int(id))
try:
startsecond, endsecond = startenddict[id]
except KeyError: # pragma: no cover
startsecond = 0
endsecond = 0
try:
label = labeldict[id]
except KeyError: # pragma: no cover
label = str(id)
group = group.sort("time")
if endsecond > 0:
group = group.with_columns((pl.col("time")-startsecond))
group = group.filter(pl.col("time")>0)
group = group.filter(pl.col("time")<endsecond-startsecond)
lat = group['lat'].fill_nan(None).drop_nulls().to_list()
lon = group['lon'].fill_nan(None).drop_nulls().to_list()
coordinates = zip(lat, lon)
trajectory_dict = {
'coordinates' :[{'latitude':c[0], 'longitude': c[1]} for c in list(coordinates)],
'color': color,
'label': label,
}
trajectories.append(trajectory_dict)
mapdata = {
'course': course_dict,
'latmean': latmean,
'lonmean': lonmean,
'trajectories': trajectories,
}
script, div = get_chart("/mapcompare", mapdata, debug=False)
return script, div
def interactive_otwcpchart(powerdf, promember=0, rowername="", r=None,
cpfit='data',
title='', type='water',
wcpower=[], wcdurations=[], cpoverlay=False):
powerdf2 = powerdf.filter((pl.col("Delta") > 0) & (pl.col("CP") > 0))
# plot tools
if (promember == 1): # pragma: no cover
TOOLS = 'save,pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
else:
TOOLS = 'pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
x_axis_type = 'log'
deltas = powerdf2['Delta'].apply(lambda x: timedeltaconv(x))
powerdf2 = powerdf2.with_columns(
ftime = deltas.apply(lambda x: strfdelta(x)),
Deltaminutes = pl.col("Delta")/60.
)
# there is no Paul's law for OTW
thesecs = powerdf2['Delta']
theavpower = powerdf2['CP']
p1, fitt, fitpower, ratio = datautils.cpfit(powerdf2)
if cpfit == 'automatic' and r is not None:
if type == 'water':
p1 = [r.p0, r.p1, r.p2, r.p3]
ratio = r.cpratio
elif type == 'erg': # pragma: no cover
p1 = [r.ep0, r.ep1, r.ep2, r.ep3]
ratio = r.ecpratio
def fitfunc(pars, x):
return abs(pars[0])/(1+(x/abs(pars[2]))) + abs(pars[1])/(1+(x/abs(pars[3])))
fitpower = fitfunc(p1, fitt)
message = ""
# if len(fitpower[fitpower<0]) > 0:
# message = "CP model fit didn't give correct results"
deltas = fitt.apply(lambda x: timedeltaconv(x))
ftime = niceformat(deltas)
# add world class
wcpower = pd.Series(wcpower, dtype='float')
wcdurations = pd.Series(wcdurations, dtype='float')
# fitting WC data to three parameter CP model
if len(wcdurations) >= 4: # pragma: no cover
def fitfunc(pars, x):
return pars[0] / (1+(x/pars[2])) + pars[1]/(1+(x/pars[3]))
def errfunc(pars, x, y):
return fitfunc(pars, x)-y
p1wc, success = optimize.leastsq(errfunc, p0[:],
args=(wcdurations, wcpower))
else:
p1wc = None
if p1wc is not None and cpoverlay: # pragma: no cover
fitpowerwc = fitfunc(p1wc, fitt)
fitpowerexcellent = 0.7*fitfunc(p1wc, fitt)
fitpowergood = 0.6*fitfunc(p1wc, fitt)
fitpowerfair = 0.5*fitfunc(p1wc, fitt)
fitpoweraverage = 0.4*fitfunc(p1wc, fitt)
else:
fitpowerwc = 0*fitpower
fitpowerexcellent = 0*fitpower
fitpowergood = 0*fitpower
fitpowerfair = 0*fitpower
fitpoweraverage = 0*fitpower
fit_data = pl.DataFrame(dict(
CP=fitpower,
CPmax=ratio*fitpower,
duration=fitt/60.,
ftime=ftime,
# workout = workouts,
fitpowerwc=fitpowerwc,
fitpowerexcellent=fitpowerexcellent,
fitpowergood=fitpowergood,
fitpowerfair=fitpowerfair,
fitpoweraverage=fitpoweraverage,
# url = urls,
))
if not title:
title = "Critical Power for "+rowername
chart_dict = {
'data': powerdf2.to_dicts(),
'fitdata': fit_data.to_dicts(),
'title': title,
}
script, div = get_chart("/cp", chart_dict)
return [script, div, p1, ratio, message]
def interactive_agegroup_plot(df, distance=2000, duration=None,
sex='male', weightcategory='hwt'):
if df.empty:
return '', ''
age = df['age']
power = df['power']
name = df['name']
season = df['season']
if duration: # pragma: no cover
duration2 = int(duration/60.)
plottitle = sex+' '+weightcategory+' %s min' % duration2
else:
plottitle = sex+' '+weightcategory+' %s m' % distance
# poly_coefficients = np.polyfit(age,power,6)
age2 = np.linspace(11, 95)
# poly_vals = np.polyval(poly_coefficients,age2)
# poly_vals = 0.5*(np.abs(poly_vals)+poly_vals)
def fitfunc(pars, x):
return np.abs(pars[0])*(1-x/max(120, pars[1])) \
- np.abs(pars[2])*np.exp(-x/np.abs(pars[3]))+np.abs(pars[4])*(np.sin(np.pi*x/max(50, pars[5])))
def errfunc(pars, x, y):
return fitfunc(pars, x)-y
p0age = [700, 120, 700, 10, 100, 100]
p1, success = optimize.leastsq(errfunc, p0age[:],
args=(age, power))
expo_vals = fitfunc(p1, age2)
expo_vals = 0.5*(np.abs(expo_vals)+expo_vals)
source = ColumnDataSource(
data=dict(
age=age,
power=power,
season=season,
name=name,
)
)
sourcefit = ColumnDataSource(
data=dict(
age2=age2,
expo_vals=expo_vals,
)
)
TOOLS = 'save,pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
plot = figure(tools=TOOLS, width=900)
#plot.sizing_mode = 'stretch_both'
plot.circle('age', 'power', source=source, fill_color='red', size=15,
legend_label='World Record')
plot.line('age2', 'expo_vals', source=sourcefit)
plot.xaxis.axis_label = "Age"
plot.yaxis.axis_label = "Concept2 power"
plot.title.text = plottitle
hover = plot.select(dict(type=HoverTool))
hover.tooltips = OrderedDict([
('Name ', '@name'),
('Season ', '@season'),
])
hover.mode = 'mouse'
script, div = components(plot)
return script, div
def forcecurve_multi_interactive_chart(selected): # pragma: no cover
ids = [analysis.id for analysis in selected]
workoutids = [analysis.workout.id for analysis in selected]
selected_dict = [ForceCurveAnalysisSerializer(analysis).data for analysis in selected]
columns = ['catch', 'slip', 'wash', 'finish', 'averageforce',
'peakforceangle', 'peakforce', 'spm', 'distance',
'workoutstate', 'workoutid', 'driveenergy', 'cumdist']
columns = columns + [name for name, d in metrics.rowingmetrics]
rowdata = dataprep.read_data(columns, ids=workoutids,
workstrokesonly=False)
rowdata = remove_nulls_pl(rowdata)
rowdata = rowdata.fill_nan(None).drop_nulls()
if rowdata.is_empty():
return "", "No Valid Data Available", "", ""
data_dict = rowdata.to_dicts()
thresholdforces = []
for analysis in selected:
boattype = analysis.workout.boattype
thresholdforce = 100. if 'x' in boattype else 200.
thresholdforces.append({'id': analysis.workout.id, 'thresholdforce': thresholdforce})
chart_data = {
'title': '',
'data': data_dict,
'thresholdforces': thresholdforces,
'forcecurve_analyses': selected_dict,
}
script, div = get_chart("/forcecurve_compare", chart_data, debug=False)
return script, div
def instroke_multi_interactive_chart(selected, *args, **kwargs): # pragma: no cover
df2 = []
ids = [analysis.id for analysis in selected]
metrics = list(set([analysis.metric for analysis in selected]))
maximum_values = {}
workouts = []
for metric in metrics:
maximum_values[metric] = 0
cntr = 1
for analysis in selected:
#start_second, end_second, spm_min, spm_max, name
activeminutesmin = int(analysis.start_second/60.)
activeminutesmax = int(analysis.end_second/60.)
rowdata = rrdata(csvfile=analysis.workout.csvfilename)
data = rowdata.get_instroke_data(
analysis.metric,
spm_min=analysis.spm_min,
spm_max=analysis.spm_max,
activeminutesmin=activeminutesmin,
activeminutesmax=activeminutesmax,
)
mean_vals = data.mean()
if analysis.metric == 'boat accelerator curve':
mean_vals[0] = (mean_vals[1]+ mean_vals[len(mean_vals)-1])/2.
if len(metrics) > 1:
if mean_vals.max() > maximum_values[analysis.metric]:
maximum_values[analysis.metric] = mean_vals.max()
xvals = np.arange(len(mean_vals))
data2 = pl.DataFrame({
'x': pl.Series(xvals),
'y': pl.Series(mean_vals),
})
data2 = data2.with_columns((pl.lit(cntr)).alias("id"))
df2.append(data2)
legendlabel = '{name} - {metric} - {workout}'.format(
name = analysis.name,
metric = analysis.metric,
date = analysis.date,
workout = str(analysis.workout)
)
workouts.append({'id': cntr, 'label': legendlabel})
cntr = cntr + 1
ytitle = metrics[0]
if len(metrics) > 1:
cntr = 1
for analysis in selected:
df2[cntr-1] = df2[cntr-1].with_columns(
(pl.col("y")/ maximum_values[analysis.metric])
)
ytitle = 'Scaled'
cntr = cntr+1
df2 = pl.concat(df2)
data_dict = df2.to_dicts()
chart_data = {
'title': '',
'data': data_dict,
'ytitle': ytitle,
'workouts': workouts,
}
script, div = get_chart("/instroke_compare", chart_data)
return script, div
def instroke_interactive_chart(df,metric, workout, spm_min, spm_max,
activeminutesmin, activeminutesmax,
individual_curves,
name='',notes=''): # pragma: no cover
if df.empty:
return "", "No data in selection"
df_pos = (df+abs(df))/2.
df_min = -(-df+abs(-df))/2.
mean_vals = df.median().replace(0, np.nan)
q75 = df_pos.quantile(q = 0.75).replace(0, np.nan)
q25 = df_pos.quantile(q=0.25).replace(0, np.nan)
q75min = df_min.quantile(q=0.75).replace(0, np.nan)
q25min = df_min.quantile(q=0.25).replace(0, np.nan)
mean_vals = mean_vals.interpolate()
xvals = np.arange(len(mean_vals))
df_plot = pl.DataFrame({
'x':xvals,
'median':mean_vals,
'high':q75,
'low':q75min,
'high 2':q25min,
'low 2': q25,
})
df_plot = df_plot.with_columns(
pl.coalesce(["high", "high 2"]).alias("high")
)
df_plot = df_plot.with_columns(
pl.coalesce("low", "low 2").alias("low")
)
df_plot = df_plot.drop(["high 2", "low 2"])
df_plot = df_plot.drop_nulls()
ytitle = "Y"
if metric == 'boat accelerator curve':
ytitle = "Boat acceleration (m/s^2)"
elif metric == 'instroke boat speed':
ytitle = "Boat Speed (m/s)"
vavg = mean_vals.median()
elif metric == 'oar angle velocity curve':
ytitle = "Oar Angular Velocity (degree/s)"
elif metric == 'seat curve':
ytitle = "Seat Speed (m/s)"
lines_dict = df.to_dict("records")
data_dict = df_plot.to_dicts()
chart_data = {
'lines': lines_dict,
'data': data_dict,
'ytitle': ytitle,
'title': str(workout) + ' - ' + metric,
'individual_curves': individual_curves,
'spmmin': spm_min,
'spmmax': spm_max,
'timemin' :'{activeminutesmin}'.format(
activeminutesmin=datetime.timedelta(seconds=60*activeminutesmin),
),
'timemax': '{activeminutesmax}'.format(
activeminutesmax=datetime.timedelta(seconds=60*activeminutesmax)
),
'analysis_name': name,
}
script, div = get_chart("/instroke", chart_data, debug=False)
return script, div
def interactive_chart(id=0, promember=0, intervaldata={}):
# Add hover to this comma-separated string and see what changes
if (promember == 1):
TOOLS = 'save,pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
else:
TOOLS = 'pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair'
columns = ['time', 'pace', 'hr', 'fpace', 'ftime', 'spm']
datadf = dataprep.read_data(columns, ids=[id], workstrokesonly=False)
if datadf.is_empty():
return "", "No Valid Data Available"
datadf = datadf.fill_nan(None).drop_nulls()
row = Workout.objects.get(id=id)
if datadf.is_empty():
return "", "No Valid Data Available"
try:
_ = datadf['spm']
except KeyError: # pragma: no cover
datadf = datadf.with_columns((pl.lit(0)).alias("spm"))
try:
_ = datadf['pace']
except KeyError: # pragma: no cover
datadf = datadf.with_columns((pl.lit(0)).alias("pace"))
data_dict = datadf.to_dicts()
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
intervals = []
# add shaded bar chart areas
if intervaldata != {}:
intervaldf = pd.DataFrame(intervaldata)
intervaldf['itime'] = intervaldf['itime']*1.e3
intervaldf['time'] = intervaldf['itime'].cumsum()
intervaldf['time'] = intervaldf['time'].shift(1)
intervaldf.loc[0, 'time'] = 0
intervaldf['time_r'] = intervaldf['time'] + intervaldf['itime']
intervaldf['value'] = 100
mask = intervaldf['itype'] == 3
intervaldf.loc[mask, 'value'] = 0
intervaldf['bottom'] = 0
intervals = intervaldf.to_dict("records")
chart_data = {
'title': row.name,
'x': "time",
'y1': "pace",
'y2': "spm",
'data': data_dict,
'metrics': metrics_list,
'intervals': intervals,
}
script, div = get_chart("/interactive", chart_data)
return script, div
def interactive_chart_video(videodata):
try:
spm = videodata['spm']
except KeyError: # pragma: no cover
return "", "No SPM data"
time = range(len(spm))
data = zip(time, spm)
data2 = []
for t, s in data:
data2.append( {'x': t, 'y': s})
markerpoint = {
'x': time[0],
'y': spm[0],
'r': 10,
}
chart_data = {
'data': data2,
'markerpoint': markerpoint,
}
script, div = get_chart("/videochart", chart_data)
return [script, div]
def interactive_multiflex(datadf, xparam, yparam, groupby, extratitle='',
ploterrorbars=False,
title=None, binsize=1, colorlegend=[],
spmmin=0, spmmax=0, workmin=0, workmax=0):
if datadf.empty: # pragma: no cover
return ['', '<p>No non-zero data in selection</p>']
if xparam == 'workoutid': # pragma: no cover
xparamname = 'Workout'
else:
xparamname = axlabels[xparam]
if yparam == 'workoutid': # pragma: no cover
yparamname = 'Workout'
else:
yparamname = axlabels[yparam]
if groupby == 'workoutid': # pragma: no cover
groupname = 'Workout'
elif groupby == 'date': # pragma: no cover
groupname = 'Date'
else:
groupname = axlabels[groupby]
if title is None:
title = '{y} vs {x} grouped by {gr}'.format(
x=xparamname,
y=yparamname,
gr=groupname,
)
if extratitle is not None:
title = title+' '+extratitle
if xparam == 'cumdist': # pragma: no cover
res = make_cumvalues(datadf[xparam])
datadf[xparam] = res[0]
if xparam == 'distance': # pragma: no cover
xaxmax = datadf[xparam].max()
xaxmin = datadf[xparam].min()
elif xparam == 'time': # pragma: no cover
tseconds = datadf.loc[:, 'time']
xaxmax = tseconds.max()
xaxmin = 0
elif xparam == 'workoutid': # pragma: no cover
xaxmax = datadf[xparam].max()-5
xaxmin = datadf[xparam].min()+5
else:
xaxmax = yaxmaxima[xparam]
xaxmin = yaxminima[xparam]
if yparam == 'distance': # pragma: no cover
yaxmax = datadf[yparam].max()
yaxmin = datadf[yparam].min()
elif yparam == 'time': # pragma: no cover
tseconds = datadf.loc[:, 'time']
yaxmax = tseconds.max()
yaxmin = 0
elif yparam == 'workoutid': # pragma: no cover
yaxmax = datadf[yparam].max()-5
yaxmin = datadf[yparam].min()+5
else:
yaxmax = yaxmaxima[yparam]
yaxmin = yaxminima[yparam]
data_dict = datadf.to_dict("records")
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
chart_data = {
'title': title,
'x': xparam,
'y': yparam,
'data': data_dict,
'metrics': metrics_list,
'errorbars':ploterrorbars,
'groupname': groupname,
}
script, div = get_chart("/trendflex", chart_data,debug=False)
return script, div
def interactive_cum_flex_chart2(theworkouts, promember=0,
xparam='spm',
yparam1='power',
yparam2='spm',
workstrokesonly=False,
extratitle='',
trendline=False):
ids = [int(w.id) for w in theworkouts]
columns = [name for name, d in metrics.rowingmetrics]
columns_basic = [name for name, d in metrics.rowingmetrics if d['group'] == 'basic']
columns = columns + ['spm', 'driveenergy', 'distance' ,'workoutstate']
columns_basic = columns_basic + ['spm', 'driveenergy', 'distance', 'workoutstate']
datadf = pd.DataFrame()
if promember:
datadf = dataprep.read_data(columns, ids=ids, doclean=True,
workstrokesonly=workstrokesonly, for_chart=True)
else:
datadf = dataprep.read_data(columns_basic, ids=ids, doclean=True,
workstrokesonly=workstrokesonly, for_chart=True)
try:
_ = datadf[yparam2]
except (KeyError, ColumnNotFoundError): # pragma: no cover
yparam2 = 'None'
try:
_ = datadf[yparam1]
except (KeyError, ColumnNotFoundError):
yparam1 = 'None'
datadf = dataprep.remove_nulls_pl(datadf)
# test if we have drive energy
try: # pragma: no cover
_ = datadf['driveenergy'].mean()
except (KeyError, ColumnNotFoundError): # pragma: no cover
datadf = datadf.with_columns((pl.lit(500)).alias("driveenergy"))
# test if we have power
try: # pragma: no cover
_ = datadf['power'].mean()
except (KeyError, ColumnNotFoundError): # pragma: no cover
datadf = datadf.with_columns((pl.lit(50)).alias("power"))
yparamname1 = axlabels[yparam1]
if yparam2 != 'None':
yparamname2 = axlabels[yparam2]
# check if dataframe not empty
if datadf.is_empty(): # pragma: no cover
return ['', '<p>No non-zero data in selection</p>', '', '']
try:
datadf = datadf.with_columns(pl.col(xparam).alias("x1"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
try:
datadf = datadf.with_columns(pl.col("distance").alias("x1"))
except (KeyError, ColumnNotFoundError):
try:
datadf = datadf.with_columns(pl.col('time').alias("x1"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
return ['', '<p>No non-zero data in selection</p>', '', '']
try:
datadf = datadf.with_columns(pl.col(yparam1).alias("y1"))
except (KeyError, ColumnNotFoundError):
try:
datadf = datadf.with_columns(pl.col('pace').alias("y1"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
return ['', '<p>No non-zero data in selection</p>', '', '']
if yparam2 != 'None':
try:
datadf = datadf.with_columns(pl.col(yparam2).alias("y2"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
datadf = datadf.with_columns(pl.col("y1").alias("y2"))
else: # pragma: no cover
datadf = datadf.with_columns(pl.col("y1").alias("y2"))
datadf = datadf.with_columns(xname = pl.lit(axlabels[xparam]))
datadf = datadf.with_columns(yname1 = pl.lit(axlabels[yparam1]))
if yparam2 != 'None':
datadf = datadf.with_columns(yname2 = pl.lit(axlabels[yparam2]))
else: # pragma: no cover
datadf = datadf.with_columns(yname2 = pl.lit(axlabels[yparam1]))
def func(x, a, b):
return a*x+b
x1 = datadf['x1']
y1 = datadf['y1']
try:
popt, pcov = optimize.curve_fit(func, x1, y1)
ytrend = func(x1, popt[0], popt[1])
datadf= datadf.with_columns(ytrend = ytrend)
except TypeError:
datadf = datadf.with_columns(ytrend = y1)
data_dict = datadf.to_dicts()
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
chart_data = {
'title': extratitle,
'x': xparam,
'y1': yparam1,
'y2': yparam2,
'data': data_dict,
'metrics': metrics_list,
'trendline': trendline,
}
script, div = get_chart("/dots", chart_data, debug=False)
return script, div
def interactive_flexchart_stacked(id, r, xparam='time',
yparam1='pace',
yparam2='power',
yparam3='hr',
yparam4='spm',
mode='erg'):
columns = [name for name, d in metrics.rowingmetrics]
columns_basic = [name for name, d in metrics.rowingmetrics if d['group'] == 'basic']
columns = columns + ['spm', 'driveenergy', 'distance','workoutid','workoutstate']
columns_basic = columns_basic + ['spm', 'driveenergy', 'distance','workoutid','workoutstate']
rowdata = pd.DataFrame()
row = Workout.objects.get(id=id)
if ispromember(r.user):
rowdata = dataprep.read_data(columns, ids=[id], doclean=True,
workstrokesonly=False, for_chart=True)
else:
rowdata = dataprep.read_data(columns_basic, ids=[id], doclean=True,
workstrokesonly=False, for_chart=True)
rowdata = dataprep.remove_nulls_pl(rowdata)
if r.usersmooth > 1: # pragma: no cover
for column in columns:
try:
if metricsdicts[column]['maysmooth']:
nrsteps = int(log2(r.usersmooth))
for i in range(nrsteps):
column_ew = utils.ewmovingaverage(
rowdata[column], 5)
rowdata = rowdata.with_columns(
column = column_ew
)
except KeyError:
pass
if len(rowdata) < 2:
if ispromember(r.user):
rowdata = dataprep.read_data(columns, ids=[id],
doclean=False,
workstrokesonly=False,
for_chart=True)
else:
rowdata = dataprep.read_data(columns_basic, ids=[id],
doclean=False,
workstrokesonly=False,
for_chart=True)
rowdata = dataprep.remove_nulls_pl(rowdata)
if rowdata.is_empty():
return "", "No valid data"
try:
tseconds = rowdata['time']
except (KeyError, ColumnNotFoundError): # pragma: no cover
return '', 'No time data - cannot make flex plot'
try:
rowdata = rowdata.with_columns(x1=pl.col(xparam))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(x1=pl.lit(0))
try:
rowdata = rowdata.with_columns(y1=pl.col(yparam1))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y1=pl.col("time"))
rowdata = rowdata.with_columns((pl.col("y1")).alias(yparam1))
try:
rowdata = rowdata.with_columns(y2=pl.col(yparam2))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y2=pl.col("time"))
rowdata = rowdata.with_columns((pl.col("y2")).alias(yparam2))
try:
rowdata = rowdata.with_columns(y1=pl.col(yparam3))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y3=pl.col("time"))
rowdata = rowdata.with_columns((pl.col("y3")).alias(yparam3))
try:
rowdata = rowdata.with_columns(y4=pl.col(yparam1))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y4=pl.col("time"))
rowdata = rowdata.with_columns((pl.col("y4")).alias(yparam4))
# replace nans
rowdata = rowdata.fill_nan(0)
data_dict = rowdata.to_dicts()
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
chart_data = {
'title': row.name,
'x': xparam,
'y1': yparam1,
'y2': yparam2,
'y3': yparam3,
'y4': yparam4,
'data': data_dict,
'metrics': metrics_list,
}
script, div = get_chart("/stacked", chart_data, debug=True)
return script, div
def interactive_flex_chart2(id, r, promember=0,
xparam='time',
yparam1='pace',
yparam2='hr',
plottype='line',
workstrokesonly=False,
trendline=False,
mode='rower'):
columns = [name for name, d in metrics.rowingmetrics]
columns_basic = [name for name, d in metrics.rowingmetrics if d['group'] == 'basic']
columns = columns + ['spm', 'driveenergy', 'distance','workoutstate']
columns_basic = columns_basic + ['spm', 'driveenergy', 'distance','workoutstate']
if promember:
rowdata = dataprep.read_data(columns, ids=[id], doclean=True,
workstrokesonly=workstrokesonly, for_chart=True)
else:
rowdata = dataprep.read_data(columns_basic, ids=[id], doclean=True,
workstrokesonly=workstrokesonly, for_chart=True)
if r.usersmooth > 1: # pragma: no cover
for column in columns:
try:
if metricsdicts[column]['maysmooth']:
nrsteps = int(log2(r.usersmooth))
for i in range(nrsteps):
rowdata = rowdata.with_columns(
column = utils.ewmovingaverage(
rowdata[column], 5))
except KeyError:
pass
if len(rowdata) < 2:
if promember:
rowdata = dataprep.read_data(columns, ids=[id],
doclean=False,
workstrokesonly=False, for_chart=True)
else:
rowdata = dataprep.read_data(columns_basic, ids=[id], doclean=False,
workstrokesonly=False, for_chart=True)
workstrokesonly = False
try:
_ = rowdata[yparam2]
except (KeyError, TypeError, ColumnNotFoundError): # pragma: no cover
yparam2 = 'None'
try:
_ = rowdata[yparam1]
except (TypeError, KeyError, ColumnNotFoundError): # pragma: no cover
yparam1 = 'None'
# test if we have drive energy
try:
_ = rowdata['driveenergy'].mean()
except (KeyError, TypeError, ColumnNotFoundError):
rowdata = rowdata.with_columns(driveenergy=pl.lit(500))
# test if we have power
try:
_ = rowdata['power'].mean()
except (KeyError, TypeError, ColumnNotFoundError):
rowdata = rowdata.with_columns(power=pl.lit(50))
# replace nans
rowdata = dataprep.remove_nulls_pl(rowdata)
row = Workout.objects.get(id=id)
if rowdata.is_empty():
return "", "No valid data", workstrokesonly
try:
tseconds = rowdata['time']
except (KeyError, ColumnNotFoundError): # pragma: no cover
return '', 'No time data - cannot make flex plot', workstrokesonly
try:
rowdata = rowdata.with_columns(x1 = pl.col(xparam))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(x1 = pl.col("time"))
try:
rowdata = rowdata.with_columns(y1 = pl.col(yparam1))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y1 = pl.col("time"))
rowdata = rowdata.with_columns(yparam1 = pl.col("y1"))
if yparam2 != 'None':
try:
rowdata = rowdata.with_columns(y2 = pl.col(yparam2))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns(y2 = pl.col("time"))
rowdata = rowdata.with_columns(yparam2 = pl.col("y2"))
else: # pragma: no cover
rowdata = rowdata.with_columns(y2=pl.col("y1"))
# average values
if xparam != 'time':
try:
x1mean = rowdata['x1'].mean()
except TypeError: # pragma: no cover
x1mean = 0
else: # pragma: no cover
x1mean = 0
y1mean = rowdata['y1'].mean()
y2mean = rowdata['y2'].mean()
try:
rowdata = rowdata.with_columns((pl.lit(axlabels[xparam])).alias("xname"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns((pl.lit(xparam)).alias("xname"))
try:
rowdata = rowdata.with_columns((pl.lit(axlabels[yparam1])).alias("yname1"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns((pl.lit(yparam1)).alias("yname1"))
if yparam2 != 'None':
try:
rowdata = rowdata.with_columns((pl.lit(axlabels[yparam2])).alias("yname2"))
except (KeyError, ColumnNotFoundError): # pragma: no cover
rowdata = rowdata.with_columns((pl.lit(yparam2)).alias("yname2"))
else: # pragma: no cover
rowdata = rowdata.with_columns((pl.col("yname1")).alias("yname2"))
def func(x, a, b):
return a*x+b
x1 = rowdata['x1']
y1 = rowdata['y1']
try:
popt, pcov = optimize.curve_fit(func, x1, y1)
ytrend = func(x1, popt[0], popt[1])
rowdata = rowdata.with_columns(ytrend=ytrend)
except TypeError: # pragma: no cover
rowdata = rowdata.with_columns(ytrend=pl.col("y1"))
#rowdata = rowdata.replace([np.inf, -np.inf], np.nan)
rowdata = rowdata.fill_nan(None).drop_nulls()
data_dict = rowdata.to_dicts()
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
chart_data = {
'title': row.name,
'x': xparam,
'y1': yparam1,
'y2': yparam2,
'data': data_dict,
'metrics': metrics_list,
'trendline': trendline,
'plottype': plottype,
}
script, div = get_chart("/flex2", chart_data, debug=False)
return script, div, workstrokesonly
def thumbnails_set(r, id, favorites):
charts = []
columns = [f.xparam for f in favorites]
columns += [f.yparam1 for f in favorites]
columns += [f.yparam2 for f in favorites]
columns += ['time']
rowdata = dataprep.getsmallrowdata_pd(columns, ids=[id], doclean=True)
try:
rowdata.dropna(axis=1, how='all', inplace=True)
except TypeError: # pragma: no cover
return [
{'script': "",
'div': "",
'notes': ""
}]
if rowdata.empty:
try:
rowdata = dataprep.getsmallrowdata_pd(columns, ids=[id], doclean=False,
workstrokesonly=False)
except: # pragma: no cover
return [
{'script': "",
'div': "",
'notes': ""
}]
if rowdata.empty:
return [
{'script': "",
'div': "",
'notes': ""
}]
lengte = len(rowdata)
maxlength = 50
if lengte > maxlength:
try:
bins = np.linspace(rowdata['time'].min(),
rowdata['time'].max(), maxlength)
groups = rowdata.groupby(np.digitize(rowdata['time'], bins))
rowdata = groups.mean()
except (KeyError, TypeError): # pragma: no cover
pass
plotnr = 1
for f in favorites:
script, div = thumbnail_flex_chart(
rowdata,
id=id,
xparam=f.xparam,
yparam1=f.yparam1,
yparam2=f.yparam2,
plottype=f.plottype,
plotnr=plotnr
)
plotnr = plotnr+1
charts.append({
'script': script,
'div': div,
'notes': f.notes})
return charts
def thumbnail_flex_chart(rowdata, id=0, promember=0,
xparam='time',
yparam1='pace',
yparam2='hr',
plottype='line',
plotnr=1,
workstrokesonly=False):
try:
_ = rowdata[yparam2]
except KeyError:
yparam2 = 'None'
try:
_ = rowdata[yparam1]
except KeyError:
yparam1 = 'None'
try:
tseconds = rowdata.loc[:, 'time']
except KeyError: # pragma: no cover
return '', 'No time data - cannot make flex plot'
try:
rowdata['x1'] = rowdata.loc[:, xparam]
except KeyError: # pragma: no cover
rowdata['x1'] = 0*rowdata.loc[:, 'time']
try:
rowdata['y1'] = rowdata.loc[:, yparam1]
except KeyError: # pragma: no cover
rowdata['y1'] = 0*rowdata.loc[:, 'time']
if yparam2 != 'None':
try:
rowdata['y2'] = rowdata.loc[:, yparam2]
except KeyError: # pragma: no cover
rowdata['y2'] = 0*rowdata.loc[:, 'time']
else:
rowdata['y2'] = rowdata['y1']
data_dict = rowdata.to_dict("records")
metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics]
chart_data = {
'title': '',
'x': xparam,
'y1': yparam1,
'y2': yparam2,
'data': data_dict,
'metrics': metrics_list,
'plottype': plottype,
'plotnr': plotnr,
}
script, div = get_chart("/miniflex", chart_data)
return script, div
def interactive_multiple_compare_chart(ids, xparam, yparam, plottype='line',
promember=0, workstrokesonly=True,
labeldict=None, startenddict={}):
columns = [xparam,yparam]
columns_basic = [xparam,yparam]
add_columns = [
'ftime', 'distance', 'fpace',
'spm','cumdist',
'time', 'pace', 'workoutstate',
'workoutid'
]
columns = columns + add_columns
columns_basic = columns_basic + add_columns
compute = False
doclean = False
if workstrokesonly:
compute = True
doclean = True
driveenergy = False
if 'driveenergy' in columns:
driveenergy = True
datadf = pl.DataFrame()
if promember:
datadf = dataprep.read_data(columns, ids=ids, doclean=doclean,
compute=compute, driveenergy=driveenergy,
workstrokesonly=workstrokesonly, for_chart=True,
startenddict=startenddict)
else:
datadf = dataprep.read_data(columns_basic, ids=ids, doclean=doclean,
compute=compute, driveenergy = driveenergy,
workstrokesonly=workstrokesonly, for_chart=True,
startenddict=startenddict)
datadf = dataprep.remove_nulls_pl(datadf)
# check if dataframe not empty
if datadf.is_empty(): # pragma: no cover
return ['<p>No non-zero data in selection</p>', '']
datadf = datadf.with_columns(pl.col("workoutid").cast(pl.UInt32).keep_name())
# filter for start end dict
nrworkouts = len(ids)
try:
tseconds = datadf['time']
except (KeyError, ColumnNotFoundError): # pragma: no cover
try:
tseconds = datadf[xparam]
except:
return ['<p>A chart data error occurred</p>', '']
if (xparam == 'time'):
datadf = datadf.with_columns((pl.col(xparam)-datadf[0,xparam]).alias(xparam))
data_dict = datadf.to_dicts()
metrics_list = [{'name': name, 'rowingmetrics':{k: v for k, v in d.items() if k != 'numtype_pl'} } for name, d in metrics.rowingmetrics]
try:
workoutsdict = [{'id': id, 'label': labeldict[id]} for id in ids]
except KeyError:
return ['<p>Chart error</p>','']
chart_data = {
'title': '',
'x': xparam,
'y': yparam,
'data': data_dict,
'metrics': metrics_list,
'plottype': plottype,
'workouts': workoutsdict,
}
script, div = get_chart("/compare", chart_data, debug=False)
return script, div
def get_zones_report_pl(rower, startdate, enddate, trainingzones='hr', date_agg='week',
yaxis='time'):
data = []
enddate = enddate + datetime.timedelta(days=1)
workouts = Workout.objects.filter(
user=rower,
startdatetime__gte=startdate,
startdatetime__lte=enddate,
duplicate=False,
).order_by("-startdatetime")
ids = [w.id for w in workouts]
columns = ['workoutid', 'hr', 'power', 'time']
df = dataprep.read_data(columns, ids=ids, workstrokesonly=False, doclean=False)
df = dataprep.remove_nulls_pl(df)
try:
df = df.with_columns((pl.col("time").diff().clip(0, 20*1.e3)).alias("deltat")).lazy()
except ColumnNotFoundError:
pass
hrzones = rower.hrzones
powerzones = rower.powerzones
for w in workouts:
iswater = w.workouttype in mytypes.otwtypes
pw_ut2 = rower.pw_ut2
pw_ut1 = rower.pw_ut1
pw_at = rower.pw_at
pw_tr = rower.pw_tr
pw_an = rower.pw_an
if iswater:
pw_ut2 = pw_ut2*(100.-rower.otwslack)/100.
pw_ut1 = pw_ut1*(100.-rower.otwslack)/100.
pw_at = pw_at*(100.-rower.otwslack)/100.
pw_tr = pw_tr*(100.-rower.otwslack)/100.
pw_an = pw_an*(100.-rower.otwslack)/100.
# 1
time_ut2 = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") < rower.ut2,
)
time_pw_ut2 = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") < pw_ut2,
)
# 2
time_ut1 = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") >= rower.ut2,
pl.col("hr") < rower.ut1,
)
time_pw_ut1 = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") >= pw_ut2,
pl.col("power") < pw_ut1,
)
#3
time_at = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") >= rower.ut1,
pl.col("hr") < rower.at,
)
time_pw_at = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") >= pw_ut1,
pl.col("power") < pw_at,
)
#4
time_tr = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") >= rower.at,
pl.col("hr") < rower.tr,
)
time_pw_tr = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") >= pw_at,
pl.col("power") < pw_tr,
)
#5
time_an = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") >= rower.tr,
pl.col("hr") < rower.an,
)
time_pw_an = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") >= pw_tr,
pl.col("power") < pw_an,
)
time_max = df.filter(
pl.col("workoutid") == w.id,
pl.col("hr") >= rower.an,
)
time_pw_max = df.filter(
pl.col("workoutid") == w.id,
pl.col("power") >= pw_an,
)
time_in_ut2 = time_ut2.collect()['deltat'].sum()/(60*1.e3)
time_in_ut2_pw = time_pw_ut2.collect()['deltat'].sum()/(60*1.e3)
time_in_ut1 = time_ut1.collect()['deltat'].sum()/(60*1.e3)
time_in_ut1_pw = time_pw_ut1.collect()['deltat'].sum()/(60*1.e3)
time_in_at = time_at.collect()['deltat'].sum()/(60*1.e3)
time_in_at_pw = time_pw_at.collect()['deltat'].sum()/(60*1.e3)
time_in_tr = time_tr.collect()['deltat'].sum()/(60*1.e3)
time_in_tr_pw = time_pw_tr.collect()['deltat'].sum()/(60*1.e3)
time_in_an = time_an.collect()['deltat'].sum()/(60*1.e3)
time_in_an_pw = time_pw_an.collect()['deltat'].sum()/(60*1.e3)
time_in_max = time_max.collect()['deltat'].sum()/(60*1.e3)
time_in_max_pw = time_pw_max.collect()['deltat'].sum()/(60*1.e3)
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': '<{ut2}'.format(ut2=hrzones[1]),
'time_in_zone': time_in_ut2,
'pw_zonename': '<{ut2}'.format(ut2=powerzones[1]),
'pw_time_in_zone': time_in_ut2_pw,
})
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': hrzones[1],
'time_in_zone': time_in_ut1,
'pw_zonename': powerzones[1],
'pw_time_in_zone': time_in_ut1_pw,
})
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': hrzones[2],
'time_in_zone': time_in_at,
'pw_zonename': powerzones[2],
'pw_time_in_zone': time_in_at_pw,
})
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': hrzones[3],
'time_in_zone': time_in_tr,
'pw_zonename': powerzones[3],
'pw_time_in_zone': time_in_tr_pw,
})
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': hrzones[4],
'time_in_zone': time_in_an,
'pw_zonename': powerzones[4],
'pw_time_in_zone': time_in_an_pw,
})
data.append({
'date': w.date.strftime("%Y-%m-%d"),
'id': w.id,
'zonename': hrzones[5],
'time_in_zone': time_in_max,
'pw_zonename': powerzones[5],
'pw_time_in_zone': time_in_max_pw,
})
chart_data = {
'data': data,
'hrzones': hrzones,
'powerzones': powerzones,
}
return chart_data
def interactive_zoneschart2(rower, data, startdate, enddate, trainingzones='hr', date_agg='week',
yaxis='time'):
if startdate >= enddate: # pragma: no cover
st = startdate
startdate = enddate
enddate = st
hrzones = data['hrzones']
powerzones = data['powerzones']
data['yaxis'] = yaxis
data['title'] = 'Activity {d1} to {d2} for {r}'.format(
d1=startdate.strftime("%Y-%m-%d"),
d2=enddate.strftime("%Y-%m-%d"),
r=str(rower),
)
data['stackBy'] = 'time_in_zone'
data['colorBy'] = 'zonename'
if trainingzones == 'power':
data['stackBy'] = 'pw_time_in_zone'
data['colorBy'] = 'pw_zonename'
data['doReduce'] = True
data['datebin'] = date_agg
data['colors'] = ['green', 'lime', 'yellow', 'blue', 'purple', 'red']
script, div = get_chart("/zones", data, debug=False)
return script, div