from datetime import date import random from datetime import timedelta from django.utils import timezone import math import numpy as np import pandas as pd import colorsys from django.conf import settings import collections import uuid import datetime import json import time from fitparse import FitFile from django.http import HttpResponse import requests import humanize from pytz.exceptions import UnknownTimeZoneError import pytz import iso8601 from iso8601 import ParseError import arrow from django.conf import settings lbstoN = 4.44822 landingpages = ( ('workout_view', 'Workout View'), ('workout_edit_view', 'Edit View'), ('workout_workflow_view', 'Workflow View'), ('workout_stats_view', 'Stats View'), ('workout_data_view', 'Data Explore View'), ('workout_summary_edit_view', 'Intervals Editor'), ('workout_flexchart_stacked_view', 'Workout Stacked Chart'), ('workout_flexchart3_view', 'Workout Flex Chart') ) landingpages2 = ( ('workout_view', 'Workout View'), ('workout_edit_view', 'Edit View'), ('workout_workflow_view', 'Workflow View'), ('workout_stats_view', 'Stats View'), ('workout_data_view', 'Data Explore View'), ('workout_summary_edit_view', 'Intervals Editor'), ('workout_flexchart_stacked_view', 'Workout Stacked Chart'), ('workout_flexchart3_view', 'Workout Flex Chart'), ('workout_delete', 'Remove Workout') ) workflowmiddlepanel = ( ('panel_statcharts.html', 'Static Charts'), ('flexthumbnails.html', 'Flex Charts'), ('panel_summary.html', 'Summary'), ('panel_map.html', 'Map'), ('panel_comments.html', 'Basic Info and Links'), ('panel_notes.html', 'Workout Notes'), ('panel_shortcomment.html', 'Comment Link'), ('panel_middlesocial.html', 'Social Media Share Buttons'), ) defaultmiddle = ['panel_middlesocial.html', 'panel_statcharts.html', 'flexthumbnails.html', 'panel_summary.html', 'panel_map.html'] workflowleftpanel = ( ('panel_navigationheader.html', 'Navigation Header'), ('panel_editbuttons.html', 'Edit Workout Button'), ('panel_delete.html', 'Delete Workout Button'), ('panel_export.html', 'Export Workout Button'), ('panel_social.html', 'Social Media Share Buttons'), ('panel_advancededit.html', 'Advanced Workout Edit Button'), ('panel_editintervals.html', 'Edit Intervals Button'), ('panel_stats.html', 'Workout Statistics Button'), ('panel_flexchart.html', 'Flex Chart'), ('panel_staticchart.html', 'Create Static Charts Buttons'), ('panel_uploadimage.html', 'Attach Image'), ('panel_geekyheader.html', 'Geeky Header'), ('panel_editwind.html', 'Edit Wind Data'), ('panel_editstream.html', 'Edit Stream Data'), ('panel_otwpower.html', 'Run OTW Power Calculations'), ('panel_mapview.html', 'Map'), ('panel_ranking.html', 'Ranking View'), ) defaultleft = [ 'panel_navigationheader.html', 'panel_editbuttons.html', 'panel_advancededit.html', 'panel_editintervals.html', 'panel_stats.html', 'panel_staticchart.html', 'panel_uploadimage.html', ] coxes_calls = [ 'Sit Ready!', "Let's relax the shoulders, and give me a power ten to the finish!", "Almost there. Give me ten strokes on the legs!", "Let it run!", "Don't rush the slides!", "Quick hands.", "You are clearing the puddles.", "Let's push through now. Get me that open water.", "We're going for the line now. Power ten on the next.", ] info_calls = [ "Please give us a minute to count all those strokes, you've been working hard!", "Please give us a minute to count all your strokes." ] def dologging(filename, s): do_logging = settings.MYLOGGING.get(filename,True) if not do_logging: return tstamp = time.localtime() timestamp = time.strftime('%b-%d-%Y %H:%M:%S', tstamp) with open(filename, 'a') as f: f.write('\n') f.write(timestamp) f.write(' ') try: f.write(s) except TypeError: f.write(str(s)) def to_pace(pace): # pragma: no cover minutes, seconds = divmod(pace, 60) seconds, rest = divmod(seconds, 1) tenths = int(rest*10) s = '{m:0>2}:{s:0>2}.{t:0>1}'.format( m=int(minutes), s=int(seconds), t=int(tenths) ) return s def get_call(): call1 = random.choice(coxes_calls) call2 = random.choice(info_calls) call = """
""" % (call1, call2) return call def absolute(request): urls = { 'ABSOLUTE_ROOT': request.build_absolute_uri('/')[:-1].strip("/"), 'ABSOLUTE_ROOT_URL': request.build_absolute_uri('/').strip("/"), 'PATH': request.build_absolute_uri(), } return urls def trcolors(r1, g1, b1, r2, g2, b2): r1 = r1/255. r2 = r2/255. g1 = g1/255. g2 = g2/255. b2 = b2/255. b1 = b1/255. h1, s1, v1 = colorsys.rgb_to_hsv(r1, g1, b1) h2, s2, v2 = colorsys.rgb_to_hsv(r2, g2, b2) return 360*h1, 360*(h2-h1), s1, (s2-s1), v1, (v2-v1) palettes = { 'monochrome_blue': (207, -4, 0.06, 0.89, 1.0, -0.38), 'gold_sunset': (47, -31, .26, -0.12, 0.94, -0.5), 'blue_red': (207, -200, .85, 0, .74, -.24), 'blue_green': (207, -120, .85, 0, .75, .25), 'cyan_purple': trcolors(237, 248, 251, 136, 65, 157), 'green_blue': trcolors(240, 249, 232, 8, 104, 172), 'orange_red': trcolors(254, 240, 217, 179, 0, 0), 'cyan_blue': trcolors(241, 238, 246, 4, 90, 141), 'cyan_green': trcolors(246, 239, 247, 1, 108, 89), 'cyan_magenta': trcolors(241, 238, 246, 152, 0, 67), 'beige_magenta': trcolors(254, 235, 226, 122, 1, 119), 'yellow_green': trcolors(255, 255, 204, 0, 104, 55), 'yellow_blue': trcolors(255, 255, 205, 37, 52, 148), 'autumn': trcolors(255, 255, 212, 153, 52, 4), 'yellow_red': trcolors(255, 255, 178, 189, 0, 39) } rankingdistances = [100, 500, 1000, 2000, 5000, 6000, 10000, 21097, 42195, 100000] rankingdurations = [] rankingdurations.append(datetime.time(minute=1)) rankingdurations.append(datetime.time(minute=4)) rankingdurations.append(datetime.time(minute=30)) rankingdurations.append(datetime.time(hour=1, minute=15)) rankingdurations.append(datetime.time(hour=1)) def range_to_color_hex(groupcols, palette='monochrome_blue'): try: plt = palettes[palette] except KeyError: # pragma: no cover plt = palettes['monochrome_blue'] rgb = [colorsys.hsv_to_rgb((plt[0]+plt[1]*x)/360., plt[2]+plt[3]*x, plt[4]+plt[5]*x) for x in groupcols] RGB = [(int(255.*r), int(255.*g), int(255.*b)) for (r, g, b) in rgb] colors = ["#%02x%02x%02x" % (r, g, b) for (r, g, b) in RGB] return colors def str2bool(v): # pragma: no cover return v.lower() in ("yes", "true", "t", "1") def uniqify(seq, idfun=None): # order preserving if idfun is None: def idfun(x): return x seen = {} result = [] for item in seq: marker = idfun(item) # in old Python versions: # if seen.has_key(marker) # but in new ones: if marker in seen: continue seen[marker] = 1 result.append(item) return result def serialize_list(value, token=','): # pragma: no cover assert(isinstance(value, list) or isinstance( value, tuple) or isinstance(value, np.ndarray)) return token.join([str(s) for s in value]) def deserialize_list(value, token=','): # pragma: no cover if isinstance(value, list): return value elif isinstance(value, np.ndarray): return value return value.split(token) def geo_distance(lat1, lon1, lat2, lon2): """ Approximate distance and bearing between two points defined by lat1,lon1 and lat2,lon2 This is a slight underestimate but is close enough for our purposes, We're never moving more than 10 meters between trackpoints Bearing calculation fails if one of the points is a pole. (Hey, from the North pole you can walk South, East, North and end up on the same spot!) """ # radius of our earth in km --> should be moved to settings if # rowing takes off on other planets R = 6373.0 # pi pi = math.pi lat1 = math.radians(lat1) lat2 = math.radians(lat2) lon1 = math.radians(lon1) lon2 = math.radians(lon2) dlon = lon2 - lon1 dlat = lat2 - lat1 a = math.sin(dlat / 2)**2 + math.cos(lat1) * \ math.cos(lat2) * math.sin(dlon / 2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) distance = R * c tc1 = math.atan2(math.sin(lon2-lon1)*math.cos(lat2), math.cos(lat1)*math.sin(lat2)-math.sin(lat1)*math.cos(lat2)*math.cos(lon2-lon1)) tc1 = tc1 % (2*pi) bearing = math.degrees(tc1) return [distance, bearing] def isbreakthrough(delta, cpvalues, p0, p1, p2, p3, ratio): pwr = abs(p0)/(1+(delta/abs(p2))) pwr += abs(p1)/(1+(delta/abs(p3))) dd = 0.25*(ratio-1) pwr2 = pwr*(1+dd) pwr *= ratio delta = delta.values.astype(int) cpvalues = cpvalues.values.astype(int) pwr = pwr.astype(int) res = np.sum(cpvalues > pwr+1) res2 = np.sum(cpvalues > pwr2+1) btdf = pd.DataFrame( { 'delta': delta[cpvalues > pwr], 'cpvalues': cpvalues[cpvalues > pwr], 'pwr': pwr[cpvalues > pwr], } ) btdf.sort_values('delta', axis=0, inplace=True) return res >= 1, btdf, res2 >= 1 def myqueue(queue, function, *args, **kwargs): class MockJob: def __init__(self, *args, **kwargs): self.result = 1 self.id = 1 def revoke(self): # pragma: no cover return 1 if settings.TESTING: return MockJob() elif settings.CELERY: # pragma: no cover kwargs['debug'] = True job = function.delay(*args, **kwargs) else: # pragma: no cover if settings.DEBUG: kwargs['debug'] = True job_id = str(uuid.uuid4()) kwargs['job_id'] = job_id kwargs['jobkey'] = job_id kwargs['timeout'] = 3600 dologging('queue.log',function.__name__) job = queue.enqueue(function, *args, **kwargs) return job # pragma: no cover def calculate_age(born, today=None): if not today: today = timezone.now() if born: try: return today.year - born.year - ((today.month, today.day) < (born.month, born.day)) except AttributeError: return None else: return None def my_dict_from_instance(instance, model): thedict = {} thedict['id'] = instance.id for f in instance._meta.fields: fname = f.name try: verbosename = f.verbose_name except: # pragma: no cover verbosename = f.name get_choice = 'get_'+fname+'_display' if hasattr(instance, get_choice): value = getattr(instance, get_choice)() else: try: value = getattr(instance, fname) except AttributeError: # pragma: no cover value = None if f.editable and value: thedict[fname] = (verbosename, value) return thedict def wavg(group, avg_name, weight_name): """ http://stackoverflow.com/questions/10951341/pandas-dataframe-aggregate-function-using-multiple-columns In rare instance, we may not have weights, so just return the mean. Customize this if your business case should return otherwise. """ try: d = group[avg_name] except KeyError: return 0 try: w = group[weight_name] except KeyError: return d.mean() try: return (d * w).sum() / w.sum() except ZeroDivisionError: # pragma: no cover return d.mean() from string import Formatter def totaltime_sec_to_string(totaltime, shorten=False): if np.isnan(totaltime): return '' fmt = '{H:02}:{M:02}:{S:02}.{t}' if shorten: fmt = '{H}:{M:02}:{S:02}' if totaltime < 3600: fmt = '{M}:{S:02}' remainder = int(totaltime) tenths = totaltime-remainder f = Formatter() desired_fields = [field_tuple[1] for field_tuple in f.parse(fmt)] possible_fields = ('D','H', 'M', 'S') constants = {'D':86400, 'H': 3600, 'M': 60, 'S': 1} values = {} for field in possible_fields: if field in desired_fields and field in constants: values[field], remainder = divmod(remainder, constants[field]) values['t'] = round(10*tenths) return f.format(fmt, **values) def totaltime_sec_to_string_old(totaltime, shorten=False): if np.isnan(totaltime): return '' hours = int(totaltime / 3600.) if hours > 23: # pragma: no cover message = 'Warning: The workout duration was longer than 23 hours. ' hours = 23 elif hours < 0: message = 'Warning: invalid workout duration' hours = 0 minutes = int((totaltime - 3600. * hours) / 60.) if minutes > 59: # pragma: no cover minutes = 59 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' elif minutes < 0: minutes = 0 if not message: message = 'Warning: invalid workout duration' seconds = int(totaltime - 3600. * hours - 60. * minutes) if seconds > 59: # pragma: no cover seconds = 59 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' elif seconds < 0: seconds = 0 if not message: message = 'Warning: invalid workout duration' tenths = round(10 * (totaltime - 3600. * hours - 60. * minutes - seconds)) if tenths > 9: # pragma: no cover tenths = 9 if not message: # pragma: no cover message = 'Warning: there is something wrong with the workout duration' elif tenths < 0: tenths = 0 if not message: message = 'Warning: invalid workout duration' duration = "" if not shorten: duration = "{hours:02d}:{minutes:02d}:{seconds:02d}.{tenths}".format( hours=hours, minutes=minutes, seconds=seconds, tenths=tenths ) else: if hours != 0: # pragma: no cover duration = "{hours}:{minutes:02d}:{seconds:02d}".format( hours=hours, minutes=minutes, seconds=seconds, # tenths=tenths ) else: duration = "{minutes}:{seconds:02d}".format( # hours=hours, minutes=minutes, seconds=seconds, # tenths=tenths ) return duration def iscoach(m, r): # pragma: no cover result = False result = m in r.coaches return result # Exponentially weighted moving average # Used for data smoothing of the jagged data obtained by Strava # See bitbucket issue 72 def ewmovingaverage(interval, window_size): # Experimental code using Exponential Weighted moving average try: intervaldf = pd.DataFrame({'v': interval}) idf_ewma1 = intervaldf.ewm(span=window_size) idf_ewma2 = intervaldf[::-1].ewm(span=window_size) i_ewma1 = idf_ewma1.mean().loc[:, 'v'] i_ewma2 = idf_ewma2.mean().loc[:, 'v'] interval2 = np.vstack((i_ewma1, i_ewma2[::-1])) interval2 = np.mean(interval2, axis=0) # average except ValueError: # pragma: no cover interval2 = interval return interval2 # Exceptions # Custom error class - to raise a NoTokenError class NoTokenError(Exception): def __init__(self, value): self.value = value def __str__(self): # pragma: no cover return repr(self.value) class ProcessorCustomerError(Exception): # pragma: no cover def __init__(self, value): self.value = value def __str__(self): return repr(self.value) # Custom exception handler, returns a 401 HTTP message # with exception details in the json data def custom_exception_handler(exc, message): response = { "errors": [ { "code": str(exc), "detail": message, } ] } res = HttpResponse(message) res.status_code = 401 res.json = json.dumps(response) return res def get_strava_stream(r, metric, stravaid, series_type='time', fetchresolution='high', authorizationstring=''): if r is not None: # pragma: no cover authorizationstring = str('Bearer ' + r.stravatoken) headers = {'Authorization': authorizationstring, 'user-agent': 'sanderroosendaal', 'Content-Type': 'application/json', 'resolution': 'medium', } if metric == 'power': # pragma: no cover metric = 'watts' url = "https://www.strava.com/api/v3/activities/{stravaid}" \ "/streams/{metric}?resolution={fetchresolution}&series_type={series_type}".format( stravaid=stravaid, fetchresolution=fetchresolution, series_type=series_type, metric=metric) s = requests.get(url, headers=headers) if metric == 'power': # pragma: no cover with open('data.txt', 'w') as outfile: json.dump(s.json(), outfile) try: for data in s.json(): try: if data['type'] == metric: return np.array(data['data']) except TypeError: # pragma: no cover return None except: # pragma: no cover return None return None # pragma: no cover def allmonths(startdate, enddate): d = startdate while d < enddate: yield d d = datetime.date(d.year+(d.month // 12), ((d.month % 12) + 1), 1) def allsundays(startdate, enddate): d = startdate d += datetime.timedelta(days=6 - d.weekday()) # first Sunday while d <= enddate: yield d d += datetime.timedelta(days=7) def steps_read_fit(filename, name='', sport='Custom'): # pragma: no cover authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN url = settings.WORKOUTS_FIT_URL+"/tojson" headers = {'Authorization': authorizationstring} response = requests.post(url=url, headers=headers, json={'filename': filename}) if response.status_code != 200: # pragma: no cover return None w = response.json() try: d = w['workout'] except KeyError: # pragma: no cover return None return d def steps_write_fit(steps): authorizationstring = 'Bearer '+settings.WORKOUTS_FIT_TOKEN url = settings.WORKOUTS_FIT_URL+"/tofit" headers = {'Authorization': authorizationstring} response = requests.post(url=url, headers=headers, json=steps) if response.status_code != 200: # pragma: no cover return None w = response.json() try: filename = w['filename'] except KeyError: # pragma: no cover return None return filename def step_to_time_dist(step, avgspeed=3.2, ftp=200, ftspm=25, ftv=3.7, powerzones=None): seconds = 0 distance = 0 rscore = 0 durationtype = step.get('durationType', 0) value = step.get('durationValue', 0) if value == 0: # pragma: no cover return 0, 0, 0 targettype = step.get('targetType', 0) if durationtype == 'Time': seconds = value/1000. distance = avgspeed*seconds rscore = 60.*seconds/3600. if targettype == 'Speed': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) velomid = 0. if value != 0: # pragma: no cover distance = seconds*value/1000. velomid = value/1000. elif valuelow != 0 and valuehigh != 0: # pragma: no cover distance = seconds*(valuelow+valuehigh)/2. velomid = (valuelow+valuehigh)/2000. else: velomid = avgspeed distance = velomid*seconds veloratio = (velomid/ftv)**(3.0) rscoreperhour = 100.*veloratio rscore = rscoreperhour*seconds/3600. if targettype == 'Power': value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: if value < 10 and value > 0: # pragma: no cover targetpower = ftp*0.6 # dit is niet correct if powerzones is not None and value < len(powerzones)-1 and value>1: targetpower = (powerzones[int(value)-1]+powerzones[int(value)])/(2.) elif value > 10 and value < 1000: # pragma: no cover targetpower = value*ftp/100. elif value > 1000: targetpower = value-1000 avgspeed = ftv*(targetpower/ftp)**(1./3.) distance = avgspeed*seconds avgpower = targetpower if valuelow != 0 and valuehigh != 0: # pragma: no cover avgpower = (valuelow+valuehigh)/2. avgspeed = ftv*(avgpower/ftp)**(1./3.) distance = avgspeed*seconds rscore = 100.*(avgpower/ftp)*seconds/3600. if targettype == 'Cadence': value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: avgpower = ftp*value/ftspm if valuelow != 0 and valuehigh != 0: # pragma: no cover avgspm = (valuelow+valuehigh)/2. avgpower = ftp*avgspm/ftspm avgspeed = ftv*(avgpower/ftp)**(1./3.) distance = avgspeed*seconds rscore = 100*(avgpower/ftp)*seconds/3600. return seconds, distance, rscore elif durationtype == 'Distance': distance = value/100. seconds = distance/avgspeed rscore = 60.*float(seconds)/3600. if targettype == 'Speed': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) velomid = 0 if value != 0: seconds = distance/value velomid = value/1000. elif valuelow != 0 and valuehigh != 0: velomid = (valuelow+valuehigh)/2000. seconds = distance/velomid veloratio = (velomid/ftv)**(3.0) rscoreperhour = 100.*veloratio rscore = rscoreperhour*seconds/3600. seconds = distance/velomid if targettype == 'Power': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: if value < 10 and value > 0: targetpower = ftp*0.6 elif value > 10 and value < 1000: targetpower = value*ftp/100. elif value > 1000: targetpower = value-1000 avgspeed = ftv*(targetpower/ftp)**(1./3.) seconds = distance/avgspeed avgpower = targetpower if valuelow != 0 and valuehigh != 0: avgpower = (valuelow+valuehigh)/2. avgspeed = ftv*(avgpower/ftp)**(1./3.) seconds = distance/avgspeed rscore = 100.*(avgpower/ftp)*seconds/3600. if targettype == 'Cadence': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: avgpower = ftp*value/ftspm if valuelow != 0 and valuehigh != 0: avgspm = (valuelow+valuehigh)/2. avgpower = ftp*avgspm/ftspm rscore = 100*(avgpower/ftp)*seconds/3600. avgspeed = ftv*(avgpower/ftp)**(1./3.) seconds = distance/avgspeed return seconds, distance, rscore elif durationtype in ['PowerLessThan', 'PowerGreaterThan', 'HrLessThan', 'HrGreaterThan']: # pragma: no cover seconds = 600 distance = seconds*avgspeed veloratio = (avgspeed/ftv)**(3.0) rscoreperhour = 100.*veloratio rscore = rscoreperhour*seconds/3600. return seconds, distance, rscore return seconds, distance, rscore def get_step_type(step): # pragma: no cover t = 'WorkoutStep' if step['durationType'] in ['RepeatUntilStepsCmplt', 'RepeatUntilHrLessThan', 'RepeatUntilHrGreaterThan']: t = 'WorkoutRepeatStep' return t def peel(listToPeel): if len(listToPeel) == 0: # pragma: no cover return None, None if len(listToPeel) == 1: return listToPeel[0], None first = listToPeel[0] rest = listToPeel[1:] if first['type'] == 'Step': # pragma: no cover return first, rest # repeatstep theID = -1 lijst = [] while theID != first['repeatID']: f, rest = peel(rest) lijst.append(f) theID = f['stepID'] first['steps'] = list(reversed(lijst)) return first, rest def ps_dict_order_dict(d, short=False): steps = d['steps'] sdicts = [] for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string( step, short=short) seconds, meters, rscore = step_to_time_dist(step) sdict = { 'type': type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'dict': step, } sdicts.append(sdict) sdict2 = list(reversed(sdicts)) return sdict2 def ps_dict_order(d, short=False, rower=None, html=True): sdict = collections.OrderedDict({}) steps = d['steps'] if steps is None: steps = [] ftp = 200 powerzones = None if rower is not None: ftp = rower.ftp powerzones = [rower.pw_ut2, rower.pw_ut1, rower.pw_at, rower.pw_tr, rower.pw_an] # ftv = rower.ftv # ftspm = rower.ftspm for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string( step, short=short) seconds, meters, rscore = step_to_time_dist(step, ftp=ftp, powerzones=powerzones) sdict[stepID] = { 'string': sstring, 'type': type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'seconds': seconds, 'meters': meters, 'rscore': rscore, } sdict2 = collections.OrderedDict(reversed(list(sdict.items()))) for step in steps: sstring, type, stepID, repeatID, repeatValue = step_to_string( step, short=short) seconds, meters, rscore = step_to_time_dist(step) sdict[stepID] = { 'string': sstring, 'type': type, 'stepID': stepID, 'repeatID': repeatID, 'repeatValue': repeatValue, 'seconds': seconds, 'meters': meters, 'rscore': rscore, } sdict2 = collections.OrderedDict(reversed(list(sdict.items()))) sdict3 = [] hold = [] multiplier = [] holduntil = [] spaces = '' totalmeters = 0 totalseconds = 0 totalrscore = 0 factor = 1 for key, item in sdict2.items(): if item['type'] == 'RepeatStep': hold.append(item) holduntil.append(item['repeatID']) multiplier.append(item['repeatValue']) factor *= item['repeatValue'] if html: spaces += ' ' else: spaces += ' ' if item['type'] == 'Step': item['string'] = spaces+item['string'] sdict3.append(item) totalmeters += factor*item['meters'] totalseconds += factor*item['seconds'] totalrscore += factor*item['rscore'] if len(holduntil) > 0 and item['stepID'] <= holduntil[-1]: if item['stepID'] == holduntil[-1]: sdict3.append(hold.pop()) factor /= multiplier.pop() if html: spaces = spaces[:-18] else: spaces = spaces[:-3] holduntil.pop() else: # pragma: no cover prevstep = sdict3.pop() prevstep['string'] = prevstep['string'][18:] prevprevstep = sdict3.pop() prevprevstep['string'] = spaces+prevprevstep['string'] sdict3.append(prevprevstep) curstep = hold.pop() curstep['string'] = curstep['string'] sdict3.append(curstep) factor /= multiplier.pop() sdict3.append(prevstep) holduntil.pop() if html: spaces = spaces[:-18] else: spaces = spaces[:-3] sdict = list(reversed(sdict3)) return sdict, totalmeters, totalseconds, totalrscore def step_to_string(step, short=False): type = 'Step' repeatID = -1 repeatValue = 1 nr = 0 intensity = '' duration = '' unit = '' target = '' repeat = '' notes = '' durationtype = step['durationType'] if step['durationValue'] == 0: if durationtype not in [ 'RepeatUntilStepsCmplt', 'RepeatUntilHrLessThan', 'RepeatUntilHrGreaterThan' ]: # pragma: no cover return '', type, -1, -1, 1 if durationtype == 'Time': unit = 'min' value = step['durationValue'] if value/1000. >= 3600: # pragma: no cover unit = 'h' dd = timedelta(seconds=value/1000.) duration = '{v}'.format(v=str(dd)) elif durationtype == 'Distance': unit = 'm' value = step['durationValue']/100. duration = int(value) elif durationtype == 'HrLessThan': # pragma: no cover value = step['durationValue'] if value <= 100: duration = 'until heart rate lower than {v}% of max'.format( v=value) if short: duration = 'until HR<{v}% of max'.format(v=value) else: duration = 'until heart rate lower than {v}'.format(v=value-100) if short: duration = 'until HR<{v}'.format(v=value-100) elif durationtype == 'HrGreaterThan': # pragma: no cover value = step['durationValue'] if value <= 100: duration = 'until heart rate greater than {v}% of max'.format( v=value) if short: duration = 'until R>{v}% of max'.format(v=value) else: duration = 'until heart rate greater than {v}'.format(v=value-100) if short: duration = 'until HR>{v}'.format(v=value/100) elif durationtype == 'PowerLessThan': # pragma: no cover value = step['durationValue'] targetvalue = step.get('targetValue', 0) if value <= 1000: duration = 'Repeat until Power is less than {targetvalue} % of FTP'.format( targetvalue=targetvalue ) if short: 'until < {targetvalue}% of FTP'.format(targetvalue=targetvalue) else: duration = 'Repeat until Power is less than {targetvalue} Watt'.format( targetvalue=targetvalue-1000 ) if short: 'until < {targetvalue} W'.format(targetvalue=targetvalue-1000) elif durationtype == 'PowerGreaterThan': # pragma: no cover value = step['durationValue'] targetvalue = step.get('targetValue', 0) if value <= 1000: duration = 'Repeat until Power is greater than {targetvalue} % of FTP'.format( targetvalue=targetvalue ) if short: duration = 'until > {targetvalue}% of FTP'.format( targetvalue=targetvalue) else: duration = 'Repeat until Power is greater than {targetvalue} Watt'.format( targetvalue=targetvalue-1000 ) if short: duration = 'until > {targetvalue} W'.format( targetvalue=targetvalue) elif durationtype == 'RepeatUntilStepsCmplt': # pragma: no cover type = 'RepeatStep' ntimes = ': {v}x'.format(v=step['targetValue']) repeatID = step['durationValue'] duration = ntimes repeatValue = step.get('targetValue', 0) elif durationtype == 'RepeatUntilHrGreaterThan': # pragma: no cover type = 'RepeatStep' targetvalue = step.get('targetValue', 0) if targetvalue <= 100: duration = 'Repeat until Heart Rate is greater than {targetvalue} % of max'.format( targetvalue=targetvalue ) if short: duration = ': until HR>{targetvalue} % of max'.format( targetvalue=targetvalue) else: duration = 'Repeat until Heart Rate is greater than {targetvalue}:'.format( targetvalue=targetvalue-100. ) if short: duration = ': untl HR>{targetvalue}'.format( targetvalue=targetvalue-100) repeatID = step['durationValue'] elif durationtype == 'RepeatUntilHrLessThan': # pragma: no cover type = 'RepeatStep' targetvalue = step.get('targetValue', 0) if targetvalue <= 100: duration = 'Repeat until Heart Rate is less than {targetvalue} % of max'.format( targetvalue=targetvalue ) if short: duration = ': until HR<{targetvalue}% of max'.format( targetvalue=targetvalue) else: duration = 'Repeat until Heart Rate is less than {targetvalue}:'.format( targetvalue=targetvalue-100. ) if short: duration = ': until HR<{targetvalue}'.format( targetvalue=targetvalue-100) repeatID = step['durationValue'] # try: targettype = step['targetType'] except KeyError: targettype = None if targettype == 'HeartRate': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value < 10 and value > 0: target = '@ HR zone {v}'.format(v=value) else: if valuelow < 100: target = '@ HR {l} - {h} % of max'.format( l=valuelow, h=valuehigh, ) else: target = '@ HR {l} - {h}'.format( l=valuelow - 100, h=valuehigh - 100, ) elif targettype == 'Power': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value < 10 and value > 0: target = '@ Power zone {v}'.format(v=value) elif value > 10 and value < 1000: target = '@ {v}% of FTP'.format(v=value) elif value > 1000: target = '@ {v} W'.format(v=value-1000) elif valuelow > 0 and valuehigh > 0: if valuelow < 1000: target = '@ {l} - {h} % of FTP'.format( l=valuelow, h=valuehigh, ) else: target = '@ {l} - {h} W'.format( l=valuelow-1000, h=valuehigh-1000, ) elif targettype == 'Speed': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: v = value/1000. pace = 500./v pacestring = to_pace(pace) target = '@ {v} m/s {p}, per 500m'.format( v=value/1000., p=pacestring) if short: # pragma: no cover target = '@ {p}'.format(p=pacestring) elif valuelow != 0 and valuehigh != 0: # pragma: no cover v = valuelow/1000. pace = 500./v pacestringlow = to_pace(pace) v = valuehigh/1000. pace = 500./v pacestringhigh = to_pace(pace) target = '@ {l:1.2f} and {h:1.2f} m/s, ({ph} to {pl} per 500m)'.format( l=valuelow/1000., h=valuehigh/1000., pl=pacestringlow, ph=pacestringhigh, ) if short: target = '@ {pl} - {ph}'.format( pl=pacestringlow, ph=pacestringhigh, ) elif targettype == 'Cadence': # pragma: no cover value = step.get('targetValue', 0) valuelow = step.get('targetValueLow', 0) valuehigh = step.get('targetValueHigh', 0) if value != 0: target = '@ {v} SPM'.format(v=value) elif valuelow != 0 and valuehigh != 0: target = '@ {l} - {h} SPM'.format( l=valuelow, h=valuehigh ) nr = step['stepId'] # name = step['wkt_step_name'] notes = '' try: if len(step['description']): # pragma: no cover notes = ' - '+step['description'] except KeyError: notes = '' try: intensity = step['intensity'] except KeyError: intensity = 0 s = '{duration} {unit} {target} {repeat} {notes}'.format( # nr=nr, # name=name, unit=unit, # intensity=intensity, duration=duration, target=target, repeat=repeat, notes=notes, ) if short: s = '{duration} {unit} {target} {repeat}'.format( duration=duration, target=target, repeat=repeat, unit=unit, ) if short and intensity in ['Warmup', 'Cooldown', 'Rest']: s = '{intensity} {duration} {unit} {target} {repeat}'.format( intensity=intensity, duration=duration, target=target, repeat=repeat, unit=unit ) elif intensity in ['Warmup', 'Cooldown', 'Rest']: s = '{intensity} {duration} {unit} {target} {repeat} {notes}'.format( intensity=intensity, duration=duration, target=target, repeat=repeat, unit=unit, notes=notes, ) if type == 'RepeatStep': s = 'Repeat {duration}'.format(duration=duration) return s, type, nr, repeatID, repeatValue def strfdelta(tdelta): # pragma: no cover try: minutes, seconds = divmod(tdelta.seconds, 60) tenths = int(tdelta.microseconds / 1e5) except AttributeError: minutes, seconds = divmod(tdelta.view(np.int64), 60e9) seconds, rest = divmod(seconds, 1e9) tenths = int(rest / 1e8) res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format( minutes=minutes, seconds=seconds, tenths=tenths, ) return res def request_is_ajax(request): is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' # if settings.TESTING: # is_ajax = True return is_ajax def intervals_to_string(vals, units, typ): if vals is None or units is None or typ is None: # pragma: no cover return '' if len(vals) != len(units) or len(vals) != len(typ): # pragma: no cover return '' s = '' previous = 'rest' for i in range(len(vals)): if typ[i] == 'rest' and previous == 'rest': if units[i] == 'min': # pragma: no cover val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': # pragma: no cover unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '+0min/{val}{unit}'.format(val=val, unit=unit) elif typ[i] == 'rest': # pragma: no cover if units[i] == 'min': val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '/{val}{unit}'.format(val=val, unit=unit) previous = 'rest' else: # pragma: no cover # work interval if units[i] == 'min': val = int(vals[i])*60 unit = 'sec' else: val = int(vals[i]) if units[i] == 'meters': unit = 'm' if units[i] == 'seconds': unit = 'sec' s += '+{val}{unit}'.format(val=val, unit=unit) previous = 'work' if s[0] == '+': s = s[1:] return s def get_timezone_from_c2data(data): try: timezone = pytz.timezone(data['timezone']) except UnknownTimeZoneError: timezone = pytz.utc except KeyError: # pragma: no cover timezone = pytz.utc return timezone def get_startdatetime_from_c2data(data): timezone = get_timezone_from_c2data(data) try: startdatetime = iso8601.parse_date(data['date_utc']) except: # pragma: no cover startdatetime = iso8601.parse_date(data['date']) totaltime = data['time']/10. try: rest_time = data['rest_time']/10. except KeyError: rest_time = 0 totaltime = totaltime+rest_time duration = totaltime_sec_to_string(totaltime) starttimeunix = arrow.get(startdatetime).timestamp()-totaltime startdatetime = arrow.get(starttimeunix) startdatetime = startdatetime.astimezone(timezone) workoutdate = startdatetime.astimezone( timezone ).strftime('%Y-%m-%d') starttime = startdatetime.astimezone( timezone ).strftime('%H:%M:%S') return startdatetime, starttime, workoutdate, duration, starttimeunix, timezone