Private
Public Access
1
0
Files
rowsandall/rowers/utils.py
2025-04-13 17:28:51 +02:00

1522 lines
47 KiB
Python

from datetime import date
import random
from datetime import timedelta
from django.utils import timezone
import math
import numpy as np
import pandas as pd
import polars as pl
from polars.exceptions import ColumnNotFoundError
import colorsys
from django.conf import settings
import collections
import uuid
import datetime
import json
import time
from fitparse import FitFile
from django.http import HttpResponse
import requests
import humanize
from pytz.exceptions import UnknownTimeZoneError
import pytz
import iso8601
from iso8601 import ParseError
import arrow
from django.conf import settings
lbstoN = 4.44822
intensitymap = {
0: "Active",
1: "Rest",
2: "Warmup",
3: "Cooldown",
4: "Recovery",
5: "Interval",
6: "Other",
}
landingpages = (
('workout_view', 'Workout View'),
('workout_edit_view', 'Edit View'),
('workout_workflow_view', 'Workflow View'),
('workout_stats_view', 'Stats View'),
('workout_data_view', 'Data Explore View'),
('workout_summary_edit_view', 'Intervals Editor'),
('workout_flexchart_stacked_view', 'Workout Stacked Chart'),
('workout_flexchart3_view', 'Workout Flex Chart')
)
landingpages2 = (
('workout_view', 'Workout View'),
('workout_edit_view', 'Edit View'),
('workout_workflow_view', 'Workflow View'),
('workout_stats_view', 'Stats View'),
('workout_data_view', 'Data Explore View'),
('workout_summary_edit_view', 'Intervals Editor'),
('workout_flexchart_stacked_view', 'Workout Stacked Chart'),
('workout_flexchart3_view', 'Workout Flex Chart'),
('workout_delete', 'Remove Workout')
)
workout_name_element = (
('date', 'Date'),
('name', 'Name'),
('distance', 'Distance'),
('ownerfirst', 'Rower first name'),
('ownerlast', 'Rower last name'),
('duration', 'Duration'),
('boattype','Boat type'),
('workouttype', 'Workout type'),
('seatnumber','Seat number'),
('boatname', 'Boat name'),
('empowerside', 'Empower side'),
('inboard', 'Inboard'),
('oarlength', 'Oar length'),
)
workflowmiddlepanel = (
('panel_statcharts.html', 'Static Charts'),
('flexthumbnails.html', 'Flex Charts'),
('panel_summary.html', 'Summary'),
('panel_map.html', 'Map'),
('panel_comments.html', 'Basic Info and Links'),
('panel_empower.html', 'Empower Oarlock Info'),
('panel_notes.html', 'Workout Notes'),
('panel_shortcomment.html', 'Comment Link'),
('panel_middlesocial.html', 'Social Media Share Buttons'),
)
defaultmiddle = ['panel_middlesocial.html',
'panel_statcharts.html',
'flexthumbnails.html',
'panel_summary.html',
'panel_map.html']
workflowleftpanel = (
('panel_navigationheader.html', 'Navigation Header'),
('panel_editbuttons.html', 'Edit Workout Button'),
('panel_delete.html', 'Delete Workout Button'),
('panel_export.html', 'Export Workout Button'),
('panel_social.html', 'Social Media Share Buttons'),
('panel_advancededit.html', 'Advanced Workout Edit Button'),
('panel_editintervals.html', 'Edit Intervals Button'),
('panel_stats.html', 'Workout Statistics Button'),
('panel_flexchart.html', 'Flex Chart'),
('panel_staticchart.html', 'Create Static Charts Buttons'),
('panel_uploadimage.html', 'Attach Image'),
('panel_geekyheader.html', 'Geeky Header'),
('panel_editwind.html', 'Edit Wind Data'),
('panel_editstream.html', 'Edit Stream Data'),
('panel_otwpower.html', 'Run OTW Power Calculations'),
('panel_mapview.html', 'Map'),
('panel_ranking.html', 'Ranking View'),
)
defaultleft = [
'panel_navigationheader.html',
'panel_editbuttons.html',
'panel_advancededit.html',
'panel_editintervals.html',
'panel_stats.html',
'panel_staticchart.html',
'panel_uploadimage.html',
]
coxes_calls = [
'Sit Ready!',
"Let's relax the shoulders, and give me a power ten to the finish!",
"Almost there. Give me ten strokes on the legs!",
"Let it run!",
"Don't rush the slides!",
"Quick hands.",
"You are clearing the puddles.",
"Let's push through now. Get me that open water.",
"We're going for the line now. Power ten on the next.",
]
info_calls = [
"Please give us a minute to count all those strokes, you've been working hard!",
"Please give us a minute to count all your strokes."
]
def dologging(filename, s):
do_logging = settings.MYLOGGING.get(filename,True)
if not do_logging:
return
tstamp = time.localtime()
timestamp = time.strftime('%b-%d-%Y %H:%M:%S', tstamp)
with open(filename, 'a') as f:
f.write('\n')
f.write(timestamp)
f.write(' ')
try:
f.write(s)
except TypeError:
f.write(str(s))
def to_pace(pace): # pragma: no cover
minutes, seconds = divmod(pace, 60)
seconds, rest = divmod(seconds, 1)
tenths = int(rest*10)
s = '{m:0>2}:{s:0>2}.{t:0>1}'.format(
m=int(minutes),
s=int(seconds),
t=int(tenths)
)
return s
def get_call():
call1 = random.choice(coxes_calls)
call2 = random.choice(info_calls)
call = """<div id="id_sitready" class="successmessage"><p>%s (%s)</p></div>""" % (call1, call2)
return call
def absolute(request):
urls = {
'ABSOLUTE_ROOT': request.build_absolute_uri('/')[:-1].strip("/"),
'ABSOLUTE_ROOT_URL': request.build_absolute_uri('/').strip("/"),
'PATH': request.build_absolute_uri(),
}
return urls
def trcolors(r1, g1, b1, r2, g2, b2):
r1 = r1/255.
r2 = r2/255.
g1 = g1/255.
g2 = g2/255.
b2 = b2/255.
b1 = b1/255.
h1, s1, v1 = colorsys.rgb_to_hsv(r1, g1, b1)
h2, s2, v2 = colorsys.rgb_to_hsv(r2, g2, b2)
return 360*h1, 360*(h2-h1), s1, (s2-s1), v1, (v2-v1)
palettes = {
'monochrome_blue': (207, -4, 0.06, 0.89, 1.0, -0.38),
'gold_sunset': (47, -31, .26, -0.12, 0.94, -0.5),
'blue_red': (207, -200, .85, 0, .74, -.24),
'blue_green': (207, -120, .85, 0, .75, .25),
'cyan_purple': trcolors(237, 248, 251, 136, 65, 157),
'green_blue': trcolors(240, 249, 232, 8, 104, 172),
'orange_red': trcolors(254, 240, 217, 179, 0, 0),
'cyan_blue': trcolors(241, 238, 246, 4, 90, 141),
'cyan_green': trcolors(246, 239, 247, 1, 108, 89),
'cyan_magenta': trcolors(241, 238, 246, 152, 0, 67),
'beige_magenta': trcolors(254, 235, 226, 122, 1, 119),
'yellow_green': trcolors(255, 255, 204, 0, 104, 55),
'yellow_blue': trcolors(255, 255, 205, 37, 52, 148),
'autumn': trcolors(255, 255, 212, 153, 52, 4),
'yellow_red': trcolors(255, 255, 178, 189, 0, 39)
}
rankingdistances = [100, 500, 1000, 2000,
5000, 6000, 10000, 21097, 42195, 100000]
rankingdurations = []
rankingdurations.append(datetime.time(minute=1))
rankingdurations.append(datetime.time(minute=4))
rankingdurations.append(datetime.time(minute=30))
rankingdurations.append(datetime.time(hour=1, minute=15))
rankingdurations.append(datetime.time(hour=1))
def range_to_color_hex(groupcols, palette='monochrome_blue'):
try:
plt = palettes[palette]
except KeyError: # pragma: no cover
plt = palettes['monochrome_blue']
rgb = [colorsys.hsv_to_rgb((plt[0]+plt[1]*x)/360.,
plt[2]+plt[3]*x,
plt[4]+plt[5]*x) for x in groupcols]
RGB = [(int(255.*r), int(255.*g), int(255.*b)) for (r, g, b) in rgb]
colors = ["#%02x%02x%02x" % (r, g, b) for (r, g, b) in RGB]
return colors
def str2bool(v): # pragma: no cover
return v.lower() in ("yes", "true", "t", "1")
def uniqify(seq, idfun=None):
# order preserving
if idfun is None:
def idfun(x):
return x
seen = {}
result = []
for item in seq:
marker = idfun(item)
# in old Python versions:
# if seen.has_key(marker)
# but in new ones:
if marker in seen:
continue
seen[marker] = 1
result.append(item)
return result
def serialize_list(value, token=','): # pragma: no cover
assert(isinstance(value, list) or isinstance(
value, tuple) or isinstance(value, np.ndarray))
return token.join([str(s) for s in value])
def deserialize_list(value, token=','): # pragma: no cover
if isinstance(value, list):
return value
elif isinstance(value, np.ndarray):
return value
return value.split(token)
def geo_distance(lat1, lon1, lat2, lon2):
""" Approximate distance and bearing between two points
defined by lat1,lon1 and lat2,lon2
This is a slight underestimate but is close enough for our purposes,
We're never moving more than 10 meters between trackpoints
Bearing calculation fails if one of the points is a pole.
(Hey, from the North pole you can walk South, East, North and end up
on the same spot!)
"""
# radius of our earth in km --> should be moved to settings if
# rowing takes off on other planets
R = 6373.0
# pi
pi = math.pi
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = math.sin(dlat / 2)**2 + math.cos(lat1) * \
math.cos(lat2) * math.sin(dlon / 2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
tc1 = math.atan2(math.sin(lon2-lon1)*math.cos(lat2),
math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(lon2-lon1))
tc1 = tc1 % (2*pi)
bearing = math.degrees(tc1)
return [distance, bearing]
def move_one_meter(latitude, longitude, bearing):
# Earth radius in meters
earth_radius = 6371000
# Convert latitude and longitude from degrees to radians
lat_rad = math.radians(latitude)
lon_rad = math.radians(longitude)
# Convert bearing from degrees to radians
bearing_rad = math.radians(bearing)
# Calculate the new latitude
new_lat = math.asin(math.sin(lat_rad) * math.cos(1/earth_radius) +
math.cos(lat_rad) * math.sin(1/earth_radius) * math.cos(bearing_rad))
# Calculate the new longitude
new_lon = lon_rad + math.atan2(math.sin(bearing_rad) * math.sin(1/earth_radius) * math.cos(lat_rad),
math.cos(1/earth_radius) - math.sin(lat_rad) * math.sin(new_lat))
# Convert new latitude and longitude from radians to degrees
new_lat = math.degrees(new_lat)
new_lon = math.degrees(new_lon)
return new_lat, new_lon
def isbreakthrough(delta, cpvalues, p0, p1, p2, p3, ratio):
pwr = abs(p0)/(1+(delta/abs(p2)))
pwr += abs(p1)/(1+(delta/abs(p3)))
dd = 0.25*(ratio-1)
pwr2 = pwr*(1+dd)
pwr *= ratio
try:
delta = delta.cast(pl.Int32)
cpvalues = cpvalues.cast(pl.Int32)
pwr = pwr.cast(pl.Int32)
except:
return False, pl.DataFrame(), False
btdf = pl.DataFrame({
'delta': delta,
'cpvalues': cpvalues,
'pwr': pwr,
'pwr2': pwr2
})
res = btdf.select(pl.col("cpvalues")>pl.col("pwr")+1)['cpvalues'].sum()
res2 = btdf.select(pl.col("cpvalues")> pl.col("pwr2")+1)['cpvalues'].sum()
btdf = btdf.filter(pl.col("cpvalues")>pl.col("pwr"))
btdf = btdf.sort('delta')
return res >= 1, btdf, res2 >= 1
def myqueue(queue, function, *args, **kwargs):
class MockJob:
def __init__(self, *args, **kwargs):
self.result = 1
self.id = 1
def revoke(self): # pragma: no cover
return 1
if settings.TESTING:
return MockJob()
elif settings.CELERY: # pragma: no cover
kwargs['debug'] = True
job = function.delay(*args, **kwargs)
else: # pragma: no cover
if settings.DEBUG:
kwargs['debug'] = True
job_id = str(uuid.uuid4())
kwargs['job_id'] = job_id
kwargs['jobkey'] = job_id
kwargs['timeout'] = 3600
dologging('queue.log',function.__name__)
job = queue.enqueue(function, *args, **kwargs)
return job # pragma: no cover
def calculate_age(born, today=None):
if not today:
today = timezone.now()
if born:
try:
return today.year - born.year - ((today.month, today.day) < (born.month, born.day))
except AttributeError:
return None
else:
return None
def my_dict_from_instance(instance, model):
thedict = {}
thedict['id'] = instance.id
for f in instance._meta.fields:
fname = f.name
try:
verbosename = f.verbose_name
except: # pragma: no cover
verbosename = f.name
get_choice = 'get_'+fname+'_display'
if hasattr(instance, get_choice):
value = getattr(instance, get_choice)()
else:
try:
value = getattr(instance, fname)
except AttributeError: # pragma: no cover
value = None
if f.editable and value:
thedict[fname] = (verbosename, value)
return thedict
def wavg(group, avg_name, weight_name):
""" http://stackoverflow.com/questions/10951341/pandas-dataframe-aggregate-function-using-multiple-columns
In rare instance, we may not have weights, so just return the mean. Customize this if your business case
should return otherwise.
"""
try:
d = group[avg_name]
except (KeyError, ColumnNotFoundError):
return 0
try:
w = group[weight_name]
except (KeyError, ColumnNotFoundError):
return d.mean()
try:
return (d * w).sum() / w.sum()
except ZeroDivisionError: # pragma: no cover
return d.mean()
return 0
from string import Formatter
def totaltime_sec_to_string(totaltime, shorten=False):
if np.isnan(totaltime):
return ''
if totaltime < 0:
totaltime = -totaltime
fmt = '{H:02}:{M:02}:{S:02}.{t}'
if shorten:
fmt = '{H}:{M:02}:{S:02}'
if totaltime < 3600:
fmt = '{M}:{S:02}'
remainder = int(totaltime)
tenths = totaltime-remainder
f = Formatter()
desired_fields = [field_tuple[1] for field_tuple in f.parse(fmt)]
possible_fields = ('D','H', 'M', 'S')
constants = {'D':86400, 'H': 3600, 'M': 60, 'S': 1}
values = {}
for field in possible_fields:
if field in desired_fields and field in constants:
values[field], remainder = divmod(remainder, constants[field])
values['t'] = round(10*tenths)
return f.format(fmt, **values)
def totaltime_sec_to_string_old(totaltime, shorten=False):
if np.isnan(totaltime):
return ''
hours = int(totaltime / 3600.)
if hours > 23: # pragma: no cover
message = 'Warning: The workout duration was longer than 23 hours. '
hours = 23
elif hours < 0:
message = 'Warning: invalid workout duration'
hours = 0
minutes = int((totaltime - 3600. * hours) / 60.)
if minutes > 59: # pragma: no cover
minutes = 59
if not message: # pragma: no cover
message = 'Warning: there is something wrong with the workout duration'
elif minutes < 0:
minutes = 0
if not message:
message = 'Warning: invalid workout duration'
seconds = int(totaltime - 3600. * hours - 60. * minutes)
if seconds > 59: # pragma: no cover
seconds = 59
if not message: # pragma: no cover
message = 'Warning: there is something wrong with the workout duration'
elif seconds < 0:
seconds = 0
if not message:
message = 'Warning: invalid workout duration'
tenths = round(10 * (totaltime - 3600. * hours - 60. * minutes - seconds))
if tenths > 9: # pragma: no cover
tenths = 9
if not message: # pragma: no cover
message = 'Warning: there is something wrong with the workout duration'
elif tenths < 0:
tenths = 0
if not message:
message = 'Warning: invalid workout duration'
duration = ""
if not shorten:
duration = "{hours:02d}:{minutes:02d}:{seconds:02d}.{tenths}".format(
hours=hours,
minutes=minutes,
seconds=seconds,
tenths=tenths
)
else:
if hours != 0: # pragma: no cover
duration = "{hours}:{minutes:02d}:{seconds:02d}".format(
hours=hours,
minutes=minutes,
seconds=seconds,
# tenths=tenths
)
else:
duration = "{minutes}:{seconds:02d}".format(
# hours=hours,
minutes=minutes,
seconds=seconds,
# tenths=tenths
)
return duration
def iscoach(m, r): # pragma: no cover
result = False
result = m in r.coaches
return result
# Exponentially weighted moving average
# Used for data smoothing of the jagged data obtained by Strava
# See bitbucket issue 72
def ewmovingaverage(interval, window_size):
# Experimental code using Exponential Weighted moving average
try:
intervaldf = pd.DataFrame({'v': interval})
idf_ewma1 = intervaldf.ewm(span=window_size)
idf_ewma2 = intervaldf[::-1].ewm(span=window_size)
i_ewma1 = idf_ewma1.mean().loc[:, 'v']
i_ewma2 = idf_ewma2.mean().loc[:, 'v']
interval2 = np.vstack((i_ewma1, i_ewma2[::-1]))
interval2 = np.mean(interval2, axis=0) # average
except ValueError: # pragma: no cover
interval2 = interval
return interval2
# Exceptions
# Custom error class - to raise a NoTokenError
class NoTokenError(BaseException):
def __init__(self, value):
self.value = value
def __str__(self): # pragma: no cover
return repr(self.value)
class ProcessorCustomerError(Exception): # pragma: no cover
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
# Custom exception handler, returns a 401 HTTP message
# with exception details in the json data
def custom_exception_handler(exc, message):
response = {
"errors": [
{
"code": str(exc),
"detail": message,
}
]
}
res = HttpResponse(message)
res.status_code = 401
res.json = json.dumps(response)
return res
def get_strava_stream(r, metric, stravaid, series_type='time', fetchresolution='high', authorizationstring=''):
if r is not None: # pragma: no cover
authorizationstring = str('Bearer ' + r.stravatoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json',
'resolution': 'medium', }
if metric == 'power': # pragma: no cover
metric = 'watts'
url = "https://www.strava.com/api/v3/activities/{stravaid}" \
"/streams/{metric}?resolution={fetchresolution}&series_type={series_type}".format(
stravaid=stravaid,
fetchresolution=fetchresolution,
series_type=series_type,
metric=metric)
s = requests.get(url, headers=headers)
if metric == 'power': # pragma: no cover
with open('data.txt', 'w') as outfile:
json.dump(s.json(), outfile)
try:
for data in s.json():
try:
if data['type'] == metric:
return np.array(data['data'])
except TypeError: # pragma: no cover
return None
except: # pragma: no cover
return None
return None # pragma: no cover
def allmonths(startdate, enddate):
d = startdate
while d < enddate:
yield d
d = datetime.date(d.year+(d.month // 12), ((d.month % 12) + 1), 1)
def allsundays(startdate, enddate):
d = startdate
d += datetime.timedelta(days=6 - d.weekday()) # first Sunday
while d <= enddate:
yield d
d += datetime.timedelta(days=7)
def steps_read_fit(filename, name='', sport='Custom'): # pragma: no cover
authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN
url = settings.WORKOUTS_FIT_URL+"/tojson"
headers = {'Authorization': authorizationstring}
response = requests.post(url=url, headers=headers,
json={'filename': filename})
if response.status_code != 200: # pragma: no cover
return None
w = response.json()
try:
d = w['workout']
except KeyError: # pragma: no cover
return None
return d
def steps_read_intervals(filename, name='', sport='Custom'): # pragma: no cover
authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN
url = settings.WORKOUTS_FIT_URL+"/tointervals"
headers = {'Authorization': authorizationstring}
response = requests.post(url=url, headers=headers,
json={'filename': filename})
if response.status_code != 200: # pragma: no cover
return None
w = response.text
return w
def steps_write_fit(steps):
authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN
url = settings.WORKOUTS_FIT_URL+"/tofit"
headers = {'Authorization': authorizationstring}
# convert to json, value of keys called wkt_step_name to string
for step in steps['steps']:
step['wkt_step_name'] = str(step['wkt_step_name'])
# convert numerical values in the dict to integers
for key in step.keys():
if isinstance(step[key], (int, float)):
step[key] = int(step[key])
response = requests.post(url=url, headers=headers, json=steps)
if response.status_code != 200: # pragma: no cover
return None
w = response.json()
try:
filename = w['filename']
except KeyError: # pragma: no cover
return None
return filename
def step_to_time_dist(step, avgspeed=3.2, ftp=200, ftspm=25, ftv=3.7, powerzones=None):
seconds = 0
distance = 0
rscore = 0
durationtype = step.get('durationType', '')
value = step.get('durationValue', 0)
if value == 0: # pragma: no cover
return 0, 0, 0
targettype = step.get('targetType', '')
if durationtype.lower() == 'time':
seconds = value/1000.
distance = avgspeed*seconds
rscore = 60.*seconds/3600.
if targettype is None:
return seconds, distance, rscore
if targettype.lower() == 'speed': # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
velomid = 0.
if value != 0: # pragma: no cover
distance = seconds*value/1000.
velomid = value/1000.
elif valuelow != 0 and valuehigh != 0: # pragma: no cover
distance = seconds*(valuelow+valuehigh)/2.
velomid = (valuelow+valuehigh)/2000.
else:
velomid = avgspeed
distance = velomid*seconds
veloratio = (velomid/ftv)**(3.0)
rscoreperhour = 100.*veloratio
rscore = rscoreperhour*seconds/3600.
if targettype.lower() in ['power','powerlap','power_lap']:
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
if value < 10 and value > 0: # pragma: no cover
targetpower = ftp*0.6 # dit is niet correct
if powerzones is not None and value < len(powerzones)-1 and value>1:
targetpower = (powerzones[int(value)-1]+powerzones[int(value)])/(2.)
elif value > 10 and value < 1000: # pragma: no cover
targetpower = value*ftp/100.
elif value > 1000:
targetpower = value-1000
avgspeed = ftv*(targetpower/ftp)**(1./3.)
distance = avgspeed*seconds
avgpower = targetpower
if valuelow != 0 and valuehigh != 0: # pragma: no cover
avgpower = (valuelow+valuehigh)/2.
if avgpower < 10 and avgpower > 0:
targetpower = ftp*0.6
elif avgpower > 10 and avgpower < 1000:
targetpower = avgpower*ftp/100.
elif avgpower > 1000:
targetpower = avgpower-1000
avgspeed = ftv*(targetpower/ftp)**(1./3.)
distance = avgspeed*seconds
rscore = 100.*(targetpower/ftp)*seconds/3600.
if targettype.lower() in ['cadence','cadencelap','cadence_lap']:
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
avgpower = ftp*value/ftspm
if valuelow != 0 and valuehigh != 0: # pragma: no cover
avgspm = (valuelow+valuehigh)/2.
avgpower = ftp*avgspm/ftspm
avgspeed = ftv*(avgpower/ftp)**(1./3.)
distance = avgspeed*seconds
rscore = 100*(avgpower/ftp)*seconds/3600.
return seconds, distance, rscore
elif durationtype.lower() == 'distance':
distance = value/100.
seconds = distance/avgspeed
rscore = 60.*float(seconds)/3600.
if targettype is None:
return seconds, distance, rscore
if targettype.lower() in ['speed', 'speedlap','speed_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
velomid = 0
if value != 0:
seconds = distance/value
velomid = value/1000.
elif valuelow != 0 and valuehigh != 0:
velomid = (valuelow+valuehigh)/2000.
seconds = distance/velomid
veloratio = (velomid/ftv)**(3.0)
rscoreperhour = 100.*veloratio
rscore = rscoreperhour*seconds/3600.
if velomid > 0:
seconds = distance/velomid
if targettype.lower() in ['power','powerlap','power_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
if value < 10 and value > 0:
targetpower = ftp*0.6
elif value > 10 and value < 1000:
targetpower = value*ftp/100.
elif value > 1000:
targetpower = value-1000
avgspeed = ftv*(targetpower/ftp)**(1./3.)
seconds = distance/avgspeed
avgpower = targetpower
if valuelow != 0 and valuehigh != 0:
avgpower = (valuelow+valuehigh)/2.
if avgpower < 10 and avgpower > 0:
targetpower = ftp*0.6
elif avgpower > 10 and avgpower < 1000:
targetpower = avgpower*ftp/100.
elif avgpower > 1000:
targetpower = avgpower-1000
avgspeed = ftv*(targetpower/ftp)**(1./3.)
seconds = distance/avgspeed
rscore = 100.*(targetpower/ftp)*seconds/3600.
if targettype.lower() in ['cadence','cadencelap','cadence_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
avgpower = ftp*value/ftspm
if valuelow != 0 and valuehigh != 0:
avgspm = (valuelow+valuehigh)/2.
avgpower = ftp*avgspm/ftspm
rscore = 100*(avgpower/ftp)*seconds/3600.
avgspeed = ftv*(avgpower/ftp)**(1./3.)
seconds = distance/avgspeed
return seconds, distance, rscore
elif durationtype.lower() in ['powerlessthan', 'powergreaterthan', 'hrlessthan', 'hrgreaterthan',
'power_less_than','power_greater_than','hr_less_than', 'hr_greater_than']: # pragma: no cover
seconds = 600
distance = seconds*avgspeed
veloratio = (avgspeed/ftv)**(3.0)
rscoreperhour = 100.*veloratio
rscore = rscoreperhour*seconds/3600.
return seconds, distance, rscore
return seconds, distance, rscore
def get_step_type(step): # pragma: no cover
t = 'WorkoutStep'
if step['durationType'].lower() in ['repeatuntilstepscmplt', 'repeatuntilhrlessthan', 'repeatuntilhrgreaterthan',
'repeat_until_steps_cmplt', 'repeat_until_hr_less_than', 'repeat_until_hr_greater_than']:
t = 'WorkoutRepeatStep'
return t
def peel(listToPeel):
if len(listToPeel) == 0: # pragma: no cover
return None, None
if len(listToPeel) == 1:
return listToPeel[0], None
first = listToPeel[0]
rest = listToPeel[1:]
if first['type'] == 'Step': # pragma: no cover
return first, rest
# repeatstep
theID = -1
lijst = []
while theID != first['repeatID']:
f, rest = peel(rest)
lijst.append(f)
theID = f['stepID']
first['steps'] = list(reversed(lijst))
return first, rest
def ps_dict_order_dict(d, short=False):
steps = d['steps']
sdicts = []
for step in steps:
sstring, type, stepID, repeatID, repeatValue = step_to_string(
step, short=short)
seconds, meters, rscore = step_to_time_dist(step)
sdict = {
'type': type,
'stepID': stepID,
'repeatID': repeatID,
'repeatValue': repeatValue,
'dict': step,
}
sdicts.append(sdict)
sdict2 = list(reversed(sdicts))
return sdict2
def ps_dict_order(d, short=False, rower=None, html=True):
sdict = collections.OrderedDict({})
steps = d['steps']
if steps is None:
steps = []
ftp = 200
powerzones = None
if rower is not None:
ftp = rower.ftp
powerzones = [rower.pw_ut2, rower.pw_ut1, rower.pw_at, rower.pw_tr, rower.pw_an]
# ftv = rower.ftv
# ftspm = rower.ftspm
for step in steps:
sstring, type, stepID, repeatID, repeatValue = step_to_string(
step, short=short)
seconds, meters, rscore = step_to_time_dist(step, ftp=ftp, powerzones=powerzones)
sdict[stepID] = {
'string': sstring,
'type': type,
'stepID': stepID,
'repeatID': repeatID,
'repeatValue': repeatValue,
'seconds': seconds,
'meters': meters,
'rscore': rscore,
}
sdict2 = collections.OrderedDict(reversed(list(sdict.items())))
for step in steps:
sstring, type, stepID, repeatID, repeatValue = step_to_string(
step, short=short)
seconds, meters, rscore = step_to_time_dist(step)
sdict[stepID] = {
'string': sstring,
'type': type,
'stepID': stepID,
'repeatID': repeatID,
'repeatValue': repeatValue,
'seconds': seconds,
'meters': meters,
'rscore': rscore,
}
sdict2 = collections.OrderedDict(reversed(list(sdict.items())))
sdict3 = []
hold = []
multiplier = []
holduntil = []
spaces = ''
totalmeters = 0
totalseconds = 0
totalrscore = 0
factor = 1
for key, item in sdict2.items():
if item['type'] == 'RepeatStep':
hold.append(item)
holduntil.append(item['repeatID'])
multiplier.append(item['repeatValue'])
factor *= item['repeatValue']
if html:
spaces += '&nbsp;&nbsp;&nbsp;'
else:
spaces += ' '
if item['type'] == 'Step':
item['string'] = spaces+item['string']
sdict3.append(item)
totalmeters += factor*item['meters']
totalseconds += factor*item['seconds']
totalrscore += factor*item['rscore']
if len(holduntil) > 0 and item['stepID'] <= holduntil[-1]:
if item['stepID'] == holduntil[-1]:
sdict3.append(hold.pop())
factor /= multiplier.pop()
if html:
spaces = spaces[:-18]
else:
spaces = spaces[:-3]
holduntil.pop()
else: # pragma: no cover
prevstep = sdict3.pop()
prevstep['string'] = prevstep['string'][18:]
prevprevstep = sdict3.pop()
prevprevstep['string'] = spaces+prevprevstep['string']
sdict3.append(prevprevstep)
curstep = hold.pop()
curstep['string'] = curstep['string']
sdict3.append(curstep)
factor /= multiplier.pop()
sdict3.append(prevstep)
holduntil.pop()
if html:
spaces = spaces[:-18]
else:
spaces = spaces[:-3]
sdict = list(reversed(sdict3))
return sdict, totalmeters, totalseconds, totalrscore
def step_to_string(step, short=False):
stype = 'Step'
repeatID = -1
repeatValue = 1
nr = 0
intensity = ''
duration = ''
unit = ''
target = ''
repeat = ''
notes = ''
durationtype = step['durationType']
if step['durationValue'] == 0:
if durationtype.lower() not in [
'repeatuntilstepscmplt',
'repeatuntilhrlessthan',
'repeatuntilhrgreaterthan',
'repeat_until_steps_cmplt',
'repeat_until_hr_less_than',
'repeat_until_hr_greater_than'
]: # pragma: no cover
return '', stype, -1, -1, 1
if durationtype.lower() == 'time':
unit = 'min'
value = step['durationValue']
if value/1000. >= 3600: # pragma: no cover
unit = 'h'
dd = timedelta(seconds=value/1000.)
duration = '{v}'.format(v=str(dd))
elif durationtype.lower() == 'distance':
unit = 'm'
value = step['durationValue']/100.
duration = int(value)
elif durationtype.lower() in ['hrlessthan', 'hr_less_than']: # pragma: no cover
value = step['durationValue']
if value <= 100:
duration = 'until heart rate lower than {v}% of max'.format(
v=value)
if short:
duration = 'until HR<{v}% of max'.format(v=value)
else:
duration = 'until heart rate lower than {v}'.format(v=value-100)
if short:
duration = 'until HR<{v}'.format(v=value-100)
elif durationtype.lower() in ['hrgreaterthan','hr_greater_than']: # pragma: no cover
value = step['durationValue']
if value <= 100:
duration = 'until heart rate greater than {v}% of max'.format(
v=value)
if short:
duration = 'until R>{v}% of max'.format(v=value)
else:
duration = 'until heart rate greater than {v}'.format(v=value-100)
if short:
duration = 'until HR>{v}'.format(v=value/100)
elif durationtype.lower() in ['powerlessthan','power_less_than']: # pragma: no cover
value = step['durationValue']
targetvalue = step.get('targetValue', 0)
if value <= 1000:
duration = 'Repeat until Power is less than {targetvalue} % of FTP'.format(
targetvalue=targetvalue
)
if short:
'until < {targetvalue}% of FTP'.format(targetvalue=targetvalue)
else:
duration = 'Repeat until Power is less than {targetvalue} Watt'.format(
targetvalue=targetvalue-1000
)
if short:
'until < {targetvalue} W'.format(targetvalue=targetvalue-1000)
elif durationtype.lower() in ['powergreaterthan','power_greater_than']: # pragma: no cover
value = step['durationValue']
targetvalue = step.get('targetValue', 0)
if value <= 1000:
duration = 'Repeat until Power is greater than {targetvalue} % of FTP'.format(
targetvalue=targetvalue
)
if short:
duration = 'until > {targetvalue}% of FTP'.format(
targetvalue=targetvalue)
else:
duration = 'Repeat until Power is greater than {targetvalue} Watt'.format(
targetvalue=targetvalue-1000
)
if short:
duration = 'until > {targetvalue} W'.format(
targetvalue=targetvalue)
elif durationtype.lower() in ['repeatuntilstepscmplt','repeat_until_steps_cmplt']: # pragma: no cover
stype = 'RepeatStep'
ntimes = ': {v}x'.format(v=step['targetValue'])
repeatID = step['durationValue']
duration = ntimes
repeatValue = step.get('targetValue', 0)
elif durationtype.lower() in ['repeatuntilhrgreaterthan','repeat_until_hr_greater_than']: # pragma: no cover
stype = 'RepeatStep'
targetvalue = step.get('targetValue', 0)
if targetvalue <= 100:
duration = 'until Heart Rate is greater than {targetvalue} % of max'.format(
targetvalue=targetvalue
)
if short:
duration = ': until HR>{targetvalue} % of max'.format(
targetvalue=targetvalue)
else:
duration = 'until Heart Rate is greater than {targetvalue}:'.format(
targetvalue=targetvalue-100.
)
if short:
duration = ': untl HR>{targetvalue}'.format(
targetvalue=targetvalue-100)
repeatID = step['durationValue']
elif durationtype.lower() in ['repeatuntilhrlessthan','repeat_until_hr_less_than']: # pragma: no cover
stype = 'RepeatStep'
targetvalue = step.get('targetValue', 0)
if targetvalue <= 100:
duration = 'until Heart Rate is less than {targetvalue} % of max'.format(
targetvalue=targetvalue
)
if short:
duration = ': until HR<{targetvalue}% of max'.format(
targetvalue=targetvalue)
else:
duration = 'until Heart Rate is less than {targetvalue}:'.format(
targetvalue=targetvalue-100.
)
if short:
duration = ': until HR<{targetvalue}'.format(
targetvalue=targetvalue-100)
repeatID = step['durationValue']
#
try:
targettype = step['targetType']
except KeyError:
targettype = ''
if targettype is None:
targettype = ''
if targettype.lower() in ['heartrate','heartratelap','heart_rate','heart_rate_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value < 10 and value > 0:
target = '@ HR zone {v}'.format(v=value)
else:
if valuelow < 100:
target = '@ HR {l} - {h} % of max'.format(
l=valuelow,
h=valuehigh,
)
else:
target = '@ HR {l} - {h}'.format(
l=valuelow - 100,
h=valuehigh - 100,
)
elif targettype.lower() in ['power', 'powerlap','power_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value < 10 and value > 0:
target = '@ Power zone {v}'.format(v=value)
elif value > 10 and value < 1000:
target = '@ {v}% of FTP'.format(v=value)
elif value > 1000:
target = '@ {v} W'.format(v=value-1000)
elif valuelow > 0 and valuehigh > 0:
if valuelow < 1000:
target = '@ {l} - {h} % of FTP'.format(
l=valuelow,
h=valuehigh,
)
else:
target = '@ {l} - {h} W'.format(
l=valuelow-1000,
h=valuehigh-1000,
)
elif targettype.lower() in ['speed', 'speedlap','speed_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
v = value/1000.
pace = 500./v
pacestring = to_pace(pace)
target = '@ {v} m/s {p}, per 500m'.format(
v=value/1000.,
p=pacestring)
if short: # pragma: no cover
target = '@ {p}'.format(p=pacestring)
elif valuelow != 0 and valuehigh != 0: # pragma: no cover
v = valuelow/1000.
pace = 500./v
pacestringlow = to_pace(pace)
v = valuehigh/1000.
pace = 500./v
pacestringhigh = to_pace(pace)
target = '@ {l:1.2f} and {h:1.2f} m/s, ({ph} to {pl} per 500m)'.format(
l=valuelow/1000.,
h=valuehigh/1000.,
pl=pacestringlow,
ph=pacestringhigh,
)
if short:
target = '@ {pl} - {ph}'.format(
pl=pacestringlow,
ph=pacestringhigh,
)
elif targettype.lower() in ['cadence','cadencelap','cadence_lap']: # pragma: no cover
value = step.get('targetValue', 0)
valuelow = step.get('targetValueLow', 0)
valuehigh = step.get('targetValueHigh', 0)
if value != 0:
target = '@ {v} SPM'.format(v=value)
elif valuelow != 0 and valuehigh != 0:
target = '@ {l} - {h} SPM'.format(
l=valuelow,
h=valuehigh
)
nr = step['stepId']
# name = step['wkt_step_name']
notes = ''
try:
if len(step['description']): # pragma: no cover
notes = ' - '+step['description']
except KeyError:
notes = ''
try:
intensity = step['intensity']
except KeyError:
intensity = ''
s = '{duration} {unit} {target} {repeat} {notes}'.format(
# nr=nr,
# name=name,
unit=unit,
# intensity=intensity,
duration=duration,
target=target,
repeat=repeat,
notes=notes,
)
if short:
s = '{duration} {unit} {target} {repeat}'.format(
duration=duration,
target=target,
repeat=repeat,
unit=unit,
)
if isinstance(intensity, int):
intensity = intensitymap[intensity]
if short and intensity.lower() in ['warmup', 'cooldown', 'rest']:
s = '{intensity} {duration} {unit} {target} {repeat}'.format(
intensity=intensity,
duration=duration,
target=target,
repeat=repeat,
unit=unit
)
elif intensity.lower() in ['warmup', 'cooldown', 'rest']:
s = '{intensity} {duration} {unit} {target} {repeat} {notes}'.format(
intensity=intensity,
duration=duration,
target=target,
repeat=repeat,
unit=unit,
notes=notes,
)
if stype == 'RepeatStep':
s = 'Repeat {duration}'.format(duration=duration)
return s, stype, nr, repeatID, repeatValue
def strfdelta(tdelta): # pragma: no cover
try:
minutes, seconds = divmod(tdelta.seconds, 60)
tenths = int(tdelta.microseconds / 1e5)
except AttributeError:
minutes, seconds = divmod(tdelta.view(np.int64), 60e9)
seconds, rest = divmod(seconds, 1e9)
tenths = int(rest / 1e8)
res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format(
minutes=minutes,
seconds=seconds,
tenths=tenths,
)
return res
def request_is_ajax(request):
is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest'
# if settings.TESTING:
# is_ajax = True
return is_ajax
def intervals_to_string(vals, units, typ):
if vals is None or units is None or typ is None: # pragma: no cover
return ''
if len(vals) != len(units) or len(vals) != len(typ): # pragma: no cover
return ''
s = ''
previous = 'rest'
for i in range(len(vals)):
if typ[i] == 'rest' and previous == 'rest':
if units[i] == 'min': # pragma: no cover
val = int(vals[i])*60
unit = 'sec'
else:
val = int(vals[i])
if units[i] == 'meters': # pragma: no cover
unit = 'm'
if units[i] == 'seconds':
unit = 'sec'
s += '+0min/{val}{unit}'.format(val=val, unit=unit)
elif typ[i] == 'rest': # pragma: no cover
if units[i] == 'min':
val = int(vals[i])*60
unit = 'sec'
else:
val = int(vals[i])
if units[i] == 'meters':
unit = 'm'
if units[i] == 'seconds':
unit = 'sec'
s += '/{val}{unit}'.format(val=val, unit=unit)
previous = 'rest'
else: # pragma: no cover # work interval
if units[i] == 'min':
val = int(vals[i])*60
unit = 'sec'
else:
val = int(vals[i])
if units[i] == 'meters':
unit = 'm'
if units[i] == 'seconds':
unit = 'sec'
s += '+{val}{unit}'.format(val=val, unit=unit)
previous = 'work'
if s[0] == '+':
s = s[1:]
return s
def get_timezone_from_c2data(data):
try:
timezone = pytz.timezone(data['timezone'])
except UnknownTimeZoneError:
timezone = pytz.utc
except KeyError: # pragma: no cover
timezone = pytz.utc
return timezone
def get_startdatetime_from_c2data(data):
timezone = get_timezone_from_c2data(data)
try:
startdatetime = iso8601.parse_date(data['date_utc'])
except: # pragma: no cover
startdatetime = iso8601.parse_date(data['date'])
totaltime = data['time']/10.
try:
rest_time = data['rest_time']/10.
except KeyError:
rest_time = 0
totaltime = totaltime+rest_time
duration = totaltime_sec_to_string(totaltime)
starttimeunix = arrow.get(startdatetime).timestamp()-totaltime
startdatetime = arrow.get(starttimeunix)
startdatetime = startdatetime.astimezone(timezone)
workoutdate = startdatetime.astimezone(
timezone
).strftime('%Y-%m-%d')
starttime = startdatetime.astimezone(
timezone
).strftime('%H:%M:%S')
return startdatetime, starttime, workoutdate, duration, starttimeunix, timezone