Private
Public Access
1
0

more coverage

This commit is contained in:
Sander Roosendaal
2021-04-26 18:26:16 +02:00
parent 9e2a97e721
commit 594ee6239a
11 changed files with 133 additions and 91 deletions

View File

@@ -10,7 +10,7 @@ def create_alert(manager, rower, measured,period=7, emailalert=True,
name='',**kwargs):
# check if manager is coach of rower. If not return 0
if manager.rower != rower:
if manager.rower != rower: # pragma: no cover
if rower not in coach_getcoachees(manager.rower):
return 0,'You are not allowed to create this alert'
@@ -85,7 +85,7 @@ def alert_add_filters(alert,filters):
# get alert stats
# nperiod = 0: current period, i.e. next_run - n days to today
# nperiod = 1: 1 period ago , i.e. next_run -2n days to next_run -n days
def alert_get_stats(alert,nperiod=0):
def alert_get_stats(alert,nperiod=0): # pragma: no cover
# get strokes
workstrokesonly = not alert.reststrokes
startdate = (alert.next_run - datetime.timedelta(days=(nperiod+1)*alert.period-1))
@@ -117,10 +117,10 @@ def alert_get_stats(alert,nperiod=0):
}
# check if filters are in columns list
pdcolumns = set(df.columns)
pdcolumns = set(df.columns) # pragma: no cover
# drop strokes through filter
if set(columns) <= pdcolumns:
if set(columns) <= pdcolumns: # pragma: no cover
for condition in alert.filter.all():
if condition.condition == '>':
mask = df[condition.metric] > condition.value1
@@ -137,7 +137,7 @@ def alert_get_stats(alert,nperiod=0):
df.loc[mask,alert.measured.metric] = np.nan
df.dropna(inplace=True,axis=0)
else:
else: # pragma: no cover
return {
'workouts':workouts.count(),
'startdate':startdate,

View File

@@ -20,7 +20,7 @@ import requests
from rowsandall_app.settings import SITE_URL
from rowsandall_app.settings_dev import SITE_URL as SITE_URL_DEV
def getvalue(data):
def getvalue(data): # pragma: no cover
perc = 0
total = 1
done = 0
@@ -41,7 +41,7 @@ def getvalue(data):
def longtask(aantal,jobid=None,debug=False,
session_key=None):
session_key=None): # pragma: no cover
counter = 0
channel = 'tasks'
@@ -66,7 +66,7 @@ def longtask(aantal,jobid=None,debug=False,
return 1
def longtask2(aantal,jobid=None,debug=False,secret=''):
def longtask2(aantal,jobid=None,debug=False,secret=''): # pragma: no cover
counter = 0
channel = 'tasks'

View File

@@ -50,7 +50,7 @@ os.environ['DJANGO_SETTINGS_MODULE'] = '$project_name$.settings'
if not getattr(__builtins__, "WindowsError", None):
class WindowsError(OSError): pass
def rdata(file_obj, rower=rrower()):
def rdata(file_obj, rower=rrower()): # pragma: no cover
""" Read rowing data file and return 0 if file doesn't exist"""
try:
result = rrdata(file_obj, rower=rower)
@@ -71,7 +71,7 @@ def processattachment(rower, fileobj, title, uploadoptions,testing=False):
try:
with io.open('media/'+filename,'rb') as fop:
line = fop.readline()
except (IOError, UnicodeEncodeError):
except (IOError, UnicodeEncodeError): # pragma: no cover
return 0
@@ -80,9 +80,9 @@ def processattachment(rower, fileobj, title, uploadoptions,testing=False):
users = User.objects.filter(username=uploadoptions['username'])
if len(users)==1:
therower = users[0].rower
elif uploadoptions['username'] == '':
elif uploadoptions['username'] == '': # pragma: no cover
therower = rower
else:
else: # pragma: no cover
return 0
else:
therower = rower
@@ -94,7 +94,7 @@ def processattachment(rower, fileobj, title, uploadoptions,testing=False):
uploadoptions['title'] = title
url = settings.UPLOAD_SERVICE_URL
if not testing:
if not testing: # pragma: no cover
response = requests.post(url,data=uploadoptions)
# print("Upload response status code",response.status_code, response.json())
if response.status_code == 200:
@@ -125,7 +125,7 @@ def processattachment(rower, fileobj, title, uploadoptions,testing=False):
race = VirtualRace.objects.get(id=uploadoptions['raceid'])
if race.manager == rower.user:
result = email_submit_race(therower,race,workoutid[0])
except VirtualRace.DoesNotExist:
except VirtualRace.DoesNotExist: # pragma: no cover
pass
@@ -135,19 +135,19 @@ def get_from_address(message):
from_address = message.from_address[0].lower()
if message.encoded:
if message.encoded: # pragma: no cover
body = message.text.splitlines()
else:
body = message.get_body().splitlines()
try:
first_line = body[0].lower()
except IndexError:
except IndexError: # pragma: no cover
first_line = ''
try:
first_line = first_line.decode('utf-8')
except AttributeError:
except AttributeError: # pragma: no cover
pass
if "quiske" in first_line:
@@ -178,17 +178,17 @@ class Command(BaseCommand):
def handle(self, *args, **options):
if 'testing' in options:
testing = options['testing']
else:
else: # pragma: no cover
testing = False
if 'mailbox' in options:
workoutmailbox = Mailbox.objects.get(name=options['mailbox'])
else:
else: # pragma: no cover
workoutmailbox = Mailbox.objects.get(name='workouts')
if 'failedmailbox' in options:
failedmailbox = Mailbox.objects.get(name=options['failedmailbox'])
else:
else: # pragma: no cover
failedmailbox = Mailbox.objects.get(name='Failed')
# Polar
@@ -197,17 +197,17 @@ class Command(BaseCommand):
# Concept2
rowers = Rower.objects.filter(c2_auto_import=True)
for r in rowers:
for r in rowers: # pragma: no cover
if user_is_not_basic(r.user):
c2stuff.get_c2_workouts(r)
rowers = Rower.objects.filter(rp3_auto_import=True)
for r in rowers:
for r in rowers: # pragma: no cover
if user_is_not_basic(r.user):
res = rp3stuff.get_rp3_workouts(r)
rowers = Rower.objects.filter(nk_auto_import=True)
for r in rowers:
for r in rowers: # pragma: no cover
if user_is_not_basic(r.user):
res = nkstuff.get_nk_workouts(r)
@@ -223,7 +223,7 @@ class Command(BaseCommand):
# extension = attachment.document.name[-3:].lower()
try:
message = Message.objects.get(id=attachment.message_id)
if message.encoded:
if message.encoded: # pragma: no cover
# if message.text:
body = "\n".join(message.text.splitlines())
else:
@@ -239,16 +239,16 @@ class Command(BaseCommand):
rowers = [
r for r in Rower.objects.all() if r.user.email.lower() == from_address
]
try:
try: # pragma: no cover
rowers2 = [
r for r in Rower.objects.all() if from_address in r.emailalternatives
]
rowers = rowers+rowers2
except TypeError:
pass
except IOError:
except IOError: # pragma: no cover
rowers = []
except Message.DoesNotExist:
except Message.DoesNotExist: # pragma: no cover
try:
attachment.delete()
except:
@@ -268,7 +268,7 @@ class Command(BaseCommand):
rower, datafile, title, uploadoptions,
testing=testing
)
except BadZipFile:
except BadZipFile: # pragma: no cover
pass
else:
@@ -282,16 +282,16 @@ class Command(BaseCommand):
# We're done with the attachment. It can be deleted
try:
attachment.delete()
except IOError:
except IOError: # pragma: no cover
pass
except WindowsError:
except WindowsError: # pragma: no cover
if not testing:
time.sleep(2)
try:
attachment.delete()
except WindowsError:
pass
except:
except: # pragma: no cover
message.mailbox = failedmailbox
message.save()

View File

@@ -38,7 +38,7 @@ from rowers.tasks import handle_nk_async_workout
try:
from json.decoder import JSONDecodeError
except ImportError:
except ImportError: # pragma: no cover
JSONDecodeError = ValueError
from rowers.imports import *
@@ -60,7 +60,7 @@ oauth_data = {
from requests.auth import HTTPBasicAuth
def get_token(code):
def get_token(code): # pragma: no cover
url = oauth_data['base_url']
@@ -97,14 +97,14 @@ def get_token(code):
def nk_open(user):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None):
if (r.nktoken == '') or (r.nktoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
raise NoTokenError("User has no token")
else:
if (timezone.now()>r.nktokenexpirydate):
thetoken = rower_nk_token_refresh(user)
if thetoken == None:
if thetoken == None: # pragma: no cover
raise NoTokenError("User has no token")
return thetoken
else:
@@ -115,12 +115,12 @@ def nk_open(user):
def get_nk_workouts(rower, do_async=True):
try:
thetoken = nk_open(rower.user)
except NoTokenError:
except NoTokenError: # pragma: no cover
return 0
res = get_nk_workout_list(rower.user)
if res.status_code != 200:
if res.status_code != 200: # pragma: no cover
return 0
nkids = [item['id'] for item in res.json()]
@@ -141,7 +141,7 @@ def get_nk_workouts(rower, do_async=True):
with open('nkblocked.json','r') as nkblocked:
jsondata = json.load(nkblocked)
parkedids = jsondata['ids']
except FileNotFoundError:
except FileNotFoundError: # pragma: no cover
pass
knownnkids = uniqify(knownnkids+tombstones+parkedids)
@@ -180,7 +180,7 @@ def do_refresh_token(refreshtoken):
response = requests.post(url,data=post_data,auth=HTTPBasicAuth(oauth_data['client_id'],oauth_data['client_secret']))
if response.status_code != 200:
if response.status_code != 200: # pragma: no cover
return [0,0,0]
token_json = response.json()
@@ -207,16 +207,16 @@ def rower_nk_token_refresh(user):
return r.nktoken
def make_authorization_url(request):
def make_authorization_url(request): # pragma: no cover
return imports_make_authorization_url(oauth_data)
def get_nk_workout_list(user,fake=False,after=0,before=0):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None):
if (r.nktoken == '') or (r.nktoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (r.nktokenexpirydate is None or timezone.now()+timedelta(seconds=10)>r.nktokenexpirydate):
elif (r.nktokenexpirydate is None or timezone.now()+timedelta(seconds=10)>r.nktokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
@@ -249,10 +249,10 @@ def get_nk_workout_list(user,fake=False,after=0,before=0):
def get_workout(user,nkid,do_async=False):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None):
if (r.nktoken == '') or (r.nktoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s) ,0
elif (timezone.now()>r.nktokenexpirydate):
elif (timezone.now()>r.nktokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s),0
@@ -260,7 +260,7 @@ def get_workout(user,nkid,do_async=False):
'sessionIds': nkid,
}
if do_async:
if do_async: # pragma: no cover
res = get_nk_workout_list(r.user)
if res.status_code != 200:
return 0
@@ -293,7 +293,7 @@ def get_workout(user,nkid,do_async=False):
response = requests.get(url,headers=headers,params=params)
if response.status_code != 200:
if response.status_code != 200: # pragma: no cover
# error handling and logging
return {},pd.DataFrame()
@@ -323,7 +323,7 @@ def get_workout(user,nkid,do_async=False):
response = requests.get(url, headers=headers,params=params)
if response.status_code != 200:
if response.status_code != 200: # pragma: no cover
# error handling and logging
return {},df

View File

@@ -12,10 +12,10 @@ def save_scoring(name,user,filename,id=0,notes=""):
collection = StandardCollection(name=name,manager=user,notes=notes)
collection.save()
standards = CourseStandard.objects.filter(standardcollection=collection)
for standard in standards:
for standard in standards: # pragma: no cover
standards.delete()
else:
else: # pragma: no cover
try:
collection = StandardCollection.objects.get(id=id)
collection.name = name
@@ -34,7 +34,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
try:
df = pd.read_csv(filename)
except:
except: # pragma: no cover
return 0
df.rename(
@@ -58,7 +58,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
for index, row in df.iterrows():
try:
name = row['Name']
except KeyError:
except KeyError: # pragma: no cover
continue
try:
@@ -69,7 +69,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
seconds = delta.total_seconds()
referencespeed = coursedistance/seconds
except KeyError:
except KeyError: # pragma: no cover
continue
try:
@@ -77,7 +77,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
agemax = row['MaxAge']
agemin = int(agemin)
agemax = int(agemax)
except KeyError:
except KeyError: # pragma: no cover
agemin = 0
agemax = 120
@@ -85,19 +85,19 @@ def save_scoring(name,user,filename,id=0,notes=""):
boatclass = row['BoatClass']
if boatclass.lower() in ['standard','olympic','normal','water']:
boatclass = 'water'
elif boatclass.lower() in ['erg','c2','concept','static','rower']:
elif boatclass.lower() in ['erg','c2','concept','static','rower']: # pragma: no cover
boatclass = 'rower'
elif boatclass.lower() in ['dynamic']:
elif boatclass.lower() in ['dynamic']: # pragma: no cover
boatclass = 'dynamic'
elif boatclass.lower() in ['slides','slide','slider','sliders']:
elif boatclass.lower() in ['slides','slide','slider','sliders']: # pragma: no cover
boatclass = 'slides'
elif boatclass.lower() in ['c','c-boat']:
elif boatclass.lower() in ['c','c-boat']: # pragma: no cover
boatclass = 'c-boat'
elif boatclass.lower() in ['coastal','coast']:
elif boatclass.lower() in ['coastal','coast']: # pragma: no cover
boatclass = 'coastal'
elif boatclass.lower() in ['church','churchboat','finnish','finland']:
elif boatclass.lower() in ['church','churchboat','finnish','finland']: # pragma: no cover
boatclass = 'churchboat'
except KeyError:
except KeyError: # pragma: no cover
boatclass = 'water'
try:
@@ -122,7 +122,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
weightclass = 'hwt'
elif weightclass.lower() in ['lwt','l','light','lights','lighties']:
weightclass = 'lwt'
except KeyError:
except KeyError: # pragma: no cover
weightclass = 'hwt'
adaptiveclass = 'None'
@@ -130,7 +130,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
adaptiveclass = row['AdaptiveClass']
if adaptiveclass.lower() in ['o','open','none','no']:
adaptiveclass = 'None'
except KeyError:
except KeyError: # pragma: no cover
adaptiveclass = 'None'
try:
@@ -141,7 +141,7 @@ def save_scoring(name,user,filename,id=0,notes=""):
# finding existing standard
existingstandards = CourseStandard.objects.filter(name=name,standardcollection=collection)
#print(existingstandards,collection)
if existingstandards:
if existingstandards: # pragma: no cover
existingstandards.update(
name=name,
coursedistance=coursedistance,

View File

@@ -13,11 +13,35 @@ from rowers.models import update_records
class MiscTests(TestCase):
def setUp(self):
pass
self.u = UserFactory(is_staff=True)
self.r = Rower.objects.create(user=self.u,
birthdate=faker.profile()['birthdate'],
gdproptin=True,surveydone=True,
gdproptindate=timezone.now(),
rowerplan='coach',subscription_id=1)
self.c = Client()
self.user_workouts = WorkoutFactory.create_batch(5, user=self.r)
self.factory = RequestFactory()
self.password = faker.word()
self.u.set_password(self.password)
self.u.save()
def test_c2records(self):
update_records(verbose=False)
def test_failed_que(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
url = reverse('failed_queue_view')
response = self.c.get(url)
self.assertEqual(response.status_code,200)
url2 = reverse('failed_queue_empty')
response = self.c.get(url2,follow=True)
self.assertRedirects(response,expected_url=url,status_code=302,target_status_code=200)
#@pytest.mark.django_db
class WorkoutTests(TestCase):
def setUp(self):

View File

@@ -14,9 +14,12 @@ import rowers.plannedsessions as plannedsessions
from django.db import transaction
from rowers.views.workoutviews import plannedsession_compare_view
from rowers.views.otherviews import download_fit
from rowers.opaque import encoder
from django.utils.crypto import get_random_string
from django.http.response import Http404
@override_settings(TESTING=True)
class TrainingPlanTest(TestCase):
def setUp(self):
@@ -1898,6 +1901,17 @@ description: ""
response = garmin_stuff.ps_to_garmin(self.ps_trimp,self.r)
self.assertEqual(response.status_code,200)
url = '0'
request = self.factory.get(url)
request.user = self.u
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
with self.assertRaises(Http404) as context:
response = download_fit(request,filename=self.ps_trimp.fitfile)
self.assertTrue('File not found' in context.exception)
def test_plannedsessions_dateform_view(self):
login = self.c.login(username=self.u.username, password=self.password)

View File

@@ -78,7 +78,7 @@ def get_token(code):
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
refresh_token = token_json['refresh_token']
except KeyError:
except KeyError: # pragma: no cover
thetoken = 0
expires_in = 0
refresh_token = 0
@@ -86,11 +86,11 @@ def get_token(code):
return thetoken,expires_in,refresh_token
# Make authorization URL including random string
def make_authorization_url(request):
def make_authorization_url(request): # pragma: no cover
return imports_make_authorization_url(oauth_data)
def getidfromresponse(response):
def getidfromresponse(response): # pragma: no cover
t = json.loads(response.text)
links = t["_links"]
@@ -113,7 +113,7 @@ def createtpworkoutdata(w):
return tcxfilename
def tp_check(access_token):
def tp_check(access_token): # pragma: no cover
headers = {
"Content-Type": "application/json",
'Accept': 'application/json',
@@ -157,15 +157,15 @@ def uploadactivity(access_token,filename,description='',
data = json.dumps(data),
headers=headers,verify=False)
if resp.status_code != 200:
if resp.status_code != 200: # pragma: no cover
return 0,resp.reason,resp.status_code,headers
else:
return resp.json()[0]["Id"],"ok",200,""
return 0,0,0,0
return 0,0,0,0 # pragma: no cover
def workout_tp_upload(user,w):
def workout_tp_upload(user,w): # pragma: no cover
message = "Uploading to TrainingPeaks"
tpid = 0
r = w.user

View File

@@ -55,7 +55,7 @@ class PlannedSessionViewSet(viewsets.ModelViewSet):
model = PlannedSession
serializer_class = PlannedSessionSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
r = Rower.objects.get(user=self.request.user)
if r.rowerplan not in ['basic','pro']:
@@ -75,7 +75,7 @@ class WorkoutViewSet(viewsets.ModelViewSet):
#queryset = Workout.objects.all().order_by("-date", "-starttime")
serializer_class = WorkoutSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
r = Rower.objects.get(user=self.request.user)
return Workout.objects.filter(user=r).order_by("-date","-starttime")
@@ -94,7 +94,7 @@ class RowerViewSet(viewsets.ModelViewSet):
serializer_class = RowerSerializer
#queryset = Rower.objects.all()
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
r = Rower.objects.filter(user=self.request.user)
return r
@@ -113,7 +113,7 @@ class FavoriteChartViewSet(viewsets.ModelViewSet):
serializer_class = FavoriteChartSerializer
#queryset = FavoriteChart.objects.all()
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
r = Rower.objects.get(user=self.request.user)
return FavoriteChart.objects.filter(user=r)
@@ -146,7 +146,7 @@ class VirtualRaceViewSet(viewsets.ModelViewSet):
model = VirtualRace
serializer_class = VirtualRaceSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
return VirtualRace.objects.all()
except TypeError:
@@ -158,7 +158,7 @@ class CourseStandardViewSet(viewsets.ModelViewSet):
model = CourseStandard
serializer_class = CourseStandardSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
return CourseStandard.objects.all()
except TypeError:
@@ -171,7 +171,7 @@ class StandardCollectionViewSet(viewsets.ModelViewSet):
serializer_class = StandardCollectionSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
return StandardCollection.objects.all()
except TypeError:
@@ -183,7 +183,7 @@ class GeoCourseViewSet(viewsets.ModelViewSet):
model = GeoCourse,
serializer_class = GeoCourseSerializer
def get_queryset(self):
def get_queryset(self): # pragma: no cover
try:
return GeoCourse.objects.all()
except TypeError:
@@ -205,18 +205,18 @@ router.register(r'api/standards',CourseStandardViewSet,'standards')
router.register(r'api/standardcollections',StandardCollectionViewSet,'standardcollections')
router.register(r'api/geocourses',GeoCourseViewSet,'geocourses')
def permissiondenied_view(request):
def permissiondenied_view(request): # pragma: no cover
raise PermissionDenied
def filenotfound_view(request):
def filenotfound_view(request): # pragma: no cover
return rowers.views.error403_view(request)
def response_error_handler(request, exception=None):
def response_error_handler(request, exception=None): # pragma: no cover
return HttpResponse('Error handler content', status=403)
def filenotfound_handler(request, exception=None):
def filenotfound_handler(request, exception=None): # pragma: no cover
return HttpResponse('Error handler content', status=404)
handler403 = views.error403_view
@@ -853,7 +853,7 @@ urlpatterns = [
re_path(r'^braintree/$',views.braintree_webhook_view,name="braintree_webhook_view"),
]
if settings.DEBUG:
if settings.DEBUG: # pragma: no cover
urlpatterns += [
re_path(r'^c2listug/(?P<page>\d+)/$',views.c2listdebug_view),
re_path(r'^c2listug/$',views.c2listdebug_view),

View File

@@ -28,7 +28,11 @@ def download_fit(request,filename=''):
raise PermissionDenied("You are not allowed to download this file")
fitfile = ps.fitfile
response = HttpResponse(fitfile)
try:
response = HttpResponse(fitfile)
except FileNotFoundError:
raise Http404("File not found")
response['Content-Disposition'] = 'attachment; filename="%s"' % filename
response['Content-Type'] = 'application/octet-stream'

View File

@@ -35,12 +35,12 @@ def get_weather_data(long,lat,unixtime):
try:
s = requests.get(url)
except ConnectionError:
except ConnectionError: # pragma: no cover
return 0
if s.ok:
return s.json()
else:
else: # pragma: no cover
return 0
# Get Metar data
@@ -56,12 +56,12 @@ def get_metar_data(airportcode,unixtime):
try:
s = requests.get(url)
except:
except: # pragma: no cover
message = 'Failed to download METAR data'
return [0,0,message,'','']
if s.ok:
if s.ok: # pragma: no cover
try:
doc = etree.fromstring(s.content)
except AttributeError:
@@ -108,7 +108,7 @@ def get_wind_data(lat,long,unixtime):
data = get_weather_data(lat,long,unixtime)
summary = ''
temperature = 20
if data:
if data: # pragma: no cover
try:
# we are getting wind in mph
windspeed = data['currently']['windSpeed']*0.44704
@@ -157,7 +157,7 @@ def get_wind_data(lat,long,unixtime):
message = 'Summary for your location at '+timestamp+': '+summary
message += '. Temperature '+str(temperature)+'F/'+str(temperaturec)+'C'
if data:
if data: # pragma: no cover
message += '. Wind: '+str(windspeed)+' m/s. Wind Bearing: '+str(windbearing)+' degrees'