Private
Public Access
1
0

coverage related changes

This commit is contained in:
Sander Roosendaal
2021-04-26 17:48:22 +02:00
parent 7626554ba9
commit 9e2a97e721
17 changed files with 1534 additions and 144 deletions

View File

@@ -322,7 +322,7 @@ def update_subscription(rower,data,method='up'):
return False,0
return False,0
return False,0 # pragma: no cover
def create_subscription(rower,data):

View File

@@ -13,15 +13,15 @@ def coordinate_in_path(latitude,longitude, p):
return p.contains_points([(latitude,longitude)])[0]
class InvalidTrajectoryError(Exception):
def __init__(self,value):
def __init__(self,value): # pragma: no cover
self.value=value
def __str__(self):
def __str__(self): # pragma: no cover
return repr(self.value)
def time_in_path(df,p,maxmin='max',getall=False,name='unknown',logfile=None):
if df.empty:
if df.empty: # pragma: no cover
return 0
latitude = df.latitude
@@ -33,12 +33,12 @@ def time_in_path(df,p,maxmin='max',getall=False,name='unknown',logfile=None):
if maxmin=='max':
b = (~df['inpolygon']).shift(-1)+df['inpolygon']
else:
else: # pragma: no cover
b = (~df['inpolygon']).shift(1)+df['inpolygon']
if len(df[b==2]):
if logfile is not None:
if logfile is not None: # pragma: no cover
t = time.localtime()
timestamp = bytes('{t}'.format(t=time.strftime('%b-%d-%Y_%H%M', t)),'utf-8')
with open(logfile,'ab') as f:
@@ -57,12 +57,12 @@ def time_in_path(df,p,maxmin='max',getall=False,name='unknown',logfile=None):
f.write(b' passes found')
else:
f.write(b' pass found')
if getall:
if getall: # pragma: no cover
return df[b==2]['time'],df[b==2]['cum_dist']
else:
return df[b==2]['time'].min(),df[b==2]['cum_dist'].min()
if logfile is not None:
if logfile is not None: # pragma: no cover
t = time.localtime()
timestamp = bytes('{t}'.format(t=time.strftime('%b-%d-%Y_%H%M', t)),'utf-8')
with open(logfile,'ab') as f:
@@ -78,10 +78,10 @@ def time_in_path(df,p,maxmin='max',getall=False,name='unknown',logfile=None):
f.write(bytes(str(len(df[b==2])),'utf-8'))
f.write(b' ')
f.write(b' pass not found')
raise InvalidTrajectoryError("Trajectory doesn't go through path")
raise InvalidTrajectoryError("Trajectory doesn't go through path") # pragma: no cover
return 0
return 0 # pragma: no cover
def coursetime_first(data,paths,polygons=[],logfile=None):
@@ -97,7 +97,7 @@ def coursetime_first(data,paths,polygons=[],logfile=None):
try:
entrytime,entrydistance = time_in_path(data,paths[0],maxmin='max',name=polygons[0][1],logfile=logfile)
coursecompleted = True
except InvalidTrajectoryError:
except InvalidTrajectoryError: # pragma: no cover
entrytime = data['time'].max()
entrydistance = data['cum_dist'].max()
coursecompleted = False
@@ -113,8 +113,8 @@ def coursetime_paths(data,paths,finalmaxmin='min',polygons=[],logfile=None):
polygons = [(0,str(i)) for i in range(len(paths))]
# corner case - empty list of paths
if len(paths) == 0:
return 0,True
if len(paths) == 0: # pragma: no cover
return 0,0,True
# end - just the Finish polygon
if len(paths) == 1:
@@ -124,7 +124,7 @@ def coursetime_paths(data,paths,finalmaxmin='min',polygons=[],logfile=None):
entrydistance
) = time_in_path(data,paths[0],maxmin=finalmaxmin,name = polygons[0][1],logfile=logfile)
coursecompleted = True
except InvalidTrajectoryError:
except InvalidTrajectoryError: # pragma: no cover
entrytime = data['time'].max()
entrydistance = data['cum_dist'].max()
coursecompleted = False
@@ -142,9 +142,9 @@ def coursetime_paths(data,paths,finalmaxmin='min',polygons=[],logfile=None):
coursecompleted
) = coursetime_paths(data,paths[1:],polygons=polygons[1:],logfile=logfile)
return time+timenext, dist+distnext,coursecompleted
except InvalidTrajectoryError:
except InvalidTrajectoryError: # pragma: no cover
entrytime = data['time'].max()
entrydistance = data['cum_dist'].max()
coursecompleted = False
return entrytime, entrydistance, coursecompleted
return entrytime, entrydistance, coursecompleted # pragma: no cover

View File

@@ -1210,7 +1210,7 @@ def fetchcp_new(rower,workouts):
try:
df = df[df['cp'] == df.groupby(['delta'])['cp'].transform('max')]
except KeyError: # pragma: no cover
pd.Series(),pd.Series(),0,pd.Series(),pd.Series()
return pd.Series(),pd.Series(),0,pd.Series(),pd.Series()
df = df.sort_values(['delta']).reset_index()

View File

@@ -27,7 +27,7 @@ rpetotss = {
10:140,
}
def updatecp(delta,cpvalues,r,workouttype='water'):
def updatecp(delta,cpvalues,r,workouttype='water'): # pragma: no cover
if workouttype in otwtypes:
p0 = r.p0
p1 = r.p1
@@ -88,7 +88,7 @@ def cpfit(powerdf,fraclimit=0.0001,nmax=1000):
if len(thesecs)>=4:
try:
p1, success = optimize.leastsq(errfunc, p0[:], args = (thesecs,theavpower))
except:
except: # pragma: no cover
factor = fitfunc(p0,thesecs.mean())/theavpower.mean()
p1 = [p0[0]/factor,p0[1]/factor,p0[2],p0[3]]
@@ -132,7 +132,7 @@ def getlogarr(maxt):
for la in logarr:
try:
v = 5+int(10.**(la))
except ValueError:
except ValueError: # pragma: no cover
v = 0
res.append(v)
@@ -142,7 +142,7 @@ def getlogarr(maxt):
logarr = logarr.values
return logarr
def getsinglecp(df):
def getsinglecp(df): # pragma: no cover
thesecs = df['TimeStamp (sec)'].max()-df['TimeStamp (sec)'].min()
if thesecs != 0:
maxt = 1.05*thesecs
@@ -164,12 +164,12 @@ def getsinglecp(df):
return delta,cpvalue,avgpower
def getcp_new(dfgrouped,logarr):
def getcp_new(dfgrouped,logarr): # pragma: no cover
delta = []
cpvalue = []
avgpower = {}
print(dfgrouped)
#print(dfgrouped)
for id, group in dfgrouped:
@@ -307,7 +307,7 @@ def getcp(dfgrouped,logarr):
try:
avgpower[id] = int(ww.mean())
except ValueError:
except ValueError: # pragma: no cover
avgpower[id] = '---'
if not np.isnan(ww.mean()):
length = len(ww)
@@ -375,7 +375,7 @@ def getmaxwattinterval(tt,ww,i):
except KeyError:
wmax = 0
deltat = 0
else:
else: # pragma: no cover
wmax = 0
deltat = 0
@@ -386,10 +386,10 @@ def getfastest(df,thevalue,mode='distance'):
dd = df['cumdist'].copy()
tmax = tt.max()
if mode == 'distance':
if mode == 'distance': # pragma: no cover
if dd.max() < thevalue:
return 0
else:
else: # pragma: no cover
if tt.max() < thevalue:
return 0
@@ -463,7 +463,7 @@ def getfastest(df,thevalue,mode='distance'):
endtime = starttime+duration
#print(duration,starttime,endtime,'aa')
return duration[0]/1000.,starttime[0]/1000.,endtime[0]/1000.
else:
else: # pragma: no cover
distance = griddata(restime,distance,[thevalue*60*1000],method='linear',rescale=True)
starttime = griddata(restime,starttimes,[thevalue*60*1000],method='linear',rescale=True)
duration = griddata(restime,restime,[thevalue*60*1000],method='linear',rescale=True)
@@ -471,4 +471,4 @@ def getfastest(df,thevalue,mode='distance'):
print(distance,starttime,endtime )
return distance[0],starttime[0]/1000.,endtime[0]/1000.
return 0
return 0 # pragma: no cover

View File

@@ -43,7 +43,7 @@ TEST_CLIENT_SECRET = "aapnootmies"
TEST_REDIRECT_URI = "http://localhost:8000/rowers/test_callback"
def custom_exception_handler(exc,message):
def custom_exception_handler(exc,message): # pragma: no cover
response = {
"errors": [
@@ -60,7 +60,7 @@ def custom_exception_handler(exc,message):
return res
def do_refresh_token(refreshtoken):
def do_refresh_token(refreshtoken): # pragma: no cover
client_auth = requests.auth.HTTPBasicAuth(TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "refresh_token",
"client_secret": TEST_CLIENT_SECRET,
@@ -88,7 +88,7 @@ def do_refresh_token(refreshtoken):
return [thetoken,expires_in,refresh_token]
def get_token(code):
def get_token(code): # pragma: no cover
client_auth = requests.auth.HTTPBasicAuth(TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "authorization_code",
"code": code,
@@ -114,7 +114,7 @@ def get_token(code):
return [thetoken,expires_in,refresh_token]
def make_authorization_url(request):
def make_authorization_url(request): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
from uuid import uuid4
@@ -133,7 +133,7 @@ def make_authorization_url(request):
return HttpResponseRedirect(url)
def rower_ownapi_token_refresh(user):
def rower_ownapi_token_refresh(user): # pragma: no cover
r = Rower.objects.get(user=user)
res = do_refresh_token(r.ownapirefreshtoken)
access_token = res[0]
@@ -149,7 +149,7 @@ def rower_ownapi_token_refresh(user):
r.save()
return r.ownapitoken
def get_ownapi_workout_list(user):
def get_ownapi_workout_list(user): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
s = "Token doesn't exist. Need to authorize"
@@ -169,7 +169,7 @@ def get_ownapi_workout_list(user):
return s
def get_ownapi_workout(user,ownapiid):
def get_ownapi_workout(user,ownapiid): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
return custom_exception_handler(401,s)
@@ -188,7 +188,7 @@ def get_ownapi_workout(user,ownapiid):
return s
def createownapiworkoutdata(w):
def createownapiworkoutdata(w): # pragma: no cover
filename = w.csvfilename
row = rowingdata(csvfile=filename)
averagehr = int(row.df[' HRCur (bpm)'].mean())
@@ -277,7 +277,7 @@ def createownapiworkoutdata(w):
return data
def getidfromresponse(response):
def getidfromresponse(response): # pragma: no cover
t = json.loads(response.text)
uri = t['uris'][0]
id = uri[len(uri)-13:len(uri)-5]

View File

@@ -62,7 +62,7 @@ from rowers.utils import NoTokenError, custom_exception_handler
import rowers.mytypes as mytypes
# Exchange access code for long-lived access token
def get_token(code):
def get_token(code): # pragma: no cover
post_data = {"grant_type": "authorization_code",
"code": code,
@@ -130,7 +130,7 @@ def get_polar_notifications():
try:
response = requests.get(url, headers=headers)
except ConnectionError:
except ConnectionError: # pragma: no cover
response = {
'status_code':400,
}
@@ -170,10 +170,10 @@ def get_polar_workouts(user):
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
else:
else: # pragma: no cover
authorizationstring = str('Bearer ' + r.polartoken)
headers = {'Authorization':authorizationstring,
'Accept': 'application/json'}
@@ -247,7 +247,7 @@ def get_polar_workouts(user):
return exercise_list
def get_polar_user_info(user,physical=False):
def get_polar_user_info(user,physical=False): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
@@ -285,7 +285,7 @@ def get_polar_user_info(user,physical=False):
return response
def get_polar_workout(user,id,transactionid):
def get_polar_workout(user,id,transactionid): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):

View File

@@ -14,21 +14,21 @@ import uuid
from django.core.exceptions import ValidationError
def format_pace_tick(x,pos=None):
def format_pace_tick(x,pos=None): # pragma: no cover
minu=int(x/60)
sec=int(x-minu*60.)
sec_str=str(sec).zfill(2)
template='%d:%s'
return template % (minu,sec_str)
def format_time_tick(x,pos=None):
def format_time_tick(x,pos=None): # pragma: no cover
hour=int(x/3600)
min=int((x-hour*3600.)/60)
min_str=str(min).zfill(2)
template='%d:%s'
return template % (hour,min_str)
def format_pace(x,pos=None):
def format_pace(x,pos=None): # pragma: no cover
if isinf(x) or isnan(x):
x=0
@@ -42,7 +42,7 @@ def format_pace(x,pos=None):
return str1
def format_time(x,pos=None):
def format_time(x,pos=None): # pragma: no cover
min = int(x/60.)
@@ -60,7 +60,7 @@ def validate_image_extension(value):
ext = os.path.splitext(value.name)[1].lower()
valid_extension = ['.jpg','.jpeg','.png','.gif']
if not ext in valid_extension:
if not ext in valid_extension: # pragma: no cover
raise ValidationError(u'File not supported')
def validate_file_extension(value):
@@ -69,25 +69,25 @@ def validate_file_extension(value):
valid_extensions = ['.tcx','.csv','.TCX','.gpx','.GPX',
'.CSV','.fit','.FIT','.zip','.ZIP',
'.gz','.GZ','.xls']
if not ext in valid_extensions:
if not ext in valid_extensions: # pragma: no cover
raise ValidationError(u'File not supported!')
def must_be_csv(value):
import os
ext = os.path.splitext(value.name)[1]
valid_extensions = ['.csv','.CSV']
if not ext in valid_extensions:
if not ext in valid_extensions: # pragma: no cover
raise ValidationError(u'File not supported!')
def validate_kml(value):
import os
ext = os.path.splitext(value.name)[1]
valid_extensions = ['.kml','.KML']
if not ext in valid_extensions:
if not ext in valid_extensions: # pragma: no cover
raise ValidationError(u'File not supported!')
def handle_uploaded_image(i):
def handle_uploaded_image(i): # pragma: no cover
from io import StringIO, BytesIO
from PIL import Image, ImageOps, ExifTags
import os
@@ -121,7 +121,7 @@ def handle_uploaded_image(i):
try:
if exif[orientation] == 3:
mage=image.rotate(180, expand=True)
image=image.rotate(180, expand=True)
elif exif[orientation] == 6:
image=image.rotate(270, expand=True)
elif exif[orientation] == 8:

View File

@@ -61,11 +61,11 @@ def rp3_open(user):
return imports_open(user, oauth_data)
# Refresh ST token using refresh token
def do_refresh_token(refreshtoken):
def do_refresh_token(refreshtoken): # pragma: no cover
return imports_do_refresh_token(refreshtoken, oauth_data)
# Exchange access code for long-lived access token
def get_token(code):
def get_token(code): # pragma: no cover
client_auth = requests.auth.HTTPBasicAuth(RP3_CLIENT_KEY, RP3_CLIENT_SECRET)
post_data = {
"client_id":RP3_CLIENT_KEY,
@@ -124,7 +124,7 @@ def get_rp3_workout_list(user):
return response
def get_rp3_workouts(rower,do_async=True):
def get_rp3_workouts(rower,do_async=True): # pragma: no cover
try:
auth_token = rp3_open(rower.user)
except NoTokenError:
@@ -160,7 +160,7 @@ def get_rp3_workouts(rower,do_async=True):
return 1
def download_rp3_file(url,auth_token,filename):
def download_rp3_file(url,auth_token,filename): # pragma: no cover
headers = {'Authorization': 'Bearer ' + auth_token }
res = requests.get(url,headers=headers)
@@ -171,7 +171,7 @@ def download_rp3_file(url,auth_token,filename):
return res.status_code
def get_rp3_workout_token(workout_id,auth_token,waittime=3,max_attempts=20):
def get_rp3_workout_token(workout_id,auth_token,waittime=3,max_attempts=20): # pragma: no cover
headers = {'Authorization': 'Bearer ' + auth_token }
get_download_link = """{
@@ -211,7 +211,7 @@ def get_rp3_workout_token(workout_id,auth_token,waittime=3,max_attempts=20):
return download_url
def get_rp3_workout_link(user,workout_id,waittime=3,max_attempts=20):
def get_rp3_workout_link(user,workout_id,waittime=3,max_attempts=20): # pragma: no cover
r = Rower.objects.get(user=user)
auth_token = rp3_open(user)

View File

@@ -151,7 +151,7 @@ class PlannedSessionSerializer(serializers.ModelSerializer):
'fitfile'
)
def create(self, validated_data):
def create(self, validated_data): # pragma: no cover
if self.context['request'].user.is_authenticated:
r = Rower.objects.get(user=self.context['request'].user)
else:
@@ -206,7 +206,7 @@ class WorkoutSerializer(serializers.ModelSerializer):
'rankingpiece'
)
def create(self, validated_data):
def create(self, validated_data): # pragma: no cover
if self.context['request'].user.is_authenticated:
r = Rower.objects.get(user=self.context['request'].user)
else:
@@ -263,7 +263,7 @@ class StrokeDataSerializer(serializers.Serializer):
workoutid = serializers.IntegerField
strokedata = serializers.JSONField
def create(self, workoutid, strokedata):
def create(self, workoutid, strokedata): # pragma: no cover
"""
Create and enter a new set of stroke data into the DB
"""
@@ -307,7 +307,7 @@ class GeoCourseSerializer(serializers.ModelSerializer):
'polygons',
)
def update(self, instance, validated_data):
def update(self, instance, validated_data): # pragma: no cover
instance.name = validated_data.get('name',instance.name)
instance.country = validated_data.get('country',instance.country)
instance.notes = validated_data.get('notes',instance.notes)

View File

@@ -55,11 +55,11 @@ def get_token(code):
return imports_get_token(code,oauth_data)
# Make authorization URL including random string
def make_authorization_url(request):
def make_authorization_url(request): # pragma: no cover
return imports_make_authorization_url(oauth_data)
# This is token refresh. Looks for tokens in our database, then refreshes
def rower_sporttracks_token_refresh(user):
def rower_sporttracks_token_refresh(user): # pragma: no cover
r = Rower.objects.get(user=user)
res = do_refresh_token(r.sporttracksrefreshtoken)
access_token = res[0]
@@ -82,7 +82,7 @@ def get_sporttracks_workout_list(user):
if (r.sporttrackstoken == '') or (r.sporttrackstoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.sporttrackstokenexpirydate):
elif (timezone.now()>r.sporttrackstokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
@@ -99,10 +99,10 @@ def get_sporttracks_workout_list(user):
# Get workout summary data by SportTracks ID
def get_workout(user,sporttracksid,do_async=False):
r = Rower.objects.get(user=user)
if (r.sporttrackstoken == '') or (r.sporttrackstoken is None):
if (r.sporttrackstoken == '') or (r.sporttrackstoken is None): # pragma: no cover
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
elif (timezone.now()>r.sporttrackstokenexpirydate):
elif (timezone.now()>r.sporttrackstokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
@@ -129,13 +129,13 @@ def createsporttracksworkoutdata(w):
filename = w.csvfilename
try:
row = rowingdata(csvfile=filename)
except:
except: # pragma: no cover
return 0
try:
averagehr = int(row.df[' HRCur (bpm)'].mean())
maxhr = int(row.df[' HRCur (bpm)'].max())
except KeyError:
except KeyError: # pragma: no cover
averagehr = 0
maxhr = 0
@@ -162,7 +162,7 @@ def createsporttracksworkoutdata(w):
try:
lat = row.df[' latitude'].values
lon = row.df[' longitude'].values
if not lat.std() and not lon.std():
if not lat.std() and not lon.std(): # pragma: no cover
haslatlon = 0
except KeyError:
haslatlon = 0
@@ -171,7 +171,7 @@ def createsporttracksworkoutdata(w):
haspower = 1
try:
power = row.df[' Power (watts)'].astype(int).values
except KeyError:
except KeyError: # pragma: no cover
haspower = 0
locdata = []
@@ -259,13 +259,13 @@ def getidfromresponse(response):
return int(id)
def default(o):
def default(o): # pragma: no cover
if isinstance(o, numpy.int64): return int(o)
raise TypeError
def workout_sporttracks_upload(user,w,asynchron=False):
def workout_sporttracks_upload(user,w,asynchron=False): # pragma: no cover
message = "Uploading to SportTracks"
stid = 0
# ready to upload. Hurray
@@ -325,7 +325,7 @@ def add_workout_from_data(user,importid,data,strokedata,source='sporttracks',
workoutsource='sporttracks'):
try:
workouttype = data['type']
except KeyError:
except KeyError: # pragma: no cover
workouttype = 'other'
if workouttype not in [x[0] for x in Workout.workouttypes]:
@@ -339,7 +339,7 @@ def add_workout_from_data(user,importid,data,strokedata,source='sporttracks',
r = Rower.objects.get(user=user)
try:
rowdatetime = iso8601.parse_date(data['start_time'])
except iso8601.ParseError:
except iso8601.ParseError: # pragma: no cover
try:
rowdatetime = datetime.datetime.strptime(data['start_time'],"%Y-%m-%d %H:%M:%S")
rowdatetime = thetimezone.localize(rowdatetime).astimezone(utc)
@@ -354,14 +354,14 @@ def add_workout_from_data(user,importid,data,strokedata,source='sporttracks',
try:
title = data['name']
except:
except: # pragma: no cover
title = "Imported data"
try:
res = splitstdata(data['distance'])
distance = res[1]
times_distance = res[0]
except KeyError:
except KeyError: # pragma: no cover
try:
res = splitstdata(data['heartrate'])
times_distance = res[0]
@@ -388,14 +388,14 @@ def add_workout_from_data(user,importid,data,strokedata,source='sporttracks',
times_location = times_distance
latcoord = np.zeros(len(times_distance))
loncoord = np.zeros(len(times_distance))
if workouttype in mytypes.otwtypes:
if workouttype in mytypes.otwtypes: # pragma: no cover
workouttype = 'rower'
try:
res = splitstdata(data['cadence'])
times_spm = res[0]
spm = res[1]
except KeyError:
except KeyError: # pragma: no cover
times_spm = times_distance
spm = 0*times_distance

View File

@@ -533,7 +533,7 @@
"time": [3200, 6700, 10099],
"spm": [16.4, 21.2, 19.8],
"pace": [155068, 144402, 138830],
"power": [84,6, 117.2, 141.3],
"power": [84.6, 117.2, 141.3],
"hr": [85, 91, 95]
}
</pre>
@@ -586,7 +586,7 @@
</td>
<td>GET, POST</td>
<td>
<pre>[
<pre>{
"data": [
{
"time": 3200.0000476837,
@@ -613,7 +613,7 @@
"spm": 19.8095238095
}
]
]
}
</pre>
You can only post stroke data to an existing workout with
workout number {id}. If the workout already has stroke data, you

185
rowers/tests/test_api.py Normal file
View File

@@ -0,0 +1,185 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from django.db import transaction
#from __future__ import print_function
from .statements import *
nu = datetime.datetime.now()
import rowers
from rowers import dataprep
from rowers import tasks
from rowers import c2stuff
from rowers import stravastuff
import urllib
import json
import pandas as pd
from rowers.opaque import encoder
from rest_framework.test import APIRequestFactory, force_authenticate
import json
from rowers.ownapistuff import *
from rowers.views.apiviews import *
class OwnApi(TestCase):
def setUp(self):
self.u = UserFactory()
self.r = Rower.objects.create(user=self.u,
birthdate=faker.profile()['birthdate'],
gdproptin=True,surveydone=True,
gdproptindate=timezone.now(),
rowerplan='coach',subscription_id=1)
workoutsbox = Mailbox.objects.create(name='workouts')
workoutsbox.save()
failbox = Mailbox.objects.create(name='Failed')
failbox.save()
self.c = Client()
self.user_workouts = WorkoutFactory.create_batch(5, user=self.r)
self.factory = RequestFactory()
self.password = faker.word()
self.u.set_password(self.password)
self.u.save()
self.factory = APIRequestFactory()
def test_strokedataform(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
w = self.user_workouts[0]
url = reverse('strokedataform',kwargs={'id':encoder.encode_hex(w.id)})
response = self.c.get(url)
self.assertEqual(response.status_code,200)
url = reverse('strokedatajson',kwargs={'id':w.id})
request = self.factory.get(url)
request.user = self.u
force_authenticate(request, user=self.u)
response = strokedatajson(request,id=w.id)
self.assertEqual(response.status_code,200)
# response must be json
strokedata = json.loads(response.content)
df = pd.DataFrame(strokedata)
self.assertFalse(df.empty)
form_data = {
"distance": [23, 46, 48],
"time": [3200, 6700, 10099],
"spm": [16.4, 21.2, 19.8],
"pace": [155068, 144402, 138830],
"power": [84.6, 117.2, 141.3],
"hr": [85, 91, 95]
}
result = get_random_file(filename='rowers/tests/testdata/thyro.csv')
w2 = Workout.objects.create(
user=self.r,
csvfilename=result['filename'],
duration=result['duration'],
startdatetime=result['startdatetime'],
workouttype='water',
starttime=result['starttime'],
)
url = reverse('strokedatajson',kwargs={'id':w2.id})
request = self.factory.post(url,{'strokedata':form_data},format='json')
request.user = self.u
request.data = json.dumps({'strokedata':form_data})
strokedata = json.loads(request.data)['strokedata']
force_authenticate(request, user=self.u)
with patch('rowers.dataprep.getrowdata_db') as mock_getrowdata:
mock_getrowdata.return_value = (pd.DataFrame(),None)
response = strokedatajson(request,id=w.id)
self.assertEqual(response.status_code,201)
def test_strokedataform_v2(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
w = self.user_workouts[0]
url = reverse('strokedataform_v2',kwargs={'id':encoder.encode_hex(w.id)})
response = self.c.get(url)
self.assertEqual(response.status_code,200)
url = reverse('strokedatajson_v2',kwargs={'id':w.id})
request = self.factory.get(url)
request.user = self.u
force_authenticate(request, user=self.u)
response = strokedatajson_v2(request,id=w.id)
self.assertEqual(response.status_code,200)
# response must be json
strokedata = json.loads(response.content)
df = pd.DataFrame(strokedata)
self.assertFalse(df.empty)
form_data = {
"data": [
{
"time": 3200.0000476837,
"pace": 155068.4885951763,
"hr": 85.7857142857,
"power": 84.6531131591,
"distance": 23,
"spm": 16.380952381
},
{
"time": 6700.0000476837,
"pace" : 144402.6407586741,
"hr": 91.2142857143,
"power": 117.458827834,
"distance": 36,
"spm": 21.1666666667
},
{
"time": 10099.9999046326,
"pace": 138830.8712654931,
"hr": 95.7142857143,
"power": 141.31057207,
"distance": 48,
"spm": 19.8095238095
}
]
}
result = get_random_file(filename='rowers/tests/testdata/thyro.csv')
w2 = Workout.objects.create(
user=self.r,
csvfilename=result['filename'],
duration=result['duration'],
startdatetime=result['startdatetime'],
workouttype='water',
starttime=result['starttime'],
)
url = reverse('strokedatajson_v2',kwargs={'id':w2.id})
request = self.factory.post(url,form_data,format='json')
request.user = self.u
request.data = json.dumps(form_data)
force_authenticate(request, user=self.u)
with patch('rowers.dataprep.getrowdata_db') as mock_getrowdata:
mock_getrowdata.return_value = (pd.DataFrame(),None)
response = strokedatajson_v2(request,id=w.id)
self.assertEqual(response.status_code,200)

View File

@@ -4,6 +4,122 @@ from __future__ import print_function
from __future__ import unicode_literals
from .statements import *
import rowers.courses as courses
import rowers.dataprep as dataprep
from rowers.courseutils import *
from rowingdata import rowingdata as rdata
from rowers.models import polygon_to_path
class CourseUnitTest(TestCase):
def setUp(self):
self.c = Client()
self.u = User.objects.create_user('john',
'sander@ds.ds',
'koeinsloot')
self.r = Rower.objects.create(user=self.u,gdproptin=True,surveydone=True,
gdproptindate=timezone.now(),
rowerplan='coach',
)
self.nu = datetime.datetime.now()
cs = courses.kmltocourse('rowers/tests/testdata/thyro.kml')
course = cs[0]
cname = course['name']
cnotes = course['description']
self.polygons = course['polygons']
pstart = self.polygons[0]
self.ThyroBaantje = courses.createcourse(self.r,cname,self.polygons,notes=cnotes)
self.start = GeoPolygon.objects.filter(course=self.ThyroBaantje,order_in_course=0)[0]
self.ThyroBaantje.save()
result = get_random_file(filename='rowers/tests/testdata/thyro.csv')
self.wthyro = WorkoutFactory(user=self.r,
csvfilename=result['filename'],
starttime=result['starttime'],
startdatetime=result['startdatetime'],
duration=result['duration'],
distance=result['totaldist'],
workouttype = 'water',
)
self.wthyro.startdatetime = arrow.get(self.nu).datetime
self.wthyro.date = self.nu.date()
self.wthyro.save()
def test_time_in_path(self):
row = rdata(csvfile='rowers/tests/testdata/thyro.csv')
time = row.df['TimeStamp (sec)']
lat = row.df[' latitude']
lon = row.df[' longitude']
cum_dist = row.df['cum_dist']
data = pd.DataFrame(
{
'time':time,
'latitude':lat,
'longitude':lon,
'cum_dist':cum_dist,
}
)
startpath = polygon_to_path(self.start)
mintime,mindist = time_in_path(data,startpath)
self.assertEqual(mintime,78)
self.assertEqual(mindist,207.1)
def test_coursetime_first(self):
row = rdata(csvfile='rowers/tests/testdata/thyro.csv')
time = row.df['TimeStamp (sec)']
lat = row.df[' latitude']
lon = row.df[' longitude']
cum_dist = row.df['cum_dist']
data = pd.DataFrame(
{
'time':time,
'latitude':lat,
'longitude':lon,
'cum_dist':cum_dist,
}
)
paths = []
polygons = GeoPolygon.objects.filter(course=self.ThyroBaantje).order_by("order_in_course")
for p in polygons:
paths.append(polygon_to_path(p))
entrytime,entrydistance,coursecompleted = coursetime_first(data,paths)
self.assertEqual(entrytime,78)
self.assertEqual(entrydistance,207.1)
self.assertTrue(coursecompleted)
def test_coursetime_paths(self):
row = rdata(csvfile='rowers/tests/testdata/thyro.csv')
time = row.df['TimeStamp (sec)']
lat = row.df[' latitude']
lon = row.df[' longitude']
cum_dist = row.df['cum_dist']
data = pd.DataFrame(
{
'time':time,
'latitude':lat,
'longitude':lon,
'cum_dist':cum_dist,
}
)
paths = []
polygons = GeoPolygon.objects.filter(course=self.ThyroBaantje).order_by("order_in_course")
for p in polygons:
paths.append(polygon_to_path(p))
entrytime,entrydistance,coursecompleted = coursetime_paths(data,paths)
self.assertEqual(entrytime,435)
self.assertEqual(entrydistance,1348.8)
self.assertTrue(coursecompleted)
class CoursesTest(TestCase):
def setUp(self):

1081
rowers/tests/testdata/thyro2.csv vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -47,7 +47,7 @@ def get_token(code):
# Make authorization URL including random string
def make_authorization_url(request):
return imports_make_authorization_url(oauth_data)
return imports_make_authorization_url(oauth_data) # pragma: no cover
# Get list of workouts available on Underarmour
def get_underarmour_workout_list(user):
@@ -72,7 +72,7 @@ def get_underarmour_workout_list(user):
# Get workout summary data by Underarmour ID
def get_workout(user,underarmourid,do_async=False):
r = Rower.objects.get(user=user)
if (r.underarmourtoken == '') or (r.underarmourtoken is None):
if (r.underarmourtoken == '') or (r.underarmourtoken is None): # pragma: no cover
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
else:
@@ -99,7 +99,7 @@ def createunderarmourworkoutdata(w):
filename = w.csvfilename
try:
row = rowingdata(csvfile=filename)
except:
except: # pragma: no cover
return 0
st = w.startdatetime.astimezone(pytz.timezone(w.timezone))
@@ -151,7 +151,7 @@ def createunderarmourworkoutdata(w):
haslatlon=1
try:
try: # pragma: no cover
lat = row.df[' latitude']
lon = row.df[' longitude']
if not lat.std() and not lon.std():
@@ -161,7 +161,7 @@ def createunderarmourworkoutdata(w):
# path data
if haslatlon:
if haslatlon: # pragma: no cover
locdata = []
for e in zip(t,lat.values,lon.values):
point = {
@@ -248,7 +248,7 @@ def getidfromresponse(response):
return int(id)
def refresh_ua_actlist(user):
def refresh_ua_actlist(user): # pragma: no cover
r = Rower.objects.get(user=user)
authorizationstring = str('Bearer ' + r.underarmourtoken)
headers = {'Authorization': authorizationstring,
@@ -312,17 +312,17 @@ def get_userid(access_token):
try:
res = me_json['id']
except KeyError:
except KeyError: # pragma: no cover
res = 0
return res
def default(o):
def default(o): # pragma: no cover
if isinstance(o, numpy.int64): return int(o)
raise TypeError
def workout_ua_upload(user,w):
def workout_ua_upload(user,w): # pragma: no cover
message = "Uploading to MapMyFitness"
uaid = 0
@@ -383,7 +383,7 @@ def add_workout_from_data(user,importid,data,strokedata,
try:
comments = data['notes']
except:
except: # pragma: no cover
comments = ''
try:
@@ -394,7 +394,7 @@ def add_workout_from_data(user,importid,data,strokedata,
r = Rower.objects.get(user=user)
try:
rowdatetime = iso8601.parse_date(data['start_datetime'])
except iso8601.ParseError:
except iso8601.ParseError: # pragma: no cover
try:
rowdatetime = datetime.strptime(data['start_datetime'],"%Y-%m-%d %H:%M:%S")
rowdatetime = thetimezone.localize(rowdatetime).astimezone(utc)
@@ -410,7 +410,7 @@ def add_workout_from_data(user,importid,data,strokedata,
try:
title = data['name']
except:
except: # pragma: no cover
title = "Imported data"
timeseries = data['time_series']
@@ -421,7 +421,7 @@ def add_workout_from_data(user,importid,data,strokedata,
res = splituadata(timeseries['distance'])
distance = res[1]
times_distance = res[0]
except KeyError:
except KeyError: # pragma: no cover
message = "Error. No distance data"
return (0,message)
@@ -440,7 +440,7 @@ def add_workout_from_data(user,importid,data,strokedata,
lon = coord['lng']
latcoord.append(lat)
loncoord.append(lon)
except:
except: # pragma: no cover
times_location = times_distance
latcoord = np.zeros(len(times_distance))
loncoord = np.zeros(len(times_distance))
@@ -451,7 +451,7 @@ def add_workout_from_data(user,importid,data,strokedata,
res = splituadata(timeseries['cadence'])
times_spm = res[0]
spm = res[1]
except KeyError:
except KeyError: # pragma: no cover
times_spm = times_distance
spm = 0*times_distance
@@ -459,7 +459,7 @@ def add_workout_from_data(user,importid,data,strokedata,
res = splituadata(timeseries['heartrate'])
hr = res[1]
times_hr = res[0]
except KeyError:
except KeyError: # pragma: no cover
times_hr = times_distance
hr = 0*times_distance

View File

@@ -234,8 +234,10 @@ urlpatterns = [
re_path(r'^', include(router.urls)),
re_path(r'^api-docs/$', views.schema_view,name='schema_view'),
re_path(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework')),
re_path(r'^api/workouts/(?P<id>\b[0-9A-Fa-f]+\b)/strokedata/$',views.strokedatajson,name='strokedatajson'),
re_path(r'^api/v2/workouts/(?P<id>\b[0-9A-Fa-f]+\b)/strokedata/$',views.strokedatajson_v2,name='strokedatajson_v2'),
re_path(r'^api/workouts/(?P<id>\b[0-9A-Fa-f]+\b)/strokedata/$',views.strokedatajson,
name='strokedatajson'),
re_path(r'^api/v2/workouts/(?P<id>\b[0-9A-Fa-f]+\b)/strokedata/$',views.strokedatajson_v2,
name='strokedatajson_v2'),
re_path(r'^500v/$',views.error500_view,name='error500_view'),
path('502/', TemplateView.as_view(template_name='502.html'),name='502'),
path('500/', TemplateView.as_view(template_name='500.html'),name='500'),
@@ -714,8 +716,10 @@ urlpatterns = [
re_path(r'^edittarget/(?P<pk>\d+)/$',login_required(
views.TrainingTargetUpdate.as_view()),
name='trainingtarget_update_view'),
re_path(r'^workout/(?P<id>\b[0-9A-Fa-f]+\b)/test\_strokedata/$',views.strokedataform),
re_path(r'^workout/(?P<id>\b[0-9A-Fa-f]+\b)/v2/test\_strokedata/$',views.strokedataform_v2),
re_path(r'^workout/(?P<id>\b[0-9A-Fa-f]+\b)/test\_strokedata/$',views.strokedataform,
name='strokedataform'),
re_path(r'^workout/(?P<id>\b[0-9A-Fa-f]+\b)/v2/test\_strokedata/$',views.strokedataform_v2,
name='strokedataform_v2'),
re_path(r'^sessions/library/$',views.template_library_view,name="template_library_view"),
re_path(r'^sessions/teamcreate/user/(?P<userid>\d+)/$',views.plannedsession_teamcreate_view,
name='plannedsession_teamcreate_view'),

View File

@@ -6,6 +6,7 @@ from __future__ import unicode_literals
from rowers.views.statements import *
from rowers.tasks import handle_calctrimp
from rowers.mailprocessing import send_confirm
from rowers.opaque import encoder
import sys
import arrow
@@ -14,14 +15,11 @@ import arrow
@login_required()
def strokedataform(request,id=0):
try:
id=int(id)
except ValueError:
id = 0
id = encoder.decode_hex(id)
try:
w = Workout.objects.get(id=id)
except Workout.DoesNotExist:
except Workout.DoesNotExist: # pragma: no cover
raise Http404("Workout doesn't exist")
if request.method == 'GET':
@@ -33,7 +31,7 @@ def strokedataform(request,id=0):
'id':id,
'workout':w,
})
elif request.method == 'POST':
elif request.method == 'POST': # pragma: no cover
form = StrokeDataForm()
return render(request, 'strokedata_form.html',
@@ -42,19 +40,17 @@ def strokedataform(request,id=0):
'teams':get_my_teams(request.user),
'id':id,
'workout':w,
})
}) # pragma: no cover
@login_required()
def strokedataform_v2(request,id=0):
try:
id=int(id)
except ValueError:
id = 0
id = encoder.decode_hex(id)
try:
w = Workout.objects.get(id=id)
except Workout.DoesNotExist:
except Workout.DoesNotExist: # pragma: no cover
raise Http404("Workout doesn't exist")
if request.method == 'GET':
@@ -66,7 +62,7 @@ def strokedataform_v2(request,id=0):
'id':id,
'workout':w,
})
elif request.method == 'POST':
elif request.method == 'POST': # pragma: no cover
form = StrokeDataForm()
return render(request, 'strokedata_form_v2.html',
@@ -95,7 +91,7 @@ def strokedatajson_v2(request,id):
"""
row = get_object_or_404(Workout,pk=id)
if row.user != request.user.rower:
if row.user != request.user.rower: # pragma: no cover
return HttpResponse("You do not have permission to perform this action",status=403)
try:
@@ -124,7 +120,7 @@ def strokedatajson_v2(request,id):
for d in request.data['data']:
logfile.write(json.dumps(d))
logfile.write("\n")
except KeyError:
except KeyError: # pragma: no cover
try:
for d in request.data['strokedata']:
logfile.write(json.dumps(d))
@@ -134,14 +130,14 @@ def strokedatajson_v2(request,id):
except (AttributeError,TypeError):
logfile.write("No data in request\n")
checkdata, r = dataprep.getrowdata_db(id=row.id)
if not checkdata.empty:
if not checkdata.empty: # pragma: no cover
return HttpResponse("Duplicate Error",status=409)
df = pd.DataFrame()
try:
df = pd.DataFrame(request.data['data'])
except KeyError:
except KeyError: # pragma: no cover
try:
df = pd.DataFrame(request.data['strokedata'])
except:
@@ -154,7 +150,7 @@ def strokedatajson_v2(request,id):
#time, pace, distance,spm
try:
time = df['time']/1.e3
except KeyError:
except KeyError: # pragma: no cover
try:
time = df['t']/10.
except KeyError:
@@ -162,12 +158,12 @@ def strokedatajson_v2(request,id):
try:
spm = df['spm']
except KeyError:
except KeyError: # pragma: no cover
return HttpResponse("Missing spm",status=400)
try:
distance = df['distance']
except KeyError:
except KeyError: # pragma: no cover
try:
distance = df['d']/10.
except KeyError:
@@ -175,7 +171,7 @@ def strokedatajson_v2(request,id):
try:
pace = df['pace']/1.e3
except KeyError:
except KeyError: # pragma: no cover
try:
pace = df['p']/10.
except KeyError:
@@ -185,7 +181,7 @@ def strokedatajson_v2(request,id):
try:
power = df['power']
except KeyError:
except KeyError: # pragma: no cover
power = 0*time
try:
drivelength = df['drivelength']
@@ -245,7 +241,7 @@ def strokedatajson_v2(request,id):
lapidx = 0*time
try:
hr = df['hr']
except KeyError:
except KeyError: # pragma: no cover
hr = 0*df['time']
try:
@@ -331,7 +327,7 @@ def strokedatajson_v2(request,id):
isbreakthrough, ishard = dataprep.checkbreakthrough(row, r)
if r.getemailnotifications and not r.emailbounced:
if r.getemailnotifications and not r.emailbounced: # pragma: no cover
link = settings.SITE_URL+reverse(
r.defaultlandingpage,
kwargs = {
@@ -354,7 +350,7 @@ def strokedatajson_v2(request,id):
}))
#return(HttpResponse(encoder.encode_hex(row.id),status=201))
return HttpResponseNotAllowed("Method not supported")
return HttpResponseNotAllowed("Method not supported") # pragma: no cover
@@ -362,18 +358,19 @@ def strokedatajson_v2(request,id):
@login_required()
@api_view(['GET','POST'])
@permission_classes([IsAuthenticated])
def strokedatajson(request,id):
def strokedatajson(request,id=0):
"""
POST: Add Stroke data to workout
GET: Get stroke data of workout
"""
row = get_object_or_404(Workout,pk=id)
if row.user != request.user.rower:
if row.user != request.user.rower: # pragma: no cover
raise PermissionDenied("You have no access to this workout")
try:
id = int(id)
except ValueError:
except ValueError: # pragma: no cover
return HttpResponse("Not a valid workout number",status=403)
@@ -389,39 +386,46 @@ def strokedatajson(request,id):
if request.method == 'POST':
with open('apilog.log','a') as logfile:
logfile.write(str(timezone.now())+": ")
logfile.write(request.user.username+"(strokedatjson POST) \n")
logfile.write(request.user.username+"(strokedatajson POST) \n")
checkdata,r = dataprep.getrowdata_db(id=row.id)
if not checkdata.empty:
if not checkdata.empty: # pragma: no cover
return HttpResponse("Duplicate Error",status=409)
# strokedata = request.POST['strokedata']
# checking/validating and cleaning
try:
strokedata = json.loads(request.data['strokedata'])
except:
strokedata = json.loads(request.data)['strokedata']
except: # pragma: no cover
try:
s = json.dumps(request.data)
strokedata = json.loads(s)['strokedata']
except: # pragma: no cover
return HttpResponse("No JSON object could be decoded",status=400)
try:
df = pd.DataFrame(strokedata)
except ValueError: # pragma: no cover
return HttpResponse("Arrays must all be same length",status=400)
df.index = df.index.astype(int)
df.sort_index(inplace=True)
# time, hr, pace, spm, power, drivelength, distance, drivespeed, dragfactor, strokerecoverytime, averagedriveforce, peakdriveforce, lapidx
try:
time = df['time']/1.e3
except KeyError:
except KeyError: # pragma: no cover
return HttpResponse("There must be time values",status=400)
aantal = len(time)
pace = df['pace']/1.e3
if len(pace) != aantal:
if len(pace) != aantal: # pragma: no cover
return HttpResponse("Pace array has incorrect length",status=400)
distance = df['distance']
if len(distance) != aantal:
if len(distance) != aantal: # pragma: no cover
return HttpResponse("Distance array has incorrect length",status=400)
spm = df['spm']
if len(spm) != aantal:
if len(spm) != aantal: # pragma: no cover
return HttpResponse("SPM array has incorrect length",status=400)
res = dataprep.testdata(time,distance,pace,spm)
if not res:
if not res: # pragma: no cover
return HttpResponse("Data are not numerical",status=400)
power = trydf(df,aantal,'power')
@@ -503,4 +507,4 @@ def strokedatajson(request,id):
return HttpResponse(encoder.encode_hex(row.id),status=201)
#Method not supported
return HttpResponseNotAllowed("Method not supported")
return HttpResponseNotAllowed("Method not supported") # pragma: no cover