from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals from datetime import timedelta from django.utils import timezone import math import numpy as np import pandas as pd import colorsys from django.conf import settings import collections import uuid import datetime import json import time from fitparse import FitFile from django.conf import settings from django.http import HttpResponse import requests from django.http import HttpResponse import humanize from pytz.exceptions import UnknownTimeZoneError import pytz import iso8601 from iso8601 import ParseError import arrow lbstoN = 4.44822 landingpages = ( ('workout_edit_view','Edit View'), ('workout_workflow_view','Workflow View'), ('workout_stats_view','Stats View'), ('workout_data_view','Data Explore View'), ('workout_summary_edit_view','Intervals Editor'), ) workflowmiddlepanel = ( ('panel_statcharts.html','Static Charts'), ('flexthumbnails.html','Flex Charts'), ('panel_summary.html','Summary'), ('panel_map.html','Map'), ('panel_comments.html','Basic Info and Links'), ('panel_notes.html','Workout Notes'), ('panel_shortcomment.html','Comment Link'), ('panel_middlesocial.html','Social Media Share Buttons'), ) defaultmiddle = ['panel_middlesocial.html', 'panel_statcharts.html', 'flexthumbnails.html', 'panel_summary.html', 'panel_map.html'] workflowleftpanel = ( ('panel_navigationheader.html','Navigation Header'), ('panel_editbuttons.html','Edit Workout Button'), ('panel_delete.html','Delete Workout Button'), ('panel_export.html','Export Workout Button'), ('panel_social.html','Social Media Share Buttons'), ('panel_advancededit.html','Advanced Workout Edit Button'), ('panel_editintervals.html','Edit Intervals Button'), ('panel_stats.html','Workout Statistics Button'), ('panel_flexchart.html','Flex Chart'), ('panel_staticchart.html','Create Static Charts Buttons'), ('panel_uploadimage.html','Attach Image'), ('panel_geekyheader.html','Geeky Header'), ('panel_editwind.html','Edit Wind Data'), ('panel_editstream.html','Edit Stream Data'), ('panel_otwpower.html','Run OTW Power Calculations'), ('panel_mapview.html','Map'), ('panel_ranking.html','Ranking View'), ) defaultleft = [ 'panel_navigationheader.html', 'panel_editbuttons.html', 'panel_advancededit.html', 'panel_editintervals.html', 'panel_stats.html', 'panel_staticchart.html', 'panel_uploadimage.html', ] coxes_calls = [ 'Sit Ready!', "Let's relax the shoulders, and give me a power ten to the finish!", "Almost there. Give me ten strokes on the legs!", "Let it run!", "Don't rush the slides!", "Quick hands.", "You are clearing the puddles.", "Let's push through now. Get me that open water.", "We're going for the line now. Power ten on the next.", ] info_calls = [ "Please give us a minute to count all those strokes, you've been working hard!", "Please give us a minute to count all your strokes." ] import random def dologging(filename,s): tstamp = time.localtime() timestamp = time.strftime('%b-%d-%Y %H:%M:%S', tstamp) with open(filename,'a') as f: f.write('\n') f.write(timestamp) f.write(' ') f.write(s) def to_pace(pace): # pragma: no cover minutes, seconds = divmod(pace,60) seconds, rest = divmod(seconds, 1) tenths = int(rest*10) s = '{m:0>2}:{s:0>2}.{t:0>1}'.format( m=int(minutes), s=int(seconds), t=int(tenths) ) return s def get_call(): call1 = random.choice(coxes_calls) call2 = random.choice(info_calls) call = """

%s (%s)

""" % (call1, call2) return call def absolute(request): urls = { 'ABSOLUTE_ROOT': request.build_absolute_uri('/')[:-1].strip("/"), 'ABSOLUTE_ROOT_URL': request.build_absolute_uri('/').strip("/"), 'PATH':request.build_absolute_uri(), } return urls def trcolors(r1,g1,b1,r2,g2,b2): r1 = r1/255. r2 = r2/255. g1 = g1/255. g2 = g2/255. b2 = b2/255. b1 = b1/255. h1,s1,v1 = colorsys.rgb_to_hsv(r1,g1,b1) h2,s2,v2 = colorsys.rgb_to_hsv(r2,g2,b2) return 360*h1,360*(h2-h1),s1,(s2-s1),v1,(v2-v1) palettes = { 'monochrome_blue':(207,-4,0.06,0.89,1.0,-0.38), 'gold_sunset':(47,-31,.26,-0.12,0.94,-0.5), 'blue_red':(207,-200,.85,0,.74,-.24), 'blue_green':(207,-120,.85,0,.75,.25), 'cyan_green':(192,-50,.08,.65,.98,-.34), 'cyan_purple':trcolors(237,248,251,136,65,157), 'green_blue':trcolors(240,249,232,8,104,172), 'orange_red':trcolors(254,240,217,179,0,0), 'cyan_blue':trcolors(241,238,246,4,90,141), 'cyan_green':trcolors(246,239,247,1,108,89), 'cyan_magenta':trcolors(241,238,246,152,0,67), 'beige_magenta':trcolors(254,235,226,122,1,119), 'yellow_green':trcolors(255,255,204,0,104,55), 'yellow_blue':trcolors(255,255,205,37,52,148), 'autumn':trcolors(255,255,212,153,52,4), 'yellow_red':trcolors(255,255,178,189,0,39) } rankingdistances = [100,500,1000,2000,5000,6000,10000,21097,42195,100000] rankingdurations = [] rankingdurations.append(datetime.time(minute=1)) rankingdurations.append(datetime.time(minute=4)) rankingdurations.append(datetime.time(minute=30)) rankingdurations.append(datetime.time(hour=1,minute=15)) rankingdurations.append(datetime.time(hour=1)) def range_to_color_hex(groupcols,palette='monochrome_blue'): try: plt = palettes[palette] except KeyError: # pragma: no cover plt = palettes['monochrome_blue'] rgb = [colorsys.hsv_to_rgb((plt[0]+plt[1]*x)/360., plt[2]+plt[3]*x, plt[4]+plt[5]*x) for x in groupcols] RGB = [(int(255.*r),int(255.*g),int(255.*b)) for (r, g, b) in rgb] colors = ["#%02x%02x%02x" % (r, g, b) for (r, g, b) in RGB] return colors def str2bool(v): # pragma: no cover return v.lower() in ("yes", "true", "t", "1") def uniqify(seq, idfun=None): # order preserving if idfun is None: def idfun(x): return x seen = {} result = [] for item in seq: marker = idfun(item) # in old Python versions: # if seen.has_key(marker) # but in new ones: if marker in seen: continue seen[marker] = 1 result.append(item) return result def serialize_list(value,token=','): # pragma: no cover assert(isinstance(value, list) or isinstance(value, tuple) or isinstance(value,np.ndarray)) return token.join([str(s) for s in value]) def deserialize_list(value,token=','): # pragma: no cover if isinstance(value, list): return value elif isinstance(value, np.ndarray): return value return value.split(token) def geo_distance(lat1,lon1,lat2,lon2): """ Approximate distance and bearing between two points defined by lat1,lon1 and lat2,lon2 This is a slight underestimate but is close enough for our purposes, We're never moving more than 10 meters between trackpoints Bearing calculation fails if one of the points is a pole. (Hey, from the North pole you can walk South, East, North and end up on the same spot!) """ # radius of our earth in km --> should be moved to settings if # rowing takes off on other planets R = 6373.0 # pi pi = math.pi lat1 = math.radians(lat1) lat2 = math.radians(lat2) lon1 = math.radians(lon1) lon2 = math.radians(lon2) dlon = lon2 - lon1 dlat = lat2 - lat1 a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) distance = R * c tc1 = math.atan2(math.sin(lon2-lon1)*math.cos(lat2), math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(lon2-lon1)) tc1 = tc1 % (2*pi) bearing = math.degrees(tc1) return [distance,bearing] def isbreakthrough(delta,cpvalues,p0,p1,p2,p3,ratio): pwr = abs(p0)/(1+(delta/abs(p2))) pwr += abs(p1)/(1+(delta/abs(p3))) dd = 0.25*(ratio-1) pwr2 = pwr*(1+dd) pwr *= ratio delta = delta.values.astype(int) cpvalues = cpvalues.values.astype(int) pwr = pwr.astype(int) res = np.sum(cpvalues>pwr) res2 = np.sum(cpvalues>pwr2) btdf = pd.DataFrame( { 'delta':delta[cpvalues>pwr], 'cpvalues':cpvalues[cpvalues>pwr], 'pwr':pwr[cpvalues>pwr], } ) btdf.sort_values('delta',axis=0,inplace=True) return res>=1,btdf,res2>=1 def myqueue(queue,function,*args,**kwargs): class MockJob: def __init__(self,*args, **kwargs): self.result = 1 self.id = 1 def revoke(self): # pragma: no cover return 1 if settings.TESTING: return MockJob() elif settings.CELERY: # pragma: no cover kwargs['debug'] = True job = function.delay(*args,**kwargs) else: # pragma: no cover if settings.DEBUG: kwargs['debug'] = True job_id = str(uuid.uuid4()) kwargs['job_id'] = job_id kwargs['jobkey'] = job_id kwargs['timeout'] = 3600 job = queue.enqueue(function,*args,**kwargs) return job # pragma: no cover from datetime import date def calculate_age(born,today=None): if not today: today = timezone.now() if born: try: return today.year - born.year - ((today.month, today.day) < (born.month, born.day)) except AttributeError: return None else: return None def my_dict_from_instance(instance,model): thedict = {} thedict['id'] = instance.id for f in instance._meta.fields: fname = f.name try: verbosename = f.verbose_name except: # pragma: no cover verbosename = f.name get_choice = 'get_'+fname+'_display' if hasattr( instance, get_choice): value = getattr(instance, get_choice)() else: try: value = getattr(instance,fname) except AttributeError: # pragma: no cover value = None if f.editable and value: thedict[fname] = (verbosename,value) return thedict def wavg(group, avg_name, weight_name): """ http://stackoverflow.com/questions/10951341/pandas-dataframe-aggregate-function-using-multiple-columns In rare instance, we may not have weights, so just return the mean. Customize this if your business case should return otherwise. """ try: d = group[avg_name] except KeyError: return 0 try: w = group[weight_name] except KeyError: return d.mean() try: return (d * w).sum() / w.sum() except ZeroDivisionError: # pragma: no cover return d.mean() def totaltime_sec_to_string(totaltime,shorten=False): if np.isnan(totaltime): return '' hours = int(totaltime / 3600.) if hours > 23: # pragma: no cover message = 'Warning: The workout duration was longer than 23 hours. ' hours = 23 minutes = int((totaltime - 3600. * hours) / 60.) if minutes > 59: # pragma: no cover minutes = 59 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' seconds = int(totaltime - 3600. * hours - 60. * minutes) if seconds > 59: # pragma: no cover seconds = 59 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' tenths = int(10 * (totaltime - 3600. * hours - 60. * minutes - seconds)) if tenths > 9: # pragma: no cover tenths = 9 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' duration = "" if not shorten: duration = "{hours:02d}:{minutes:02d}:{seconds:02d}.{tenths}".format( hours=hours, minutes=minutes, seconds=seconds, tenths=tenths ) else: if hours != 0: # pragma: no cover duration = "{hours}:{minutes:02d}:{seconds:02d}".format( hours=hours, minutes=minutes, seconds=seconds, tenths=tenths ) else: duration = "{minutes}:{seconds:02d}".format( hours=hours, minutes=minutes, seconds=seconds, tenths=tenths ) return duration def iscoach(m,r): # pragma: no cover result = False result = m in r.coaches return result # Exponentially weighted moving average # Used for data smoothing of the jagged data obtained by Strava # See bitbucket issue 72 def ewmovingaverage(interval,window_size): # Experimental code using Exponential Weighted moving average try: intervaldf = pd.DataFrame({'v':interval}) idf_ewma1 = intervaldf.ewm(span=window_size) idf_ewma2 = intervaldf[::-1].ewm(span=window_size) i_ewma1 = idf_ewma1.mean().loc[:,'v'] i_ewma2 = idf_ewma2.mean().loc[:,'v'] interval2 = np.vstack((i_ewma1,i_ewma2[::-1])) interval2 = np.mean( interval2, axis=0) # average except ValueError: # pragma: no cover interval2 = interval return interval2 # Exceptions # Custom error class - to raise a NoTokenError class NoTokenError(Exception): def __init__(self,value): self.value=value def __str__(self): # pragma: no cover return repr(self.value) class ProcessorCustomerError(Exception): # pragma: no cover def __init__(self, value): self.value=value def __str__(self): return repr(self.value) # Custom exception handler, returns a 401 HTTP message # with exception details in the json data def custom_exception_handler(exc,message): response = { "errors": [ { "code": str(exc), "detail": message, } ] } res = HttpResponse(message) res.status_code = 401 res.json = json.dumps(response) return res def get_strava_stream(r,metric,stravaid,series_type='time',fetchresolution='high',authorizationstring=''): if r is not None: # pragma: no cover authorizationstring = str('Bearer ' + r.stravatoken) headers = {'Authorization': authorizationstring, 'user-agent': 'sanderroosendaal', 'Content-Type': 'application/json', 'resolution': 'medium',} if metric == 'power': # pragma: no cover metric = 'watts' url = "https://www.strava.com/api/v3/activities/{stravaid}/streams/{metric}?resolution={fetchresolution}&series_type={series_type}".format( stravaid=stravaid, fetchresolution=fetchresolution, series_type=series_type, metric=metric ) s = requests.get(url,headers=headers) if metric=='power': # pragma: no cover with open('data.txt', 'w') as outfile: json.dump(s.json(), outfile) try: for data in s.json(): y = None try: if data['type'] == metric: return np.array(data['data']) except TypeError: # pragma: no cover return None except: # pragma: no cover return None return None # pragma: no cover def allmonths(startdate,enddate): d = startdate while d 0: # pragma: no cover targetpower = ftp*0.6 elif value > 10 and value < 1000: # pragma: no cover targetpower = value*ftp/100. elif value > 1000: targetpower = value-1000 avgspeed = ftv*(targetpower/ftp)**(1./3.) distance = avgspeed*seconds avgpower = targetpower if valuelow != 0 and valuehigh != 0: # pragma: no cover avgpower = (valuelow+valuehigh)/2. avgspeed = ftv*(avgspeed/ftv)**(1./3.) distance = avgspeed*seconds rscore = 100.*(avgpower/ftp)*seconds/3600. if targettype == 'Cadence': value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value != 0: avgpower = ftp*value/ftspm if valuelow != 0 and valuehigh != 0: # pragma: no cover avgspm = (valuelow+valuehigh)/2. avgpower = ftp*avgspm/ftspm rscore = 100*(avgpower/ftp)*seconds/3600. return seconds,distance,rscore elif durationtype == 'Distance': distance = value/100. seconds = distance/avgspeed rscore = 60.*float(seconds)/3600. if targettype == 'Speed': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) velomid = 0 if value != 0: seconds = distance/value velomid = value/1000. elif valuelow != 0 and valuehigh != 0: velomid = (valuelow+valuehigh)/2000. seconds = distance/velomid veloratio = (velomid/ftv)**(3.0) rscoreperhour = 100.*veloratio rscore = rscoreperhour*seconds/3600. if targettype == 'Power': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value != 0: if value < 10 and value > 0: targetpower = ftp*0.6 elif value > 10 and value < 1000: targetpower = value*ftp/100. elif value > 1000: targetpower = value-1000 avgspeed = ftv*(targetpower/ftp)**(1./3.) seconds = distance/avgspeed avgpower = targetpower if valuelow != 0 and valuehigh != 0: avgpower = (valuelow+valuehigh)/2. avgspeed = ftv*(avgspeed/ftv)**(1./3.) seconds = distance/avgspeed rscore = 100.*(avgpower/ftp)*seconds/3600. if targettype == 'Cadence': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value != 0: avgpower = ftp*value/ftspm if valuelow != 0 and valuehigh != 0: avgspm = (valuelow+valuehigh)/2. avgpower = ftp*avgspm/ftspm rscore = 100*(avgpower/ftp)*seconds/3600. return seconds, distance, rscore elif durationtype in ['PowerLessThan','PowerGreaterThan','HrLessThan','HrGreaterThan']: # pragma: no cover seconds = 600 distance = seconds*avgspeed veloratio = (avgspeed/ftv)**(3.0) rscoreperhour = 100.*veloratio rscore = rscoreperhour*seconds/3600. return seconds,distance,rscore return seconds,distance, rscore def get_step_type(step): # pragma: no cover t = 'WorkoutStep' if step['durationType'] in ['RepeatUntilStepsCmplt','RepeatUntilHrLessThan','RepeatUntilHrGreaterThan']: t = 'WorkoutRepeatStep' return t def peel(l): if len(l)==0: # pragma: no cover return None,None if len(l)==1: return l[0],None first = l[0] rest = l[1:] if first['type'] == 'Step': # pragma: no cover return first, rest # repeatstep theID = -1 lijst = [] while theID != first['repeatID']: f, rest = peel(rest) lijst.append(f) theID = f['stepID'] first['steps'] = list(reversed(lijst)) return first,rest def ps_dict_order_dict(d,short=False): steps = d['steps'] sdicts = [] for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short) seconds, meters,rscore = step_to_time_dist(step) sdict = { 'type':type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'dict': step, } sdicts.append(sdict) sdict2 = list(reversed(sdicts)) return sdict2 def ps_dict_order(d,short=False,rower=None,html=True): sdict = collections.OrderedDict({}) steps = d['steps'] ftp = 200 if rower is not None: ftp = rower.ftp # ftv = rower.ftv # ftspm = rower.ftspm for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short) seconds, meters,rscore = step_to_time_dist(step,ftp=ftp) sdict[stepID] = { 'string':sstring, 'type':type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'seconds': seconds, 'meters': meters, 'rscore': rscore, } sdict2 = collections.OrderedDict(reversed(list(sdict.items()))) for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string(step,short=short) seconds, meters, rscore = step_to_time_dist(step) sdict[stepID] = { 'string':sstring, 'type':type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'seconds': seconds, 'meters': meters, 'rscore': rscore, } sdict2 = collections.OrderedDict(reversed(list(sdict.items()))) sdict3 = [] hold = [] multiplier = [] holduntil = [] spaces = '' totalmeters = 0 totalseconds = 0 totalrscore = 0 factor = 1 for key, item in sdict2.items(): if item['type'] == 'RepeatStep': hold.append(item) holduntil.append(item['repeatID']) multiplier.append(item['repeatValue']) factor *= item['repeatValue'] if html: spaces += '   ' else: spaces += ' ' if item['type'] == 'Step': item['string'] = spaces+item['string'] sdict3.append(item) totalmeters += factor*item['meters'] totalseconds += factor*item['seconds'] totalrscore += factor*item['rscore'] if len(holduntil)>0 and item['stepID'] <= holduntil[-1]: if item['stepID'] == holduntil[-1]: sdict3.append(hold.pop()) factor /= multiplier.pop() if html: spaces = spaces[:-18] else: spaces = spaces[:-3] holduntil.pop() else: # pragma: no cover prevstep = sdict3.pop() prevstep['string'] = prevstep['string'][18:] prevprevstep = sdict3.pop() prevprevstep['string'] = spaces+prevprevstep['string'] sdict3.append(prevprevstep) curstep = hold.pop() curstep['string'] = curstep['string'] sdict3.append(curstep) factor /= multiplier.pop() sdict3.append(prevstep) holduntil.pop() if html: spaces = spaces[:-18] else: spaces = spaces[:-3] sdict = list(reversed(sdict3)) return sdict,totalmeters,totalseconds,totalrscore def step_to_string(step,short=False): type = 'Step' repeatID = -1 repeatValue = 1 nr = 0 name = '' intensity = '' duration = '' unit = '' target = '' repeat = '' notes = '' durationtype = step['durationType'] if step['durationValue'] == 0: if durationtype not in ['RepeatUntilStepsCmplt','RepeatUntilHrLessThan','RepeatUntilHrGreaterThan']: # pragma: no cover return '',type, -1, -1,1 if durationtype == 'Time': unit = 'min' value = step['durationValue'] if value/1000. >= 3600: # pragma: no cover unit = 'h' dd = timedelta(seconds=value/1000.) #duration = humanize.naturaldelta(dd, minimum_unit="seconds") duration = '{v}'.format(v=str(dd)) elif durationtype == 'Distance': unit = 'm' value = step['durationValue']/100. duration = int(value) elif durationtype == 'HrLessThan': # pragma: no cover value = step['durationValue'] if value <= 100: duration = 'until heart rate lower than {v}% of max'.format(v=value) if short: duration = 'until HR<{v}% of max'.format(v=value) else: duration = 'until heart rate lower than {v}'.format(v=value-100) if short: duration = 'until HR<{v}'.format(v=value-100) elif durationtype == 'HrGreaterThan': # pragma: no cover value = step['durationValue'] if value <= 100: duration = 'until heart rate greater than {v}% of max'.format(v=value) if short: duration = 'until R>{v}% of max'.format(v=value) else: duration = 'until heart rate greater than {v}'.format(v=value-100) if short: duration = 'until HR>{v}'.format(v=value/100) elif durationtype == 'PowerLessThan': # pragma: no cover value = step['durationValue'] targetvalue = step.get('targetValue',0) if value <= 1000: duration = 'Repeat until Power is less than {targetvalue} % of FTP'.format( targetvalue=targetvalue ) if short: 'until < {targetvalue}% of FTP'.format(targetvalue=targetvalue) else: duration = 'Repeat until Power is less than {targetvalue} Watt'.format( targetvalue=targetvalue-1000 ) if short: 'until < {targetvalue} W'.format(targetvalue=targetvalue-1000) elif durationtype == 'PowerGreaterThan': # pragma: no cover value = step['durationValue'] targetvalue = step.get('targetValue',0) if value <= 1000: duration = 'Repeat until Power is greater than {targetvalue} % of FTP'.format( targetvalue=targetvalue ) if short: duration = 'until > {targetvalue}% of FTP'.format(targetvalue=targetvalue) else: duration = 'Repeat until Power is greater than {targetvalue} Watt'.format( targetvalue=targetvalue-1000 ) if short: duration = 'until > {targetvalue} W'.format(targetvalue=targetvalue) elif durationtype == 'RepeatUntilStepsCmplt': # pragma: no cover type = 'RepeatStep' ntimes = ': {v}x'.format(v=step['targetValue']) repeatID = step['durationValue'] duration =ntimes repeatValue = step.get('targetValue',0) elif durationtype == 'RepeatUntilHrGreaterThan': # pragma: no cover type = 'RepeatStep' targetvalue = step.get('targetValue',0) if targetvalue <= 100: duration = 'Repeat until Heart Rate is greater than {targetvalue} % of max'.format( targetvalue=targetvalue ) if short: duration = ': until HR>{targetvalue} % of max'.format(targetvalue=targetvalue) else: duration = 'Repeat until Heart Rate is greater than {targetvalue}:'.format( targetvalue=targetvalue-100. ) if short: duration = ': untl HR>{targetvalue}'.format(targetvalue=targetvalue-100) repeatID = step['durationValue'] elif durationtype == 'RepeatUntilHrLessThan': # pragma: no cover type = 'RepeatStep' targetvalue = step.get('targetValue',0) if targetvalue <= 100: duration = 'Repeat until Heart Rate is less than {targetvalue} % of max'.format( targetvalue=targetvalue ) if short: duration = ': until HR<{targetvalue}% of max'.format(targetvalue=targetvalue) else: duration = 'Repeat until Heart Rate is less than {targetvalue}:'.format( targetvalue=targetvalue-100. ) if short: duration = ': until HR<{targetvalue}'.format(targetvalue=targetvalue-100) repeatID = step['durationValue'] # try: targettype = step['targetType'] except KeyError: targettype = None if targettype == 'HeartRate': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value < 10 and value>0: target = '@ HR zone {v}'.format(v=value) else: if valuelow < 100: target = '@ HR {l} - {h} % of max'.format( l = valuelow, h = valuehigh, ) else: target = '@ HR {l} - {h}'.format( l = valuelow - 100, h = valuehigh - 100, ) elif targettype == 'Power': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value < 10 and value>0: target = '@ Power zone {v}'.format(v=value) elif value > 10 and value < 1000: target = '@ {v}% of FTP'.format(v=value) elif value > 1000: target = '@ {v} W'.format(v=value-1000) elif valuelow > 0 and valuehigh > 0: if valuelow < 1000: target = '@ {l} - {h} % of FTP'.format( l = valuelow, h = valuehigh, ) else: target = '@ {l} - {h} W'.format( l = valuelow-1000, h = valuehigh-1000, ) elif targettype == 'Speed': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value != 0: v = value/1000. pace = 500./v pacestring = to_pace(pace) target = '@ {v} m/s {p}, per 500m'.format( v=value/1000., p=pacestring) if short: # pragma: no cover target = '@ {p}'.format(p=pacestring) elif valuelow != 0 and valuehigh != 0: # pragma: no cover v = valuelow/1000. pace = 500./v pacestringlow = to_pace(pace) v = valuehigh/1000. pace = 500./v pacestringhigh = to_pace(pace) target = '@ {l:1.2f} and {h:1.2f} m/s, ({ph} to {pl} per 500m)'.format( l = valuelow/1000., h = valuehigh/1000., pl = pacestringlow, ph = pacestringhigh, ) if short: target = '@ {pl} - {ph}'.format( pl = pacestringlow, ph = pacestringhigh, ) elif targettype == 'Cadence': # pragma: no cover value = step.get('targetValue',0) valuelow = step.get('targetValueLow',0) valuehigh = step.get('targetValueHigh',0) if value != 0: target = '@ {v} SPM'.format(v=value) elif valuelow != 0 and valuehigh != 0: target = '@ {l} - {h} SPM'.format( l = valuelow, h = valuehigh ) nr = step['stepId'] name = step['wkt_step_name'] notes = '' try: if len(step['description']): # pragma: no cover notes = ' - '+step['description'] except KeyError: notes = '' try: intensity = step['intensity'] except KeyError: intensity = 0 s = '{duration} {unit} {target} {repeat} {notes}'.format( nr = nr, name = name, unit=unit, intensity = intensity, duration = duration, target=target, repeat = repeat, notes=notes, ) if short: s = '{duration} {unit} {target} {repeat}'.format( duration = duration, target = target, repeat = repeat, unit = unit, ) if short and intensity in ['Warmup','Cooldown','Rest']: s = '{intensity} {duration} {unit} {target} {repeat}'.format( intensity = intensity, duration = duration, target = target, repeat = repeat, unit = unit ) elif intensity in ['Warmup','Cooldown','Rest']: s = '{intensity} {duration} {unit} {target} {repeat} {notes}'.format( intensity = intensity, duration = duration, target = target, repeat = repeat, unit = unit, notes = notes, ) if type == 'RepeatStep': s = 'Repeat {duration}'.format(duration=duration) return s,type, nr, repeatID, repeatValue def strfdelta(tdelta): # pragma: no cover try: minutes, seconds = divmod(tdelta.seconds, 60) tenths = int(tdelta.microseconds / 1e5) except AttributeError: minutes, seconds = divmod(tdelta.view(np.int64), 60e9) seconds, rest = divmod(seconds, 1e9) tenths = int(rest / 1e8) res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format( minutes=minutes, seconds=seconds, tenths=tenths, ) return res def request_is_ajax(request): is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' #if settings.TESTING: # is_ajax = True return is_ajax def intervals_to_string(vals, units, typ): if vals is None or units is None or typ is None: # pragma: no cover return '' if len(vals) != len(units) or len(vals) != len(typ): # pragma: no cover return '' s = '' previous = 'rest' for i in range(len(vals)): if typ[i] == 'rest' and previous == 'rest': if units[i] == 'min': # pragma: no cover val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': # pragma: no cover unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '+0min/{val}{unit}'.format(val=val,unit=unit) elif typ[i] == 'rest': # pragma: no cover if units[i] == 'min': val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '/{val}{unit}'.format(val=val,unit=unit) previous = 'rest' else: # pragma: no cover # work interval if units[i] == 'min': val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '+{val}{unit}'.format(val=val,unit=unit) previous = 'work' if s[0] == '+': s = s[1:] return s def get_timezone_from_c2data(data): try: timezone = pytz.timezone(data['timezone']) except UnknownTimeZoneError: timezone = pytz.utc except KeyError: # pragma: no cover timezone = pytz.utc return timezone def get_startdatetime_from_c2data(data): timezone = get_timezone_from_c2data(data) try: startdatetime = iso8601.parse_date(data['date_utc']) except: # pragma: no cover startdatetime = iso8601.parse_date(data['date']) totaltime = data['time']/10. try: rest_time = data['rest_time']/10. except KeyError: rest_time = 0 totaltime = totaltime+rest_time duration = totaltime_sec_to_string(totaltime) starttimeunix = arrow.get(startdatetime).timestamp()-totaltime startdatetime = arrow.get(starttimeunix) startdatetime = startdatetime.astimezone(timezone) workoutdate = startdatetime.astimezone( timezone ).strftime('%Y-%m-%d') starttime = startdatetime.astimezone( timezone ).strftime('%H:%M:%S') return startdatetime,starttime,workoutdate,duration,starttimeunix,timezone