Private
Public Access
1
0

removing imports.py and increasing coverage

This commit is contained in:
2023-09-21 16:09:48 +02:00
parent b1d4534ff2
commit ddac7ef18f
15 changed files with 177 additions and 406 deletions

View File

@@ -23,6 +23,7 @@
<div>
<a href="/boatmovers/">Ranglijst</a>
<a href="/boatmovers/faq/">F.A.Q.</a>
<a href="/boatmovers/compare/">History</a>
</div>
{% if WARNING_MESSAGE != '' %}
<p class="message">

View File

@@ -7,14 +7,13 @@ from rowers.utils import myqueue
# (There is still some stuff defined directly in views.py. Need to
# move that here.)
from rowers.imports import *
import datetime
from requests import Request, Session
import rowers.mytypes as mytypes
from rowers.mytypes import otwtypes
from rowers.rower_rules import is_workout_user, ispromember
from iso8601 import ParseError
import pandas as pd
import numpy
import json
from scipy import optimize
@@ -22,6 +21,7 @@ from json.decoder import JSONDecodeError
from pytz.exceptions import UnknownTimeZoneError
from rowers.utils import dologging
import numpy as np
from rowsandall_app.settings import (
C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET,
@@ -83,6 +83,8 @@ def getagegrouprecord(age, sex='male', weightcategory='hwt',
p0 = [700, 120, 700, 10, 100, 100]
p1, success = optimize.leastsq(errfunc, p0[:],
args=(ages, powers))
try:
p1, success = optimize.leastsq(errfunc, p0[:],
args=(ages, powers))
@@ -99,5 +101,6 @@ def getagegrouprecord(age, sex='male', weightcategory='hwt',
else:
power = 0
return power

View File

@@ -4,7 +4,6 @@ from rowers.utils import custom_exception_handler, NoTokenError
from rowingdata import rowingdata
from django.core.exceptions import PermissionDenied
from rowers.utils import myqueue
from rowers.imports import *
import datetime
import requests
@@ -20,7 +19,9 @@ from iso8601 import ParseError
from rowers.plannedsessions import ps_dict_get_description
import pandas as pd
import arrow
from rowers import dataprep
import pytz
import numpy
import json

View File

@@ -1,291 +0,0 @@
# All the functionality to connect to SportTracks
# Python
import oauth2 as oauth
import cgi
import pytz
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime
from datetime import timedelta
import arrow
import numpy as np
from dateutil import parser
import time
from time import strftime
import rowers.dataprep as dataprep
import math
from math import sin, cos, atan2, sqrt
import os
import sys
import urllib
import iso8601
from uuid import uuid4
# Django
from django.http import HttpResponseRedirect, HttpResponse, JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
# Project
# from .models import Profile
from rowingdata import rowingdata, make_cumvalues
import pandas as pd
from rowers.models import Rower, Workout, TombStone
import rowers.mytypes as mytypes
from rowsandall_app.settings import (
C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET,
STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI,
STRAVA_CLIENT_SECRET, SPORTTRACKS_CLIENT_SECRET,
SPORTTRACKS_CLIENT_ID, SPORTTRACKS_REDIRECT_URI
)
from rowers.utils import (
NoTokenError, custom_exception_handler, ewmovingaverage,
geo_distance, uniqify
)
# Splits SportTracks data which is one long sequence of
# [t,[lat,lon],t2,[lat2,lon2] ...]
# to [t,t2,t3, ...], [[lat,long],[lat2,long2],...
def splitstdata(lijst):
t = []
latlong = []
while len(lijst) >= 2:
t.append(lijst[0])
latlong.append(lijst[1])
lijst = lijst[2:]
return [np.array(t), np.array(latlong)]
# covered in integrations
def imports_open(user, oauth_data):
r = Rower.objects.get(user=user)
token = getattr(r, oauth_data['tokenname'])
try:
tokenexpirydate = getattr(r, oauth_data['expirydatename'])
except (TypeError, AttributeError, KeyError): # pragma: no cover
tokenexpirydate = None
if (token == '') or (token is None):
raise NoTokenError("User has no token")
else:
tokenname = oauth_data['tokenname']
refreshtokenname = oauth_data['refreshtokenname']
expirydatename = oauth_data['expirydatename']
if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate:
token = imports_token_refresh(
user,
tokenname,
refreshtokenname,
expirydatename,
oauth_data,
)
elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover
token = imports_token_refresh(
user,
tokenname,
refreshtokenname,
expirydatename,
oauth_data,
)
return token
# covered in integrations
# Refresh token using refresh token
def imports_do_refresh_token(refreshtoken, oauth_data, access_token=''):
# client_auth = requests.auth.HTTPBasicAuth(
# oauth_data['client_id'],
# oauth_data['client_secret']
# )
post_data = {"grant_type": "refresh_token",
"client_secret": oauth_data['client_secret'],
"client_id": oauth_data['client_id'],
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
'Accept': 'application/json',
'Content-Type': oauth_data['content_type']}
# for Strava
if 'grant_type' in oauth_data:
if oauth_data['grant_type']:
post_data['grant_type'] = oauth_data['grant_type']
if oauth_data['bearer_auth']:
headers['authorization'] = 'Bearer %s' % access_token
baseurl = oauth_data['base_url']
if 'json' in oauth_data['content_type']:
try:
response = requests.post(baseurl,
data=json.dumps(post_data),
headers=headers, verify=False)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
else:
try:
response = requests.post(baseurl,
data=post_data,
headers=headers, verify=False,
)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
else: # pragma: no cover
raise NoTokenError("User has no token")
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
raise NoTokenError("User has no token")
try:
expires_in = token_json['expires_in']
except KeyError:
try:
expires_at = arrow.get(token_json['expires_at']).timestamp()
expires_in = expires_at - arrow.now().timestamp()
except KeyError: # pragma: no cover
expires_in = 0
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = refreshtoken
try:
expires_in = int(expires_in)
except (TypeError, ValueError): # pragma: no cover
expires_in = 0
return [thetoken, expires_in, refresh_token]
# Exchange ST access code for long-lived ST access token
# implemented in integrations
def imports_get_token(
code, oauth_data
):
redirect_uri = oauth_data['redirect_uri']
client_secret = oauth_data['client_secret']
client_id = oauth_data['client_id']
base_uri = oauth_data['base_url']
# client_auth = requests.auth.HTTPBasicAuth(
# client_id, client_secret
# )
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_secret": client_secret,
"client_id": client_id,
}
try:
headers = oauth_data['headers']
except KeyError:
headers = {'Accept': 'application/json',
'Api-Key': client_id,
'Content-Type': 'application/json',
'user-agent': 'sanderroosendaal'}
if 'grant_type' in oauth_data:
if oauth_data['grant_type']:
post_data['grant_type'] = oauth_data['grant_type']
if 'strava' in oauth_data['autorization_uri']:
post_data['grant_type'] = "authorization_code"
if 'json' in oauth_data['content_type']:
response = requests.post(
base_uri,
data=json.dumps(post_data),
headers=headers)
else: # pragma: no cover
response = requests.post(
base_uri,
data=post_data,
headers=headers, verify=False)
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
return [0, 0, 0]
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = ''
try:
expires_in = token_json['expires_in']
except KeyError: # pragma: no cover
expires_in = 0
try:
expires_in = int(expires_in)
except (ValueError, TypeError): # pragma: no cover
expires_in = 0
else: # pragma: no cover
return [0, response.text, 0]
return [thetoken, expires_in, refresh_token]
# Make authorization URL including random string
def imports_make_authorization_url(oauth_data): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
params = {"client_id": oauth_data['client_id'],
"response_type": "code",
"redirect_uri": oauth_data['redirect_uri'],
"scope": oauth_data['scope'],
"state": state}
url = oauth_data['authorizaton_uri']+urllib.parse.urlencode(params)
return HttpResponseRedirect(url)
# This is token refresh. Looks for tokens in our database, then refreshes
def imports_token_refresh(user, tokenname, refreshtokenname, expirydatename, oauth_data):
r = Rower.objects.get(user=user)
refreshtoken = getattr(r, refreshtokenname)
# for Strava transition
if not refreshtoken: # pragma: no cover
refreshtoken = getattr(r, tokenname)
res = imports_do_refresh_token(refreshtoken, oauth_data)
access_token = res[0]
expires_in = res[1]
refresh_token = res[2]
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
setattr(r, tokenname, access_token)
if expirydatename is not None:
setattr(r, expirydatename, expirydatetime)
if refreshtokenname is not None:
setattr(r, refreshtokenname, refresh_token)
r.save()
return access_token

View File

@@ -10,13 +10,17 @@ from rowers.rower_rules import is_workout_user, ispromember
from rowers.utils import get_strava_stream
from rowers.utils import myqueue, dologging
from rowers.imports import *
#from rowers.imports import *
import gzip
import time
import requests
import arrow
import datetime
import os
from uuid import uuid4
from django.utils import timezone
from datetime import timedelta
from rowsandall_app.settings import (
STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET,
@@ -154,7 +158,7 @@ class StravaIntegration(SyncIntegration):
outF.write(s)
try:
os.remove(tcxfilename)
except WindowError: # pragma: no cover
except WindowsError: # pragma: no cover
pass
except FileNotFoundError:
return ''

View File

@@ -718,7 +718,7 @@ def interactive_activitychart2(workouts, startdate, enddate, stack='type', toolb
try:
rowers.append(rowers[0])
except IndexError:
except IndexError: # pragma: no cover
try:
rowers.append(str(workouts[0].user))
except IndexError:
@@ -1278,7 +1278,7 @@ def interactive_forcecurve(theworkouts, workstrokesonly=True, plottype='scatter'
plot.yaxis.axis_label = "Force (N)"
try:
plot.title.text = theworkouts[0].name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
plot.title.text_font_size = "1.0em"
@@ -1507,10 +1507,10 @@ def weightfromrecord(row,metricchoice):
vv = row[metricchoice]
if vv > 0:
return vv
if metricchoice == 'rscore':
if metricchoice == 'rscore': # pragma: no cover
return rscore_approx(row)
return 0
return 0 # pragma: no cover
def getfatigues(
@@ -1538,7 +1538,7 @@ def getfatigues(
try:
df2 = df.loc[date.date()]
if type(df2) == pd.Series:
if type(df2) == pd.Series: # pragma: no cover
weight += weightfromrecord(df2,metricchoice)
else:
for index, row in df2.iterrows():
@@ -1772,6 +1772,7 @@ def performance_chart(user, startdate=None, enddate=None, kfitness=42, kfatigue=
# make fast dict for dates / workouts
records = []
for w in workouts:
dd = {
'date':w.date,
@@ -1785,7 +1786,7 @@ def performance_chart(user, startdate=None, enddate=None, kfitness=42, kfatigue=
records.append(dd)
df = pd.DataFrame.from_records(records)
if df.empty:
if df.empty: # pragma: no cover
return ['', 'No Data', 0, 0, 0, outids]
df.set_index('date', inplace=True)
@@ -1895,7 +1896,7 @@ def performance_chart(user, startdate=None, enddate=None, kfitness=42, kfatigue=
rightaxlabel = 'Freshness'
if dofatigue: # pragma: no cover
yaxlabel = 'Fitness/Fatigue'
else:
else: # pragma: no cover
yaxlabel = 'Fitness'
if modelchoice == 'banister': # pragma: no cover
@@ -1917,7 +1918,7 @@ def performance_chart(user, startdate=None, enddate=None, kfitness=42, kfatigue=
if dofatigue: # pragma: no cover
y1rangemax = df.loc[:, ['fitness', 'fatigue']].max().max()*1.02
else:
else: # pragma: no cover
y1rangemax = df.loc[:, ['fitness']].max().max()*1.02
if doform: # pragma: no cover
@@ -2676,7 +2677,7 @@ def leaflet_chart_compare(course, workoutids, labeldict={}, startenddict={}):
pass
try:
df = pd.concat(data, axis=0)
except ValueError:
except ValueError: # pragma: no cover
df = pd.DataFrame()
latmean, lonmean, coordinates = course_coord_center(course)
@@ -2735,7 +2736,7 @@ def leaflet_chart_compare(course, workoutids, labeldict={}, startenddict={}):
try:
lat = df['lat']
lon = df['lon']
except KeyError:
except KeyError: # pragma: no cover
return [0, "invalid coordinate data"]
if lat.empty or lon.empty: # pragma: no cover
return [0, "invalid coordinate data"]
@@ -4052,7 +4053,7 @@ def interactive_windchart(id=0, promember=0):
legend_label="Tail (+)/Head (-) Wind (m/s)", color='black')
try:
plot.title.text = row.name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
# plot.title.text_font_size="1.0em"
plot.title.text_font = "1.0em"
@@ -4122,7 +4123,7 @@ def interactive_streamchart(id=0, promember=0):
plot.line(dist, vstream, legend_label="River Stream Velocity (m/s)")
try:
plot.title.text = row.name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
plot.title.text_font_size = "1.0em"
plot.xaxis.axis_label = "Distance (m)"
@@ -4134,7 +4135,7 @@ def interactive_streamchart(id=0, promember=0):
return [script, div]
def forcecurve_multi_interactive_chart(selected):
def forcecurve_multi_interactive_chart(selected): # pragma: no cover
df_plot = pd.DataFrame()
ids = [analysis.id for analysis in selected]
@@ -4248,7 +4249,7 @@ def forcecurve_multi_interactive_chart(selected):
return (script, div)
def instroke_multi_interactive_chart(selected, *args, **kwargs):
def instroke_multi_interactive_chart(selected, *args, **kwargs): # pragma: no cover
df_plot = pd.DataFrame()
ids = [analysis.id for analysis in selected]
metrics = list(set([analysis.metric for analysis in selected]))
@@ -4351,7 +4352,7 @@ def instroke_multi_interactive_chart(selected, *args, **kwargs):
def instroke_interactive_chart(df,metric, workout, spm_min, spm_max,
activeminutesmin, activeminutesmax,
individual_curves,
name='',notes=''):
name='',notes=''): # pragma: no cover
df_pos = (df+abs(df))/2.
df_min = -(-df+abs(-df))/2.
@@ -4578,7 +4579,7 @@ def interactive_chart(id=0, promember=0, intervaldata={}):
plot.line('time', 'pace', source=source, legend_label="Pace", name="pace")
try:
plot.title.text = row.name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
plot.title.text_font_size = "1.0em"
#plot.sizing_mode = 'stretch_both'
@@ -5776,7 +5777,7 @@ def interactive_flex_chart2(id, r, promember=0,
doclean=False,
workstrokesonly=False)
workstrokesonly = False
except TypeError:
except TypeError: # pragma: no cover
workstrokesonly = False
try:
_ = rowdata[yparam2]
@@ -6029,7 +6030,7 @@ def interactive_flex_chart2(id, r, promember=0,
legend_label="Constant Power")
# trendline
if trendline:
if trendline: # pragma: no cover
plot.line('x1', 'ytrend', source=source2, legend_label=yaxlabel+' (trend)')
if plottype == 'line':
@@ -6040,7 +6041,7 @@ def interactive_flex_chart2(id, r, promember=0,
try:
plot.title.text = row.name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
plot.title.text_font_size = "1.0em"
@@ -6340,7 +6341,7 @@ def thumbnails_set(r, id, favorites):
try:
rowdata.dropna(axis=1, how='all', inplace=True)
except TypeError:
except TypeError: # pragma: no cover
return [
{'script': "",
'div': "",
@@ -6844,7 +6845,7 @@ def interactive_otw_advanced_pace_chart(id=0, promember=0):
try:
plot.title.text = row.name
except ValueError:
except ValueError: # pragma: no cover
plot.title.text = ""
#plot.title.text_font_size = value("1.2em")
plot.xaxis.axis_label = "Time"

View File

@@ -1,8 +1,9 @@
from rowers.models import Rower, Workout, TombStone
from rowers import utils
import datetime
from django.utils import timezone
from datetime import timedelta
from rowers.imports import *
from rowsandall_app.settings import (
ROJABO_CLIENT_ID, ROJABO_REDIRECT_URI, ROJABO_CLIENT_SECRET,
SITE_URL, ROJABO_OAUTH_LOCATION,

View File

@@ -57,7 +57,7 @@ from redis import StrictRedis
redis_connection = StrictRedis()
def mocked_grpc(*args, **kwargs):
def mocked_grpc(*args, **kwargs): # pragma: no cover
class insecure_channel:
def __init__(*args,**kwargs):
pass
@@ -84,7 +84,7 @@ def mocked_grpc(*args, **kwargs):
def mocked_send_template_email(*args,**kwargs):
return 1
def mocked_myqueue(*args, **kwargs):
def mocked_myqueue(*args, **kwargs): # pragma: no cover
class Job:
def __init__(self,*args, **kwargs):
self.result = 1
@@ -95,7 +95,7 @@ def mocked_myqueue(*args, **kwargs):
return Job()
def mock_c2open(*args, **kwargs):
def mock_c2open(*args, **kwargs): # pragma: no cover
return('aap')
def mocked_session(*args, **kwargs):
@@ -124,7 +124,7 @@ def mocked_session(*args, **kwargs):
def json(self):
return self.json_data
class MockContentResponse:
class MockContentResponse: # pragma: no cover
def __init__(self,filename,status_code):
with open(filename,'rb') as f:
s = f.read()
@@ -135,7 +135,7 @@ def mocked_session(*args, **kwargs):
return MockEngine()
def mocked_sqlalchemy(*args, **kwargs):
def mocked_sqlalchemy(*args, **kwargs): # pragma: no cover
# return object with method
class MockEngine:
@@ -177,7 +177,7 @@ def mocked_sqlalchemy(*args, **kwargs):
from rowers import courses
def mocked_sqlalchemy_courses(*args, **kwargs):
def mocked_sqlalchemy_courses(*args, **kwargs): # pragma: no cover
# return object with method
cs = courses.kmltocourse('rowers/tests/testdata/thyro.kml')
course = cs[0]
@@ -251,7 +251,7 @@ class DjangoTestCase(TestCase): #, MockTestCase):
#MockTestCase.tearDown(self)
# delete_strokedata(1)
def mocked_tcx_parser(*args, **kwargs):
def mocked_tcx_parser(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/fake_strokedata.csv')
return rowingdata(df=df)
@@ -272,12 +272,12 @@ def mocked_fetchcperg(*args, **kwargs):
return df
import pandas as pd
def mocked_read_df_sql(id):
def mocked_read_df_sql(id): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/fake_strokedata.csv')
return df
def mocked_sendmail(*args,**kwargs):
def mocked_sendmail(*args,**kwargs): # pragma: no cover
return HttpResponseRedirect('/rowers/email/')
def mocked_get_video_data(*args, **kwargs):
@@ -299,7 +299,7 @@ def mocked_getrowdata_db(*args, **kwargs):
return df,row
def mocked_getrowdata_uh(*args, **kwargs):
def mocked_getrowdata_uh(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/uhfull.csv')
id = kwargs['id']
@@ -308,7 +308,7 @@ def mocked_getrowdata_uh(*args, **kwargs):
return df, row
def mocked_getsmallrowdata_uh(*args, **kwargs):
def mocked_getsmallrowdata_uh(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/uhfull.csv')
return df
@@ -335,33 +335,33 @@ def mocked_getsmallrowdata_db(*args, **kwargs):
return df
def mocked_getsmallrowdata_db_updatecp(*args, **kwargs):
def mocked_getsmallrowdata_db_updatecp(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/colsfromdb.csv')
return df
def mocked_getsmallrowdata_db_setcp(*args, **kwargs):
def mocked_getsmallrowdata_db_setcp(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/colsfromdb2.csv')
return df
def mocked_getsmallrowdata_db_water(*args, **kwargs):
def mocked_getsmallrowdata_db_water(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/colsfromdb3.csv')
return df
def mocked_getsmallrowdata_db_wps(*args, **kwargs):
def mocked_getsmallrowdata_db_wps(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/driveenergies.csv')
return df
def mocked_getpowerdata_db(*args, **kwargs):
def mocked_getpowerdata_db(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/fake_powerdata.csv')
return df
def mock_for_interactive_chart(*args, **kwargs):
def mock_for_interactive_chart(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/interactivechart.csv')
return df
@@ -371,7 +371,7 @@ def mocked_getempowerdata_db(*args, **kwargs):
return df
def mocked_read_df_cols_sql_multistats(ids,columns,convertnewtons=True):
def mocked_read_df_cols_sql_multistats(ids,columns,convertnewtons=True): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/cumstats.csv')
extracols = []
@@ -387,13 +387,13 @@ def mock_workout_summaries(*args, **kwargs):
df = pd.read_csv('rowers/tests/testdata/workout_summaries.csv')
return df
def mocked_read_df_cols_sql_multi(ids, columns, convertnewtons=True):
def mocked_read_df_cols_sql_multi(ids, columns, convertnewtons=True): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/fake_strokedata2.csv')
extracols = []
return df, extracols
def mocked_read_df_cols_sql_multiflex(ids, columns, convertnewtons=True):
def mocked_read_df_cols_sql_multiflex(ids, columns, convertnewtons=True): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/multiflexdata.csv')
extracols = []
@@ -402,11 +402,11 @@ def mocked_read_df_cols_sql_multiflex(ids, columns, convertnewtons=True):
# Mocked Strava
def mocked_stravaexport(f2,workoutname,stravatoken,description='',
activity_type='Rowing'):
activity_type='Rowing'): # pragma: no cover
print("this is mocked strava export")
return 1,'success'
def mocked_fetchcp(*args, **kwargs):
def mocked_fetchcp(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/otwcpresult.csv')
delta = df['delta']
cpvalue = df['cpvalue']
@@ -426,14 +426,14 @@ def mocked_fetchcp(*args, **kwargs):
return delta, cpvalue, avgpower
def mocked_getcpdata_sql(*args, **kwargs):
def mocked_getcpdata_sql(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/otwcpresult.csv')
df['cp'] = df['cpvalue']
return df
def mocked_cpraw(*args, **kwargs):
def mocked_cpraw(*args, **kwargs): # pragma: no cover
df = pd.read_csv('rowers/tests/testdata/otwcp_df.csv')
return df
@@ -465,17 +465,17 @@ class MockStravalibClient():
## Higher level - unfortunately didn't succeed in mocking the gateway
def get_client_token(*args, **kwargs):
def get_client_token(*args, **kwargs): # pragma: no cover
return "aap"
def mock_create_customer(*args, **kwargs):
return 121
def mock_make_payment(*args, **kwargs):
def mock_make_payment(*args, **kwargs): # pragma: no cover
return 15,''
def mock_update_subscription(*args, **kwargs):
try:
try: # pragma: no cover
rower = args[0]
data = args[1]
planid = data['plan']
@@ -497,7 +497,7 @@ def mock_update_subscription(*args, **kwargs):
def mock_create_subscription(*args, **kwargs):
return mock_update_subscription(*args, **kwargs)
def mock_cancel_subscription(*args, **kwargs):
def mock_cancel_subscription(*args, **kwargs): # pragma: no cover
themessages = []
errormessages = []
try:
@@ -521,7 +521,7 @@ def mock_mocktest(*args, **kwargs):
## Gateway stuff (not working)
class gatewayresult():
class gatewayresult(): # pragma: no cover
def __init__(self,*args,**kwargs):
self.is_success = kwargs.pop('is_success',True)
self.customer_id = 1
@@ -533,22 +533,22 @@ class gatewayresult():
def __unicode__():
return "mockedgatewayresult"
class credit_card():
class credit_card(): # pragma: no cover
def __init__(self, *args, **kwargs):
self.subscriptions = [vsubscription()]
self.country_of_issuance = 'US'
class paypal_account():
class paypal_account(): # pragma: no cover
def __init__(self, *args, **kwargs):
self.subscriptions = [vsubscription(),vsubscription()]
class customercreateresult:
class customercreateresult: # pragma: no cover
def __init__(self, *args, **kwargs):
self.customer = kwargs.pop('customer',customer())
self.is_success = kwargs.pop('is_success',True)
self.customer_id = 1
class customer():
class customer(): # pragma: no cover
def find(*arg, **kwargs):
return self
@@ -560,17 +560,17 @@ class customer():
self.paypal_accounts = [paypal_account()]
self.id = 1
class client_token():
class client_token(): # pragma: no cover
def generate(*args, **kwargs):
return 'aapnooit'
class plan():
class plan(): # pragma: no cover
def all(*args, **kwargs):
return []
class transaction():
class transaction(): # pragma: no cover
def sale(*args, **kwargs):
return gatewayresult(is_success=True)
@@ -588,7 +588,7 @@ class transaction():
self.created_at = datetime.datetime.now()
self.currency_iso_code = 'EUR'
class vtransaction():
class vtransaction(): # pragma: no cover
def __init__(self,*args, **kwargs):
self.amount = 15
self.credit_card_details = credit_card()
@@ -600,7 +600,7 @@ class vtransaction():
self.created_at = datetime.datetime.now()
self.currency_iso_code = 'EUR'
class vsubscription():
class vsubscription(): # pragma: no cover
def update(*args, **kwargs):
return gatewayresult(is_success=True)
@@ -615,7 +615,7 @@ class vsubscription():
self.price = 15
self.never_expires = True
class subscription():
class subscription(): # pragma: no cover
def create(*args, **kwargs):
return gatewayresult(is_success=True)
@@ -634,7 +634,7 @@ class subscription():
self.price = 15
self.never_expires = True
class vpayment_method():
class vpayment_method(): # pragma: no cover
def create(*args, **kwargs):
return gatewayresult()
@@ -642,25 +642,25 @@ class vpayment_method():
self.token = 'liesjeleerdelotje'
class payment_method():
class payment_method(): # pragma: no cover
def create(*args, **kwargs):
return gatewayresult()
def __init__(self, *args, **kwargs):
self.token = 'liesjeleerdelotje'
class notification():
class notification(): # pragma: no cover
def __init__(self, *args, **kwargs):
print('notifucation')
self.kind = 'subscription_canceled'
class webhook_notification():
class webhook_notification(): # pragma: no cover
def parse(*args, **kwargs):
print(args,kwargs,'parse')
return notification()
# mock braintree gateway
class MockBraintreeGateway:
class MockBraintreeGateway: # pragma: no cover
def __init__(self,*args, **kwargs):
self.customer = customer()
self.client_token = client_token()
@@ -671,13 +671,13 @@ class MockBraintreeGateway:
self.webhook_notification = webhook_notification()
def mocked_gateway(*args, **kwargs):
def mocked_gateway(*args, **kwargs): # pragma: no cover
return MockBraintreeGateway()
# Mocked Rowingdata
class mocked_rowingdata(rowingdata):
class mocked_rowingdata(rowingdata): # pragma: no cover
def __init__(self, *args, **kwargs):
super(mocked_rowingdata).__init__(*args, **kwargs)
@@ -938,19 +938,19 @@ def mocked_requests(*args, **kwargs):
class MockHeaderResponse:
class MockHeaderResponse: # pragma: no cover
def __init__(self, header_data, status_code):
self.headers = header_data
self.status_code = status_code
class MockStreamResponse:
class MockStreamResponse: # pragma: no cover
def __init__(self, file_name, status_code):
self.raw = open(file_name,'rb')
self.status_code = status_code
self.ok = True
self.file_name = file_name
def __enter__(self):
def __enter__(self): # pragma: no cover
return self
def __exit__(self, exc_type, exc_value, traceback):
@@ -977,7 +977,7 @@ def mocked_requests(*args, **kwargs):
class MockSession:
class headers:
def __init__(self,*args,**kwargs):
def __init__(self,*args,**kwargs): # pragma: no cover
pass
def update(self,*args,**kwargs):
@@ -1013,7 +1013,7 @@ def mocked_requests(*args, **kwargs):
args = [kwargs['url']]
if "tofit" in kwargs['url']:
args = [kwargs['url']]
if "tojson" in kwargs['url']:
if "tojson" in kwargs['url']: # pragma: no cover
args = [kwargs['url']]
if not args:
@@ -1156,7 +1156,7 @@ def mocked_requests(*args, **kwargs):
garmintrainingscheduletester = re.compile(garmintrainingscheduleregex)
if garmintester.match(args[0]):
if garmindownloadtester.match(args[0]):
if garmindownloadtester.match(args[0]): # pragma: no cover
return MockStreamResponse('rowers/tests/testdata/3x250m.fit',200)
if garmintrainingtester.match(args[0]):
json_data = {
@@ -1222,10 +1222,10 @@ def mocked_requests(*args, **kwargs):
'refresh_token': 'jHJhFzCfOOKB8oyiayubhLAlxaMkG3ruC1E8YxaR'
}
return MockResponse(json_data,200)
elif tpuploadtester.match(args[0]):
elif tpuploadtester.match(args[0]): # pragma: no cover
return MockResponse(tpuploadresponse,200)
if uaapitester.match(args[0]):
if uaapitester.match(args[0]): # pragma: no cover
if 'access_token' in args[0]:
json_data = {
'access_token': 'TA3n1vrNjuQJWw0TdCDHnjSmrjIPULhTlejMIWqq',
@@ -1244,7 +1244,7 @@ def mocked_requests(*args, **kwargs):
return MockResponse(uauserjson,200)
if uatester.match(args[0]):
if uatester.match(args[0]): # pragma: no cover
if 'access_token' in args[0]:
json_data = {
'access_token': 'TA3n1vrNjuQJWw0TdCDHnjSmrjIPULhTlejMIWqq',
@@ -1254,7 +1254,7 @@ def mocked_requests(*args, **kwargs):
return MockResponse(json_data,200)
if rktester.match(args[0]):
if rktester.match(args[0]): # pragma: no cover
if 'token' in args[0]:
json_data = {
'access_token': 'TA3n1vrNjuQJWw0TdCDHnjSmrjIPULhTlejMIWqq',
@@ -1272,7 +1272,7 @@ def mocked_requests(*args, **kwargs):
else:
json_data = rkworkoutlistjson
return MockResponse(json_data,200)
elif rkusertester.match(args[0]):
elif rkusertester.match(args[0]): # pragma: no cover
json_data = {
"userID": 1234567890,
"profile": "/profile",
@@ -1289,10 +1289,10 @@ def mocked_requests(*args, **kwargs):
"team": "/team"
}
return MockResponse(json_data, 200)
elif rkstrokestester.match(args[0]):
elif rkstrokestester.match(args[0]): # pragma: no cover
return MockResponse(rkstrokesjson,200)
if sttester.match(args[0]):
if sttester.match(args[0]): # pragma: no cover
if 'oauth2/token' in args[0]:
json_data = {
'access_token': 'TA3n1vrNjuQJWw0TdCDHnjSmrjIPULhTlejMIWqq',
@@ -1327,13 +1327,13 @@ def mocked_requests(*args, **kwargs):
if nkstrokestester.match(args[0]):
params = kwargs.pop('params',{})
if 'sessionIds' in params and params['sessionIds'] == '404':
if 'sessionIds' in params and params['sessionIds'] == '404': # pragma: no cover
return MockResponse(nkimpellerstrokedata, 200)
return MockResponse(nkstrokedata,200)
if nkworkoutlisttester.match(args[0]):
params = kwargs.pop('params',{})
if 'after' in params and params['after'] == '1267049972000':
if 'after' in params and params['after'] == '1267049972000': # pragma: no cover
return MockResponse(nkimpellerworkoutlist,200)
return MockResponse(nkworkoutlist,200)
@@ -1357,7 +1357,7 @@ def mocked_requests(*args, **kwargs):
query = kwargs['json']['query']
if 'download' in query:
return MockResponse(rp3linkready,200)
except KeyError:
except KeyError: # pragma: no cover
pass
json_data = rp3workoutlist
return MockResponse(json_data,200)
@@ -1392,13 +1392,13 @@ def mocked_requests(*args, **kwargs):
if c2strokestester.match(args[0]):
return MockResponse(c2strokedata,200)
elif c2importtester.match(args[0]):
if '12' in args[0]:
if '12' in args[0]: # pragma: no cover
return MockResponse(c2workoutdata,200)
elif '31' in args[0]:
return MockResponse(c2timezoneworkoutdata2,200)
elif '32' in args[0]:
elif '32' in args[0]: # pragma: no cover
return MockResponse(c2timezoneworkoutdatabad,200)
else:
else: # pragma: no cover
return MockResponse(c2timezoneworkoutdata,200)
elif c2workoutlisttester.match(args[0]):
return MockResponse(c2workoutlist,200)
@@ -1417,13 +1417,13 @@ def mocked_requests(*args, **kwargs):
}
}
return MockResponse(json_data,200)
elif 'results' in args[0]:
elif 'results' in args[0]: # pragma: no cover
json_data = {
'data': {
'id': 1223,
}
}
else:
else: # pragma: no cover
return MockResponse(c2workoutdata,200)
if fakturoidtester.match(args[0]):
@@ -1451,7 +1451,7 @@ class MockEmailMessage:
def send(self):
return 1
class MockResponse:
class MockResponse: # pragma: no cover
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
@@ -1465,7 +1465,7 @@ class MockResponse:
class MockOAuth1Session:
class MockOAuth1Session: # pragma: no cover
def __init__(self,*args, **kwargs):
pass

View File

@@ -26,6 +26,7 @@ pytestmark = pytest.mark.django_db
from bs4 import BeautifulSoup
import re
import pytz
from parameterized import parameterized
from django.test import TestCase, Client,override_settings, RequestFactory, TransactionTestCase
@@ -34,6 +35,8 @@ from django.core.management import call_command
from django.core.files.uploadedfile import SimpleUploadedFile
from io import StringIO
import factory
from factory.fuzzy import FuzzyDateTime
from django.test.client import RequestFactory
from rowers.integrations import *
@@ -41,7 +44,7 @@ from rowers.integrations import *
from rowers.forms import (
DocumentsForm,CNsummaryForm,RegistrationFormUniqueEmail,
ChartParamChoiceForm,WorkoutMultipleCompareForm,
BoxPlotChoiceForm,PerformanceManagerForm,
BoxPlotChoiceForm,PerformanceManagerForm,ForceCurveOptionsForm
)
import rowers.plots as plots
import rowers.interactiveplots as iplots
@@ -94,7 +97,7 @@ from rowers.models import *
from rowers.forms import *
from rowers.tests.mocks import *
import factory
from faker import Factory
from faker import Factory, Faker
from uuid import uuid4
faker = Factory.create()
@@ -181,13 +184,18 @@ class RaceFactory(factory.DjangoModelFactory):
sessionvalue = 1
sessionmode = 'time'
fake = Faker()
now = datetime.datetime.now(tz=pytz.timezone("America/New_York"))
one_month_ago = now - datetime.timedelta(days=30)
class WorkoutFactory(factory.DjangoModelFactory):
class Meta:
model = Workout
name = factory.LazyAttribute(lambda _: faker.word())
notes = faker.text()
startdatetime = get_random_file(name=faker.word())['startdatetime']
startdatetime = FuzzyDateTime(start_dt=one_month_ago, force_year=datetime.date.today().year)
starttime = get_random_file(name=faker.word())['starttime']
workouttype='water'
date=timezone.now().date()

View File

@@ -154,6 +154,37 @@ class ForcecurveTest(TestCase):
response = self.c.get(url)
self.assertEqual(response.status_code,200)
# saving
form_data = {
'spm_min': 19,
'spm_max': 25,
'dist_min': 0,
'dist_max': 10000,
'work_min': 0,
'work_max': 3444,
'notes': 'aap',
'name': 'sdsd',
'includereststrokes': True,
'plottype':'none',
'_save_as_new':'Save'
}
form = ForceCurveOptionsForm(form_data)
result = form.is_valid()
if not result:
print(form.errors)
self.assertTrue(form.is_valid())
response = self.c.post(url, form_data)
self.assertEqual(response.status_code,200)
url = '/rowers/analysis/forcecurveanalysis/'
response = self.c.get(url)
self.assertEqual(response.status_code,200)
class WorkoutCompareTestNew(TestCase):
def setUp(self):

View File

@@ -1332,7 +1332,7 @@ class STObjects(DjangoTestCase):
self.assertEqual(response.status_code,200)
@patch('rowers.imports.requests.get', side_effect=mocked_requests)
@patch('rowers.integrations.sporttracks.requests.get', side_effect=mocked_requests)
def test_sporttracks_import(self, mock_get):
response = self.c.get('/rowers/workout/sporttracksimport/12/',follow=True)
@@ -1343,7 +1343,7 @@ class STObjects(DjangoTestCase):
self.assertEqual(response.status_code, 200)
@patch('rowers.imports.requests.get', side_effect=mocked_requests)
@patch('rowers.integrations.sporttracks.requests.get', side_effect=mocked_requests)
def test_sporttracks_import(self, mock_get):
response = self.c.get('/rowers/workout/sporttracksimport/13/',follow=True)
@@ -1356,7 +1356,7 @@ class STObjects(DjangoTestCase):
self.assertEqual(response.status_code, 200)
@patch('rowers.imports.requests.get', side_effect=mocked_requests)
@patch('rowers.integrations.sporttracks.requests.get', side_effect=mocked_requests)
def test_sporttracks_import_all(self, mock_get):
response = self.c.get('/rowers/workout/sporttracksimport/?selectallnew=true')
@@ -1422,7 +1422,7 @@ class TPObjects(DjangoTestCase):
csvfilename=filename
)
@patch('rowers.imports.requests.post', side_effect=mocked_requests)
@patch('rowers.integrations.trainingpeaks.requests.post', side_effect=mocked_requests)
def test_tp_callback(self, mock_post):
response = self.c.get('/tp_callback?code=dsdoij232s',follow=True)

View File

@@ -89,6 +89,10 @@ class URLTests(TestCase):
'/rowers/laboratory/',
'/rowers/laboratory/user/1/',
'/rowers/legal/',
'/rowers/workouts/setrpe/',
'/boatmovers/',
'/boatmovers/faq/',
'/boatmovers/compare/',
'/rowers/list-courses/',
'/rowers/list-graphs/',
'/rowers/list-jobs/',

Binary file not shown.

View File

@@ -292,6 +292,8 @@ urlpatterns = [
views.agegrouprecordview, name='agegrouprecordview'),
re_path(r'^workouts/setrpe/$', views.workouts_setrpe_view,
name='workouts_setrpe_view'),
re_path(r'^workouts/setrpe/user/(?P<userid>\d+)/$', views.workouts_setrpe_view,
name='workouts_setrpe_view'),
re_path(r'^list-workouts/team/(?P<teamid>\d+)/$', views.workouts_view,
name='workouts_view'),
re_path(r'^(?P<rowerid>\d+)/list-workouts/$', views.workouts_view,

View File

@@ -417,7 +417,7 @@ def workout_forcecurve_view(request, id=0, analysis=0, userid=0, workstrokesonly
if r == row.user:
mayedit = 1
if analysis:
if analysis: # pragma: no cover
try:
forceanalysis = ForceCurveAnalysis.objects.get(id=analysis)
dist_min = forceanalysis.dist_min
@@ -473,7 +473,7 @@ def workout_forcecurve_view(request, id=0, analysis=0, userid=0, workstrokesonly
plottype = form.cleaned_data['plottype']
workstrokesonly = not includereststrokes
if "_save" in request.POST and "new" not in request.POST:
if "_save" in request.POST and "new" not in request.POST: # pragma: no cover
if not analysis:
forceanalysis = ForceCurveAnalysis(
workout = row,
@@ -929,7 +929,7 @@ def workout_recalcsummary_view(request, id=0):
def workouts_duplicates_select_view(request, userid=0):
r = getrequestrower(request, userid=userid)
if request.method == 'POST':
if request.method == 'POST': # pragma: no cover
form = WorkoutMultipleCompareForm(request.POST)
if form.is_valid():
workouts = form.cleaned_data['workouts']
@@ -1922,16 +1922,17 @@ def plannedsession_compare_view(request, id=0, userid=0):
# set RPE for list of workouts
@login_required()
def workouts_setrpe_view(request):
def workouts_setrpe_view(request,userid=0):
today = timezone.now()
startdate = today-timezone.timedelta(days=32)
enddate = today+timezone.timedelta(days=1)
r = getrequestrower(request)
r = getrequestrower(request,userid=userid)
startdate = datetime.datetime.combine(startdate, datetime.time())
enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59))
startdate = pytz.utc.localize(startdate)
enddate = pytz.utc.localize(enddate)
workouts = Workout.objects.filter(
user=r,rpe=0,
@@ -1950,9 +1951,14 @@ def workouts_setrpe_view(request):
)
if request.method == 'POST':
if request.method == 'POST': # pragma: no cover
dateform = DateRangeForm(request.POST)
rpe_formset = WorkoutsRPEFormSet(request.POST)
rpe_formset = WorkoutsRPEFormSet(request.POST,
queryset=Workout.objects.filter(user=r,rpe=0,duplicate=False,
startdatetime__gte=startdate,
startdatetime__lte=enddate
)
)
if dateform.is_valid(): # pragma: no cover
startdate = dateform.cleaned_data['startdate']
enddate = dateform.cleaned_data['enddate']
@@ -2209,7 +2215,7 @@ def workouts_view(request, message='', successmessage='',
'%Y-%m-%d')+'/'+enddate.strftime('%Y-%m-%d')
norpecount = len([w for w in workouts if w.rpe==0 and not w.duplicate])
if norpecount and r.get_rpe_warnings:
if norpecount and r.get_rpe_warnings: # pragma: no cover
messages.info(request,'You have workouts with no RPE value set. \
Click <a href="/rowers/workouts/setrpe">here</a> to update them. \
You can switch off this warning in <a href="/rowers/me/edit">settings</a>.')
@@ -3138,7 +3144,7 @@ def instroke_chart(request, id=0, metric=''): # pragma: no cover
" If you are already a Pro user, please log in to access this functionality",
redirect_field_name=None)
@permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True)
def instroke_data(request, metric='', spm_min=15, spm_max=45, activeminutesmin=0, activeminutesmax=0, id=0, ):
def instroke_data(request, metric='', spm_min=15, spm_max=45, activeminutesmin=0, activeminutesmax=0, id=0, ): # pragma: no cover
r = getrequestrower(request, userid=0)
w = get_workoutuser(id, request)
rowdata = rrdata(csvfile=w.csvfilename)
@@ -3255,7 +3261,7 @@ def instroke_chart_interactive(request, id=0, analysis=0, userid=0):
'maxminutes': maxminutes,
})
if analysis:
if analysis: # pragma: no cover
try:
instroke_analysis = InStrokeAnalysis.objects.get(id=analysis)
if instroke_analysis.rower != r: