Private
Public Access
1
0

more coverage related

This commit is contained in:
Sander Roosendaal
2021-04-23 14:18:13 +02:00
parent 99dfb171b5
commit e19158926d
10 changed files with 383 additions and 297 deletions

View File

@@ -307,14 +307,14 @@ def sharedPage(request, key):
try:
try:
shareKey = ShareKey.objects.get(pk=key)
except:
except: # pragma: no cover
raise SharifyError
if shareKey.expired:
if shareKey.expired: # pragma: no cover
raise SharifyError
func, args, kwargs = resolve(shareKey.location)
kwargs["__shared"] = True
return func(request, *args, **kwargs)
except SharifyError:
except SharifyError: # pragma: no cover
raise Http404 # or add a more detailed error page. This either means that the key doesnt exist or is expired.
def createShareURL(request):
@@ -326,10 +326,10 @@ def createShareURL(request):
location = url)
key.save()
return render(request, 'share.html', {"key":key})
else:
else: # pragma: no cover
raise Http404
def createShareModel(request, model_id):
def createShareModel(request, model_id): # pragma: no cover
task = MyModel.objects.get(pk=model_id)
key = ShareKey.objects.create(pk=get_random_string(40),
expiration_seconds=60*60*24, # 1 day
@@ -353,7 +353,7 @@ def getfavorites(r,row):
matchworkouttypes = [workouttype,'all']
workoutsource = row.workoutsource
if 'speedcoach2' in row.workoutsource:
if 'speedcoach2' in row.workoutsource: # pragma: no cover
workoutsource = 'speedcoach2'
favorites = FavoriteChart.objects.filter(user=r,
@@ -370,7 +370,7 @@ def getfavorites(r,row):
return favorites,maxfav
def get_logo_by_pk(request,*args,**kwargs):
def get_logo_by_pk(request,*args,**kwargs): # pragma: no cover
id = kwargs['id']
return get_object_or_404(RaceLogo,pk=id)
@@ -378,7 +378,7 @@ def get_virtualevent_by_pk(request,*args,**kwargs):
id = kwargs['id']
return get_object_or_404(VirtualRace,pk=id)
def get_promember(request,*args,**kwargs):
def get_promember(request,*args,**kwargs): # pragma: no cover
return request.user
def get_course_by_pk(request,*args,**kwargs):
@@ -401,15 +401,15 @@ def get_plan_by_pk(request,*args,**kwargs):
id = kwargs['id']
return get_object_or_404(TrainingPlan,pk=id)
def get_macro_by_pk(request,*args,**kwargs):
def get_macro_by_pk(request,*args,**kwargs): # pragma: no cover
id = kwargs['id']
return get_object_or_404(TrainingMacroCycle,pk=id)
def get_meso_by_pk(request,*args,**kwargs):
def get_meso_by_pk(request,*args,**kwargs): # pragma: no cover
id = kwargs['id']
return get_object_or_404(TrainingMesoCycle,pk=id)
def get_micro_by_pk(request,*args,**kwargs):
def get_micro_by_pk(request,*args,**kwargs): # pragma: no cover
id = kwargs['id']
return get_object_or_404(TrainingMicroCycle,pk=id)
@@ -420,7 +420,7 @@ def get_workout_default_page(request,id):
r = Rower.objects.get(user=request.user)
if r.defaultlandingpage == 'workout_edit_view':
return reverse('workout_edit_view',kwargs={'id':id})
else:
else: # pragma: no cover
return reverse('workout_workflow_view',kwargs={'id':id})
def get_user_by_userid(*args,**kwargs):
@@ -436,7 +436,7 @@ def get_user_by_userid(*args,**kwargs):
u = get_object_or_404(User,pk=id)
return u
def get_user_by_id(*args,**kwargs):
def get_user_by_id(*args,**kwargs): # pragma: no cover
request = args[0]
try:
id = args[1]
@@ -448,7 +448,7 @@ def get_user_by_id(*args,**kwargs):
return get_object_or_404(User,pk=id)
def get_rower_by_id(request,id):
def get_rower_by_id(request,id): # pragma: no cover
u = User.objects.get(id=id)
return u.rower
@@ -474,14 +474,14 @@ def getrequestrower(request,rowerid=0,userid=0,notpermanent=False):
elif userid != 0:
u = User.objects.get(id=userid)
r = getrower(u)
elif request.user.is_anonymous:
elif request.user.is_anonymous: # pragma: no cover
return None
else:
r = getrower(request.user)
u = r.user
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover: # pragma: no cover
raise Http404("Rower doesn't exist")
if r.user == request.user:
@@ -520,21 +520,21 @@ def getrequestrowercoachee(request,rowerid=0,userid=0,notpermanent=False):
elif userid != 0:
u = User.objects.get(id=userid)
r = getrower(u)
elif request.user.is_anonymous:
elif request.user.is_anonymous: # pragma: no cover
return None
else:
r = getrower(request.user)
u = r.user
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover: # pragma: no cover
raise Http404("Rower doesn't exist")
if r.user == request.user:
request.session['rowerid'] = r.id
return r
if userid != 0 and not is_coach_user(request.user,u):
if userid != 0 and not is_coach_user(request.user,u): # pragma: no cover
request.session['rowerid'] = request.user.rower.id
raise PermissionDenied("You have no access to this user")
@@ -563,16 +563,16 @@ def getrequestplanrower(request,rowerid=0,userid=0,notpermanent=False):
elif userid != 0:
try:
u = User.objects.get(id=userid)
except User.DoesNotExist:
except User.DoesNotExist: # pragma: no cover: # pragma: no cover
raise Http404("User does not exist")
r = getrower(u)
else:
r = getrower(request.user)
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover: # pragma: no cover
raise Http404("Rower doesn't exist")
if 'shared' in request.session and request.session['shared']:
if 'shared' in request.session and request.session['shared']: # pragma: no cover
return r
if r.user != request.user and not can_plan_user(request.user,r ):
@@ -589,12 +589,12 @@ def getrower(user):
try:
if user is None or user.is_anonymous:
return None
except AttributeError:
except AttributeError: # pragma: no cover
if User.objects.get(id=user).is_anonymous:
return None
try:
r = Rower.objects.get(user=user)
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover:
r = Rower(user=user)
r.save()
@@ -605,7 +605,7 @@ def get_workout(id):
try:
id = encoder.decode_hex(id)
w = Workout.objects.get(id=id)
except Workout.DoesNotExist:
except Workout.DoesNotExist: # pragma: no cover:
raise Http404("Workout doesn't exist")
return w
@@ -614,15 +614,15 @@ def get_workoutuser(id,request):
try:
id = encoder.decode_hex(id)
w = Workout.objects.get(id=id)
except Workout.DoesNotExist:
except Workout.DoesNotExist: # pragma: no cover:
raise Http404("Workout doesn't exist")
if not is_workout_user(request.user,w):
if not is_workout_user(request.user,w): # pragma: no cover
raise PermissionDenied
return w
def getvalue(data):
def getvalue(data): # pragma: no cover
perc = 0
total = 1
done = 0
@@ -640,7 +640,7 @@ def getvalue(data):
return total,done,id,session_key
class SessionTaskListener(threading.Thread):
class SessionTaskListener(threading.Thread): # pragma: no cover
def __init__(self, r, channels):
threading.Thread.__init__(self)
self.redis = r
@@ -685,7 +685,7 @@ from rq.job import Job
try:
from rest_framework_swagger.views import get_swagger_view
except ImportError:
except ImportError: # pragma: no cover
pass
from rest_framework.renderers import JSONRenderer
@@ -694,7 +694,7 @@ from rest_framework.response import Response
from rowers.serializers import RowerSerializer,WorkoutSerializer
try:
from rest_framework import status,permissions,generics
except ImportError:
except ImportError: # pragma: no cover
pass
from rest_framework.decorators import api_view, renderer_classes, permission_classes
@@ -723,10 +723,10 @@ from rowers.celery import result as celery_result
# Define the API documentation
try:
schema_view = get_swagger_view(title='Rowsandall API')
except NameError:
except NameError: # pragma: no cover
pass
def remove_asynctask(request,id):
def remove_asynctask(request,id): # pragma: no cover
try:
oldtasks = request.session['async_tasks']
except KeyError:
@@ -739,7 +739,7 @@ def remove_asynctask(request,id):
request.session['async_tasks'] = newtasks
def get_job_result(jobid):
def get_job_result(jobid): # pragma: no cover
if settings.TESTING:
return None
elif settings.CELERY:
@@ -771,7 +771,7 @@ verbose_job_status = {
'submit_race': 'Checking Race Course Result',
}
def get_job_status(jobid):
def get_job_status(jobid): # pragma: no cover
if settings.TESTING:
summary = {
'status': 'failed',
@@ -836,7 +836,7 @@ def get_job_status(jobid):
return summary
def kill_async_job(request,id='aap'):
def kill_async_job(request,id='aap'): # pragma: no cover
if settings.CELERY:
job = celery_result.AsyncResult(id)
job.revoke()
@@ -853,7 +853,7 @@ def kill_async_job(request,id='aap'):
return HttpResponseRedirect(url)
@login_required()
def raise_500(request):
def raise_500(request): # pragma: no cover
if request.user.is_superuser:
raise ValueError
else:
@@ -895,7 +895,7 @@ def raise_500(request):
# return HttpResponseRedirect(url)
@csrf_exempt
def post_progress(request,id=None,value=0):
def post_progress(request,id=None,value=0): # pragma: no cover
if request.method == 'POST':
try:
secret = request.POST['secret']
@@ -924,7 +924,7 @@ def post_progress(request,id=None,value=0):
else: # request method is not POST
return HttpResponse('GET method not allowed',status=405)
def get_all_queued_jobs(userid=0):
def get_all_queued_jobs(userid=0): # pragma: no cover
r = StrictRedis()
jobs = []
@@ -969,7 +969,7 @@ def get_stored_tasks_status(request):
taskids = []
taskstatus = []
for id,func_name in reversed(taskids):
for id,func_name in reversed(taskids): # pragma: no cover
progress = 0
try:
cached_progress = cache.get(id)
@@ -1027,13 +1027,13 @@ def get_thumbnails(request,id):
try:
if charts[0]['script'] == '':
charts = []
except IndexError:
except IndexError: # pragma: no cover
charts = []
return JSONResponse(charts)
def get_blog_posts(request):
def get_blog_posts(request): # pragma: no cover
blogposts = BlogPost.objects.all().order_by("-date")
jsondata = []
@@ -1049,7 +1049,7 @@ def get_blog_posts(request):
return JSONResponse(jsondata)
def get_blog_posts_old(request):
def get_blog_posts_old(request): # pragma: no cover
try:
response = requests.get(
'https://analytics.rowsandall.com/wp-json/wp/v2/posts?per_page=3')
@@ -1116,12 +1116,12 @@ def rowhascoordinates(row):
try:
latitude = rowdata.df[' latitude']
if not latitude.std():
if not latitude.std(): # pragma: no cover
hascoordinates = 0
except (KeyError,AttributeError):
hascoordinates = 0
else:
else: # pragma: no cover
hascoordinates = 0
return hascoordinates
@@ -1130,13 +1130,13 @@ def rowhascoordinates(row):
# Wrapper around the rowingdata call to catch some exceptions
# Checks for CSV file, then for gzipped CSV file, and if all fails, returns 0
def rdata(csvfile=None,rower=rrower()):
if csvfile is None:
if csvfile is None: # pragma: no cover
return 0
try:
res = rrdata(csvfile=csvfile,rower=rower)
except pd.errors.EmptyDataError:
except pd.errors.EmptyDataError: # pragma: no cover
res = 0
except (IOError, IndexError, EOFError,FileNotFoundError):
except (IOError, IndexError, EOFError,FileNotFoundError): # pragma: no cover
try:
res = rrdata(csvfile=file+'.gz',rower=rower)
except (IOError, IndexError, EOFError,FileNotFoundError):
@@ -1150,7 +1150,7 @@ def get_my_teams(user):
therower = Rower.objects.get(user=user)
try:
teams1 = therower.team.all()
except AttributeError:
except AttributeError: # pragma: no cover
teams1 = []
teams2 = Team.objects.filter(manager=user)
@@ -1167,7 +1167,7 @@ def get_time(second):
minutes=0
sec=0
microsecond = 0
elif math.isnan(second):
elif math.isnan(second): # pragma: no cover
hours = 0
minutes=0
sec=0
@@ -1182,12 +1182,12 @@ def get_time(second):
# get the workout ID from the SportTracks URI
def getidfromsturi(uri,length=8):
def getidfromsturi(uri,length=8): # pragma: no cover
return uri[len(uri)-length:]
import re
def getidfromuri(uri):
def getidfromuri(uri): # pragma: no cover
m = re.search('/(\w.*)\/(\d+)',uri)
return m.group(2)
@@ -1197,7 +1197,7 @@ from rowers.utils import (
geo_distance,serialize_list,deserialize_list,uniqify,
str2bool,range_to_color_hex,absolute,myqueue,get_call,
calculate_age,rankingdistances,rankingdurations,
is_ranking_piece,my_dict_from_instance,wavg,NoTokenError,
my_dict_from_instance,wavg,NoTokenError,
request_is_ajax
)
@@ -1208,12 +1208,12 @@ def iscoachmember(user):
if not user.is_anonymous:
try:
r = Rower.objects.get(user=user)
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover:
r = Rower(user=user)
r.save()
result = user.is_authenticated and ('coach' in r.rowerplan)
else:
else: # pragma: no cover
result = False
return result
@@ -1256,7 +1256,7 @@ def sendmail(request):
success = response.json().get('success')
form = EmailForm(request.POST)
if form.is_valid() and success:
if form.is_valid() and success: # pragma: no cover
firstname = form.cleaned_data['firstname']
lastname = form.cleaned_data['lastname']
email = form.cleaned_data['email']
@@ -1271,7 +1271,7 @@ def sendmail(request):
else:
if not success:
messages.error(request,'Bots are not welcome')
else:
else: # pragma: no cover
messages.error(request,'Something went wrong. Please try again')
return HttpResponseRedirect('/rowers/email/')
else:
@@ -1285,41 +1285,41 @@ def add_workout_from_strokedata(user,importid,data,strokedata,
workoutsource='concept2'):
try:
workouttype = data['type']
except KeyError:
except KeyError: # pragma: no cover
workouttype = 'rower'
if workouttype not in [x[0] for x in Workout.workouttypes]:
if workouttype not in [x[0] for x in Workout.workouttypes]: # pragma: no cover
workouttype = 'other'
try:
comments = data['comments']
except:
except: # pragma: no cover
comments = ' '
# comments = "Imported data \n %s" % comments
# comments = "Imported data \n"+comments # str(comments)
try:
thetimezone = tz(data['timezone'])
except:
except: # pragma: no cover
thetimezone = 'UTC'
r = getrower(user)
try:
rowdatetime = iso8601.parse_date(data['date_utc'])
except KeyError:
except KeyError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['start_date'])
except ParseError:
except ParseError: # pragma: no cover
rowdatetime = iso8601.parse_date(data['date'])
try:
c2intervaltype = data['workout_type']
except KeyError:
except KeyError: # pragma: no cover
c2intervaltype = ''
try:
title = data['name']
except KeyError:
except KeyError: # pragma: no cover
title = ""
try:
t = data['comments'].split('\n', 1)[0]
@@ -1339,7 +1339,7 @@ def add_workout_from_strokedata(user,importid,data,strokedata,
nr_rows = len(unixtime)
try:
try: # pragma: no cover
latcoord = strokedata.loc[:,'lat']
loncoord = strokedata.loc[:,'lon']
except:
@@ -1356,7 +1356,7 @@ def add_workout_from_strokedata(user,importid,data,strokedata,
try:
spm = strokedata.loc[:,'spm']
except KeyError:
except KeyError: # pragma: no cover
spm = 0*dist2
try:
@@ -1413,10 +1413,10 @@ def add_workout_from_strokedata(user,importid,data,strokedata,
try:
totaldist = data['distance']
totaltime = data['time']/10.
except KeyError:
except KeyError: # pragma: no cover
totaldist = 0
totaltime = 0
else:
else: # pragma: no cover
totaldist = 0
totaltime = 0
@@ -1438,7 +1438,7 @@ def add_workout_from_strokedata(user,importid,data,strokedata,
def keyvalue_get_default(key,options,def_options):
def keyvalue_get_default(key,options,def_options): # pragma: no cover
try:
return options[key]
@@ -1449,14 +1449,14 @@ def keyvalue_get_default(key,options,def_options):
# Creates unix time stamp from a datetime object
def totimestamp(dt, epoch=datetime.datetime(1970,1,1,tzinfo=tz('UTC'))):
def totimestamp(dt, epoch=datetime.datetime(1970,1,1,tzinfo=tz('UTC'))): # pragma: no cover
td = dt - epoch
# return td.total_seconds()
return (td.microseconds + (td.seconds + td.days * 86400) * 10**6) / 10**6
# Check if a column of a dataframe has the required (aantal)
# number of elements. Also checks if the column is a numerical type
# Replaces any faulty columns with zeros
def trydf(df,aantal,column):
def trydf(df,aantal,column): # pragma: no cover
try:
s = df[column]
if len(s) != aantal: