1281 lines
39 KiB
Python
1281 lines
39 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
from datetime import timedelta
|
|
from django.utils import timezone
|
|
|
|
import math
|
|
import numpy as np
|
|
import pandas as pd
|
|
import colorsys
|
|
from django.conf import settings
|
|
import collections
|
|
|
|
import uuid
|
|
import datetime
|
|
|
|
import json
|
|
import time
|
|
from fitparse import FitFile
|
|
from django.conf import settings
|
|
|
|
from django.http import HttpResponse
|
|
|
|
import requests
|
|
|
|
from django.http import HttpResponse
|
|
|
|
import humanize
|
|
from pytz.exceptions import UnknownTimeZoneError
|
|
import pytz
|
|
import iso8601
|
|
from iso8601 import ParseError
|
|
import arrow
|
|
|
|
lbstoN = 4.44822
|
|
|
|
landingpages = (
|
|
('workout_edit_view','Edit View'),
|
|
('workout_workflow_view','Workflow View'),
|
|
('workout_stats_view','Stats View'),
|
|
('workout_data_view','Data Explore View'),
|
|
('workout_summary_edit_view','Intervals Editor'),
|
|
)
|
|
|
|
|
|
|
|
workflowmiddlepanel = (
|
|
('panel_statcharts.html','Static Charts'),
|
|
('flexthumbnails.html','Flex Charts'),
|
|
('panel_summary.html','Summary'),
|
|
('panel_map.html','Map'),
|
|
('panel_comments.html','Basic Info and Links'),
|
|
('panel_notes.html','Workout Notes'),
|
|
('panel_shortcomment.html','Comment Link'),
|
|
('panel_middlesocial.html','Social Media Share Buttons'),
|
|
)
|
|
|
|
defaultmiddle = ['panel_middlesocial.html',
|
|
'panel_statcharts.html',
|
|
'flexthumbnails.html',
|
|
'panel_summary.html',
|
|
'panel_map.html']
|
|
|
|
workflowleftpanel = (
|
|
('panel_navigationheader.html','Navigation Header'),
|
|
('panel_editbuttons.html','Edit Workout Button'),
|
|
('panel_delete.html','Delete Workout Button'),
|
|
('panel_export.html','Export Workout Button'),
|
|
('panel_social.html','Social Media Share Buttons'),
|
|
('panel_advancededit.html','Advanced Workout Edit Button'),
|
|
('panel_editintervals.html','Edit Intervals Button'),
|
|
('panel_stats.html','Workout Statistics Button'),
|
|
('panel_flexchart.html','Flex Chart'),
|
|
('panel_staticchart.html','Create Static Charts Buttons'),
|
|
('panel_uploadimage.html','Attach Image'),
|
|
('panel_geekyheader.html','Geeky Header'),
|
|
('panel_editwind.html','Edit Wind Data'),
|
|
('panel_editstream.html','Edit Stream Data'),
|
|
('panel_otwpower.html','Run OTW Power Calculations'),
|
|
('panel_mapview.html','Map'),
|
|
('panel_ranking.html','Ranking View'),
|
|
)
|
|
|
|
defaultleft = [
|
|
'panel_navigationheader.html',
|
|
'panel_editbuttons.html',
|
|
'panel_advancededit.html',
|
|
'panel_editintervals.html',
|
|
'panel_stats.html',
|
|
'panel_staticchart.html',
|
|
'panel_uploadimage.html',
|
|
]
|
|
|
|
coxes_calls = [
|
|
'Sit Ready!',
|
|
"Let's relax the shoulders, and give me a power ten to the finish!",
|
|
"Almost there. Give me ten strokes on the legs!",
|
|
"Let it run!",
|
|
"Don't rush the slides!",
|
|
"Quick hands.",
|
|
"You are clearing the puddles.",
|
|
"Let's push through now. Get me that open water.",
|
|
"We're going for the line now. Power ten on the next.",
|
|
]
|
|
|
|
info_calls = [
|
|
"Please give us a minute to count all those strokes, you've been working hard!",
|
|
"Please give us a minute to count all your strokes."
|
|
]
|
|
|
|
import random
|
|
|
|
def dologging(filename,s):
|
|
tstamp = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y %H:%M:%S', tstamp)
|
|
with open(filename,'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write(s)
|
|
|
|
def to_pace(pace):
|
|
minutes, seconds = divmod(pace,60)
|
|
seconds, rest = divmod(seconds, 1)
|
|
tenths = int(rest*10)
|
|
s = '{m:0>2}:{s:0>2}.{t:0>1}'.format(
|
|
m=int(minutes),
|
|
s=int(seconds),
|
|
t=int(tenths)
|
|
)
|
|
return s
|
|
|
|
def get_call():
|
|
call1 = random.choice(coxes_calls)
|
|
call2 = random.choice(info_calls)
|
|
|
|
call = """<div id="id_sitready" class="successmessage">
|
|
<p>
|
|
%s (%s)
|
|
</p>
|
|
</div>
|
|
""" % (call1, call2)
|
|
|
|
return call
|
|
|
|
def absolute(request):
|
|
urls = {
|
|
'ABSOLUTE_ROOT': request.build_absolute_uri('/')[:-1].strip("/"),
|
|
'ABSOLUTE_ROOT_URL': request.build_absolute_uri('/').strip("/"),
|
|
'PATH':request.build_absolute_uri(),
|
|
}
|
|
|
|
return urls
|
|
|
|
def trcolors(r1,g1,b1,r2,g2,b2):
|
|
r1 = r1/255.
|
|
r2 = r2/255.
|
|
g1 = g1/255.
|
|
g2 = g2/255.
|
|
b2 = b2/255.
|
|
b1 = b1/255.
|
|
h1,s1,v1 = colorsys.rgb_to_hsv(r1,g1,b1)
|
|
h2,s2,v2 = colorsys.rgb_to_hsv(r2,g2,b2)
|
|
|
|
|
|
return 360*h1,360*(h2-h1),s1,(s2-s1),v1,(v2-v1)
|
|
|
|
palettes = {
|
|
'monochrome_blue':(207,-4,0.06,0.89,1.0,-0.38),
|
|
'gold_sunset':(47,-31,.26,-0.12,0.94,-0.5),
|
|
'blue_red':(207,-200,.85,0,.74,-.24),
|
|
'blue_green':(207,-120,.85,0,.75,.25),
|
|
'cyan_green':(192,-50,.08,.65,.98,-.34),
|
|
'cyan_purple':trcolors(237,248,251,136,65,157),
|
|
'green_blue':trcolors(240,249,232,8,104,172),
|
|
'orange_red':trcolors(254,240,217,179,0,0),
|
|
'cyan_blue':trcolors(241,238,246,4,90,141),
|
|
'cyan_green':trcolors(246,239,247,1,108,89),
|
|
'cyan_magenta':trcolors(241,238,246,152,0,67),
|
|
'beige_magenta':trcolors(254,235,226,122,1,119),
|
|
'yellow_green':trcolors(255,255,204,0,104,55),
|
|
'yellow_blue':trcolors(255,255,205,37,52,148),
|
|
'autumn':trcolors(255,255,212,153,52,4),
|
|
'yellow_red':trcolors(255,255,178,189,0,39)
|
|
}
|
|
|
|
rankingdistances = [100,500,1000,2000,5000,6000,10000,21097,42195,100000]
|
|
rankingdurations = []
|
|
rankingdurations.append(datetime.time(minute=1))
|
|
rankingdurations.append(datetime.time(minute=4))
|
|
rankingdurations.append(datetime.time(minute=30))
|
|
rankingdurations.append(datetime.time(hour=1,minute=15))
|
|
rankingdurations.append(datetime.time(hour=1))
|
|
|
|
def range_to_color_hex(groupcols,palette='monochrome_blue'):
|
|
|
|
try:
|
|
plt = palettes[palette]
|
|
except KeyError: # pragma: no cover
|
|
plt = palettes['monochrome_blue']
|
|
|
|
rgb = [colorsys.hsv_to_rgb((plt[0]+plt[1]*x)/360.,
|
|
plt[2]+plt[3]*x,
|
|
plt[4]+plt[5]*x) for x in groupcols]
|
|
|
|
RGB = [(int(255.*r),int(255.*g),int(255.*b)) for (r, g, b) in rgb]
|
|
colors = ["#%02x%02x%02x" % (r, g, b) for (r, g, b) in RGB]
|
|
|
|
return colors
|
|
|
|
def str2bool(v): # pragma: no cover
|
|
return v.lower() in ("yes", "true", "t", "1")
|
|
|
|
def uniqify(seq, idfun=None):
|
|
# order preserving
|
|
if idfun is None:
|
|
def idfun(x): return x
|
|
seen = {}
|
|
result = []
|
|
for item in seq:
|
|
marker = idfun(item)
|
|
# in old Python versions:
|
|
# if seen.has_key(marker)
|
|
# but in new ones:
|
|
if marker in seen: continue
|
|
seen[marker] = 1
|
|
result.append(item)
|
|
return result
|
|
|
|
def serialize_list(value,token=','): # pragma: no cover
|
|
assert(isinstance(value, list) or isinstance(value, tuple) or isinstance(value,np.ndarray))
|
|
return token.join([str(s) for s in value])
|
|
|
|
def deserialize_list(value,token=','): # pragma: no cover
|
|
if isinstance(value, list):
|
|
return value
|
|
elif isinstance(value, np.ndarray):
|
|
return value
|
|
return value.split(token)
|
|
|
|
def geo_distance(lat1,lon1,lat2,lon2):
|
|
""" Approximate distance and bearing between two points
|
|
defined by lat1,lon1 and lat2,lon2
|
|
This is a slight underestimate but is close enough for our purposes,
|
|
We're never moving more than 10 meters between trackpoints
|
|
|
|
Bearing calculation fails if one of the points is a pole.
|
|
(Hey, from the North pole you can walk South, East, North and end up
|
|
on the same spot!)
|
|
|
|
"""
|
|
|
|
# radius of our earth in km --> should be moved to settings if
|
|
# rowing takes off on other planets
|
|
R = 6373.0
|
|
|
|
# pi
|
|
pi = math.pi
|
|
|
|
lat1 = math.radians(lat1)
|
|
lat2 = math.radians(lat2)
|
|
lon1 = math.radians(lon1)
|
|
lon2 = math.radians(lon2)
|
|
|
|
dlon = lon2 - lon1
|
|
dlat = lat2 - lat1
|
|
|
|
a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
distance = R * c
|
|
|
|
tc1 = math.atan2(math.sin(lon2-lon1)*math.cos(lat2),
|
|
math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(lon2-lon1))
|
|
|
|
tc1 = tc1 % (2*pi)
|
|
|
|
bearing = math.degrees(tc1)
|
|
|
|
return [distance,bearing]
|
|
|
|
|
|
|
|
def isbreakthrough(delta,cpvalues,p0,p1,p2,p3,ratio):
|
|
pwr = abs(p0)/(1+(delta/abs(p2)))
|
|
pwr += abs(p1)/(1+(delta/abs(p3)))
|
|
|
|
dd = 0.25*(ratio-1)
|
|
pwr2 = pwr*(1+dd)
|
|
|
|
pwr *= ratio
|
|
|
|
|
|
delta = delta.values.astype(int)
|
|
cpvalues = cpvalues.values.astype(int)
|
|
pwr = pwr.astype(int)
|
|
|
|
|
|
res = np.sum(cpvalues>pwr)
|
|
res2 = np.sum(cpvalues>pwr2)
|
|
|
|
|
|
btdf = pd.DataFrame(
|
|
{
|
|
'delta':delta[cpvalues>pwr],
|
|
'cpvalues':cpvalues[cpvalues>pwr],
|
|
'pwr':pwr[cpvalues>pwr],
|
|
}
|
|
)
|
|
|
|
|
|
btdf.sort_values('delta',axis=0,inplace=True)
|
|
|
|
|
|
return res>=1,btdf,res2>=1
|
|
|
|
|
|
def myqueue(queue,function,*args,**kwargs):
|
|
class MockJob:
|
|
def __init__(self,*args, **kwargs):
|
|
self.result = 1
|
|
self.id = 1
|
|
|
|
def revoke(self): # pragma: no cover
|
|
return 1
|
|
|
|
if settings.TESTING:
|
|
return MockJob()
|
|
elif settings.CELERY: # pragma: no cover
|
|
kwargs['debug'] = True
|
|
job = function.delay(*args,**kwargs)
|
|
else: # pragma: no cover
|
|
if settings.DEBUG:
|
|
kwargs['debug'] = True
|
|
|
|
job_id = str(uuid.uuid4())
|
|
kwargs['job_id'] = job_id
|
|
kwargs['jobkey'] = job_id
|
|
kwargs['timeout'] = 3600
|
|
|
|
job = queue.enqueue(function,*args,**kwargs)
|
|
|
|
return job # pragma: no cover
|
|
|
|
|
|
from datetime import date
|
|
|
|
def calculate_age(born,today=None):
|
|
if not today:
|
|
today = timezone.now()
|
|
if born:
|
|
try:
|
|
return today.year - born.year - ((today.month, today.day) < (born.month, born.day))
|
|
except AttributeError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
def my_dict_from_instance(instance,model):
|
|
thedict = {}
|
|
thedict['id'] = instance.id
|
|
|
|
for f in instance._meta.fields:
|
|
|
|
fname = f.name
|
|
|
|
try:
|
|
verbosename = f.verbose_name
|
|
except: # pragma: no cover
|
|
verbosename = f.name
|
|
|
|
get_choice = 'get_'+fname+'_display'
|
|
if hasattr( instance, get_choice):
|
|
value = getattr(instance, get_choice)()
|
|
else:
|
|
try:
|
|
value = getattr(instance,fname)
|
|
except AttributeError: # pragma: no cover
|
|
value = None
|
|
|
|
if f.editable and value:
|
|
thedict[fname] = (verbosename,value)
|
|
|
|
return thedict
|
|
|
|
def wavg(group, avg_name, weight_name):
|
|
""" http://stackoverflow.com/questions/10951341/pandas-dataframe-aggregate-function-using-multiple-columns
|
|
In rare instance, we may not have weights, so just return the mean. Customize this if your business case
|
|
should return otherwise.
|
|
"""
|
|
try:
|
|
d = group[avg_name]
|
|
except KeyError:
|
|
return 0
|
|
try:
|
|
w = group[weight_name]
|
|
except KeyError:
|
|
return d.mean()
|
|
try:
|
|
return (d * w).sum() / w.sum()
|
|
except ZeroDivisionError: # pragma: no cover
|
|
return d.mean()
|
|
|
|
def totaltime_sec_to_string(totaltime,shorten=False):
|
|
if np.isnan(totaltime):
|
|
return ''
|
|
hours = int(totaltime / 3600.)
|
|
if hours > 23: # pragma: no cover
|
|
message = 'Warning: The workout duration was longer than 23 hours. '
|
|
hours = 23
|
|
|
|
minutes = int((totaltime - 3600. * hours) / 60.)
|
|
if minutes > 59: # pragma: no cover
|
|
minutes = 59
|
|
if not message: # pragma: no cover
|
|
message = 'Warning: there is something wrong with the workout duration'
|
|
|
|
seconds = int(totaltime - 3600. * hours - 60. * minutes)
|
|
if seconds > 59: # pragma: no cover
|
|
seconds = 59
|
|
if not message: # pragma: no cover
|
|
message = 'Warning: there is something wrong with the workout duration'
|
|
|
|
tenths = int(10 * (totaltime - 3600. * hours - 60. * minutes - seconds))
|
|
if tenths > 9: # pragma: no cover
|
|
tenths = 9
|
|
if not message: # pragma: no cover
|
|
message = 'Warning: there is something wrong with the workout duration'
|
|
|
|
duration = ""
|
|
if not shorten:
|
|
duration = "{hours:02d}:{minutes:02d}:{seconds:02d}.{tenths}".format(
|
|
hours=hours,
|
|
minutes=minutes,
|
|
seconds=seconds,
|
|
tenths=tenths
|
|
)
|
|
else:
|
|
if hours != 0: # pragma: no cover
|
|
duration = "{hours}:{minutes:02d}:{seconds:02d}".format(
|
|
hours=hours,
|
|
minutes=minutes,
|
|
seconds=seconds,
|
|
tenths=tenths
|
|
)
|
|
else:
|
|
duration = "{minutes}:{seconds:02d}".format(
|
|
hours=hours,
|
|
minutes=minutes,
|
|
seconds=seconds,
|
|
tenths=tenths
|
|
)
|
|
|
|
return duration
|
|
|
|
|
|
def iscoach(m,r): # pragma: no cover
|
|
result = False
|
|
result = m in r.coaches
|
|
|
|
return result
|
|
|
|
# Exponentially weighted moving average
|
|
# Used for data smoothing of the jagged data obtained by Strava
|
|
# See bitbucket issue 72
|
|
def ewmovingaverage(interval,window_size):
|
|
# Experimental code using Exponential Weighted moving average
|
|
|
|
try:
|
|
intervaldf = pd.DataFrame({'v':interval})
|
|
idf_ewma1 = intervaldf.ewm(span=window_size)
|
|
idf_ewma2 = intervaldf[::-1].ewm(span=window_size)
|
|
|
|
i_ewma1 = idf_ewma1.mean().loc[:,'v']
|
|
i_ewma2 = idf_ewma2.mean().loc[:,'v']
|
|
|
|
interval2 = np.vstack((i_ewma1,i_ewma2[::-1]))
|
|
interval2 = np.mean( interval2, axis=0) # average
|
|
except ValueError: # pragma: no cover
|
|
interval2 = interval
|
|
|
|
return interval2
|
|
|
|
# Exceptions
|
|
# Custom error class - to raise a NoTokenError
|
|
class NoTokenError(Exception):
|
|
def __init__(self,value):
|
|
self.value=value
|
|
|
|
def __str__(self): # pragma: no cover
|
|
return repr(self.value)
|
|
|
|
class ProcessorCustomerError(Exception): # pragma: no cover
|
|
def __init__(self, value):
|
|
self.value=value
|
|
|
|
def __str__(self):
|
|
return repr(self.value)
|
|
|
|
# Custom exception handler, returns a 401 HTTP message
|
|
# with exception details in the json data
|
|
def custom_exception_handler(exc,message):
|
|
|
|
response = {
|
|
"errors": [
|
|
{
|
|
"code": str(exc),
|
|
"detail": message,
|
|
}
|
|
]
|
|
}
|
|
|
|
res = HttpResponse(message)
|
|
res.status_code = 401
|
|
res.json = json.dumps(response)
|
|
|
|
return res
|
|
|
|
def get_strava_stream(r,metric,stravaid,series_type='time',fetchresolution='high',authorizationstring=''):
|
|
if r is not None:
|
|
authorizationstring = str('Bearer ' + r.stravatoken)
|
|
headers = {'Authorization': authorizationstring,
|
|
'user-agent': 'sanderroosendaal',
|
|
'Content-Type': 'application/json',
|
|
'resolution': 'medium',}
|
|
|
|
if metric == 'power': # pragma: no cover
|
|
metric = 'watts'
|
|
|
|
url = "https://www.strava.com/api/v3/activities/{stravaid}/streams/{metric}?resolution={fetchresolution}&series_type={series_type}".format(
|
|
stravaid=stravaid,
|
|
fetchresolution=fetchresolution,
|
|
series_type=series_type,
|
|
metric=metric
|
|
)
|
|
|
|
|
|
s = requests.get(url,headers=headers)
|
|
|
|
|
|
if metric=='power': # pragma: no cover
|
|
with open('data.txt', 'w') as outfile:
|
|
json.dump(s.json(), outfile)
|
|
|
|
try:
|
|
for data in s.json():
|
|
y = None
|
|
try:
|
|
if data['type'] == metric:
|
|
return np.array(data['data'])
|
|
except TypeError: # pragma: no cover
|
|
return None
|
|
except: # pragma: no cover
|
|
return None
|
|
|
|
return None # pragma: no cover
|
|
|
|
def allmonths(startdate,enddate):
|
|
d = startdate
|
|
while d<enddate:
|
|
yield d
|
|
d = datetime.date(d.year+(d.month // 12),((d.month % 12) + 1),1)
|
|
|
|
def allsundays(startdate,enddate):
|
|
d = startdate
|
|
d += datetime.timedelta(days = 6 - d.weekday()) # first Sunday
|
|
while d<=enddate:
|
|
yield d
|
|
d += datetime.timedelta(days=7)
|
|
|
|
def steps_read_fit(filename,name='',sport='Custom'):
|
|
authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN
|
|
url = settings.WORKOUTS_FIT_URL+"/tojson"
|
|
headers = {'Authorization':authorizationstring}
|
|
|
|
response = requests.post(url=url,headers=headers,json={'filename':filename})
|
|
|
|
if response.status_code != 200: # pragma: no cover
|
|
return None
|
|
|
|
w = response.json()
|
|
try:
|
|
d = w['workout']
|
|
except KeyError: # pragma: no cover
|
|
return None
|
|
|
|
return d
|
|
|
|
def steps_write_fit(steps):
|
|
authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN
|
|
url = settings.WORKOUTS_FIT_URL+"/tofit"
|
|
headers = {'Authorization':authorizationstring}
|
|
|
|
response = requests.post(url=url,headers=headers,json=steps)
|
|
|
|
if response.status_code != 200: # pragma: no cover
|
|
return None
|
|
|
|
w = response.json()
|
|
try:
|
|
filename = w['filename']
|
|
except KeyError: # pragma: no cover
|
|
return None
|
|
|
|
return filename
|
|
|
|
def step_to_time_dist(step,avgspeed = 3.2,ftp=200,ftspm=25,ftv=3.7):
|
|
seconds = 0
|
|
distance = 0
|
|
rscore = 0
|
|
durationtype = step.get('durationType',0)
|
|
value = step.get('durationValue',0)
|
|
|
|
if value == 0: # pragma: no cover
|
|
return 0,0,0
|
|
|
|
targettype = step.get('targetType',0)
|
|
|
|
if durationtype == 'Time':
|
|
seconds = value/1000.
|
|
distance = avgspeed*seconds
|
|
rscore = 60.*seconds/3600.
|
|
|
|
if targettype == 'Speed':
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
velomid = 0.
|
|
if value != 0: # pragma: no cover
|
|
distance = seconds*value/1000.
|
|
velomid = value/1000.
|
|
elif valuelow != 0 and valuehigh != 0: # pragma: no cover
|
|
distance = seconds*(valuelow+valuehigh)/2.
|
|
velomid = (valuelow+valuehigh)/2000.
|
|
else:
|
|
velomid = avgspeed
|
|
|
|
veloratio = (velomid/ftv)**(3.0)
|
|
rscoreperhour = 100.*veloratio
|
|
rscore = rscoreperhour*seconds/3600.
|
|
|
|
if targettype == 'Power':
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
if value < 10 and value > 0: # pragma: no cover
|
|
targetpower = ftp*0.6
|
|
elif value > 10 and value < 1000: # pragma: no cover
|
|
targetpower = value*ftp/100.
|
|
elif value > 1000:
|
|
targetpower = value-1000
|
|
avgspeed = ftv*(targetpower/ftp)**(1./3.)
|
|
distance = avgspeed*seconds
|
|
avgpower = targetpower
|
|
if valuelow != 0 and valuehigh != 0: # pragma: no cover
|
|
avgpower = (valuelow+valuehigh)/2.
|
|
avgspeed = ftv*(avgspeed/ftv)**(1./3.)
|
|
distance = avgspeed*seconds
|
|
|
|
rscore = 100.*(avgpower/ftp)*seconds/3600.
|
|
|
|
if targettype == 'Cadence':
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
avgpower = ftp*value/ftspm
|
|
if valuelow != 0 and valuehigh != 0: # pragma: no cover
|
|
avgspm = (valuelow+valuehigh)/2.
|
|
avgpower = ftp*avgspm/ftspm
|
|
|
|
rscore = 100*(avgpower/ftp)*seconds/3600.
|
|
|
|
return seconds,distance,rscore
|
|
elif durationtype == 'Distance':
|
|
distance = value/100.
|
|
seconds = distance/avgspeed
|
|
rscore = 60.*float(seconds)/3600.
|
|
|
|
if targettype == 'Speed': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
velomid = 0
|
|
|
|
if value != 0:
|
|
seconds = distance/value
|
|
velomid = value/1000.
|
|
elif valuelow != 0 and valuehigh != 0:
|
|
velomid = (valuelow+valuehigh)/2000.
|
|
seconds = distance/velomid
|
|
|
|
veloratio = (velomid/ftv)**(3.0)
|
|
rscoreperhour = 100.*veloratio
|
|
rscore = rscoreperhour*seconds/3600.
|
|
|
|
if targettype == 'Power': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
if value < 10 and value > 0:
|
|
targetpower = ftp*0.6
|
|
elif value > 10 and value < 1000:
|
|
targetpower = value*ftp/100.
|
|
elif value > 1000:
|
|
targetpower = value-1000
|
|
avgspeed = ftv*(targetpower/ftp)**(1./3.)
|
|
seconds = distance/avgspeed
|
|
avgpower = targetpower
|
|
if valuelow != 0 and valuehigh != 0:
|
|
avgpower = (valuelow+valuehigh)/2.
|
|
avgspeed = ftv*(avgspeed/ftv)**(1./3.)
|
|
seconds = distance/avgspeed
|
|
|
|
rscore = 100.*(avgpower/ftp)*seconds/3600.
|
|
|
|
if targettype == 'Cadence': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
avgpower = ftp*value/ftspm
|
|
if valuelow != 0 and valuehigh != 0:
|
|
avgspm = (valuelow+valuehigh)/2.
|
|
avgpower = ftp*avgspm/ftspm
|
|
|
|
rscore = 100*(avgpower/ftp)*seconds/3600.
|
|
|
|
return seconds, distance, rscore
|
|
elif durationtype in ['PowerLessThan','PowerGreaterThan','HrLessThan','HrGreaterThan']: # pragma: no cover
|
|
seconds = 600
|
|
distance = seconds*avgspeed
|
|
veloratio = (avgspeed/ftv)**(3.0)
|
|
rscoreperhour = 100.*veloratio
|
|
rscore = rscoreperhour*seconds/3600.
|
|
|
|
return seconds,distance,rscore
|
|
|
|
return seconds,distance, rscore
|
|
|
|
def get_step_type(step): # pragma: no cover
|
|
t = 'WorkoutStep'
|
|
|
|
if step['durationType'] in ['RepeatUntilStepsCmplt','RepeatUntilHrLessThan','RepeatUntilHrGreaterThan']:
|
|
t = 'WorkoutRepeatStep'
|
|
|
|
return t
|
|
|
|
def peel(l):
|
|
if len(l)==0: # pragma: no cover
|
|
return None,None
|
|
if len(l)==1:
|
|
return l[0],None
|
|
|
|
first = l[0]
|
|
rest = l[1:]
|
|
|
|
if first['type'] == 'Step': # pragma: no cover
|
|
return first, rest
|
|
# repeatstep
|
|
theID = -1
|
|
lijst = []
|
|
while theID != first['repeatID']:
|
|
f, rest = peel(rest)
|
|
lijst.append(f)
|
|
theID = f['stepID']
|
|
first['steps'] = list(reversed(lijst))
|
|
|
|
return first,rest
|
|
|
|
|
|
def ps_dict_order_dict(d,short=False):
|
|
steps = d['steps']
|
|
sdicts = []
|
|
for step in steps:
|
|
sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short)
|
|
seconds, meters,rscore = step_to_time_dist(step)
|
|
|
|
sdict = {
|
|
'type':type,
|
|
'stepID': stepID,
|
|
'repeatID': repeatID,
|
|
'repeatValue': repeatValue,
|
|
'dict': step,
|
|
}
|
|
sdicts.append(sdict)
|
|
|
|
sdict2 = list(reversed(sdicts))
|
|
|
|
return sdict2
|
|
|
|
def ps_dict_order(d,short=False,rower=None,html=True):
|
|
sdict = collections.OrderedDict({})
|
|
steps = d['steps']
|
|
|
|
ftp = 200
|
|
if rower is not None:
|
|
ftp = rower.ftp
|
|
# ftv = rower.ftv
|
|
# ftspm = rower.ftspm
|
|
|
|
for step in steps:
|
|
sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short)
|
|
seconds, meters,rscore = step_to_time_dist(step,ftp=ftp)
|
|
|
|
sdict[stepID] = {
|
|
'string':sstring,
|
|
'type':type,
|
|
'stepID': stepID,
|
|
'repeatID': repeatID,
|
|
'repeatValue': repeatValue,
|
|
'seconds': seconds,
|
|
'meters': meters,
|
|
'rscore': rscore,
|
|
}
|
|
|
|
|
|
sdict2 = collections.OrderedDict(reversed(list(sdict.items())))
|
|
|
|
for step in steps:
|
|
sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short)
|
|
seconds, meters, rscore = step_to_time_dist(step)
|
|
|
|
sdict[stepID] = {
|
|
'string':sstring,
|
|
'type':type,
|
|
'stepID': stepID,
|
|
'repeatID': repeatID,
|
|
'repeatValue': repeatValue,
|
|
'seconds': seconds,
|
|
'meters': meters,
|
|
'rscore': rscore,
|
|
}
|
|
|
|
sdict2 = collections.OrderedDict(reversed(list(sdict.items())))
|
|
|
|
sdict3 = []
|
|
hold = []
|
|
multiplier = []
|
|
holduntil = []
|
|
spaces = ''
|
|
totalmeters = 0
|
|
totalseconds = 0
|
|
totalrscore = 0
|
|
factor = 1
|
|
|
|
for key, item in sdict2.items():
|
|
if item['type'] == 'RepeatStep':
|
|
hold.append(item)
|
|
holduntil.append(item['repeatID'])
|
|
multiplier.append(item['repeatValue'])
|
|
factor *= item['repeatValue']
|
|
if html:
|
|
spaces += ' '
|
|
else:
|
|
spaces += ' '
|
|
if item['type'] == 'Step':
|
|
item['string'] = spaces+item['string']
|
|
sdict3.append(item)
|
|
totalmeters += factor*item['meters']
|
|
totalseconds += factor*item['seconds']
|
|
totalrscore += factor*item['rscore']
|
|
if len(holduntil)>0 and item['stepID'] <= holduntil[-1]:
|
|
if item['stepID'] == holduntil[-1]:
|
|
sdict3.append(hold.pop())
|
|
factor /= multiplier.pop()
|
|
if html:
|
|
spaces = spaces[:-18]
|
|
else:
|
|
spaces = spaces[:-3]
|
|
holduntil.pop()
|
|
else: # pragma: no cover
|
|
prevstep = sdict3.pop()
|
|
prevstep['string'] = prevstep['string'][18:]
|
|
prevprevstep = sdict3.pop()
|
|
prevprevstep['string'] = spaces+prevprevstep['string']
|
|
sdict3.append(prevprevstep)
|
|
curstep = hold.pop()
|
|
curstep['string'] = curstep['string']
|
|
sdict3.append(curstep)
|
|
factor /= multiplier.pop()
|
|
sdict3.append(prevstep)
|
|
holduntil.pop()
|
|
if html:
|
|
spaces = spaces[:-18]
|
|
else:
|
|
spaces = spaces[:-3]
|
|
|
|
sdict = list(reversed(sdict3))
|
|
|
|
return sdict,totalmeters,totalseconds,totalrscore
|
|
|
|
def step_to_string(step,short=False):
|
|
type = 'Step'
|
|
repeatID = -1
|
|
repeatValue = 1
|
|
|
|
nr = 0
|
|
name = ''
|
|
intensity = ''
|
|
duration = ''
|
|
unit = ''
|
|
target = ''
|
|
repeat = ''
|
|
notes = ''
|
|
|
|
durationtype = step['durationType']
|
|
if step['durationValue'] == 0:
|
|
if durationtype not in ['RepeatUntilStepsCmplt','RepeatUntilHrLessThan','RepeatUntilHrGreaterThan']: # pragma: no cover
|
|
return '',type, -1, -1,1
|
|
|
|
if durationtype == 'Time':
|
|
unit = 'min'
|
|
value = step['durationValue']
|
|
if value/1000. >= 3600: # pragma: no cover
|
|
unit = 'h'
|
|
dd = timedelta(seconds=value/1000.)
|
|
#duration = humanize.naturaldelta(dd, minimum_unit="seconds")
|
|
duration = '{v}'.format(v=str(dd))
|
|
elif durationtype == 'Distance':
|
|
unit = 'm'
|
|
value = step['durationValue']/100.
|
|
duration = int(value)
|
|
elif durationtype == 'HrLessThan': # pragma: no cover
|
|
value = step['durationValue']
|
|
if value <= 100:
|
|
duration = 'until heart rate lower than {v}% of max'.format(v=value)
|
|
if short:
|
|
duration = 'until HR<{v}% of max'.format(v=value)
|
|
else:
|
|
duration = 'until heart rate lower than {v}'.format(v=value-100)
|
|
if short:
|
|
duration = 'until HR<{v}'.format(v=value-100)
|
|
elif durationtype == 'HrGreaterThan': # pragma: no cover
|
|
value = step['durationValue']
|
|
if value <= 100:
|
|
duration = 'until heart rate greater than {v}% of max'.format(v=value)
|
|
if short:
|
|
duration = 'until R>{v}% of max'.format(v=value)
|
|
else:
|
|
duration = 'until heart rate greater than {v}'.format(v=value-100)
|
|
if short:
|
|
duration = 'until HR>{v}'.format(v=value/100)
|
|
elif durationtype == 'PowerLessThan': # pragma: no cover
|
|
value = step['durationValue']
|
|
targetvalue = step.get('targetValue',0)
|
|
if value <= 1000:
|
|
duration = 'Repeat until Power is less than {targetvalue} % of FTP'.format(
|
|
targetvalue=targetvalue
|
|
)
|
|
if short:
|
|
'until < {targetvalue}% of FTP'.format(targetvalue=targetvalue)
|
|
else:
|
|
duration = 'Repeat until Power is less than {targetvalue} Watt'.format(
|
|
targetvalue=targetvalue-1000
|
|
)
|
|
if short:
|
|
'until < {targetvalue} W'.format(targetvalue=targetvalue-1000)
|
|
elif durationtype == 'PowerGreaterThan': # pragma: no cover
|
|
value = step['durationValue']
|
|
targetvalue = step.get('targetValue',0)
|
|
if value <= 1000:
|
|
duration = 'Repeat until Power is greater than {targetvalue} % of FTP'.format(
|
|
targetvalue=targetvalue
|
|
)
|
|
if short:
|
|
duration = 'until > {targetvalue}% of FTP'.format(targetvalue=targetvalue)
|
|
else:
|
|
duration = 'Repeat until Power is greater than {targetvalue} Watt'.format(
|
|
targetvalue=targetvalue-1000
|
|
)
|
|
if short:
|
|
duration = 'until > {targetvalue} W'.format(targetvalue=targetvalue)
|
|
elif durationtype == 'RepeatUntilStepsCmplt': # pragma: no cover
|
|
type = 'RepeatStep'
|
|
ntimes = ': {v}x'.format(v=step['targetValue'])
|
|
repeatID = step['durationValue']
|
|
duration =ntimes
|
|
repeatValue = step.get('targetValue',0)
|
|
elif durationtype == 'RepeatUntilHrGreaterThan': # pragma: no cover
|
|
type = 'RepeatStep'
|
|
targetvalue = step.get('targetValue',0)
|
|
if targetvalue <= 100:
|
|
duration = 'Repeat until Heart Rate is greater than {targetvalue} % of max'.format(
|
|
targetvalue=targetvalue
|
|
)
|
|
if short:
|
|
duration = ': until HR>{targetvalue} % of max'.format(targetvalue=targetvalue)
|
|
else:
|
|
duration = 'Repeat until Heart Rate is greater than {targetvalue}:'.format(
|
|
targetvalue=targetvalue-100.
|
|
)
|
|
if short:
|
|
duration = ': untl HR>{targetvalue}'.format(targetvalue=targetvalue-100)
|
|
repeatID = step['durationValue']
|
|
elif durationtype == 'RepeatUntilHrLessThan': # pragma: no cover
|
|
type = 'RepeatStep'
|
|
targetvalue = step.get('targetValue',0)
|
|
if targetvalue <= 100:
|
|
duration = 'Repeat until Heart Rate is less than {targetvalue} % of max'.format(
|
|
targetvalue=targetvalue
|
|
)
|
|
if short:
|
|
duration = ': until HR<{targetvalue}% of max'.format(targetvalue=targetvalue)
|
|
else:
|
|
duration = 'Repeat until Heart Rate is less than {targetvalue}:'.format(
|
|
targetvalue=targetvalue-100.
|
|
)
|
|
if short:
|
|
duration = ': until HR<{targetvalue}'.format(targetvalue=targetvalue-100)
|
|
repeatID = step['durationValue']
|
|
|
|
|
|
#
|
|
|
|
try:
|
|
targettype = step['targetType']
|
|
except KeyError:
|
|
targettype = None
|
|
|
|
if targettype == 'HeartRate': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value < 10 and value>0:
|
|
target = '@ HR zone {v}'.format(v=value)
|
|
else:
|
|
if valuelow < 100:
|
|
target = '@ HR {l} - {h} % of max'.format(
|
|
l = valuelow,
|
|
h = valuehigh,
|
|
)
|
|
else:
|
|
target = '@ HR {l} - {h}'.format(
|
|
l = valuelow - 100,
|
|
h = valuehigh - 100,
|
|
)
|
|
elif targettype == 'Power': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value < 10 and value>0:
|
|
target = '@ Power zone {v}'.format(v=value)
|
|
elif value > 10 and value < 1000:
|
|
target = '@ {v}% of FTP'.format(v=value)
|
|
elif value > 1000:
|
|
target = '@ {v} W'.format(v=value-1000)
|
|
elif valuelow > 0 and valuehigh > 0:
|
|
if valuelow < 1000:
|
|
target = '@ {l} - {h} % of FTP'.format(
|
|
l = valuelow,
|
|
h = valuehigh,
|
|
)
|
|
else:
|
|
target = '@ {l} - {h} W'.format(
|
|
l = valuelow-1000,
|
|
h = valuehigh-1000,
|
|
)
|
|
elif targettype == 'Speed': # pragma: no cover
|
|
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
v = value/1000.
|
|
pace = 500./v
|
|
pacestring = to_pace(pace)
|
|
target = '@ {v} m/s {p}, per 500m'.format(
|
|
v=value/1000.,
|
|
p=pacestring)
|
|
if short: # pragma: no cover
|
|
target = '@ {p}'.format(p=pacestring)
|
|
elif valuelow != 0 and valuehigh != 0: # pragma: no cover
|
|
v = valuelow/1000.
|
|
pace = 500./v
|
|
pacestringlow = to_pace(pace)
|
|
|
|
v = valuehigh/1000.
|
|
pace = 500./v
|
|
pacestringhigh = to_pace(pace)
|
|
|
|
target = '@ {l:1.2f} and {h:1.2f} m/s, ({ph} to {pl} per 500m)'.format(
|
|
l = valuelow/1000.,
|
|
h = valuehigh/1000.,
|
|
pl = pacestringlow,
|
|
ph = pacestringhigh,
|
|
)
|
|
if short:
|
|
target = '@ {pl} - {ph}'.format(
|
|
pl = pacestringlow,
|
|
ph = pacestringhigh,
|
|
)
|
|
elif targettype == 'Cadence': # pragma: no cover
|
|
value = step.get('targetValue',0)
|
|
valuelow = step.get('targetValueLow',0)
|
|
valuehigh = step.get('targetValueHigh',0)
|
|
|
|
if value != 0:
|
|
target = '@ {v} SPM'.format(v=value)
|
|
elif valuelow != 0 and valuehigh != 0:
|
|
target = '@ {l} - {h} SPM'.format(
|
|
l = valuelow,
|
|
h = valuehigh
|
|
)
|
|
|
|
|
|
nr = step['stepId']
|
|
|
|
name = step['wkt_step_name']
|
|
|
|
notes = ''
|
|
try:
|
|
if len(step['description']): # pragma: no cover
|
|
notes = ' - '+step['description']
|
|
except KeyError:
|
|
notes = ''
|
|
|
|
try:
|
|
intensity = step['intensity']
|
|
except KeyError:
|
|
intensity = 0
|
|
|
|
s = '{duration} {unit} {target} {repeat} {notes}'.format(
|
|
nr = nr,
|
|
name = name,
|
|
unit=unit,
|
|
intensity = intensity,
|
|
duration = duration,
|
|
target=target,
|
|
repeat = repeat,
|
|
notes=notes,
|
|
)
|
|
|
|
if short:
|
|
s = '{duration} {unit} {target} {repeat}'.format(
|
|
duration = duration,
|
|
target = target,
|
|
repeat = repeat,
|
|
unit = unit,
|
|
)
|
|
|
|
if short and intensity in ['Warmup','Cooldown','Rest']:
|
|
s = '{intensity} {duration} {unit} {target} {repeat}'.format(
|
|
intensity = intensity,
|
|
duration = duration,
|
|
target = target,
|
|
repeat = repeat,
|
|
unit = unit
|
|
)
|
|
elif intensity in ['Warmup','Cooldown','Rest']:
|
|
s = '{intensity} {duration} {unit} {target} {repeat} {notes}'.format(
|
|
intensity = intensity,
|
|
duration = duration,
|
|
target = target,
|
|
repeat = repeat,
|
|
unit = unit,
|
|
notes = notes,
|
|
)
|
|
|
|
if type == 'RepeatStep':
|
|
s = 'Repeat {duration}'.format(duration=duration)
|
|
|
|
return s,type, nr, repeatID, repeatValue
|
|
|
|
def strfdelta(tdelta): # pragma: no cover
|
|
try:
|
|
minutes, seconds = divmod(tdelta.seconds, 60)
|
|
tenths = int(tdelta.microseconds / 1e5)
|
|
except AttributeError:
|
|
minutes, seconds = divmod(tdelta.view(np.int64), 60e9)
|
|
seconds, rest = divmod(seconds, 1e9)
|
|
tenths = int(rest / 1e8)
|
|
res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format(
|
|
minutes=minutes,
|
|
seconds=seconds,
|
|
tenths=tenths,
|
|
)
|
|
|
|
return res
|
|
|
|
def request_is_ajax(request):
|
|
is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest'
|
|
#if settings.TESTING:
|
|
# is_ajax = True
|
|
|
|
return is_ajax
|
|
|
|
def intervals_to_string(vals, units, typ):
|
|
if vals is None or units is None or typ is None: # pragma: no cover
|
|
return ''
|
|
if len(vals) != len(units) or len(vals) != len(typ): # pragma: no cover
|
|
return ''
|
|
|
|
s = ''
|
|
previous = 'rest'
|
|
for i in range(len(vals)):
|
|
if typ[i] == 'rest' and previous == 'rest':
|
|
if units[i] == 'min': # pragma: no cover
|
|
val = int(vals[i])*60
|
|
unit = 'sec'
|
|
else:
|
|
val = int(vals[i])
|
|
if units[i] == 'meters': # pragma: no cover
|
|
unit = 'm'
|
|
if units[i] == 'seconds':
|
|
unit = 'sec'
|
|
s += '+0min/{val}{unit}'.format(val=val,unit=unit)
|
|
elif typ[i] == 'rest': # pragma: no cover
|
|
if units[i] == 'min':
|
|
val = int(vals[i])*60
|
|
unit = 'sec'
|
|
else:
|
|
val = int(vals[i])
|
|
if units[i] == 'meters':
|
|
unit = 'm'
|
|
if units[i] == 'seconds':
|
|
unit = 'sec'
|
|
s += '/{val}{unit}'.format(val=val,unit=unit)
|
|
previous = 'rest'
|
|
else: # pragma: no cover # work interval
|
|
if units[i] == 'min':
|
|
val = int(vals[i])*60
|
|
unit = 'sec'
|
|
else:
|
|
val = int(vals[i])
|
|
if units[i] == 'meters':
|
|
unit = 'm'
|
|
if units[i] == 'seconds':
|
|
unit = 'sec'
|
|
s += '+{val}{unit}'.format(val=val,unit=unit)
|
|
previous = 'work'
|
|
|
|
if s[0] == '+':
|
|
s = s[1:]
|
|
|
|
return s
|
|
|
|
def get_timezone_from_c2data(data):
|
|
|
|
try:
|
|
timezone = pytz.timezone(data['timezone'])
|
|
except UnknownTimeZoneError:
|
|
timezone = pytz.utc
|
|
except KeyError: # pragma: no cover
|
|
timezone = pytz.utc
|
|
|
|
return timezone
|
|
|
|
|
|
def get_startdatetime_from_c2data(data):
|
|
timezone = get_timezone_from_c2data(data)
|
|
try:
|
|
startdatetime = iso8601.parse_date(data['date_utc'])
|
|
except: # pragma: no cover
|
|
startdatetime = iso8601.parse_date(data['date'])
|
|
|
|
totaltime = data['time']/10.
|
|
duration = totaltime_sec_to_string(totaltime)
|
|
starttimeunix = arrow.get(startdatetime).timestamp()-totaltime
|
|
startdatetime = arrow.get(starttimeunix)
|
|
startdatetime = startdatetime.astimezone(timezone)
|
|
|
|
workoutdate = startdatetime.astimezone(
|
|
timezone
|
|
).strftime('%Y-%m-%d')
|
|
starttime = startdatetime.astimezone(
|
|
timezone
|
|
).strftime('%H:%M:%S')
|
|
|
|
return startdatetime,starttime,workoutdate,duration,starttimeunix,timezone
|