Private
Public Access
1
0

adding a few tests

This commit is contained in:
Sander Roosendaal
2022-07-14 15:27:01 +02:00
parent 868b39cd1f
commit ecf9566f0b
9 changed files with 205 additions and 66 deletions

View File

@@ -23,12 +23,12 @@ def time_in_path(df, p, maxmin='max', getall=False, name='unknown', logfile=None
def f(x):
return coordinate_in_path(x['latitude'], x['longitude'], p)
df['inpolygon'] = df.apply(f, axis=1)
inpolygon = df.apply(lambda row:f(row), axis=1).copy()
if maxmin == 'max':
b = (~df['inpolygon']).shift(-1)+df['inpolygon']
b = (~inpolygon).shift(-1)+inpolygon
else: # pragma: no cover
b = (~df['inpolygon']).shift(1)+df['inpolygon']
b = (~inpolygon).shift(1)+inpolygon
if len(df[b == 2]):
if logfile is not None: # pragma: no cover
@@ -90,7 +90,7 @@ def coursetime_first(data, paths, polygons=[], logfile=None):
try:
entrytime, entrydistance = time_in_path(
data, paths[0], maxmin='max', name=polygons[0][1], logfile=logfile)
data, paths[0], maxmin='max', name=str(polygons[0]), logfile=logfile)
coursecompleted = True
except InvalidTrajectoryError: # pragma: no cover
entrytime = data['time'].max()
@@ -118,7 +118,7 @@ def coursetime_paths(data, paths, finalmaxmin='min', polygons=[], logfile=None):
(
entrytime,
entrydistance
) = time_in_path(data, paths[0], maxmin=finalmaxmin, name=polygons[0][1], logfile=logfile)
) = time_in_path(data, paths[0], maxmin=finalmaxmin, name=str(polygons[0]), logfile=logfile)
coursecompleted = True
except InvalidTrajectoryError: # pragma: no cover
entrytime = data['time'].max()
@@ -129,7 +129,7 @@ def coursetime_paths(data, paths, finalmaxmin='min', polygons=[], logfile=None):
if len(paths) > 1:
try:
time, dist = time_in_path(
data, paths[0], name=polygons[0][1], logfile=logfile)
data, paths[0], name=str(polygons[0]), logfile=logfile)
data2 = data[data['time'] > time].copy()
data2['time'] = data2['time'].apply(lambda x: x-time)
data2['cum_dist'] = data2['cum_dist'].apply(lambda x: x-dist)

View File

@@ -1258,7 +1258,6 @@ def new_workout_from_file(r, f2,
uploadoptions={'boattype': '1x', 'workouttype': 'rower'}):
message = ""
try:
fileformat = get_file_type(f2)
except (IOError, UnicodeDecodeError): # pragma: no cover

View File

@@ -104,6 +104,48 @@ thetimezone = get_current_timezone()
allowedcolumns = [key for key, value in strokedatafields.items()]
from rowsandall_app.settings_dev import use_sqlite
from rowsandall_app.settings_dev import DATABASES as DEV_DATABASES
try:
user = settings.DATABASES['default']['USER']
except KeyError: # pragma: no cover
user = ''
try:
password = settings.DATABASES['default']['PASSWORD']
except KeyError: # pragma: no cover
password = ''
try:
database_name = settings.DATABASES['default']['NAME']
except KeyError: # pragma: no cover
database_name = ''
try:
host = settings.DATABASES['default']['HOST']
except KeyError: # pragma: no cover
host = ''
try:
port = settings.DATABASES['default']['PORT']
except KeyError: # pragma: no cover
port = ''
database_url = 'mysql://{user}:{password}@{host}:{port}/{database_name}'.format(
user=user,
password=password,
database_name=database_name,
host=host,
port=port,
)
database_name_dev = DEV_DATABASES['default']['NAME']
if use_sqlite:
database_url_debug = 'sqlite:///'+database_name_dev
database_url = database_url_debug
database_url_debug = database_url
# mapping the DB column names to the CSV file column names
@@ -747,17 +789,9 @@ def paceformatsecs(values):
def update_c2id_sql(id, c2id):
engine = create_engine(database_url, echo=False)
table = 'rowers_workout'
query = "UPDATE %s SET uploadedtoc2 = %s WHERE `id` = %s;" % (
table, c2id, id)
with engine.connect() as conn, conn.begin():
_ = conn.execute(query)
conn.close()
engine.dispose()
workout = Workout.objects.get(id=id)
workout.uploadedtoc2 = c2id
workout.save()
return 1

View File

@@ -553,7 +553,7 @@ def polygon_coord_center(polygon):
return latitudes.mean(), longitudes.mean()
def polygon_to_path(polygon):
def polygon_to_path(polygon, debug=False):
points = GeoPoint.objects.filter(polygon=polygon).order_by("order_in_poly")
s = []

View File

@@ -1,4 +1,5 @@
import os
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
from YamJam import yamjam
CFG = yamjam()['rowsandallapp']
@@ -364,7 +365,7 @@ def handle_strava_sync(stravatoken, workoutid, filename, name, activity_type, de
tb = traceback.format_exc()
dologging('strava_fail.log', tb)
failed = True
except stravalib.exc.TimeoutExceeded:
except stravalib.exc.TimeoutExceeded: # pragma: no cover
dologging('strava_fail.log', 'Strava upload failed for Workout {id} TimeOutExceeded'.format(
id=workoutid))
tb = traceback.format_exc()
@@ -541,7 +542,7 @@ from rowers.models import polygon_to_path
def handle_check_race_course(self,
f1, workoutid, courseid,
recordid, useremail, userfirstname,
**kwargs): # pragma: no cover
**kwargs):
logfile = 'courselog_{workoutid}_{courseid}.log'.format(
workoutid=workoutid, courseid=courseid)
@@ -571,11 +572,11 @@ def handle_check_race_course(self,
mode = kwargs['mode']
summary = False
if 'summary' in kwargs:
if 'summary' in kwargs: # pragma: no cover
summary = kwargs['summary']
successemail = False
if 'successemail' in kwargs:
if 'successemail' in kwargs: # pragma: no cover
successemail = kwargs['successemail']
try:
@@ -610,17 +611,19 @@ def handle_check_race_course(self,
rowdata.fillna(method='backfill', inplace=True)
rowdata['time'] = rowdata['time']-rowdata.loc[0, 'time']
rowdata = rowdata[rowdata['time'] > splitsecond]
rowdata.loc[:, 'time'] = rowdata.loc[:, 'time'].copy()-rowdata.loc[0, 'time']
rowdata = rowdata.copy()[rowdata['time'] > splitsecond]
# we may want to expand the time (interpolate)
rowdata['dt'] = rowdata['time'].apply(
rowdata.loc[:,'dt'] = rowdata['time'].apply(
lambda x: safetimedelta(x)
)
).values
rowdata = rowdata.resample('100ms', on='dt').mean()
rowdata = rowdata.interpolate()
course = GeoCourse.objects.get(courseid)
course = GeoCourse.objects.get(id=courseid)
polygons = course.polygons.all()
@@ -637,7 +640,7 @@ def handle_check_race_course(self,
try:
entrytimes, entrydistances = time_in_path(rowdata, paths[0], maxmin='max', getall=True,
name=polygons[0].name, logfile=logfile)
except AttributeError: # for testing
except AttributeError: # pragma: no cover
entrytimes, entrydistances = time_in_path(rowdata, paths[0], maxmin='max', getall=True,
name='Start', logfile=logfile)
with open(logfile, 'ab') as f:
@@ -706,7 +709,8 @@ def handle_check_race_course(self,
'endsecond': endseconds,
})
records = records[records['coursecompleted'] is True]
#records = records[records['coursecompleted'] is True]
records = records.loc[records['coursecompleted'], : ]
if len(records):
coursecompleted = True
@@ -724,17 +728,19 @@ def handle_check_race_course(self,
coursedistance = coursemeters
velo = coursedistance/coursetimeseconds
points = 100*(2.-referencespeed/velo)
record = VirtualRaceResult.objects.get(id=recordid)
record.duration = totaltime_sec_to_string(coursetimeseconds)
record.distance=int(coursemeters)
record.points = points
record.startsecond = startsecond
record.endsecond = endsecond
record.workoutid = workoutid
record.coursecompleted = 1
record.save()
if mode == 'coursetest':
if mode != 'coursetest':
record = VirtualRaceResult.objects.get(id=recordid)
record.duration = totaltime_sec_to_string(coursetimeseconds)
record.distance=int(coursemeters)
record.points = points
record.startsecond = startsecond
record.endsecond = endsecond
record.workoutid = workoutid
record.coursecompleted = 1
record.save()
else: # pragma: no cover
record = CourseTestResult.objects.get(id=recordid)
record.duration = totaltime_sec_to_string(coursetimeseconds)
record.distance = int(coursemeters)
@@ -744,7 +750,7 @@ def handle_check_race_course(self,
record.points = points
record.save()
if summary:
if summary: # pragma: no cover
try:
row = rdata(csvfile=f1)
except IOError: # pragma: no cover
@@ -769,7 +775,7 @@ def handle_check_race_course(self,
workout.save()
if successemail:
if successemail: # pragma: no cover
handle_sendemail_coursesucceed(
useremail, userfirstname, logfile, workoutid
)
@@ -1929,7 +1935,7 @@ def handle_sendemail_ical(first_name, last_name, email, url, icsfile, **kwargs):
try:
os.remove(icsfile)
except:
except: # pragma: no cover
pass
return 1

View File

@@ -1554,6 +1554,36 @@ description: ""
self.instantplan.save()
self.startdate = (datetime.datetime.now()-datetime.timedelta(days=1)).date()
self.enddate = (datetime.datetime.now()+datetime.timedelta(days=1)).date()
self.preferreddate = datetime.datetime.now().date()
self.ps = SessionFactory(startdate=self.startdate,enddate=self.enddate,
sessiontype='session',
sessionmode = 'time',
criterium = 'none',
sessionvalue = 60,
sessionunit='min',
preferreddate=self.preferreddate,
manager=self.u,
interval_string = '5x(800m/5min)'
)
self.ps.save()
result = plannedsessions.add_rower_session(self.r,self.ps)
self.step = PlannedSessionStep(
manager = self.u,
name = 'cd',
durationvalue = '50000',
durationtype = 'Distance',
)
self.step.save()
def tearDown(self):
@@ -1562,6 +1592,72 @@ description: ""
except (IOError, FileNotFoundError,OSError):
pass
def test_stepadder(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
url = '/rowers/plans/stepeditor/{id}/'.format(id=self.ps.id)
response = self.c.get(url,follow=True)
self.assertEqual(response.status_code,200)
url = '/rowers/plans/stepadder/{id}/'.format(id=self.ps.id)
bdy = json.dumps([self.step.id])
response = self.c.post(url, bdy, content_type='application/json',
**{'HTTP_X_REQUESTED_WITH':'XMLHttpRequest'})
self.assertEqual(response.status_code, 200)
def test_stepdelete(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
url = '/rowers/plans/step/{id}/delete'.format(id=self.step.id)
response = self.c.get(url,follow=True)
self.assertEqual(response.status_code,200)
def test_stepedit(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
url = '/rowers/plans/step/{id}/edit/{psid}/'.format(
id=self.step.id, psid=self.ps.id
)
response = self.c.get(url)
self.assertEqual(response.status_code,200)
data = {
'durationtype': 'RepeatUntilStepsCmplt',
'durationvalue': '0.0',
#'targettype': None,
'targetvalue': '8',
'targetvaluelow': '0',
'targetvaluehigh': '0',
'intensity': 'Active',
'description': 'aap'
}
form = StepEditorForm(data)
self.assertTrue(form.is_valid())
reponse = self.c.post(url, data)
self.assertTrue(response.status_code,200)
def test_save_plan_yaml_view(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)
url = '/rowers/sessions/saveasplan/?when={s}/{e}'.format(
s = self.startdate.strftime("%Y-%m-%d"),
e = self.enddate.strftime("%Y-%m-%d")
)
response = self.c.get(url,follow=True)
self.assertEqual(response.status_code,200)
def test_clone_view(self):
login = self.c.login(username=self.u.username, password=self.password)
self.assertTrue(login)

View File

@@ -888,8 +888,7 @@ class ChallengesTest(TestCase):
self.assertEqual(response.status_code, 200)
@patch('rowers.tasks.create_engine', side_effect=mocked_sqlalchemy_courses)
def notest_virtualevent_check_view(self,mocked_sqlalchemy_courses):
def test_virtualevent_check_view(self):
res = tasks.handle_check_race_course(
self.wthyro.csvfilename,
@@ -898,7 +897,6 @@ class ChallengesTest(TestCase):
self.result.id,
self.wthyro.user.user.email,
self.wthyro.user.user.first_name,
mode='coursetest',
)
self.assertEqual(res,1)

View File

@@ -515,7 +515,7 @@ def rower_process_garmincallback(request): # pragma: no cover
# Process Rojabo callback
@login_required()
def rower_process_rojabocallback(request): # prgrma: no cover
def rower_process_rojabocallback(request): # pragma: no cover
# do stuff
try:
code = request.GET.get('code', None)

View File

@@ -1409,7 +1409,7 @@ def save_plan_yaml(request, userid=0):
steps = ps.steps
steps['filename'] = ""
workouts.append(steps)
else:
else: # pragma: no cover
if ps.sessionmode == 'distance':
ps.interval_string = '{d}m'.format(d=ps.sessionvalue)
elif ps.sessionmode == 'time':
@@ -2994,7 +2994,8 @@ def rower_create_trainingplan(request, id=0):
redirect_field_name=None)
def stepadder(request, id=0):
is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest'
if not is_ajax:
if not is_ajax: # pragma: no cover
return JSONResponse(
status=403, data={
'status': 'false',
@@ -3005,7 +3006,7 @@ def stepadder(request, id=0):
is_save = request.GET.get('save',0)
if request.method != 'POST':
if request.method != 'POST': # pragma: no cover
message = {'status': 'false',
'message': 'this view cannot be accessed through GET'}
return JSONResponse(status=403, data=message)
@@ -3013,14 +3014,16 @@ def stepadder(request, id=0):
try:
json_data = json.loads(request.body)
post_data = json_data
except (KeyError, JSONDecodeError):
except (KeyError, JSONDecodeError): # pragma: no cover
q = request.POST
post_data = {k: q.getlist(k) if len(
q.getlist(k)) > 1 else v for k, v in q.items()}
# only allow local host
hostt = request.get_host().split(':')
if hostt[0] not in ['localhost', '127.0.0.1', 'dev.rowsandall.com', 'rowsandall.com']:
if hostt[0] not in ['localhost', '127.0.0.1', 'dev.rowsandall.com', 'rowsandall.com',
'testserver']: # pragma: no cover
message = {'status': 'false',
'message': 'permission denied for host '+hostt[0]}
return JSONResponse(status=403, data=message)
@@ -3028,7 +3031,7 @@ def stepadder(request, id=0):
if ps.steps:
filename = ps.steps.get('filename','')
sport = ps.steps.get('sport','rowing')
else:
else: # pragma: no cover
filename = ''
sport = 'rowing'
@@ -3045,10 +3048,10 @@ def stepadder(request, id=0):
d = step.asdict()
d['stepId'] = nr
steps['steps'].append(d)
except PlannedSessionStep.DoesNotExist:
except PlannedSessionStep.DoesNotExist: # pragma: no cover
pass
if is_save:
if is_save: # pragma: no cover
# save the darn thing
ps.steps = steps
@@ -3066,9 +3069,12 @@ def stepdelete(request, id=0):
step.delete()
backid = request.GET.get('id')
backid = request.GET.get('id',0)
url = reverse(stepeditor,kwargs={'id':backid})
if backid: # pragma: no cover
url = reverse(stepeditor,kwargs={'id':backid})
else:
url = reverse('plannedsessions_view')
return HttpResponseRedirect(url)
@@ -3079,7 +3085,7 @@ def stepedit(request, id=0, psid=0):
step = get_object_or_404(PlannedSessionStep, pk=id)
try:
ps = PlannedSession.objects.get(id=psid)
except PlannedSession.DoesNotExist:
except PlannedSession.DoesNotExist: # pragma: no cover
ps = None
form = StepEditorForm(instance=step)
@@ -3094,7 +3100,7 @@ def stepedit(request, id=0, psid=0):
ee = ss.copy()
ee.pop('stepId')
if (dd == ee):
if (dd == ee): # pragma: no cover
ss['durationType'] = form.cleaned_data['durationtype']
ss['durationValue'] = form.cleaned_data['durationvalue']
ss['targetType'] = form.cleaned_data['targettype']
@@ -3126,15 +3132,15 @@ def stepedit(request, id=0, psid=0):
step.name = form.cleaned_data['name']
step.description = form.cleaned_data['description']
if step.durationtype == 'Time':
if step.durationtype == 'Time': # pragma: no cover
step.durationvalue *= 60000
elif step.durationtype == 'Distance':
elif step.durationtype == 'Distance': # pragma: no cover
step.durationvalue *= 100
step.save()
if step.durationtype == 'Time':
if step.durationtype == 'Time': # pragma: no cover
form.fields['durationvalue'].initial = step.durationvalue / 60000
elif step.durationtype == 'Distance':
form.fields['durationvalue'].initial = step.durationvalue / 100
@@ -3143,7 +3149,7 @@ def stepedit(request, id=0, psid=0):
stepdescription = step_to_string(step.asdict(), short=False)[0]
if request.method == 'POST':
if 'stepsave_and_return' in request.POST:
if 'stepsave_and_return' in request.POST: # pragma: no cover
url = reverse('stepeditor',kwargs = {'id': ps.id})
return HttpResponseRedirect(url)
@@ -3204,7 +3210,7 @@ def stepeditor(request, id=0):
targetvaluehigh = targetvaluehigh,
intensity = intensity,
)
if not archived_steps.count() and durationvalue != 0:
if not archived_steps.count():
s = PlannedSessionStep(
manager = request.user,
durationtype = durationtype,
@@ -3217,14 +3223,14 @@ def stepeditor(request, id=0):
name = step.get('wkt_step_name','Step')
)
s.save()
else:
else: # pragma: no cover
s = archived_steps[0]
currentsteps.append(s)
form = StepEditorForm()
if request.method == 'POST':
if request.method == 'POST': # pragma: no cover
form = StepEditorForm(request.POST)
if form.is_valid():
step = form.save(commit=False)