Private
Public Access
1
0
Files
rowsandall/rowers/utils.py
Sander Roosendaal 5df1a2272a adding test duration
2020-12-07 15:00:27 +01:00

543 lines
14 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import math
import numpy as np
import pandas as pd
import colorsys
from django.conf import settings
import uuid
import datetime
import json
from django.http import HttpResponse
import requests
from django.http import HttpResponse
lbstoN = 4.44822
landingpages = (
('workout_edit_view','Edit View'),
('workout_workflow_view','Workflow View'),
('workout_stats_view','Stats View'),
('workout_data_view','Data Explore View'),
('workout_summary_edit_view','Intervals Editor'),
)
workflowmiddlepanel = (
('panel_statcharts.html','Static Charts'),
('flexthumbnails.html','Flex Charts'),
('panel_summary.html','Summary'),
('panel_map.html','Map'),
('panel_comments.html','Basic Info and Links'),
('panel_notes.html','Workout Notes'),
('panel_shortcomment.html','Comment Link'),
('panel_middlesocial.html','Social Media Share Buttons'),
)
defaultmiddle = ['panel_middlesocial.html',
'panel_statcharts.html',
'flexthumbnails.html',
'panel_summary.html',
'panel_map.html']
workflowleftpanel = (
('panel_navigationheader.html','Navigation Header'),
('panel_editbuttons.html','Edit Workout Button'),
('panel_delete.html','Delete Workout Button'),
('panel_export.html','Export Workout Button'),
('panel_social.html','Social Media Share Buttons'),
('panel_advancededit.html','Advanced Workout Edit Button'),
('panel_editintervals.html','Edit Intervals Button'),
('panel_stats.html','Workout Statistics Button'),
('panel_flexchart.html','Flex Chart'),
('panel_staticchart.html','Create Static Charts Buttons'),
('panel_uploadimage.html','Attach Image'),
('panel_geekyheader.html','Geeky Header'),
('panel_editwind.html','Edit Wind Data'),
('panel_editstream.html','Edit Stream Data'),
('panel_otwpower.html','Run OTW Power Calculations'),
('panel_mapview.html','Map'),
('panel_ranking.html','Ranking View'),
)
defaultleft = [
'panel_navigationheader.html',
'panel_editbuttons.html',
'panel_advancededit.html',
'panel_editintervals.html',
'panel_stats.html',
'panel_staticchart.html',
'panel_uploadimage.html',
]
coxes_calls = [
'Sit Ready!',
"Let's relax the shoulders, and give me a power ten to the finish!",
"Almost there. Give me ten strokes on the legs!",
"Let it run!",
"Don't rush the slides!",
"Quick hands.",
"You are clearing the puddles.",
"Let's push through now. Get me that open water.",
"We're going for the line now. Power ten on the next.",
]
info_calls = [
"Please give us a minute to count all those strokes, you've been working hard!",
"Please give us a minute to count all your strokes."
]
import random
def get_call():
call1 = random.choice(coxes_calls)
call2 = random.choice(info_calls)
call = """<div id="id_sitready" class="successmessage">
<p>
%s (%s)
</p>
</div>
""" % (call1, call2)
return call
def absolute(request):
urls = {
'ABSOLUTE_ROOT': request.build_absolute_uri('/')[:-1].strip("/"),
'ABSOLUTE_ROOT_URL': request.build_absolute_uri('/').strip("/"),
'PATH':request.build_absolute_uri(),
}
return urls
def trcolors(r1,g1,b1,r2,g2,b2):
r1 = r1/255.
r2 = r2/255.
g1 = g1/255.
g2 = g2/255.
b2 = b2/255.
b1 = b1/255.
h1,s1,v1 = colorsys.rgb_to_hsv(r1,g1,b1)
h2,s2,v2 = colorsys.rgb_to_hsv(r2,g2,b2)
return 360*h1,360*(h2-h1),s1,(s2-s1),v1,(v2-v1)
palettes = {
'monochrome_blue':(207,-4,0.06,0.89,1.0,-0.38),
'gold_sunset':(47,-31,.26,-0.12,0.94,-0.5),
'blue_red':(207,-200,.85,0,.74,-.24),
'blue_green':(207,-120,.85,0,.75,.25),
'cyan_green':(192,-50,.08,.65,.98,-.34),
'cyan_purple':trcolors(237,248,251,136,65,157),
'green_blue':trcolors(240,249,232,8,104,172),
'orange_red':trcolors(254,240,217,179,0,0),
'cyan_blue':trcolors(241,238,246,4,90,141),
'cyan_green':trcolors(246,239,247,1,108,89),
'cyan_magenta':trcolors(241,238,246,152,0,67),
'beige_magenta':trcolors(254,235,226,122,1,119),
'yellow_green':trcolors(255,255,204,0,104,55),
'yellow_blue':trcolors(255,255,205,37,52,148),
'autumn':trcolors(255,255,212,153,52,4),
'yellow_red':trcolors(255,255,178,189,0,39)
}
rankingdistances = [100,500,1000,2000,5000,6000,10000,21097,42195,100000]
rankingdurations = []
rankingdurations.append(datetime.time(minute=1))
rankingdurations.append(datetime.time(minute=4))
rankingdurations.append(datetime.time(minute=30))
rankingdurations.append(datetime.time(hour=1,minute=15))
rankingdurations.append(datetime.time(hour=1))
def is_ranking_piece(workout):
if workout.distance in rankingdistances:
return True
elif workout.duration in rankingdurations:
return True
return False
def range_to_color_hex(groupcols,palette='monochrome_blue'):
try:
plt = palettes[palette]
except KeyError:
plt = palettes['monochrome_blue']
rgb = [colorsys.hsv_to_rgb((plt[0]+plt[1]*x)/360.,
plt[2]+plt[3]*x,
plt[4]+plt[5]*x) for x in groupcols]
RGB = [(int(255.*r),int(255.*g),int(255.*b)) for (r, g, b) in rgb]
colors = ["#%02x%02x%02x" % (r, g, b) for (r, g, b) in RGB]
return colors
def str2bool(v):
return v.lower() in ("yes", "true", "t", "1")
def uniqify(seq, idfun=None):
# order preserving
if idfun is None:
def idfun(x): return x
seen = {}
result = []
for item in seq:
marker = idfun(item)
# in old Python versions:
# if seen.has_key(marker)
# but in new ones:
if marker in seen: continue
seen[marker] = 1
result.append(item)
return result
def serialize_list(value,token=','):
assert(isinstance(value, list) or isinstance(value, tuple) or isinstance(value,np.ndarray))
return token.join([str(s) for s in value])
def deserialize_list(value,token=','):
if isinstance(value, list):
return value
elif isinstance(value, np.ndarray):
return value
return value.split(token)
def geo_distance(lat1,lon1,lat2,lon2):
""" Approximate distance and bearing between two points
defined by lat1,lon1 and lat2,lon2
This is a slight underestimate but is close enough for our purposes,
We're never moving more than 10 meters between trackpoints
Bearing calculation fails if one of the points is a pole.
(Hey, from the North pole you can walk South, East, North and end up
on the same spot!)
"""
# radius of our earth in km --> should be moved to settings if
# rowing takes off on other planets
R = 6373.0
# pi
pi = math.pi
lat1 = math.radians(lat1)
lat2 = math.radians(lat2)
lon1 = math.radians(lon1)
lon2 = math.radians(lon2)
dlon = lon2 - lon1
dlat = lat2 - lat1
a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
tc1 = math.atan2(math.sin(lon2-lon1)*math.cos(lat2),
math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(lon2-lon1))
tc1 = tc1 % (2*pi)
bearing = math.degrees(tc1)
return [distance,bearing]
def isbreakthrough(delta,cpvalues,p0,p1,p2,p3,ratio):
pwr = abs(p0)/(1+(delta/abs(p2)))
pwr += abs(p1)/(1+(delta/abs(p3)))
dd = 0.25*(ratio-1)
pwr2 = pwr*(1+dd)
pwr *= ratio
delta = delta.values.astype(int)
cpvalues = cpvalues.values.astype(int)
pwr = pwr.astype(int)
res = np.sum(cpvalues>pwr)
res2 = np.sum(cpvalues>pwr2)
btdf = pd.DataFrame(
{
'delta':delta[cpvalues>pwr],
'cpvalues':cpvalues[cpvalues>pwr],
'pwr':pwr[cpvalues>pwr],
}
)
btdf.sort_values('delta',axis=0,inplace=True)
return res>=1,btdf,res2>=1
def myqueue(queue,function,*args,**kwargs):
class MockJob:
def __init__(self,*args, **kwargs):
self.result = 1
self.id = 1
def revoke(self):
return 1
if settings.TESTING:
return MockJob()
elif settings.CELERY:
kwargs['debug'] = True
job = function.delay(*args,**kwargs)
else:
if settings.DEBUG:
kwargs['debug'] = True
job_id = str(uuid.uuid4())
kwargs['job_id'] = job_id
kwargs['jobkey'] = job_id
kwargs['timeout'] = 3600
job = queue.enqueue(function,*args,**kwargs)
return job
from datetime import date
def calculate_age(born,today=None):
if not today:
today = date.today()
if born:
try:
return today.year - born.year - ((today.month, today.day) < (born.month, born.day))
except AttributeError:
return None
else:
return None
def my_dict_from_instance(instance,model):
thedict = {}
thedict['id'] = instance.id
for f in instance._meta.fields:
fname = f.name
try:
verbosename = f.verbose_name
except:
verbosename = f.name
get_choice = 'get_'+fname+'_display'
if hasattr( instance, get_choice):
value = getattr(instance, get_choice)()
else:
try:
value = getattr(instance,fname)
except AttributeError:
value = None
if f.editable and value:
thedict[fname] = (verbosename,value)
return thedict
def wavg(group, avg_name, weight_name):
""" http://stackoverflow.com/questions/10951341/pandas-dataframe-aggregate-function-using-multiple-columns
In rare instance, we may not have weights, so just return the mean. Customize this if your business case
should return otherwise.
"""
d = group[avg_name]
try:
w = group[weight_name]
except KeyError:
return d.mean()
try:
return (d * w).sum() / w.sum()
except ZeroDivisionError:
return d.mean()
def totaltime_sec_to_string(totaltime,shorten=False):
if np.isnan(totaltime):
return ''
hours = int(totaltime / 3600.)
if hours > 23:
message = 'Warning: The workout duration was longer than 23 hours. '
hours = 23
minutes = int((totaltime - 3600. * hours) / 60.)
if minutes > 59:
minutes = 59
if not message:
message = 'Warning: there is something wrong with the workout duration'
seconds = int(totaltime - 3600. * hours - 60. * minutes)
if seconds > 59:
seconds = 59
if not message:
message = 'Warning: there is something wrong with the workout duration'
tenths = int(10 * (totaltime - 3600. * hours - 60. * minutes - seconds))
if tenths > 9:
tenths = 9
if not message:
message = 'Warning: there is something wrong with the workout duration'
duration = ""
if not shorten:
duration = "{hours:02d}:{minutes:02d}:{seconds:02d}.{tenths}".format(
hours=hours,
minutes=minutes,
seconds=seconds,
tenths=tenths
)
else:
if hours != 0:
duration = "{hours}:{minutes:02d}:{seconds:02d}".format(
hours=hours,
minutes=minutes,
seconds=seconds,
tenths=tenths
)
else:
duration = "{minutes}:{seconds:02d}".format(
hours=hours,
minutes=minutes,
seconds=seconds,
tenths=tenths
)
return duration
def iscoach(m,r):
result = False
result = m in r.coaches
return result
# Exponentially weighted moving average
# Used for data smoothing of the jagged data obtained by Strava
# See bitbucket issue 72
def ewmovingaverage(interval,window_size):
# Experimental code using Exponential Weighted moving average
try:
intervaldf = pd.DataFrame({'v':interval})
idf_ewma1 = intervaldf.ewm(span=window_size)
idf_ewma2 = intervaldf[::-1].ewm(span=window_size)
i_ewma1 = idf_ewma1.mean().loc[:,'v']
i_ewma2 = idf_ewma2.mean().loc[:,'v']
interval2 = np.vstack((i_ewma1,i_ewma2[::-1]))
interval2 = np.mean( interval2, axis=0) # average
except ValueError:
interval2 = interval
return interval2
# Exceptions
# Custom error class - to raise a NoTokenError
class NoTokenError(Exception):
def __init__(self,value):
self.value=value
def __str__(self):
return repr(self.value)
class ProcessorCustomerError(Exception):
def __init__(self, value):
self.value=value
def __str__(self):
return repr(self.value)
# Custom exception handler, returns a 401 HTTP message
# with exception details in the json data
def custom_exception_handler(exc,message):
response = {
"errors": [
{
"code": str(exc),
"detail": message,
}
]
}
res = HttpResponse(message)
res.status_code = 401
res.json = json.dumps(response)
return res
def get_strava_stream(r,metric,stravaid,series_type='time',fetchresolution='high',authorizationstring=''):
if r is not None:
authorizationstring = str('Bearer ' + r.stravatoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json',
'resolution': 'medium',}
if metric == 'power':
metric = 'watts'
url = "https://www.strava.com/api/v3/activities/{stravaid}/streams/{metric}?resolution={fetchresolution}&series_type={series_type}".format(
stravaid=stravaid,
fetchresolution=fetchresolution,
series_type=series_type,
metric=metric
)
s = requests.get(url,headers=headers)
if metric=='power':
with open('data.txt', 'w') as outfile:
json.dump(s.json(), outfile)
print('saved to file')
for data in s.json():
y = None
try:
if data['type'] == metric:
return np.array(data['data'])
except TypeError:
return None
return None
def allmonths(startdate,enddate):
d = startdate
while d<enddate:
yield d
d = datetime.date(d.year+(d.month // 12),((d.month % 12) + 1),1)
def allsundays(startdate,enddate):
d = startdate
d += datetime.timedelta(days = 6 - d.weekday()) # first Sunday
while d<=enddate:
yield d
d += datetime.timedelta(days=7)