Private
Public Access
1
0

increase test coverage

This commit is contained in:
Sander Roosendaal
2021-05-21 14:55:49 +02:00
parent bc5ad4109d
commit 71fdecaf42
15 changed files with 2830 additions and 667 deletions

View File

@@ -838,25 +838,6 @@ def get_workout(user,c2id,do_async=True):
return 1
# Get stroke data belonging to C2 ID
def get_c2_workout_strokes(user,c2id):
r = Rower.objects.get(user=user)
if (r.c2token == '') or (r.c2token is None): # pragma: no cover
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
elif (timezone.now()>r.tokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.c2token)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://log.concept2.com/api/users/me/results/"+str(c2id)+"/strokes"
s = requests.get(url,headers=headers)
return s
# Get list of C2 workouts. We load only the first page,
# assuming that users don't want to import their old workouts
@@ -1028,190 +1009,3 @@ def rower_c2_token_refresh(user):
return r.c2token
else: # pragma: no cover
return None
# Create workout data from Strava or Concept2
# data and create the associated Workout object and save it
def add_workout_from_data(user,importid,data,strokedata,
source='c2',splitdata=None,
workoutsource='concept2'):
try:
workouttype = mytypes.c2mappinginv[data['type']]
except KeyError: # pragma: no cover
workouttype = 'rower'
if workouttype not in [x[0] for x in Workout.workouttypes]: # pragma: no cover
workouttype = 'other'
try:
comments = data['comments']
except: # pragma: no cover
comments = ' '
try:
thetimezone = pytz.timezone(data['timezone'])
except UnknownTimeZoneError:
thetimezone = 'UTC'
r = Rower.objects.get(user=user)
try:
rowdatetime = iso8601.parse_date(data['date_utc'])
thetimezone = 'UTC'
except KeyError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['start_date'])
rowdatetime = rowdatetime.make_aware(thetimezone)
except ParseError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['date'])
rowdatetime = rowdatetime.make_aware(thetimezone)
try:
c2intervaltype = data['workout_type']
except KeyError: # pragma: no cover
c2intervaltype = ''
try:
title = data['name']
except KeyError:
title = ""
try:
t = data['comments'].split('\n', 1)[0]
title += t[:40]
except: # pragma: no cover
title = ''
try:
comments = data['comments']
except KeyError: # pragma: no cover
comments = ''
starttimeunix = arrow.get(rowdatetime).timestamp()
res = make_cumvalues(0.1*strokedata['t'])
cum_time = res[0]
lapidx = res[1]
totaltime = data['time']/10.
starttimeunix = starttimeunix - totaltime
unixtime = cum_time+starttimeunix
# unixtime[0] = starttimeunix
seconds = 0.1*strokedata.loc[:,'t']
nr_rows = len(unixtime)
try: # pragma: no cover
latcoord = strokedata.loc[:,'lat']
loncoord = strokedata.loc[:,'lon']
except:
latcoord = np.zeros(nr_rows)
loncoord = np.zeros(nr_rows)
try:
strokelength = strokedata.loc[:,'strokelength']
except:
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata.loc[:,'d']
try:
spm = strokedata.loc[:,'spm']
except KeyError: # pragma: no cover
spm = 0*dist2
try:
hr = strokedata.loc[:,'hr']
except KeyError: # pragma: no cover
hr = 0*spm
pace = strokedata.loc[:,'p']/10.
pace = np.clip(pace,0,1e4)
pace = pace.replace(0,300)
velo = 500./pace
power = 2.8*velo**3
if workouttype in ['bike','bikeerg']: # pragma: no cover
velo = 1000./pace
pace = 500./velo
# save csv
# Create data frame with all necessary data to write to csv
df = pd.DataFrame({'TimeStamp (sec)':unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)':spm,
' HRCur (bpm)':hr,
' longitude':loncoord,
' latitude':latcoord,
' Stroke500mPace (sec/500m)':pace,
' Power (watts)':power,
' DragFactor':np.zeros(nr_rows),
' DriveLength (meters)':np.zeros(nr_rows),
' StrokeDistance (meters)':strokelength,
' DriveTime (ms)':np.zeros(nr_rows),
' StrokeRecoveryTime (ms)':np.zeros(nr_rows),
' AverageDriveForce (lbs)':np.zeros(nr_rows),
' PeakDriveForce (lbs)':np.zeros(nr_rows),
' lapIdx':lapidx,
' WorkoutState': 4,
' ElapsedTime (sec)':seconds
})
df.sort_values(by='TimeStamp (sec)',ascending=True)
timestr = strftime("%Y%m%d-%H%M%S")
# Create CSV file name and save data to CSV file
csvfilename ='media/{code}_{importid}.csv'.format(
importid=importid,
code = uuid4().hex[:16]
)
res = df.to_csv(csvfilename+'.gz',index_label='index',
compression='gzip')
# with Concept2
if source=='c2':
try:
totaldist = data['distance']
totaltime = data['time']/10.
except KeyError: # pragma: no cover
totaldist = 0
totaltime = 0
else: # pragma: no cover
totaldist = 0
totaltime = 0
id,message = dataprep.save_workout_database(
csvfilename,r,
workouttype=workouttype,
title=title,notes=comments,
workoutsource=workoutsource,
dosummary=True,dosmooth=False,
)
w = Workout.objects.get(id=id)
try:
local_tz = pytz.timezone(data['timezone'])
except UnknownTimeZoneError:
local_tz = pytz.utc
# local_tz = pytz.timezone(thetimezone)
w.startdatetime = w.startdatetime.astimezone(local_tz)
w.starttime = w.startdatetime.strftime('%H:%M:%S')
w.timezone = local_tz
w.duration = dataprep.totaltime_sec_to_string(totaltime)
w.distance = totaldist
w.save()
return id,message

View File

@@ -800,7 +800,7 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
def getpartofday(row,r):
workoutstartdatetime = row.rowdatetime
try:
try: # pragma: no cover
latavg = row.df[' latitude'].mean()
lonavg = row.df[' longitude'].mean()

View File

@@ -71,26 +71,18 @@ def splitstdata(lijst):
return [np.array(t),np.array(latlong)]
def splituadata(lijst):
t = []
y = []
for d in lijst:
t.append(d[0])
y.append(d[1])
return np.array(t),np.array(y)
def imports_open(user,oauth_data):
r = Rower.objects.get(user=user)
token = getattr(r,oauth_data['tokenname'])
try:
refreshtoken = getattr(r,oauth_data['refreshtokenname'])
except (TypeError,AttributeError,KeyError):
except (TypeError,AttributeError,KeyError): # pragma: no cover
refreshtoken = None
try:
tokenexpirydate = getattr(r,oauth_data['expirydatename'])
except (TypeError,AttributeError,KeyError):
except (TypeError,AttributeError,KeyError): # pragma: no cover
tokenexpirydate = None
@@ -243,7 +235,7 @@ def imports_get_token(
base_uri,
data=json.dumps(post_data),
headers=headers)
else:
else: # pragma: no cover
response = requests.post(
base_uri,
data=post_data,

View File

@@ -35,7 +35,7 @@ def add_workout_from_data(userid,nkid,data,strokedata,source='nk',splitdata=None
try:
userid=int(userid)
except TypeError:
except TypeError: # pragma: no cover
userid = userid.id
strokedata.to_csv(csvfilename, index_label='index', compression='gzip')
@@ -108,7 +108,7 @@ def add_workout_from_data(userid,nkid,data,strokedata,source='nk',splitdata=None
try:
workoutid = response.json()['id']
except KeyError:
except KeyError: # pragma: no cover
workoutid = 1

View File

@@ -247,7 +247,7 @@ def get_nk_workout_list(user,fake=False,after=0,before=0):
#
def get_workout(user,nkid,do_async=False):
def get_workout(user,nkid,do_async=True):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
@@ -260,79 +260,22 @@ def get_workout(user,nkid,do_async=False):
'sessionIds': nkid,
}
if do_async: # pragma: no cover
res = get_nk_workout_list(r.user)
if res.status_code != 200:
return 0
alldata = {}
for item in res.json():
alldata[item['id']] = item
res = get_nk_workout_list(r.user)
if res.status_code != 200: # pragma: no cover
return 0
alldata = {}
for item in res.json():
alldata[item['id']] = item
res = myqueue(
queuehigh,
handle_nk_async_workout,
alldata,
r.user.id,
r.nktoken,
nkid,
0,
r.defaulttimezone,
)
res = myqueue(
queuehigh,
handle_nk_async_workout,
alldata,
r.user.id,
r.nktoken,
nkid,
0,
r.defaulttimezone,
)
return {},pd.DataFrame()
authorizationstring = str('Bearer ' + r.nktoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json',
}
# get strokes
url = NK_API_LOCATION+"api/v1/sessions/strokes"
response = requests.get(url,headers=headers,params=params)
if response.status_code != 200: # pragma: no cover
# error handling and logging
return {},pd.DataFrame()
jsonData = response.json()
strokeData = jsonData[str(nkid)]
df = strokeDataToDf(strokeData)
# get workout data
after = df['timestamp'].min()
before = df['timestamp'].max()
after = arrow.get(after/1000.)
before = arrow.get(before/1000.)
after = after-timedelta(days=1)
before = before+timedelta(days=1)
before = str(int(before.timestamp())*1000)
after = str(int(after.timestamp())*1000)
url = NK_API_LOCATION+"api/v1/sessions/"
params = {
'after':after,
'before': before,
}
response = requests.get(url, headers=headers,params=params)
if response.status_code != 200: # pragma: no cover
# error handling and logging
return {},df
jsondata = response.json()
workoutdata = {}
for w in jsondata:
if str(w['id']) == str(nkid):
workoutdata = w
return workoutdata, df
return res

View File

@@ -468,162 +468,6 @@ def handle_stravaexport(f2,workoutname,stravatoken,description='',
return (res.id,message)
# Create workout data from Strava or Concept2
# data and create the associated Workout object and save it
def add_workout_from_data(user,importid,data,strokedata,
source='strava',splitdata=None,
workoutsource='strava'):
try:
workouttype = mytypes.stravamappinginv[data['type']]
except KeyError: # pragma: no cover
workouttype = 'other'
if workouttype.lower() == 'rowing': # pragma: no cover
workouttype = 'rower'
if 'summary_polyline' in data['map'] and workouttype=='rower': # pragma: no cover
workouttype = 'water'
if workouttype not in [x[0] for x in Workout.workouttypes]: # pragma: no cover
workouttype = 'other'
try:
comments = data['comments']
except:
comments = ' '
try:
thetimezone = tz(data['timezone'])
except:
thetimezone = 'UTC'
r = Rower.objects.get(user=user)
try:
rowdatetime = iso8601.parse_date(data['date_utc'])
except KeyError:
rowdatetime = iso8601.parse_date(data['start_date'])
except ParseError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['date'])
try:
intervaltype = data['workout_type']
except KeyError:
intervaltype = ''
try:
title = data['name']
except KeyError: # pragma: no cover
title = ""
try:
t = data['comments'].split('\n', 1)[0]
title += t[:20]
except:
title = ''
starttimeunix = arrow.get(rowdatetime).timestamp()
res = make_cumvalues(0.1*strokedata['t'])
cum_time = res[0]
lapidx = res[1]
unixtime = cum_time+starttimeunix
seconds = 0.1*strokedata.loc[:,'t']
nr_rows = len(unixtime)
try:
latcoord = strokedata.loc[:,'lat']
loncoord = strokedata.loc[:,'lon']
if latcoord.std() == 0 and loncoord.std() == 0 and workouttype == 'water': # pragma: no cover
workouttype = 'rower'
except: # pragma: no cover
latcoord = np.zeros(nr_rows)
loncoord = np.zeros(nr_rows)
if workouttype == 'water':
workouttype = 'rower'
try:
strokelength = strokedata.loc[:,'strokelength']
except: # pragma: no cover
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata.loc[:,'d']
try:
spm = strokedata.loc[:,'spm']
except KeyError: # pragma: no cover
spm = 0*dist2
try:
hr = strokedata.loc[:,'hr']
except KeyError: # pragma: no cover
hr = 0*spm
pace = strokedata.loc[:,'p']/10.
pace = np.clip(pace,0,1e4)
pace = pace.replace(0,300)
velo = 500./pace
try:
power = strokedata.loc[:,'power']
except KeyError: # pragma: no cover
power = 2.8*velo**3
#if power.std() == 0 and power.mean() == 0:
# power = 2.8*velo**3
# save csv
# Create data frame with all necessary data to write to csv
df = pd.DataFrame({'TimeStamp (sec)':unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)':spm,
' HRCur (bpm)':hr,
' longitude':loncoord,
' latitude':latcoord,
' Stroke500mPace (sec/500m)':pace,
' Power (watts)':power,
' DragFactor':np.zeros(nr_rows),
' DriveLength (meters)':np.zeros(nr_rows),
' StrokeDistance (meters)':strokelength,
' DriveTime (ms)':np.zeros(nr_rows),
' StrokeRecoveryTime (ms)':np.zeros(nr_rows),
' AverageDriveForce (lbs)':np.zeros(nr_rows),
' PeakDriveForce (lbs)':np.zeros(nr_rows),
' lapIdx':lapidx,
' ElapsedTime (sec)':seconds
})
df.sort_values(by='TimeStamp (sec)',ascending=True)
timestr = strftime("%Y%m%d-%H%M%S")
# Create CSV file name and save data to CSV file
csvfilename ='media/{code}_{importid}.csv'.format(
importid=importid,
code = uuid4().hex[:16]
)
res = df.to_csv(csvfilename+'.gz',index_label='index',
compression='gzip')
id,message = dataprep.save_workout_database(
csvfilename,r,
dosmooth=r.dosmooth,
workouttype=workouttype,
title=title,notes=comments,
workoutsource=workoutsource,
dosummary=True
)
return id,message
def workout_strava_upload(user,w, quick=False,asynchron=True):
try:

View File

@@ -484,21 +484,6 @@ def lookuplong(dict, key):
return s
@register.filter
def ualookup(dict, key):
s = dict.get(key)
if key=='distance':
s = int(float(s))
if key=='duration':
s = secondstotimestring(int(s))
if key=='starttime':
s = dateutil.parser.parse(s)
return s
from rowers.models import PlannedSession
@register.filter

View File

@@ -340,13 +340,13 @@ def mocked_getsmallrowdata_db_updatecp(*args, **kwargs):
return df
def mocked_getsmallrowdata_db_setcp(*args, **kwargs):
df = pd.read_csv('rowers/tests/testdata/colsfromdb.csv')
df = pd.read_csv('rowers/tests/testdata/colsfromdb2.csv')
return df
def mocked_getsmallrowdata_db_water(*args, **kwargs):
df = pd.read_csv('rowers/tests/testdata/colsfromdb.csv')
df = pd.read_csv('rowers/tests/testdata/colsfromdb3.csv')
return df

View File

@@ -502,15 +502,15 @@ class AsyncTaskTests(TestCase):
self.assertEqual(res,1)
@patch('rowers.dataprepnodjango.getsmallrowdata_db_updatecp')
def test_handle_updatecp(self,mocked_getsmallrowdata_db):
@patch('rowers.dataprepnodjango.getsmallrowdata_db')
def test_handle_updatecp(self,mocked_getsmallrowdata_db_updatecp):
rower_id = 1
workoutids = [1]
res = tasks.handle_updatecp(rower_id,workoutids)
self.assertEqual(res,1)
@patch('rowers.dataprepnodjango.getsmallrowdata_db_setcp')
def test_handle_setcp(self,mocked_getsmallrowdata_db):
@patch('rowers.dataprepnodjango.getsmallrowdata_db')
def test_handle_setcp(self,mocked_getsmallrowdata_db_db_setcp):
strokesdf = pd.read_csv('rowers/tests/testdata/uhfull.csv')
filename = 'rowers/tests/testdata/temp/pq.gz'
workoutids = 1

View File

@@ -385,39 +385,6 @@ class C2Objects(DjangoTestCase):
@patch('rowers.dataprep.create_engine')
def test_strokedata(self, mocked_sqlalchemy):
with open('rowers/tests/testdata/c2stroketestdata.txt','r') as infile:
res = json.load(infile)
strokedata = pd.DataFrame.from_dict(res['data'])
with open('rowers/tests/testdata/c2testdata.txt','r') as infile:
res = json.load(infile)
data = res['data']
from rowers.views import add_workout_from_strokedata
res = add_workout_from_strokedata(self.u,1,data,strokedata,source='c2')
@patch('rowers.dataprep.create_engine')
def test_strokedatanohr(self, mocked_sqlalchemy):
with open('rowers/tests/testdata/c2strokedatanohr.txt','r') as infile:
res = json.load(infile)
strokedata = pd.DataFrame.from_dict(res['data'])
with open('rowers/tests/testdata/c2testdata.txt','r') as infile:
res = json.load(infile)
data = res['data']
from rowers.views import add_workout_from_strokedata
res = add_workout_from_strokedata(self.u,1,data,strokedata,source='c2')
@patch('rowers.tasks.requests.get',side_effect=mocked_requests)
@patch('rowers.tasks.requests.post',side_effect=mocked_requests)
@patch('rowers.tasks.requests.session', side_effect=mocked_requests)

1399
rowers/tests/testdata/colsfromdb2.csv vendored Normal file

File diff suppressed because it is too large Load Diff

1399
rowers/tests/testdata/colsfromdb3.csv vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1244,7 +1244,7 @@ def get_timezone_from_c2data(data):
timezone = pytz.timezone(data['timezone'])
except UnknownTimeZoneError:
timezone = pytz.utc
except KeyError:
except KeyError: # pragma: no cover
timezone = pytz.utc
return timezone

View File

@@ -1755,7 +1755,7 @@ def workout_getimportview(request,externalid,source = 'c2',do_async=True):
if result: # pragma: no cover
messages.info(request,"Your workout will be imported in the background")
# this should return to the respective import list page
else:
else: # pragma: no cover
messages.error(request,'Error getting the workout')
url = reverse(importlistviews[source])

View File

@@ -1273,166 +1273,6 @@ def sendmail(request):
return HttpResponseRedirect('/rowers/email/')
# Create workout data from Strava or Concept2
# data and create the associated Workout object and save it
def add_workout_from_strokedata(user,importid,data,strokedata,
source='c2',splitdata=None,
workoutsource='concept2'):
try:
workouttype = data['type']
except KeyError: # pragma: no cover
workouttype = 'rower'
if workouttype not in [x[0] for x in Workout.workouttypes]: # pragma: no cover
workouttype = 'other'
try:
comments = data['comments']
except: # pragma: no cover
comments = ' '
# comments = "Imported data \n %s" % comments
# comments = "Imported data \n"+comments # str(comments)
try:
thetimezone = tz(data['timezone'])
except: # pragma: no cover
thetimezone = 'UTC'
r = getrower(user)
try:
rowdatetime = iso8601.parse_date(data['date_utc'])
except KeyError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['start_date'])
except ParseError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['date'])
try:
c2intervaltype = data['workout_type']
except KeyError: # pragma: no cover
c2intervaltype = ''
try:
title = data['name']
except KeyError: # pragma: no cover
title = ""
try:
t = data['comments'].split('\n', 1)[0]
title += t[:20]
except:
title = ''
starttimeunix = arrow.get(rowdatetime).timestamp()
res = make_cumvalues(0.1*strokedata['t'])
cum_time = res[0]
lapidx = res[1]
unixtime = cum_time+starttimeunix
# unixtime[0] = starttimeunix
seconds = 0.1*strokedata.loc[:,'t']
nr_rows = len(unixtime)
try: # pragma: no cover
latcoord = strokedata.loc[:,'lat']
loncoord = strokedata.loc[:,'lon']
except:
latcoord = np.zeros(nr_rows)
loncoord = np.zeros(nr_rows)
try:
strokelength = strokedata.loc[:,'strokelength']
except:
strokelength = np.zeros(nr_rows)
dist2 = 0.1*strokedata.loc[:,'d']
try:
spm = strokedata.loc[:,'spm']
except KeyError: # pragma: no cover
spm = 0*dist2
try:
hr = strokedata.loc[:,'hr']
except KeyError:
hr = 0*spm
pace = strokedata.loc[:,'p']/10.
pace = np.clip(pace,0,1e4)
pace = pace.replace(0,300)
velo = 500./pace
power = 2.8*velo**3
# save csv
# Create data frame with all necessary data to write to csv
df = pd.DataFrame({'TimeStamp (sec)':unixtime,
' Horizontal (meters)': dist2,
' Cadence (stokes/min)':spm,
' HRCur (bpm)':hr,
' longitude':loncoord,
' latitude':latcoord,
' Stroke500mPace (sec/500m)':pace,
' Power (watts)':power,
' DragFactor':np.zeros(nr_rows),
' DriveLength (meters)':np.zeros(nr_rows),
' StrokeDistance (meters)':strokelength,
' DriveTime (ms)':np.zeros(nr_rows),
' StrokeRecoveryTime (ms)':np.zeros(nr_rows),
' AverageDriveForce (lbs)':np.zeros(nr_rows),
' PeakDriveForce (lbs)':np.zeros(nr_rows),
' lapIdx':lapidx,
' ElapsedTime (sec)':seconds
})
df.sort_values(by='TimeStamp (sec)',ascending=True)
timestr = strftime("%Y%m%d-%H%M%S")
# Create CSV file name and save data to CSV file
csvfilename ='media/{code}_{importid}.csv'.format(
importid=importid,
code = uuid4().hex[:16]
)
res = df.to_csv(csvfilename+'.gz',index_label='index',
compression='gzip')
# with Concept2
if source=='c2':
try:
totaldist = data['distance']
totaltime = data['time']/10.
except KeyError: # pragma: no cover
totaldist = 0
totaltime = 0
else: # pragma: no cover
totaldist = 0
totaltime = 0
id,message = dataprep.save_workout_database(
csvfilename,r,
workouttype=workouttype,
dosmooth=r.dosmooth,
title=title,notes=comments,
# totaldist=totaldist,
# totaltime=totaltime,
workoutsource=workoutsource,
dosummary=True
)
return id,message
def keyvalue_get_default(key,options,def_options): # pragma: no cover
try: