from shutil import copyfile from rowers.views.statements import * import rowers.mytypes as mytypes from rowers.integrations import importsources import numpy import rowers.uploads as uploads import rowers.utils as utils from rowers.utils import intervals_to_string from urllib.parse import urlparse, parse_qs from json.decoder import JSONDecodeError import ruptures as rpt from rowers.courses import getnearestcourses from rowers.courses import pass_start from pandas.errors import IntCastingNaNError def default(o): # pragma: no cover if isinstance(o, numpy.int64): return int(o) if isinstance(o, numpy.int32): return int(o) raise TypeError def get_video_id(url): """Returns Video_ID extracting from the given url of Youtube Examples of URLs: Valid: 'http://youtu.be/_lOT2p_FCvA', 'www.youtube.com/watch?v=_lOT2p_FCvA&feature=feedu', 'http://www.youtube.com/embed/_lOT2p_FCvA', 'http://www.youtube.com/v/_lOT2p_FCvA?version=3&hl=en_US', 'https://www.youtube.com/watch?v=rTHlyTphWP0&index=6&list=PLjeDyYvG6-40qawYNR4juzvSOg-ezZ2a6', 'youtube.com/watch?v=_lOT2p_FCvA', Invalid: 'youtu.be/watch?v=_lOT2p_FCvA', """ if url.startswith(('youtu', 'www')): url = 'http://' + url elif 'http' not in url: # pragma: no cover # not sure if this is a valid case at all return url query = urlparse(url) if 'youtube' in query.hostname: if query.path == '/watch': return parse_qs(query.query)['v'][0] elif query.path.startswith(('/embed/', '/v/')): return query.path.split('/')[2] elif 'youtu.be' in query.hostname: return query.path[1:] else: # pragma: no cover raise ValueError # Show a video compared with data def workout_video_view_mini(request, id=''): try: id = encoder.decode_hex(id) analysis = VideoAnalysis.objects.get(id=id) except (VideoAnalysis.DoesNotExist, ValueError): # pragma: no cover raise Http404("Video Analysis does not exist") w = analysis.workout delay = analysis.delay if w.workouttype in mytypes.otwtypes: mode = 'water' else: mode = 'erg' if request.user.is_authenticated: mayedit = is_workout_user( request.user, w) and is_promember(request.user) rower = request.user.rower else: mayedit = False rower = None # get video ID and offset if mayedit and request.method == 'POST': form = VideoAnalysisCreateForm(request.POST) metricsform = VideoAnalysisMetricsForm(request.POST, mode=mode) if form.is_valid() and metricsform.is_valid(): video_id = form.cleaned_data['url'] try: video_id = get_video_id(form.cleaned_data['url']) except (TypeError, ValueError): # pragma: no cover pass delay = form.cleaned_data['delay'] metricsgroups = metricsform.cleaned_data['groups'] if 'save_button' in request.POST: analysis.name = form.cleaned_data['name'] analysis.video_id = video_id analysis.delay = delay analysis.metricsgroups = metricsgroups analysis.save() else: # pragma: no cover # invalid forms video_id = id delay = 0 elif mayedit: form = VideoAnalysisCreateForm( initial={ 'name': analysis.name, 'delay': analysis.delay, 'url': analysis.video_id, } ) metricsform = VideoAnalysisMetricsForm(initial={'groups': analysis.metricsgroups}, mode=mode) metricsgroups = analysis.metricsgroups video_id = analysis.video_id else: form = None metricsform = None metricsgroups = analysis.metricsgroups data, metrics, maxtime = dataprep.get_video_data( w, groups=metricsgroups, mode=mode) hascoordinates = pd.Series(data['latitude']).std() > 0 # create map if hascoordinates and mode == 'water': mapscript, mapdiv = leaflet_chart(data['latitude'], data['longitude']) else: mapscript, mapdiv = interactive_chart_video(data) data['longitude'] = data['spm'] data['latitude'] = list(range(len(data['spm']))) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(w.id)), 'name': w.name }, { 'url': reverse('workout_video_view_mini', kwargs={'id': encoder.encode_hex(analysis.id)}), 'name': 'Video Analysis' } ] return render(request, 'embedded_video_mini.html', { 'workout': w, 'rower': rower, 'data': json.dumps(data, default=default), 'mapscript': mapscript, 'mapdiv': mapdiv, 'video_id': analysis.video_id, 'form': form, 'breadcrumbs': breadcrumbs, 'analysis': analysis, 'maxtime': maxtime, 'metrics': metrics, 'locked': True, 'metricsform': metricsform, 'metricsgroups': metricsgroups, 'siteurl': settings.SITE_URL, }) # Show a video compared with data def workout_video_view(request, id=''): try: id = encoder.decode_hex(id) analysis = VideoAnalysis.objects.get(id=id) except (VideoAnalysis.DoesNotExist, ValueError): # pragma: no cover raise Http404("Video Analysis does not exist") w = analysis.workout delay = analysis.delay if w.workouttype in mytypes.otwtypes: mode = 'water' else: mode = 'erg' if request.user.is_authenticated: mayedit = is_promember( request.user) and is_workout_user(request.user, w) rower = request.user.rower else: mayedit = False rower = None # get video ID and offset if mayedit and request.method == 'POST': form = VideoAnalysisCreateForm(request.POST) metricsform = VideoAnalysisMetricsForm(request.POST, mode=mode) if form.is_valid() and metricsform.is_valid(): video_id = form.cleaned_data['url'] try: video_id = get_video_id(form.cleaned_data['url']) except (TypeError, ValueError): # pragma: no cover pass delay = form.cleaned_data['delay'] metricsgroups = metricsform.cleaned_data['groups'] if 'save_button' in request.POST: analysis.name = form.cleaned_data['name'] analysis.video_id = video_id analysis.delay = delay analysis.metricsgroups = metricsgroups analysis.save() else: # pragma: no cover video_id = id delay = 0 elif mayedit: form = VideoAnalysisCreateForm( initial={ 'name': analysis.name, 'delay': analysis.delay, 'url': analysis.video_id, } ) metricsform = VideoAnalysisMetricsForm(initial={'groups': analysis.metricsgroups}, mode=mode) metricsgroups = analysis.metricsgroups video_id = analysis.video_id else: form = None metricsform = None metricsgroups = analysis.metricsgroups data, metrics, maxtime = dataprep.get_video_data( w, groups=metricsgroups, mode=mode) hascoordinates = pd.Series(data['latitude']).std() > 0 # create map if hascoordinates and mode == 'water': mapscript, mapdiv = leaflet_chart(data['latitude'], data['longitude']) else: mapscript, mapdiv = interactive_chart_video(data) data['longitude'] = data['spm'] data['latitude'] = list(range(len(data['spm']))) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(w.id)), 'name': w.name }, { 'url': reverse('workout_video_view', kwargs={'id': encoder.encode_hex(analysis.id)}), 'name': 'Video Analysis' } ] return render(request, 'embedded_video.html', { 'workout': w, 'rower': rower, 'data': json.dumps(data, default=default), 'mapscript': mapscript, 'mapdiv': mapdiv, 'video_id': analysis.video_id, 'form': form, 'breadcrumbs': breadcrumbs, 'analysis': analysis, 'maxtime': maxtime, 'metrics': metrics, 'locked': True, 'metricsform': metricsform, 'metricsgroups': metricsgroups, 'siteurl': settings.SITE_URL, }) # Create a video compared with data @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans/", message="This functionality requires a Pro plan or higher", redirect_field_name=None) def workout_video_create_view(request, id=0): # get workout w = get_workout_by_opaqueid(request, id) if w.workouttype in mytypes.otwtypes: mode = 'water' else: mode = 'erg' # get video ID and offset if request.method == 'POST': form = VideoAnalysisCreateForm(request.POST) metricsform = VideoAnalysisMetricsForm(request.POST, mode=mode) if form.is_valid() and metricsform.is_valid(): url = form.cleaned_data['url'] delay = form.cleaned_data['delay'] metricsgroups = metricsform.cleaned_data['groups'] try: video_id = get_video_id(url) except ValueError: # pragma: no cover messages.error(request, "Not a valid YouTube video link") video_id = None if 'save_button' in request.POST: analysis = VideoAnalysis( workout=w, name=form.cleaned_data['name'], video_id=video_id, delay=delay, metricsgroups=metricsgroups ) try: analysis.save() url = reverse('workout_video_view', kwargs={'id': encoder.encode_hex(analysis.id)}) return HttpResponseRedirect(url) except IntegrityError: # pragma: no cover messages.error( request, 'You cannot save two video analysis with the same YouTube video and Workout." \ " Redirecting to your existing analysis') analysis = VideoAnalysis.objects.filter( workout=w, video_id=video_id) if analysis: url = reverse('workout_video_view', kwargs={'id': encoder.encode_hex(analysis[0].id)}) else: url = reverse('workout_video_create_view', kwargs={'id': encoder.encode_hex(w.id)}) return HttpResponseRedirect(url) else: # pragma: no cover video_id = None delay = 0 metricsgroups = ['basic'] else: form = VideoAnalysisCreateForm() metricsform = VideoAnalysisMetricsForm( initial={'groups': ['basic']}, mode=mode) video_id = None delay = 0 metricsgroups = ['basic'] # get data data, metrics, maxtime = dataprep.get_video_data( w, groups=metricsgroups, mode=mode) hascoordinates = pd.Series(data['latitude']).std() > 0 # create map if hascoordinates and mode == 'water': mapscript, mapdiv = leaflet_chart(data['latitude'], data['longitude']) else: mapscript, mapdiv = interactive_chart_video(data) data['longitude'] = data['spm'] data['latitude'] = list(range(len(data['spm']))) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(w.id)), 'name': w.name }, { 'url': reverse('workout_video_create_view', kwargs={'id': encoder.encode_hex(w.id)}), 'name': 'Video Analysis' } ] analysis = {'delay': delay} template = 'embedded_video.html' return render(request, template, { 'workout': w, 'rower': request.user.rower, 'data': json.dumps(data, default=default), 'mapscript': mapscript, 'mapdiv': mapdiv, 'video_id': video_id, 'form': form, 'metricsform': metricsform, 'analysis': analysis, 'breadcrumbs': breadcrumbs, 'maxtime': maxtime, 'metrics': metrics, 'metricsgroups': metricsgroups, 'locked': False, 'siteurl': settings.SITE_URL, }) # Show the EMpower Oarlock generated Stroke Profile @user_passes_test(ispromember, login_url="/rowers/paidplans/", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_forcecurve_view(request, id=0, analysis=0, userid=0, workstrokesonly=False): row = get_workoutuser(id, request) mayedit = 0 r = getrequestrower(request, userid=userid) if r == row.user: mayedit = 1 if analysis: # pragma: no cover try: forceanalysis = ForceCurveAnalysis.objects.get(id=analysis) dist_min = forceanalysis.dist_min dist_max = forceanalysis.dist_max spm_min = forceanalysis.spm_min spm_max = forceanalysis.spm_max work_min = forceanalysis.work_min work_max = forceanalysis.work_max notes = forceanalysis.notes name = forceanalysis.name plotcircles = forceanalysis.plotcircles plotlines = forceanalysis.plotlines includereststrokes = forceanalysis.include_rest_strokes except (ForceCurveAnalysis.DoesNotExist, ValueError): pass else: dist_min = 0 dist_max = row.distance spm_min = 15 spm_max = 55 work_min = 0 work_max = 1500 notes = '' includereststrokes = False name = '' plotcircles = True plotlines = False form = ForceCurveOptionsForm(initial={ 'spm_min': spm_min, 'spm_max': spm_max, 'dist_min': dist_min, 'dist_max': dist_max, 'work_min': work_min, 'work_max': work_max, 'notes': notes, 'name': name, 'plotcircles': plotcircles, 'plotlines': plotlines, }) if request.method == 'POST': form = ForceCurveOptionsForm(request.POST) if form.is_valid(): spm_min = form.cleaned_data['spm_min'] spm_max = form.cleaned_data['spm_max'] dist_min = form.cleaned_data['dist_min'] dist_max = form.cleaned_data['dist_max'] work_min = form.cleaned_data['work_min'] work_max = form.cleaned_data['work_max'] notes = form.cleaned_data['notes'] name = form.cleaned_data['name'] plotlines = form.cleaned_data['plotlines'] plotcircles = form.cleaned_data['plotcircles'] if not name: name = row.name includereststrokes = form.cleaned_data['includereststrokes'] workstrokesonly = not includereststrokes if "_save" in request.POST and "new" not in request.POST: # pragma: no cover if not analysis: forceanalysis = ForceCurveAnalysis( workout = row, name = name, date = timezone.now().date(), notes = notes, dist_min = dist_min, dist_max = dist_max, work_min = work_min, work_max = work_max, spm_min = spm_min, spm_max = spm_max, rower=row.user, plotlines = plotlines, plotcircles = plotcircles, include_rest_strokes = includereststrokes, ) else: forceanalysis.workout = row forceanalysis.name = name forceanalysis.date = timezone.now().date() forceanalysis.notes = notes forceanalysis.dist_min = dist_min forceanalysis.dist_max = dist_max forceanalysis.work_min = work_min forceanalysis.work_max = work_max forceanalysis.spm_min = spm_min forceanalysis.spm_max = spm_max forceanalysis.plotcircles = plotcircles forceanalysis.plotlines = plotlines forceanalysis.include_rest_strokes = includereststrokes forceanalysis.save() dosave = True messages.info(request,'Force Curve analysis saved') if "_save_as_new" in request.POST: forceanalysis = ForceCurveAnalysis( workout = row, name = name, date = timezone.now().date(), notes = notes, dist_min = dist_min, dist_max = dist_max, spm_min = spm_min, spm_max = spm_max, work_min = work_min, work_max = work_max, rower=row.user, include_rest_strokes = includereststrokes, ) forceanalysis.save() dosave = True messages.info(request,'Force Curve analysis saved') else: # pragma: no cover workstrokesonly = True script, div = interactive_forcecurve( [row] ) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': row.name }, { 'url': reverse('workout_forcecurve_view', kwargs={'id': id}), 'name': 'Empower Force Curve' } ] r = getrower(request.user) if dist_max == 0: dist_max = row.distance+100 return render(request, 'forcecurve_single.html', { 'the_script': script, 'rower': r, 'form': form, 'workout': row, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'spm_min': spm_min, 'spm_max': spm_max, 'dist_min': dist_min, 'dist_max': dist_max, 'work_min': work_min, 'work_max': work_max, 'annotation': notes, 'the_div': div, 'id': id, 'mayedit': mayedit, 'teams': get_my_teams(request.user), }) # Switch from GPS to Impeller (only for SpeedCoach 2, if impeller data) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def otw_use_impeller(request, id=0): w = get_workoutuser(id, request) row = rdata(csvfile=w.csvfilename) success = row.use_impellerdata() if success: row.write_csv(w.csvfilename) dataprep.update_strokedata(w.id, row.df) w.impeller = True w.save() messages.info( request, 'The distance and speed data are now based on Impeller data') else: messages.error(request, 'No impeller data found') url = reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(w.id)}) return HttpResponseRedirect(url) # Switch from Impeller to GPS (only for SpeedCoach 2, if impeller data) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def otw_use_gps(request, id=0): w = get_workoutuser(id, request) row = rdata(csvfile=w.csvfilename) success = row.use_gpsdata() if success: row.write_csv(w.csvfilename) dataprep.update_strokedata(w.id, row.df) w.impeller = False w.save() messages.info( request, 'The distance and speed data are now based on GPS data') else: messages.error(request, 'No GPS data found') url = reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(w.id)}) return HttpResponseRedirect(url) # add a workout manually @login_required() def addmanual_view(request, raceid=0): r = Rower.objects.get(user=request.user) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': reverse('addmanual_view'), 'name': 'Add Manual Entry' }, ] if request.method == 'POST': # Form was submitted form = WorkoutForm(request.POST) metricsform = MetricsForm(request.POST) if form.is_valid() and metricsform.is_valid(): # Get values from form name = form.cleaned_data['name'] if name == '': # pragma: no cover name = 'Manual Entry' date = form.cleaned_data['date'] starttime = form.cleaned_data['starttime'] workouttype = form.cleaned_data['workouttype'] duration = form.cleaned_data['duration'] weightcategory = form.cleaned_data['weightcategory'] adaptiveclass = form.cleaned_data['adaptiveclass'] distance = form.cleaned_data['distance'] try: rpe = form.cleaned_data['rpe'] if not rpe: rpe = -1 except KeyError: # pragma: no cover rpe = -1 notes = form.cleaned_data['notes'] thetimezone = form.cleaned_data['timezone'] private = form.cleaned_data['private'] avghr = metricsform.cleaned_data['avghr'] avgpwr = metricsform.cleaned_data['avgpwr'] avgspm = metricsform.cleaned_data['avgspm'] boattype = form.cleaned_data.get('boattype', '1x') privacy = form.cleaned_data.get('privacy', 'visible') rankingpiece = form.cleaned_data.get('rankingpiece', False) duplicate = form.cleaned_data.get('duplicate', False) seatnumber = form.cleaned_data.get('seatnumber', 1) boatname = form.cleaned_data.get('boatname', '') empowerside = form.cleaned_data.get('empowerside','port') if private: # pragma: no cover privacy = 'hidden' else: privacy = 'visible' startdatetime = (str(date) + ' ' + str(starttime)) startdatetime = datetime.datetime.strptime(startdatetime, "%Y-%m-%d %H:%M:%S") startdatetime = timezone.make_aware(startdatetime) startdatetime = startdatetime.astimezone( pytz.timezone(thetimezone) ) id, message = dataprep.create_row_df(r, distance, duration, startdatetime, rpe=rpe, weightcategory=weightcategory, adaptiveclass=adaptiveclass, avghr=avghr, rankingpiece=rankingpiece, avgpwr=avgpwr, duplicate=duplicate, avgspm=avgspm, title=name, notes=notes, workouttype=workouttype) if message: # pragma: no cover messages.error(request, message) if id: w = Workout.objects.get(id=id) w.rankingpiece = rankingpiece w.privacy = privacy w.weightcategory = weightcategory w.adaptiveclass = adaptiveclass w.notes = notes w.name = name w.rpe = rpe w.workouttype = workouttype w.boattype = boattype w.boatname = boatname w.empowerside = empowerside w.seatnumber = seatnumber w.distance = distance w.duration = duration w.save() # if ps: # add_workouts_plannedsession([w],ps,w.user) messages.info(request, 'New workout created') iform = ImageForm(request.POST, request.FILES) if iform.is_valid(): # this works but cannot get the tests to work f = iform.cleaned_data['file'] if f is not None: # pragma: no cover filename, path_and_filename = handle_uploaded_image(f) try: width, height = Image.open(path_and_filename).size except: message = "Not a valid image" messages.error(request, message) os.remove(path_and_filename) i = GraphImage(workout=w, creationdatetime=timezone.now(), filename=path_and_filename, width=width, height=height) i.save() if raceid != 0: try: race = VirtualRace.objects.get(id=raceid) except VirtualRace.DoesNotExist: # pragma: no cover messages.error(request, "Race does not exist") url = reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(id)}) return HttpResponseRedirect(url) can_submit = race_can_submit( r, race) or race_can_resubmit(r, race) can_submit = can_submit and race.sessiontype == 'indoorrace' if not can_submit: # pragma: no cover messages.error( request, 'You cannot submit a result for this race') if can_submit: records = IndoorVirtualRaceResult.objects.filter( race=race, userid=r.id) if not records: # pragma: no cover messages.error( request, 'You have to register for the race first') url = reverse('virtualevent_view', kwargs={'id': race.id}) return HttpResponseRedirect(url) recordid = records[0].id result, comments, errors, jobid = add_workout_indoorrace( [w], race, r, recordid=recordid ) for c in comments: # pragma: no cover messages.info(request, c) for er in errors: # pragma: no cover messages.error(request, er) if result: otherrecords = IndoorVirtualRaceResult.objects.filter( race=race ).exclude(userid=r.id) for otherrecord in otherrecords: try: # pragma: no cover otheruser = Rower.objects.get( id=otherrecord.userid) othername = otheruser.user.first_name+' '+otheruser.user.last_name registeredname = r.user.first_name+' '+r.user.last_name if otherrecord.emailnotifications: _ = myqueue( queue, handle_sendemail_racesubmission, otheruser.user.email, othername, registeredname, race.name, race.id ) except Rower.DoesNotExist: # pragma: no cover pass url = reverse('virtualevent_view', kwargs={'id': race.id}) return HttpResponseRedirect(url) url = reverse( 'workout_edit_view', kwargs={'id': encoder.encode_hex(id)} ) return HttpResponseRedirect(url) else: # pragma: no cover iform = ImageForm() return render(request, 'manualadd.html', {'form': form, 'iform': iform, 'metricsform': metricsform, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', }) initial = { 'workouttype': 'rower', 'date': timezone.now(), 'starttime': timezone.now(), 'timezone': r.defaulttimezone, 'duration': datetime.timedelta(minutes=2), 'distance': 500, } form = WorkoutForm(initial=initial) iform = ImageForm() metricsform = MetricsForm() return render(request, 'manualadd.html', {'form': form, 'iform': iform, 'metricsform': metricsform, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', }) # Reload the workout and calculate the summary from the stroke data (lapIDx) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_recalcsummary_view(request, id=0): row = get_workoutuser(id, request) filename = row.csvfilename rowdata = rdata(csvfile=filename) if rowdata: row.summary = rowdata.allstats() row.save() successmessage = "Summary Updated" messages.info(request, successmessage) url = reverse('workout_edit_view', kwargs={ 'id': id, }) else: message = "Something went wrong. Could not update summary" messages.error(request, message) url = reverse('workout_edit_view', kwargs={ 'id': id, }) return HttpResponseRedirect(url) # Selecting duplicates @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workouts_duplicates_select_view(request, userid=0): r = getrequestrower(request, userid=userid) if request.method == 'POST': # pragma: no cover form = WorkoutMultipleCompareForm(request.POST) if form.is_valid(): workouts = form.cleaned_data['workouts'] for w in workouts: try: w.delete() except OperationalError: pass workouts = Workout.objects.filter(user=r,duplicate=True).order_by("-startdatetime")[:19] form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = workouts breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, {'url': reverse('workouts_duplicates_select_view'), 'name': 'Select Duplicates' } ] return render(request,'workout_duplicates_select.html', { 'workouts':workouts, 'rower':r, 'form':form, 'breadcrumbs': breadcrumbs, }) # Joining workout @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) @permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True) def workouts_join_view(request, userid=0): r = getrequestrower(request, userid=userid) if request.method == 'POST' and 'workouts' in request.POST: form = WorkoutMultipleCompareForm(request.POST) paramform = WorkoutJoinParamForm(request.POST) if form.is_valid() and paramform.is_valid(): workout_name = paramform.cleaned_data['workout_name'] set_private = paramform.cleaned_data['set_private'] killparents = paramform.cleaned_data['killparents'] cd = form.cleaned_data workouts = cd['workouts'] ids = [int(w.id) for w in workouts] request.session['ids'] = ids id, message = dataprep.join_workouts(r, ids, title=workout_name, setprivate=set_private, killparents=killparents) if message: # pragma: no cover messages.error(request, message) url = reverse(r.defaultlandingpage, kwargs={ 'id': encoder.encode_hex(id), }) return HttpResponseRedirect(url) else: # pragma: no cover messages.error("Form is not valid") url = reverse('workouts_join_select') return HttpResponseRedirect(url) defaultoptions = { 'includereststrokes': False, 'workouttypes': ['rower', 'dynamic', 'slides'], 'waterboattype': mytypes.waterboattype, 'function': 'boxplot' } @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher", redirect_field_name=None) @permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True) def video_selectworkout(request, userid=0): r = getrequestrower(request, userid=userid) user = r.user userid = user.id workouts = Workout.objects.filter(user=r).order_by('-date') if 'options' in request.session: options = request.session['options'] else: options = defaultoptions options['userid'] = userid try: # pragma: no cover modalities = options['modalities'] except KeyError: # pragma: no cover modalities = [m[0] for m in mytypes.workouttypes_ordered.items()] query = request.GET.get('q') if query: # pragma: no cover query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)) ) searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() if 'startdate' in request.session: startdate = iso8601.parse_date(request.session['startdate']) else: startdate = timezone.now()-datetime.timedelta(days=42) if 'enddate' in request.session: enddate = iso8601.parse_date(request.session['enddate']) else: enddate = timezone.now() workouts = workouts.filter(date__gte=startdate, date__lte=enddate) waterboattype = mytypes.waterboattype if request.method == 'POST': thediv = get_call() dateform = DateRangeForm(request.POST) if dateform.is_valid(): startdate = dateform.cleaned_data['startdate'] enddate = dateform.cleaned_data['enddate'] startdatestring = startdate.strftime('%Y-%m-%d') enddatestring = enddate.strftime('%Y-%m-%d') request.session['startdate'] = startdatestring request.session['enddate'] = enddatestring form = WorkoutSingleSelectForm(request.POST) if form.is_valid(): cd = form.cleaned_data selectedworkout = cd['workout'] url = reverse('workout_video_create_view', kwargs={'id': encoder.encode_hex(selectedworkout.id)}) return HttpResponseRedirect(url) else: form = WorkoutSingleSelectForm(workouts=workouts) thediv = '' dateform = DateRangeForm(initial={ 'startdate': startdate, 'enddate': enddate, }) negtypes = [] for b in mytypes.boattypes: if b[0] not in waterboattype: # pragma: no cover negtypes.append(b[0]) startdate = datetime.datetime.combine(startdate, datetime.time()) enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59)) # make sure the dates are not naive startdate = pytz.utc.localize(startdate) enddate = pytz.utc.localize(enddate) if enddate < startdate: # pragma: no cover s = enddate enddate = startdate startdate = s negtypes = [] for b in mytypes.boattypes: if b[0] not in waterboattype: # pragma: no cover negtypes.append(b[0]) workouts = Workout.objects.filter(user=r, startdatetime__gte=startdate, startdatetime__lte=enddate, workouttype__in=modalities, ) workouts = workouts.order_by( "-date", "-starttime" ).exclude(boattype__in=negtypes) startdatestring = startdate.strftime('%Y-%m-%d') enddatestring = enddate.strftime('%Y-%m-%d') request.session['startdate'] = startdatestring request.session['enddate'] = enddatestring request.session['options'] = options breadcrumbs = [ { 'url': '/rowers/analysis', 'name': 'Analysis' }, { 'url': reverse('analysis_new', kwargs={'userid': userid}), 'name': 'Analysis Select' }, ] return render(request, 'video_selectworkout.html', {'workouts': workouts, 'dateform': dateform, 'startdate': startdate, 'enddate': enddate, 'rower': r, 'breadcrumbs': breadcrumbs, 'theuser': user, 'the_div': thediv, 'form': form, 'active': 'nav-analysis', 'searchform': searchform, 'teams': get_my_teams(request.user), }) @permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workouts_join_select(request, startdatestring="", enddatestring="", message='', successmessage='', userid=0, startdate=timezone.now()-datetime.timedelta(days=30), enddate=timezone.now()+datetime.timedelta(days=1), ): r = getrequestrower(request, userid=userid) waterboattype = mytypes.waterboattype+mytypes.ergtype modalities = [m[0] for m in mytypes.workouttypes] modality = 'all' if request.method == 'POST': dateform = DateRangeForm(request.POST) modalityform = TrendFlexModalForm(request.POST) if dateform.is_valid(): startdate = dateform.cleaned_data['startdate'] enddate = dateform.cleaned_data['enddate'] startdatestring = startdate.strftime('%Y-%m-%d') enddatestring = enddate.strftime('%Y-%m-%d') if modalityform.is_valid(): modality = modalityform.cleaned_data['modality'] waterboattype = modalityform.cleaned_data['waterboattype'] if modality == 'all': # pragma: no cover modalities = [m[0] for m in mytypes.workouttypes] else: modalities = [modality] else: dateform = DateRangeForm(initial={ 'startdate': startdate, 'enddate': enddate, }) negtypes = [] for b in mytypes.boattypes: if b[0] not in waterboattype: negtypes.append(b[0]) for b in mytypes.ergtypes: if b[0] not in waterboattype: # pragma: no cover negtypes.append(b[0]) startdate = datetime.datetime.combine(startdate, datetime.time()) enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59)) if startdatestring: startdate = iso8601.parse_date(startdatestring) if enddatestring: enddate = iso8601.parse_date(enddatestring) if enddate < startdate: # pragma: no cover s = enddate enddate = startdate startdate = s # make sure the dates are not naive try: startdate = pytz.utc.localize(startdate) except ValueError: pass try: enddate = pytz.utc.localize(enddate) except ValueError: pass workouts = Workout.objects.filter( user=r, startdatetime__gte=startdate, startdatetime__lte=enddate, workouttype__in=modalities).order_by("-date", "-starttime").exclude(boattype__in=negtypes) query = request.GET.get('q') if query: # pragma: no cover query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)) ) searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = workouts joinparamform = WorkoutJoinParamForm() modalityform = TrendFlexModalForm(initial={ 'modality': modality, 'waterboattype': waterboattype }) messages.info(request, successmessage) messages.error(request, message) return render(request, 'workout_join_select.html', {'workouts': workouts, 'dateform': dateform, 'searchform': searchform, 'startdate': startdate, 'enddate': enddate, 'active': 'nav-workouts', 'form': form, 'joinparamform': joinparamform, 'modalityform': modalityform, 'teams': get_my_teams(request.user), }) @login_required() def remove_power_confirm_view(request, id=0): r = getrower(request.user) workout = get_workout_by_opaqueid(request, id) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(workout.id)), 'name': encoder.encode_hex(workout.id) }, {'url': reverse('remove_power_confirm_view', kwargs={'id': encoder.encode_hex(workout.id)}), 'name': 'Delete' } ] return render(request, 'workout_remove_power_confirm.html', { 'workout': workout, 'rower': r, 'breadcrumbs': breadcrumbs, }) @login_required() def remove_power_view(request, id=0): r = getrower(request.user) workout = get_workout_by_opaqueid(request, id) try: os.remove('media/cpdata_{id}.parquet.gz'.format(id=workout.id)) except OSError: # pragma: no cover pass f = workout.csvfilename powerperc = 100*np.array([r.pw_ut2, r.pw_ut1, r.pw_at, r.pw_tr, r.pw_an])/r.ftp rr = rrower(hrmax=r.max, hrut2=r.ut2, hrut1=r.ut1, hrat=r.at, hrtr=r.tr, hran=r.an, ftp=r.ftp, powerperc=powerperc, powerzones=r.powerzones, hrzones=r.hrzones) row = rdata(csvfile=f, rower=rr) row.df[' Power (watts)'] = 0 row.write_csv(f) _ = dataprep.dataplep(row.df, id=workout.id) cpdf, delta, cpvalues = dataprep.setcp(workout) workout.normp = 0 workout.rscore = 0 workout.save() dataprep.initiate_cp(r) url = reverse(r.defaultlandingpage, kwargs={ 'id': id, } ) return HttpResponseRedirect(url) # Team comparison @user_passes_test(ispromember, login_url='/rowers/paidplans/', message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def course_mapcompare_view(request, id=0): results = [] r = None if not request.user.is_anonymous: r = getrower(request.user) try: course = GeoCourse.objects.get(id=id) except GeoCourse.DoesNotExist: # pragma: no cover raise Http404("Course does not exist") startdate = request.GET.get('startdate', None) enddate = request.GET.get('enddate', None) if startdate: startdate = iso8601.parse_date(startdate) else: startdate = timezone.now()-datetime.timedelta(days=30) if enddate: enddate = iso8601.parse_date(enddate) else: enddate = timezone.now()+datetime.timedelta(days=1) focususer = encoder.decode_hex(request.GET.get('user',None)) if focususer: try: focususer = Rower.objects.get(id=focususer).id except Rower.DoesNotExist: focususer = None focususers = request.GET.getlist('users',[]) focususers = [encoder.decode_hex(f) for f in focususers] results = VirtualRaceResult.objects.filter( course=course, workoutid__isnull=False, coursecompleted=True, ).order_by("distance", "duration") competitor_ids = [] for result in results: if result.userid not in competitor_ids: competitor_ids.append(result.userid) competitors = Rower.objects.filter(id__in=competitor_ids) if focususer: results2 = results.filter(userid=focususer) else: results2 = VirtualRaceResult.objects.none() if focususers: results3 = results.filter(userid__in=focususers) else: results3 = VirtualRaceResult.objects.none() if results2 or results3: results = results2 | results3 selected_ids = [] for result in results: if result.userid not in selected_ids: selected_ids.append(result.userid) selected_users = Rower.objects.filter(id__in=selected_ids) #workoutids = [result.workoutid for result in results] workoutids = [] for result in results: try: w = Workout.objects.get(id=result.workoutid) if w.startdatetime >= startdate and w.startdatetime <= enddate: workoutids.append(result.workoutid) except Workout.DoesNotExist: # pragma: no cover pass startenddict = {} for result in results: startenddict[result.workoutid] = (result.startsecond, result.endsecond) if len(workoutids) == 0: # pragma: no cover messages.info(request, 'There are no results to display. Perhaps you selected a date range that is not covered by the workouts.') workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get(id=id)) except Workout.DoesNotExist: # pragma: no cover pass labeldict = { int(w.id): w.__str__() for w in workouts } script, div = leaflet_chart_compare(course, workoutids, labeldict=labeldict, startenddict=startenddict) dateform = DateRangeForm(initial={ 'startdate':startdate, 'enddate':enddate, }) breadcrumbs = [ { 'url': reverse('courses_view'), 'name': 'Courses' }, { 'url': reverse('course_view', kwargs={ 'id': course.id, } ), 'name': course.name }, { 'url': reverse('course_mapcompare_view', kwargs={ 'id': course.id, } ), 'name': 'Course Compare' } ] return render(request, 'mapcompare.html', {'mapscript': script, 'mapdiv': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'course': course, 'results': results, 'active': 'nav-racing', 'teamid': 0, 'teams': [], 'competitors': competitors, 'selected_users': selected_users, 'dateform': dateform, }) def virtualevent_mapcompare_view(request, id=0): results = [] r = None if not request.user.is_anonymous: r = getrower(request.user) try: race = VirtualRace.objects.get(id=id) except VirtualRace.DoesNotExist: # pragma: no cover raise Http404("Virtual Challenge does not exist") if race.sessiontype != 'race': # pragma: no cover url = reverse(virtualevent_view, kwargs={'id': id}) messages.error(request, "This challenge doesn't have map data") return HttpResponseRedirect(request) results = VirtualRaceResult.objects.filter( race=race, workoutid__isnull=False, ).order_by("distance", "duration") workoutids = [result.workoutid for result in results] startenddict = {} if race.sessiontype == 'race': for result in results: startenddict[result.workoutid] = ( result.startsecond, result.endsecond) if len(workoutids) == 0: # pragma: no cover url = reverse('virtualevent_view', kwargs={ 'id': race.id, }) messages.info(request, 'There are no results to display') return HttpResponseRedirect(url) workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get(id=id)) except Workout.DoesNotExist: # pragma: no cover pass labeldict = { int(w.id): w.__str__() for w in workouts } script, div = leaflet_chart_compare(race.course, workoutids, labeldict=labeldict, startenddict=startenddict) breadcrumbs = [ { 'url': reverse('virtualevents_view'), 'name': 'Challenges' }, { 'url': reverse('virtualevent_view', kwargs={ 'id': race.id, } ), 'name': race.name }, { 'url': reverse('virtualevent_mapcompare_view', kwargs={ 'id': race.id, } ), 'name': 'Course Compare' } ] return render(request, 'mapcompare.html', {'mapscript': script, 'mapdiv': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'race': race, 'results': results, 'active': 'nav-racing', 'teamid': 0, 'teams': [] }) def course_compare_view(request, id=0): if request.method not in ['GET', 'POST']: # pragma: no cover raise Http404("Invalid request") results = [] xparam = 'distance' yparam = 'pace' plottype = 'line' promember = 0 if not request.user.is_anonymous: r = getrower(request.user) result = request.user.is_authenticated and ispromember(request.user) if result: promember = 1 else: # pragma: no cover r = None try: course = GeoCourse.objects.get(id=id) except GeoCourse.DoesNotExist: # pragma: no cover raise Http404("Course does not exist") script, div = course_map(course) resultobj = VirtualRaceResult results = resultobj.objects.filter( course=course, workoutid__isnull=False, coursecompleted=True, ).order_by("duration", "-distance") workoutids = [result.workoutid for result in results] startenddict = {} for result in results: startenddict[result.workoutid] = (result.startsecond, result.endsecond) if len(workoutids) == 0: # pragma: no cover url = reverse('course_view', kwargs={ 'id': course.id, }) messages.info(request, 'There are no results to display') return HttpResponseRedirect(url) if request.method == 'GET': xparam = 'distance' yparam = 'pace' plottype = 'line' request.session['ids'] = workoutids request.session['plottype'] = plottype request.session['xparam'] = xparam request.session['yparam'] = yparam workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get(id=id)) except Workout.DoesNotExist: # pragma: no cover pass form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) form.fields["workouts"].initial = Workout.objects.filter( id__in=workoutids) labeldict = { int(w.id): w.__str__() for w in workouts } chartform = ChartParamChoiceForm( initial={ 'xparam': xparam, 'yparam': yparam, 'plottype': plottype, 'teamid': 0 } ) if request.method == 'POST' and 'workouts' in request.POST: # pragma: no cover form = WorkoutMultipleCompareForm(request.POST) form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) chartform = ChartParamChoiceForm(request.POST) if form.is_valid() and chartform.is_valid(): cd = form.cleaned_data workouts = cd['workouts'] workoutids = [w.id for w in workouts] xparam = chartform.cleaned_data['xparam'] yparam = chartform.cleaned_data['yparam'] plottype = chartform.cleaned_data['plottype'] ids = [int(w.id) for w in workouts] request.session['ids'] = ids elif request.method == 'POST': # pragma: no cover form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) request.session['ids'] = workoutids chartform = ChartParamChoiceForm(request.POST) if chartform.is_valid(): xparam = chartform.cleaned_data['xparam'] yparam = chartform.cleaned_data['yparam'] plottype = chartform.cleaned_data['plottype'] try: workoutids = request.session['ids'] except KeyError: # pragma: no cover pass workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get( id=id)) except Workout.DoesNotExist: # pragma: no cover pass try: labeldict = { int(w.id): w.__str__() for w in workouts } except: # pragma: no cover labeldict = {} script, div = interactive_multiple_compare_chart(workoutids, xparam, yparam, promember=promember, plottype=plottype, labeldict=labeldict, startenddict=startenddict) breadcrumbs = [ { 'url': reverse('courses_view'), 'name': 'Courses' }, { 'url': reverse('course_view', kwargs={ 'id': course.id, } ), 'name': course.name }, { 'url': reverse('virtualevent_compare_view', kwargs={ 'id': course.id, } ), 'name': 'Compare' } ] return render(request, 'multicompare.html', {'interactiveplot': script, 'the_div': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'course': course, 'results': results, 'active': 'nav-racing', 'promember': promember, 'teamid': 0, 'chartform': chartform, 'form': form, 'teams': [] }) def virtualevent_compare_view(request, id=0): results = [] promember = 0 if not request.user.is_anonymous: r = getrower(request.user) result = request.user.is_authenticated and ispromember(request.user) if result: promember = 1 else: # pragma: no cover r = None try: race = VirtualRace.objects.get(id=id) except VirtualRace.DoesNotExist: # pragma: no cover raise Http404("Virtual Challenge does not exist") if race.sessiontype == 'race': script, div = course_map(race.course) resultobj = VirtualRaceResult else: # pragma: no cover script = '' div = '' resultobj = IndoorVirtualRaceResult results = resultobj.objects.filter( race=race, workoutid__isnull=False, coursecompleted=True, ).order_by("duration", "-distance") workoutids = [result.workoutid for result in results] startenddict = {} if race.sessiontype == 'race': for result in results: startenddict[result.workoutid] = ( result.startsecond, result.endsecond) if len(workoutids) == 0: # pragma: no cover url = reverse('virtualevent_view', kwargs={ 'id': race.id, }) messages.info(request, 'There are no results to display') return HttpResponseRedirect(url) xparam = 'time' yparam = 'pace' if request.method in ('GET','HEAD'): if race.sessionmode == 'distance': xparam = 'cumdist' yparam = 'pace' plottype = 'line' request.session['ids'] = workoutids request.session['plottype'] = plottype request.session['xparam'] = xparam request.session['yparam'] = yparam workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get(id=id)) except Workout.DoesNotExist: # pragma: no cover pass form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) form.fields["workouts"].initial = Workout.objects.filter( id__in=workoutids) labeldict = { int(w.id): w.__str__() for w in workouts } chartform = ChartParamChoiceForm( initial={ 'xparam': xparam, 'yparam': yparam, 'plottype': plottype, 'teamid': 0 } ) if request.method == 'POST' and 'workouts' in request.POST: # pragma: no cover form = WorkoutMultipleCompareForm(request.POST) form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) chartform = ChartParamChoiceForm(request.POST) if form.is_valid() and chartform.is_valid(): cd = form.cleaned_data workouts = cd['workouts'] workoutids = [w.id for w in workouts] xparam = chartform.cleaned_data['xparam'] yparam = chartform.cleaned_data['yparam'] plottype = chartform.cleaned_data['plottype'] ids = [int(w.id) for w in workouts] request.session['ids'] = ids elif request.method == 'POST': form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = Workout.objects.filter( id__in=workoutids) request.session['ids'] = workoutids chartform = ChartParamChoiceForm(request.POST) if chartform.is_valid(): xparam = chartform.cleaned_data['xparam'] yparam = chartform.cleaned_data['yparam'] plottype = chartform.cleaned_data['plottype'] try: workoutids = request.session['ids'] except KeyError: # pragma: no cover pass workouts = [] for id in workoutids: try: workouts.append(Workout.objects.get( id=id)) except Workout.DoesNotExist: # pragma: no cover pass try: labeldict = { int(w.id): w.__str__() for w in workouts } except: # pragma: no cover labeldict = {} script, div = interactive_multiple_compare_chart(workoutids, xparam, yparam, promember=promember, plottype=plottype, labeldict=labeldict, startenddict=startenddict) breadcrumbs = [ { 'url': reverse('virtualevents_view'), 'name': 'Challenges' }, { 'url': reverse('virtualevent_view', kwargs={ 'id': race.id, } ), 'name': race.name }, { 'url': reverse('virtualevent_compare_view', kwargs={ 'id': race.id, } ), 'name': 'Compare' } ] return render(request, 'multicompare.html', {'interactiveplot': script, 'the_div': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'race': race, 'results': results, 'active': 'nav-racing', 'promember': promember, 'teamid': 0, 'chartform': chartform, 'form': form, 'teams': [] }) @permission_required('plannedsession.view_session', fn=get_session_by_pk, raise_exception=True) def plannedsession_compare_view(request, id=0, userid=0): try: ps = PlannedSession.objects.get(id=id) except PlannedSession.DoesNotExist: # pragma: no cover raise Http404("Planned session does not exist") workouts = Workout.objects.filter(plannedsession=ps) ids = [int(w.id) for w in workouts] xparam = 'time' yparam = 'hr' plottype = 'line' request.session['ids'] = ids request.session['xparam'] = xparam request.session['yparam'] = yparam request.session['plottype'] = plottype request.session['ps'] = ps.id if ids: url = reverse('analysis_new', kwargs={ 'session': ps.id, 'id': encoder.encode_hex(ids[0]), 'function': 'compare'}) else: url = reverse('plannedsession_view', kwargs={'id': ps.id}) return HttpResponseRedirect(url) # set RPE for list of workouts @login_required() def workouts_setrpe_view(request,userid=0): today = timezone.now() startdate = today-timezone.timedelta(days=32) enddate = today+timezone.timedelta(days=1) r = getrequestrower(request,userid=userid) startdate = datetime.datetime.combine(startdate, datetime.time()) enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59)) startdate = pytz.utc.localize(startdate) enddate = pytz.utc.localize(enddate) workouts = Workout.objects.filter( user=r,rpe=0, duplicate=False, startdatetime__gte=startdate, startdatetime__lte=enddate ).order_by("-date", "-starttime")[0:20] dateform = DateRangeForm(initial={ 'startdate': startdate, 'enddate': enddate, }) WorkoutsRPEFormSet = modelformset_factory( Workout, form=WorkoutRPEForm, ) if request.method == 'POST': # pragma: no cover dateform = DateRangeForm(request.POST) rpe_formset = WorkoutsRPEFormSet(request.POST, queryset=Workout.objects.filter(user=r,rpe=0,duplicate=False, startdatetime__gte=startdate, startdatetime__lte=enddate ) ) if dateform.is_valid(): # pragma: no cover startdate = dateform.cleaned_data['startdate'] enddate = dateform.cleaned_data['enddate'] if rpe_formset.is_valid(): for item in rpe_formset.cleaned_data: rpe = item['rpe'] workout = item['id'] if workout: workout.rpe = rpe workout.save() workouts = Workout.objects.filter( user=r,rpe=0, duplicate=False, startdatetime__gte=startdate, startdatetime__lte=enddate ).order_by("-date", "-starttime")[0:20] rpe_formset = WorkoutsRPEFormSet(queryset=workouts) usertimezone = pytz.timezone(r.defaulttimezone) startdate = datetime.datetime.combine( startdate, datetime.time()).astimezone(usertimezone) enddate = datetime.datetime.combine( enddate, datetime.time(23, 59, 59)).astimezone(usertimezone) if enddate < startdate: # pragma: no cover s = enddate enddate = startdate startdate = s today = timezone.now() announcements = SiteAnnouncement.objects.filter( expires__gte=today ).order_by( "-created", "-id" ) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': '/rowers/workouts/setrpe/', 'name': 'Set RPE' } ] return render(request,'workouts_rpe.html', { 'workouts': workouts, 'active': 'nav-workouts', 'rower': r, 'breadcrumbs': breadcrumbs, 'dateform': dateform, 'startdate': startdate, 'enddate': enddate, 'rpe_formset': rpe_formset, }) # Workout bulk actions @login_required() def workouts_bulk_actions(request): r = getrower(request.user) action = request.session.get('action','export') workoutids = request.session.get('ids',[]) workouts = [] exportchoice = 'strava' # exportchoice = ExportChoices() actionform = WorkoutBulkActions() actionform.fields["action"].initial = action assignchoices = AssignChoices() try: for encid in workoutids: w = get_workout_by_opaqueid(request, encid) if w.user == r: workouts.append(w) else: # pragma: no cover messages.error(request,'Bulk actions are not accessible to coaches') except KeyError: # pragma: no cover pass if request.method == 'POST': actionform = WorkoutBulkActions(request.POST) form = WorkoutMultipleCompareForm(request.POST) if form.is_valid() and actionform.is_valid(): workouts = form.cleaned_data['workouts'] if len(workouts) == 0: # pragma: no cover url = reverse('workouts_view') return HttpResponseRedirect(url) action = actionform.cleaned_data['action'] if action == 'remove': for w in workouts: messages.info(request,'Removed workout '+str(encoder.encode_hex(w.id))) w.delete() elif action == 'set private': for w in workouts: w.privacy = 'private' w.save() elif action == 'set commute': for w in workouts: w.is_commute = True w.sub_type = "Commute" w.save() elif action == 'unset commute': for w in workouts: w.is_commute = False w.sub_type = None w.save() elif action == 'set public': for w in workouts: w.privacy = 'public' w.save() elif action == 'export': exportchoice = ExportChoices(request.POST) if exportchoice.is_valid(): destination = exportchoice.cleaned_data['destination'] for w in workouts: integration = importsources[destination](r.user) try: id = integration.workout_export(w) messages.info(request, 'Workout {id} exported to {destination}'.format( id=encoder.encode_hex(w.id), destination=destination)) except NoTokenError: # pragma: no cover messages.error(request, 'Export to {destination} of workout {id} failed'.format( id=encoder.encode_hex(w.id), destination=destination)) elif action == 'rower assign': assignchoices = AssignChoices(request.POST) if assignchoices.is_valid(): remove_workout = assignchoices.cleaned_data['remove_workout'] rowers = assignchoices.cleaned_data['rowers'] _ = myqueue(queuehigh, handle_assignworkouts, workouts, rowers, remove_workout) messages.info(request, "Your action will be performed in the background. It may take a few minutes to complete") url = reverse('workouts_view') return HttpResponseRedirect(url) else: # pragma: no cover if len(workouts) == 0: url = reverse(workouts_view) return HttpResponseRedirect(url) else: exportchoice = ExportChoices() actionform = WorkoutBulkActions() actionform.fields["action"].initial = action assignchoices = AssignChoices() teams = Team.objects.filter(manager=request.user) assignchoices.fields["rowers"].queryset = Rower.objects.filter( team__in=teams ).distinct().order_by("user__last_name", "user__first_name") assignchoices.fields["rowers"].initial = [] form = WorkoutMultipleCompareForm() form.fields["workouts"].queryset = Workout.objects.filter(id__in=[w.id for w in workouts]) form.fields["workouts"].initial = workouts return render(request,'workout_bulk_actions.html', {'action':action, 'exportchoice':exportchoice, 'actionform':actionform, 'assignchoices': assignchoices, 'form':form, 'workouts':workouts}) # List Workouts @login_required() def workouts_view(request, message='', successmessage='', teamid=0, rowerid=0, userid=0): startdate, enddate = get_dates_timeperiod( request, defaulttimeperiod='lastyear') request.session['referer'] = absolute(request)['PATH'] r = getrequestrower(request, rowerid=rowerid, userid=userid) show_commutes = request.GET.get('show_commutes', False) if show_commutes == 'true': show_commutes = True show_wups_cds = request.GET.get('show_wups_cds', False) # check if access is allowed startdate = datetime.datetime.combine(startdate, datetime.time()) enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59)) query = None if request.method == 'POST': if 'selectworkouts' in request.POST: request.session['action']=request.POST['action'] request.session['ids'] = request.POST.getlist('workoutid') url = reverse('workouts_bulk_actions') return HttpResponseRedirect(url) dateform = DateRangeForm(request.POST) searchform = SearchForm(request.POST) if dateform.is_valid(): # pragma: no cover startdate = dateform.cleaned_data['startdate'] enddate = dateform.cleaned_data['enddate'] if searchform.is_valid(): query = searchform.cleaned_data['q'] else: dateform = DateRangeForm(initial={ 'startdate': startdate, 'enddate': enddate, }) usertimezone = pytz.timezone(r.defaulttimezone) startdate = datetime.datetime.combine( startdate, datetime.time()).astimezone(usertimezone) enddate = datetime.datetime.combine( enddate, datetime.time(23, 59, 59)).astimezone(usertimezone) if enddate < startdate: # pragma: no cover s = enddate enddate = startdate startdate = s # start date for the small graph activity_startdate = enddate-datetime.timedelta(days=15) try: if enddate > timezone.now(): activity_enddate = timezone.now() activity_enddate = activity_enddate.replace( hour=23, minute=59, second=59).astimezone(usertimezone) activity_startdate = activity_enddate-datetime.timedelta(days=15) activity_startdate = activity_startdate.replace( hour=0, minute=0, second=0) else: # pragma: no cover activity_enddate = enddate except (ValueError, AttributeError): # pragma: no cover activity_enddate = enddate g_startdate = activity_startdate g_enddate = activity_enddate if teamid: try: theteam = Team.objects.get(id=teamid) except Team.DoesNotExist: # pragma: no cover raise Http404("Team doesn't exist") if theteam.viewing == 'allmembers' or theteam.manager == request.user: workouts = Workout.objects.filter( team=theteam, startdatetime__gte=startdate, startdatetime__lte=enddate, privacy='visible').order_by("-date", "-starttime").exclude(workoutsource='strava') g_workouts = Workout.objects.filter( team=theteam, startdatetime__gte=activity_startdate, startdatetime__lte=activity_enddate, duplicate=False, privacy='visible').order_by("-date", "-starttime").exclude(workoutsource='strava') elif theteam.viewing == 'coachonly': # pragma: no cover workouts = Workout.objects.filter( team=theteam, user=r, startdatetime__gte=startdate, startdatetime__lte=enddate, privacy='visible').order_by("-startdatetime").exclude(workoutsource='strava') g_workouts = Workout.objects.filter( team=theteam, user=r, startdatetime__gte=activity_startdate, startdatetime__lte=activity_enddate, duplicate=False, privacy='visible').order_by("-startdatetime").exclude(workoutsource='strava') elif request.user != r.user: theteam = None workouts = Workout.objects.filter( user=r, startdatetime__gte=startdate, startdatetime__lte=enddate, privacy='visible').order_by("-date", "-starttime").exclude(workoutsource='strava') g_workouts = Workout.objects.filter( user=r, startdatetime__gte=activity_startdate, startdatetime__lte=activity_enddate, duplicate=False, privacy='visible').order_by("-startdatetime").exclude(workoutsource='strava') else: theteam = None workouts = Workout.objects.filter( user=r, startdatetime__gte=startdate, startdatetime__lte=enddate).order_by("-date", "-starttime") g_workouts = Workout.objects.filter( user=r, duplicate=False, startdatetime__gte=activity_startdate, startdatetime__lte=activity_enddate).order_by("-startdatetime") if g_workouts.count() == 0: g_workouts = Workout.objects.filter( user=r, startdatetime__gte=timezone.now()-timedelta(days=15)).order_by("-startdatetime").exclude(workoutsource='strava') g_enddate = timezone.now() g_startdate = (timezone.now()-timedelta(days=15)) nr_commutes = 0 show_commutes = show_commutes or r.show_commutes if not show_commutes: nr_commutes = workouts.filter(is_commute=True).count() workouts = workouts.exclude(is_commute=True) nr_wups_cds = 0 show_wups_cds = show_wups_cds or r.show_wups_cds if not show_wups_cds: nr_wups_cds = workouts.filter(sub_type__in=['Warming Up', 'Cooling Down']).count() workouts = workouts.exclude(sub_type='Warming Up').exclude(sub_type='Cooling Down') workoutsnohr = workouts.exclude(averagehr__isnull=False) for w in workoutsnohr: # pragma: no cover _ = dataprep.workout_trimp(w) if query: # pragma: no cover query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)), ).exclude(workoutsource='strava') searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() paginator = Paginator(workouts, 12) # show 25 workouts per page page = request.GET.get('page', 1) try: workouts = paginator.page(page) except EmptyPage: # pragma: no cover workouts = paginator.page(paginator.num_pages) today = timezone.now() announcements = SiteAnnouncement.objects.filter( expires__gte=today ).order_by( "-created", "-id" ) if theteam: stack = 'rower' else: stack = 'type' yaxis = request.GET.get('yaxis', 'duration') if yaxis not in ['duration', 'trimp', 'rscore']: # pragma: no cover yaxis = 'duration' script, div = interactive_activitychart2(g_workouts, g_startdate, g_enddate, stack=stack, yaxis=yaxis) totalmeters, totalhours, totalminutes, total_seconds = get_totals( g_workouts) totalminutes = '{totalminutes:02d}'.format(totalminutes=totalminutes) messages.info(request, successmessage) messages.error(request, message) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, ] timeperiod = startdate.strftime( '%Y-%m-%d')+'/'+enddate.strftime('%Y-%m-%d') norpecount = len([w for w in workouts if w.rpe==0 and not w.duplicate]) if norpecount and r.get_rpe_warnings: # pragma: no cover messages.info(request,'You have workouts with no RPE value set. \ Click here to update them. \ You can switch off this warning in settings.') actionform = WorkoutBulkActions() return render(request, 'list_workouts.html', {'workouts': workouts, 'active': 'nav-workouts', 'rower': r, 'actionform': actionform, 'searchform': searchform, 'breadcrumbs': breadcrumbs, 'dateform': dateform, 'startdate': startdate, 'enddate': enddate, 'announcements': announcements[0:4], 'team': theteam, 'teams': get_my_teams(request.user), 'interactiveplot': script, 'the_div': div, 'timeperiod': timeperiod, 'totalmeters': totalmeters, 'totalminutes': totalminutes, 'totalhours': totalhours, 'nr_commutes': nr_commutes, 'show_commutes': show_commutes, 'show_wups_cds': show_wups_cds, 'nr_wups_cds': nr_wups_cds, }) # List of workouts to compare a selected workout to @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_fusion_list(request, id=0, startdate=timezone.now()-datetime.timedelta(days=365), enddate=timezone.now()): r = getrequestrower(request) u = r.user if request.method == 'POST': # pragma: no cover dateform = DateRangeForm(request.POST) if dateform.is_valid(): startdate = dateform.cleaned_data['startdate'] enddate = dateform.cleaned_data['enddate'] else: dateform = DateRangeForm(initial={ 'startdate': startdate, 'enddate': enddate, }) startdate = datetime.datetime.combine(startdate, datetime.time()) enddate = datetime.datetime.combine(enddate, datetime.time(23, 59, 59)) startdate = arrow.get(startdate).datetime enddate = arrow.get(enddate).datetime if enddate < startdate: # pragma: no cover s = enddate enddate = startdate startdate = s if id: theid = encoder.decode_hex(id) w = get_workoutuser(id, request) r = w.user workouts = Workout.objects.filter(user=r, startdatetime__gte=startdate, startdatetime__lte=enddate).order_by("-date", "-starttime").exclude(id=theid) query = request.GET.get('q') if query: # pragma: no cover query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)) ) searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() paginator = Paginator(workouts, 15) # show 25 workouts per page page = request.GET.get('page', 1) try: workouts = paginator.page(page) except EmptyPage: # pragma: no cover workouts = paginator.page(paginator.num_pages) row = get_workoutuser(id, request) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(row.id)), 'name': row.name }, { 'url': reverse('workout_fusion_list', kwargs={'id': id}), 'name': 'Sensor Fusion' } ] return render(request, 'fusion_list.html', {'id': id, 'workout': row, 'rower': r, 'searchform': searchform, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'workouts': workouts, 'last_name': u.last_name, 'first_name': u.first_name, 'dateform': dateform, 'startdate': startdate, 'enddate': enddate, 'teams': get_my_teams(request.user), }) # Basic view of workout @permission_required('workout.view_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_view(request, id=0, raceresult=0, sessionresult=0, nocourseraceresult=0): request.session['referer'] = absolute(request)['PATH'] if not request.user.is_anonymous: rower = getrower(request.user) else: rower = None # get row row = get_workout_by_opaqueid(request, id) f1 = row.csvfilename rowdata = rdata(csvfile=f1) summary = row.summary comments = WorkoutComment.objects.filter(workout=row) aantalcomments = len(comments) g = GraphImage.objects.filter(workout=row).order_by("-creationdatetime") for i in g: # pragma: no cover try: width, height = Image.open(i.filename).size i.width = width i.height = height i.save() except: pass # get raceresult or session result intervaldata = {} if sessionresult != 0: # pragma: no cover try: result = CourseTestResult.objects.get(id=sessionresult) startsecond = result.startsecond endsecond = result.endsecond itime = [startsecond, endsecond-startsecond] itype = [3, 4] intervaldata['itime'] = itime intervaldata['itype'] = itype vals, units, typ = rowdata.updateinterval_metric(' AverageBoatSpeed (m/s)', 0.1, mode='larger', debug=False, smoothwindow=15., activewindow=[startsecond, endsecond]) summary = rowdata.allstats() except CourseTestResult.DoesNotExist: pass if nocourseraceresult != 0: # pragma: no cover try: result = IndoorVirtualRaceResult.objects.get(id=nocourseraceresult) startsecond = result.startsecond endsecond = result.endsecond itime = [startsecond, endsecond-startsecond] itype = [3, 4] intervaldata['itime'] = itime intervaldata['itype'] = itype vals, units, typ = rowdata.updateinterval_metric(' AverageBoatSpeed (m/s)', 0.1, mode='larger', debug=False, smoothwindow=15., activewindow=[startsecond, endsecond]) summary = rowdata.allstats() except CourseTestResult.DoesNotExist: pass if raceresult != 0: # pragma: no cover try: result = VirtualRaceResult.objects.get(id=raceresult) startsecond = result.startsecond endsecond = result.endsecond itime = [startsecond, endsecond-startsecond] itype = [3, 4] intervaldata['itime'] = itime intervaldata['itype'] = itype vals, units, typ = rowdata.updateinterval_metric(' AverageBoatSpeed (m/s)', 0.1, mode='larger', debug=False, smoothwindow=15., activewindow=[startsecond, endsecond]) summary = rowdata.allstats() except VirtualRaceResult.DoesNotExist: pass # create interactive plot res = interactive_chart(encoder.decode_hex(id), intervaldata=intervaldata) script = res[0] div = res[1] # create map hascoordinates = 1 if rowdata != 0: try: latitude = rowdata.df[' latitude'] if not latitude.std(): # pragma: no cover hascoordinates = 0 except (KeyError, AttributeError): hascoordinates = 0 else: # pragma: no cover hascoordinates = 0 courses = [] if hascoordinates: if intervaldata: # pragma: no cover rowdata.df['reltime'] = rowdata.df['TimeStamp (sec)'] - \ rowdata.df.loc[0, 'TimeStamp (sec)'] mask = (rowdata.df['reltime'] > startsecond) & ( rowdata.df['reltime'] < endsecond) latitudes = rowdata.df.loc[mask, ' latitude'] longitudes = rowdata.df.loc[mask, ' longitude'] else: latitudes = rowdata.df[' latitude'] longitudes = rowdata.df[' longitude'] mapscript, mapdiv = leaflet_chart( latitudes, longitudes, row.name, raceresult=raceresult) records = VirtualRaceResult.objects.filter( workoutid=row.id, userid=row.user.id, coursecompleted=True) if records.count() > 0: # pragma: no cover courses = list(set([record.course for record in records])) else: mapscript = "" mapdiv = "" breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': reverse('workout_view', kwargs={'id': id}), 'name': row.name, } ] u = row.user.user recordsindoor = IndoorVirtualRaceResult.objects.filter(workoutid=row.id) records = VirtualRaceResult.objects.filter(workoutid=row.id) return render(request, 'workout_view.html', {'workout': row, 'rower': rower, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'graphs': g, 'last_name': u.last_name, 'records': records, 'summary': summary, 'recordsindoor': recordsindoor, 'first_name': u.first_name, 'interactiveplot': script, 'aantalcomments': aantalcomments, 'mapscript': mapscript, 'mapdiv': mapdiv, 'teams': get_my_teams(request.user), 'courses': courses, 'the_div': div}) # Resets stroke data to raw data (pace) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workout_undo_smoothenpace_view( request, id=0, message="", successmessage="" ): row = get_workoutuser(id, request) r = getrower(request.user) filename = row.csvfilename row = rdata(csvfile=filename) if row == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") if 'originalvelo' in row.df: velo = row.df['originalvelo'].values row.df[' Stroke500mPace (sec/500m)'] = 500./velo row.write_csv(filename, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), row.df) url = reverse(r.defaultlandingpage, kwargs={ 'id': id, } ) return HttpResponseRedirect(url) # Data smoothing of pace data @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality.", redirect_field_name=None) def workout_smoothenpace_view(request, id=0, message="", successmessage=""): row = get_workoutuser(id, request) previousurl = request.META.get('HTTP_REFERER') r = getrower(request.user) filename = row.csvfilename row = rdata(csvfile=filename) if row == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") try: pace = row.df[' Stroke500mPace (sec/500m)'].values velo = 500./pace except KeyError: messages.error(request, 'Unable to find the data') if previousurl: # pragma: no cover url = previousurl else: url = reverse(r.defaultlandingpage, kwargs={ 'id': id, } ) return HttpResponseRedirect(url) if 'originalvelo' not in row.df: row.df['originalvelo'] = velo velo2 = utils.ewmovingaverage(velo, 5) pace2 = 500./abs(velo2) row.df[' Stroke500mPace (sec/500m)'] = pace2 row.df = row.df.fillna(0) row.write_csv(filename, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), row.df) messages.info( request, 'A smoothening filter was applied to your pace data') if previousurl: # pragma: no cover url = previousurl else: url = reverse(r.defaultlandingpage, kwargs={ 'id': id, } ) return HttpResponseRedirect(url) # Get weather for given location and date/time @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workout_downloadwind_view(request, id=0, airportcode=None, message="", successmessage=""): row = get_workoutuser(id, request) f1 = row.csvfilename # create bearing rowdata = rdata(csvfile=f1) if rowdata == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") try: _ = rowdata.df.loc[:, 'bearing'].values except KeyError: rowdata.add_bearing() rowdata.write_csv(f1, gzip=True) # get wind try: avglat = rowdata.df[' latitude'].mean() avglon = rowdata.df[' longitude'].mean() avgtime = int(rowdata.df['TimeStamp (sec)'].mean()-rowdata.df.loc[:, 'TimeStamp (sec)'].iloc[0]) starttimeunix = int(arrow.get(row.startdatetime).timestamp()) avgtime = starttimeunix+avgtime winddata = get_wind_data(avglat, avglon, avgtime) windspeed = winddata[0] windbearing = winddata[1] message = winddata[2] if message is not None: try: row.notes += "\n"+message except TypeError: # pragma: no cover if message is not None and row.notes is not None: row.notes += message row.save() rowdata.add_wind(windspeed, windbearing) rowdata.write_csv(f1, gzip=True) messages.info(request, message) kwargs = { 'id': id} url = reverse('workout_wind_view', kwargs=kwargs) response = HttpResponseRedirect(url) except KeyError: message = "No latitude/longitude data" messages.error(request, message) kwargs = { 'id': id } url = reverse('workout_wind_view', kwargs=kwargs) response = HttpResponseRedirect(url) return response # Get weather for given location and date/time @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workout_downloadmetar_view(request, id=0, airportcode=None, message="", successmessage=""): row = get_workoutuser(id, request) f1 = row.csvfilename # create bearing rowdata = rdata(csvfile=f1) if rowdata == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") try: _ = rowdata.df.loc[:, 'bearing'].values except KeyError: rowdata.add_bearing() rowdata.write_csv(f1, gzip=True) # get wind try: avglat = rowdata.df[' latitude'].mean() avglon = rowdata.df[' longitude'].mean() airportcode = get_airport_code(avglat, avglon)[0] avgtime = int(rowdata.df['TimeStamp (sec)'].mean( )-rowdata.df.loc[:, 'TimeStamp (sec)'].iloc[0]) starttimeunix = arrow.get(row.startdatetime).timestamp() avgtime = starttimeunix + avgtime winddata = get_metar_data(airportcode, avgtime) windspeed = winddata[0] windbearing = winddata[1] message = winddata[2] try: row.notes += "\n"+message except TypeError: # pragma: no cover if message is not None: try: row.notes += message except TypeError: pass row.save() rowdata.add_wind(windspeed, windbearing) rowdata.write_csv(f1, gzip=True) messages.info(request, message) kwargs = { 'id': id} url = reverse('workout_wind_view', kwargs=kwargs) response = HttpResponseRedirect(url) except KeyError: message = "No latitude/longitude data" messages.error(request, message) kwargs = { 'id': id } url = reverse('workout_wind_view', kwargs=kwargs) response = HttpResponseRedirect(url) return response @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def instroke_view(request, id=0): w = get_workoutuser(id, request) r = getrower(request.user) mayedit = 1 breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('instroke_view', kwargs={'id': id}), 'name': 'In-Stroke Metrics' } ] rowdata = rrdata(csvfile=w.csvfilename) try: instrokemetrics = rowdata.get_instroke_columns() instrokemetrics = [m for m in instrokemetrics if m not in nometrics] except AttributeError: # pragma: no cover instrokemetrics = [] return render(request, 'instroke.html', {'workout': w, 'rower': r, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'mayedit': mayedit, 'teams': get_my_teams(request.user), 'instrokemetrics': instrokemetrics, }) # generate instroke chart @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def instroke_chart(request, id=0, metric=''): # pragma: no cover w = get_workoutuser(id, request) rowdata = rrdata(csvfile=w.csvfilename) instrokemetrics = rowdata.get_instroke_columns() if metric in instrokemetrics: job = myqueue(queuelow, instroke_static,w, metric) r = getrower(request.user) url = reverse(r.defaultlandingpage, kwargs={ 'id': id, }) return HttpResponseRedirect(url) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def instroke_data(request, metric='', spm_min=15, spm_max=45, activeminutesmin=0, activeminutesmax=0, id=0, ): # pragma: no cover r = getrequestrower(request, userid=0) w = get_workoutuser(id, request) rowdata = rrdata(csvfile=w.csvfilename) instrokemetrics = rowdata.get_instroke_columns() if not metric: metric = instrokemetrics[0] if activeminutesmax == 0: activeminutesmax = rowdata.duration/60. try: spm_min = float(spm_min) except ValueError: pass try: spm_max = float(spm_max) except ValueError: pass try: activeminutesmin = float(activeminutesmin) except ValueError: pass try: activeminutesmax = float(activeminutesmax) except ValueError: pass factor = 1 if metric == 'boat accelerator curve': factor = g_acc data = rowdata.get_instroke_data(metric, spm_min=spm_min, spm_max=spm_max, activeminutesmin=activeminutesmin, activeminutesmax=activeminutesmax, factor=factor, ) filename = str(uuid4())+'.csv' data.to_csv(filename) with open(filename, 'r') as f: response = HttpResponse(f) response['Content-Disposition'] = 'attachment; filename="%s"' % filename response['Content-Type'] = 'application/octet-stream' os.remove(filename) return response @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def instroke_chart_interactive(request, id=0, analysis=0, userid=0): is_ajax = request_is_ajax(request) r = getrequestrower(request, userid=userid) w = get_workoutuser(id, request) rowdata = rrdata(csvfile=w.csvfilename) instrokemetrics = rowdata.get_instroke_columns() form = InstrokeForm(choices=instrokemetrics) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('instroke_chart_interactive', kwargs={'id': id}), 'name': 'In-Stroke Metrics' } ] if not instrokemetrics: script = '' div = 'No Instroke Metrics available for this workout' spm_min = 15 spm_max = 45 ds = '' dd = '' activeminutesmin = 0 activeminutesmax = int(rowdata.duration/60.) maxminutes = activeminutesmax return render(request, 'instroke_interactive.html', { 'workout': w, 'rower': w.user, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'teams': get_my_teams(request.user), 'instrokemetrics': instrokemetrics, 'form':form, 'the_script': script, 'the_div': div, 'spm_min': spm_min, 'spm_max': spm_max, 'ds': ds, 'dd': dd, 'activeminutesmin': activeminutesmin, 'activeminutesmax': activeminutesmax, 'maxminutes': maxminutes, }) if analysis: # pragma: no cover try: instroke_analysis = InStrokeAnalysis.objects.get(id=analysis) if instroke_analysis.rower != r: analysis = 0 messages.error(request,'Access to this saved analysis denied') raise ValueError if instroke_analysis.workout != w: messages.error(request,'This saved analysis belongs to a different workout') form = InstrokeForm( choices=instrokemetrics, initial={ 'metric':instroke_analysis.metric, 'name': instroke_analysis.name, 'notes': instroke_analysis.notes, 'activeminutesmin':int(instroke_analysis.start_second/60.), 'activeminutesmax':int(instroke_analysis.end_second/60.), 'spm_min': instroke_analysis.spm_min, 'spm_max': instroke_analysis.spm_max, } ) metric = instroke_analysis.metric name = instroke_analysis.name notes = instroke_analysis.notes activeminutesmin = int(instroke_analysis.start_second/60.) activeminutesmax = int(instroke_analysis.end_second/60.) spm_min = instroke_analysis.spm_min spm_max = instroke_analysis.spm_max except ValueError: metric = instrokemetrics[0] spm_min = 15 spm_max = 45 name = '' notes = '' activeminutesmax = int(rowdata.duration/60.) activeminutesmin = 0 except InStrokeAnalysis.DoesNotExist: instroke_analysis = InStrokeAnalysis( workout = w, metric = instrokemetrics[0], spm_min = 15, spm_max = 45, name = '', notes = '', activeminutesmax = int(rowdata.duration/60.), activeminutesmin = 0 ) instroke_analysis.save() analysis = instroke_analysis.id else: metric = instrokemetrics[0] spm_min = 15 spm_max = 45 name = '' notes = '' activeminutesmax = int(rowdata.duration/60.) activeminutesmin = 0 maxminutes = int(rowdata.duration/60.) individual_curves = False script = '' div = get_call() dosave = False if request.method == 'POST': form = InstrokeForm(request.POST,choices=instrokemetrics) if form.is_valid(): metric = form.cleaned_data['metric'] spm_min = form.cleaned_data['spm_min'] spm_max = form.cleaned_data['spm_max'] activeminutesmin = form.cleaned_data['activeminutesmin'] activeminutesmax = form.cleaned_data['activeminutesmax'] individual_curves = form.cleaned_data['individual_curves'] notes = form.cleaned_data['notes'] name = form.cleaned_data['name'] if "_save" in request.POST and "new" not in request.POST: if not analysis: instroke_analysis = InStrokeAnalysis( workout = w, metric = metric, name = name, date = timezone.now().date(), notes = notes, start_second = 60*activeminutesmin, end_second = 60*activeminutesmax, spm_min = spm_min, spm_max = spm_max, rower=w.user, ) else: instroke_analysis = InStrokeAnalysis.objects.get(id=analysis) instroke_analysis.workout = w instroke_analysis.metric = metric instroke_analysis.name = name instroke_analysis.date = timezone.now().date() instroke_analysis.notes = notes instroke_analysis.start_second = 60*activeminutesmin instroke_analysis.end_second = 60*activeminutesmax instroke_analysis.spm_min = spm_min instroke_analysis.spm_max = spm_max instroke_analysis.rower=w.user instroke_analysis.save() dosave = True messages.info(request,'In-Stroke Analysis saved') if "_save_as_new" in request.POST: instroke_analysis = InStrokeAnalysis( workout = w, metric = metric, name = name, date = timezone.now().date(), notes = notes, start_second = 60*activeminutesmin, end_second = 60*activeminutesmax, spm_min = spm_min, spm_max = spm_max, rower=w.user, ) instroke_analysis.save() dosave = True messages.info(request,'In-Stroke Analysis saved') activesecondsmin = 60.*activeminutesmin activesecondsmax = 60.*activeminutesmax factor = 1 if metric == 'boat accelerator curve': factor = g_acc data = rowdata.get_instroke_data( metric, spm_min=spm_min, spm_max=spm_max, activeminutesmin=activeminutesmin, activeminutesmax=activeminutesmax, factor=factor, ) data = data.fillna(method='ffill').fillna(method='bfill') if metric == 'boat accelerator curve' and data.median().max() < 0.5: data = 9.81*data if metric == 'instroke boat speed' and data.median().max() < 0.15: rowdata.add_instroke_speed() rowdata.write_csv(w.csvfilename, gzip=True) data = rowdata.get_instroke_data( metric, spm_min=spm_min, spm_max=spm_max, activeminutesmin=activeminutesmin, activeminutesmax=activeminutesmax, ) script, div = instroke_interactive_chart( data, metric, w, spm_min, spm_max, activeminutesmin, activeminutesmax, individual_curves, name=name,notes=notes, ) # change to range spm_min to spm_max vals, units, typ = rowdata.updateinterval_range( ' Cadence (stokes/min)', spm_min, spm_max, debug=False, smoothwindow=2., activewindow=[activesecondsmin, activesecondsmax]) intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() totaldist = 0 totaltime = 0 avg_speed = 0 for i in range(len(idist)): if itype[i] == 4: totaldist += idist[i] totaltime += itime[i] if totaltime > 0: avg_speed = totaldist/totaltime if dosave: instroke_analysis.average_boatspeed = avg_speed instroke_analysis.save() intervaldata = { 'itime': itime, 'idist': idist, 'itype': itype, 'selector': '', 'normp': 0, 'normv': 0, } ds, dd = interactive_chart(encoder.decode_hex( id), promember=1, intervaldata=intervaldata) if is_ajax: response = json.dumps({ 'script': script, 'div': div, 'ds': ds, 'dd': dd, }) return HttpResponse(response, content_type='application/json') return render(request, 'instroke_interactive.html', { 'workout': w, 'rower': w.user, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'teams': get_my_teams(request.user), 'instrokemetrics': instrokemetrics, 'form':form, 'metric': metric, 'the_script': script, 'the_div': div, 'spm_min': spm_min, 'spm_max': spm_max, 'ds': ds, 'dd': dd, 'activeminutesmin': activeminutesmin, 'activeminutesmax': activeminutesmax, 'maxminutes': maxminutes, }) # erase column @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_erase_column_view(request, id=0, column=''): r = getrower(request.user) w = get_workoutuser(id, request) protected = ['time', 'pace', 'velo', 'cumdist', 'ftime', 'fpace', ] if column in protected: # pragma: no cover messages.error(request, 'You cannot erase this protected column') url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) try: data = dataprep.read_data([column], ids=[w.id]) except: # pragma: no cover messages.error(request, 'Invalid column') url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) try: _ = data[column] except KeyError: # pragma: no cover url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) if not column: # pragma: no cover url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) if request.method == 'POST': mms = {} for m in rowingmetrics: mms[m[0]] = m[1] try: defaultvalue = mms[column]['default'] except KeyError: if not mms[column]['null']: # pragma: no cover messages.error( request, 'You cannot erase this protected column') url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) defaultvalue = 0 try: columnl = dataprep.columndict[column] except KeyError: # pragma: no cover messages.error(request, 'You cannot erase this column') url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) row, workout = dataprep.getrowdata(id=w.id) row.df[columnl] = defaultvalue try: os.remove(w.csvfilename+'.gz') except FileNotFoundError: try: os.remove(w.csvfilename) except FileNotFoundError: pass row.write_csv(w.csvfilename, gzip=True) row, workout = dataprep.getrowdata(id=w.id) _ = dataprep.dataplep(row.df, id=w.id) if column == 'hr': w.hrtss = 0 w.trimp = 0 w.save() if column == 'power': # pragma: no cover w.rscore = 0 w.normp = 0 w.goldmedalstandard = -1 w.goldmedalseconds = 0 w.save() trimp, hrtss = dataprep.workout_trimp(w, reset=True) rscore, normp = dataprep.workout_rscore(w, reset=True) goldstandard, goldstandardduration = dataprep.workout_goldmedalstandard( w, reset=True) messages.info(request, 'Data for column '+column+' have been erased') url = reverse('workout_data_view', kwargs={ 'id': encoder.encode_hex(w.id) }) return HttpResponseRedirect(url) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_data_view', kwargs={'id': id}), 'name': 'Data Explorer' } ] return render(request, 'workout_erase_column.html', { 'column': column, 'teams': get_my_teams(request.user), 'workout': w, 'rower': r, 'breadcrumbs': breadcrumbs, } ) # resample to 1s intervals @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_resample_view(request, id=0): r = getrower(request.user) w = get_workoutuser(id, request) form = ResampleForm() if request.method == 'POST': form = ResampleForm(request.POST) if form.is_valid(): overwrite = form.cleaned_data['resamplechoice'] datadf, id, msgs = dataprep.resample( encoder.decode_hex(id), r, w, overwrite=overwrite) for message in msgs: messages.info(request, message) url = get_workout_default_page(request, encoder.encode_hex(id)) messages.info( request, 'The workout has been resampled: here'.format(url=url)) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_resample_view', kwargs={'id': id}), 'name': 'Resample Data' } ] return render(request, 'workout_resample.html', { 'form': form, 'teams': get_my_teams(request.user), 'workout': w, 'breadcrumbs': breadcrumbs, } ) # data explorer @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_data_view(request, id=0): r = getrower(request.user) w = get_workoutuser(id, request) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_data_view', kwargs={'id': id}), 'name': 'Data Explorer' } ] datadf, row = dataprep.getrowdata_db(id=encoder.decode_hex(id)) try: datadf.sort_values(['ftime'], inplace=True) except KeyError: pass columns = datadf.columns.values to_be_dropped_all = [ 'id', 'time', 'hr_an', 'hr_at', 'hr_bottom', 'hr_max', 'hr_tr', 'hr_ut1', 'hr_ut2', 'x_right', ] to_be_dropped = [] for c in to_be_dropped_all: if c in columns: to_be_dropped.append(c) datadf.drop(labels=to_be_dropped, inplace=True, axis=1) cols = ['ftime', 'cumdist', 'fpace', 'spm', 'hr', 'power', 'driveenergy', 'drivelength', 'averageforce', 'peakforce', 'distance', 'drivespeed', 'workoutstate', 'catch', 'finish', 'peakforceangle', 'wash', 'slip', 'rhythm', 'effectiveangle', 'totalangle', 'distanceperstroke', 'velo'] cols = [c for c in cols if c in datadf.columns] tcols = ['ftime', 'cumdist', 'fpace', 'spm', 'hr', 'power'] datadf = datadf[cols] try: datadf.loc[:, 'hr'] = datadf['hr'].astype('int') datadf.loc[:, 'power'] = datadf['power'].astype('int') datadf.loc[:, 'distance'] = datadf['distance'].astype('int') datadf.loc[:, 'spm'] = 10*datadf['spm'].astype('int')/10. except (KeyError, IntCastingNaNError): pass if request.method == 'POST': form = DataFrameColumnsForm(request.POST) if form.is_valid(): tcols = form.cleaned_data['cols'] else: form = DataFrameColumnsForm(initial={'cols': tcols}) try: datadf = datadf[tcols] except KeyError: # pragma: no cover # tcols = list(set(datadf.columns(tolist)+tcols)) try: datadf = datadf[tcols] datadf = datadf.fillna(value=0) except KeyError: pass for col in cols: try: if datadf[col].mean() == 0 and datadf[col].std() == 0: datadf.drop(labels=[col], axis=1, inplace=True) except (TypeError, KeyError): pass htmltable = datadf.to_html( bold_rows=True, show_dimensions=True, border=1, classes=['pandastable'], justify='justify' ) return render(request, 'workout_data.html', { 'htmltable': htmltable, 'data': datadf, 'cols': datadf.columns, 'form': form, 'rower': r, 'teams': get_my_teams(request.user), 'workout': w, 'breadcrumbs': breadcrumbs, } ) # Stats page @permission_required('workout.view_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_stats_view(request, id=0, message="", successmessage=""): r = getrower(request.user) w = get_workout(id) mayedit = 0 if request.user == w.user.user: mayedit = 1 if is_workout_user(request.user, w): mayedit = 1 breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_stats_view', kwargs={'id': id}), 'name': 'Stats' } ] workstrokesonly = True if request.method == 'POST' and 'workstrokesonly' in request.POST: # pragma: no cover workstrokesonly = str2bool(request.POST['workstrokesonly']) # prepare data frame datadf, row = dataprep.getrowdata_db(id=encoder.decode_hex(id)) datadf = dataprep.clean_df_stats(datadf, workstrokesonly=workstrokesonly, ignoreadvanced=False) if datadf.empty: datadf, row = dataprep.getrowdata_db(id=encoder.decode_hex(id)) datadf = dataprep.clean_df_stats(datadf, workstrokesonly=False, ignoreadvanced=True) workstrokesonly = False if datadf.empty: return HttpResponse("CSV data file not found") # Create stats stats = {} fieldlist, fielddict = dataprep.getstatsfields() try: fielddict.pop('pace') except KeyError: # pragma: no cover pass for field, verbosename in fielddict.items(): try: thedict = { 'mean': datadf[field].mean(), 'wmean': wavg(datadf, field, 'deltat'), 'min': datadf[field].min(), 'std': datadf[field].std(), 'max': datadf[field].max(), 'median': datadf[field].median(), 'firstq': datadf[field].quantile(q=0.25), 'thirdq': datadf[field].quantile(q=0.75), 'verbosename': verbosename, } stats[field] = thedict except KeyError: # pragma: no cover pass datadf = datadf.select_dtypes([np.number]) # Create a dict with correlation values cor = datadf.corr(method='spearman') cor.fillna(value=0, inplace=True) cordict = {} for field1, verbosename1 in fielddict.items(): thedict = {} for field2, verbosename2 in fielddict.items(): try: thedict[verbosename2] = cor.loc[field1, field2] except KeyError: # pragma: no cover thedict[verbosename2] = 0 cordict[verbosename1] = thedict # additional non-automated stats otherstats = {} # Normalized power & TSS tss, normp = dataprep.workout_rscore(w) goldmedalstandard, goldmedalseconds = dataprep.workout_goldmedalstandard(w) if not np.isnan(goldmedalstandard) and goldmedalstandard > 0: otherstats['goldmedalstandard'] = { 'verbose_name': 'Gold Medal Standard', 'value': int(goldmedalstandard), 'unit': '%', } if not np.isnan(goldmedalseconds) and goldmedalseconds > 0: otherstats['goldmedalseconds'] = { 'verbose_name': 'Gold Medal Standard Duration', 'value': utils.totaltime_sec_to_string(goldmedalseconds, shorten=True), 'unit': '', } if not np.isnan(tss) and tss != 0: otherstats['tss'] = { 'verbose_name': 'rScore', 'value': int(tss), 'unit': '' } if not np.isnan(normp): otherstats['np'] = { 'verbose_name': 'rPower', 'value': int(10*normp)/10., 'unit': 'Watt' } # HR Drift tmax = datadf['time'].max() tmin = datadf['time'].min() thalf = tmin+0.5*(tmax-tmin) mask1 = datadf['time'] < thalf mask2 = datadf['time'] > thalf hr1 = datadf.loc[mask1, 'hr'].mean() hr2 = datadf.loc[mask2, 'hr'].mean() pwr1 = datadf.loc[mask1, 'power'].mean() pwr2 = datadf.loc[mask2, 'power'].mean() try: hrdrift = ((pwr1/hr1)-(pwr2/hr2))/(pwr1/hr1) hrdrift *= 100. if not np.isnan(hrdrift): try: hrdrift = int(100*hrdrift)/100. except: # pragma: no cover hrdrift = 0 otherstats['hrdrift'] = { 'verbose_name': 'Heart Rate Drift', 'value': hrdrift, 'unit': '%', } except (ZeroDivisionError, ValueError): # pragma: no cover pass # TRIMP trimp, hrtss = dataprep.workout_trimp(w) # SPMTSS spmtss = dataprep.workout_spmtss(w) otherstats['trimp'] = { 'verbose_name': 'TRIMP', 'value': int(trimp), 'unit': '' } otherstats['hrScore'] = { 'verbose_name': 'rScore (HR)', 'value': int(hrtss), 'unit': '' } otherstats['spmScore'] = { 'verbose_name': 'rScore (SPM)', 'value': int(spmtss), 'unit': '' } return render(request, 'workoutstats.html', { 'stats': stats, 'teams': get_my_teams(request.user), 'workout': w, 'rower': r, 'mayedit': mayedit, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'workstrokesonly': workstrokesonly, 'cordict': cordict, 'otherstats': otherstats, }) # Change default landing page @login_required() def workflow_default_view(request): r = getrower(request.user) if r.defaultlandingpage == 'workout_edit_view': r.defaultlandingpage = 'workout_workflow_view' else: # pragma: no cover r.defaultlandingpage = 'workout_edit_view' r.save() url = reverse('workout_workflow_config2_view') return HttpResponseRedirect(url) # Workflow configuration @login_required() def workout_workflow_config2_view(request, userid=0): request.session['referer'] = absolute(request)['PATH'] try: workoutid = request.session['lastworkout'] except KeyError: workoutid = 0 r = getrequestrower(request, userid=userid, notpermanent=True) MiddlePanelFormSet = formset_factory(WorkFlowMiddlePanelElement, extra=1) if request.method == 'POST': middlepanel_formset = MiddlePanelFormSet(request.POST, prefix='middlepanel') newmiddlepanel = [] if middlepanel_formset.is_valid(): for form in middlepanel_formset: value = form.cleaned_data.get('panel') if value != 'None': newmiddlepanel.append(value) newmiddlepanel = [i for i in newmiddlepanel if i is not None] r.workflowmiddlepanel = newmiddlepanel try: r.save() except IntegrityError: # pragma: no cover messages.error(request, 'Something went wrong') middlepanelform_data = [{'panel': panel} for panel in r.workflowmiddlepanel] middlepanel_formset = MiddlePanelFormSet(initial=middlepanelform_data, prefix='middlepanel') tmplt = 'workflowconfig2.html' return render(request, tmplt, { 'rower': r, 'middlepanel_formset': middlepanel_formset, 'workoutid': workoutid, }) # Workflow View @login_required() @permission_required('workout.view_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_workflow_view(request, id): request.session['referer'] = absolute(request)['PATH'] request.session['lastworkout'] = id row = get_workout_by_opaqueid(request, id) r = getrower(request.user) comments = WorkoutComment.objects.filter(workout=row) aantalcomments = len(comments) favorites, maxfav = getfavorites(r, row) charts = get_call() if 'panel_map.html' in r.workflowmiddlepanel and rowhascoordinates(row): rowdata = rdata(csvfile=row.csvfilename) mapscript, mapdiv = leaflet_chart(rowdata.df[' latitude'], rowdata.df[' longitude'], row.name) else: mapscript = '' mapdiv = '' statcharts = GraphImage.objects.filter(workout=row) middleTemplates = [] for t in r.workflowmiddlepanel: try: template.loader.get_template(t) middleTemplates.append(t) except template.TemplateDoesNotExist: # pragma: no cover pass leftTemplates = [] for t in r.workflowleftpanel: try: template.loader.get_template(t) leftTemplates.append(t) except template.TemplateDoesNotExist: # pragma: no cover pass breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': row.name }, { 'url': reverse('workout_workflow_view', kwargs={'id': id}), 'name': 'View' } ] return render(request, 'workflow.html', { 'middleTemplates': middleTemplates, 'leftTemplates': leftTemplates, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'charts': charts, 'workout': row, 'mapscript': mapscript, 'mapdiv': mapdiv, 'statcharts': statcharts, 'rower': r, 'aantalcomments': aantalcomments, }) # The famous flex chart @login_required() @permission_required('workout.view_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_flexchart3_view(request, *args, **kwargs): try: id = kwargs['id'] except KeyError: # pragma: no cover raise Http404("Invalid workout number") if 'promember' in kwargs: # pragma: no cover promember = kwargs['promember'] else: promember = 0 try: favoritenr = int(request.GET['favoritechart']) except: favoritenr = -1 row = get_workout(id) r = getrequestrower(request) promember = 0 mayedit = 0 if not request.user.is_anonymous: result = ispromember(request.user) if result: promember = 1 if request.user == row.user.user: mayedit = 1 if is_workout_user(request.user, row): mayedit = 1 workouttype = 'ote' if row.workouttype in mytypes.otwtypes: workouttype = 'otw' favorites, maxfav = getfavorites(r, row) # check if favoritenr is not out of range if favorites: try: _ = favorites[favoritenr].xparam except IndexError: # pragma: no cover favoritenr = 0 except AssertionError: favoritenr = 0 except ValueError: favoritenr = 0 if 'xparam' in kwargs: xparam = kwargs['xparam'] else: if favorites: xparam = favorites[favoritenr].xparam else: xparam = 'distance' if 'yparam1' in kwargs: yparam1 = kwargs['yparam1'] else: if favorites: yparam1 = favorites[favoritenr].yparam1 else: yparam1 = 'pace' if 'yparam2' in kwargs: yparam2 = kwargs['yparam2'] if yparam2 == '': # pragma: no cover yparam2 = 'None' else: if favorites: yparam2 = favorites[favoritenr].yparam2 if yparam2 == '': # pragma: no cover yparam2 = 'None' else: yparam2 = 'hr' if not request.user.is_anonymous: r = getrower(request.user) if favoritenr >= 0 and r.showfavoritechartnotes: try: favoritechartnotes = favorites[favoritenr].notes except IndexError: # pragma: no cover favoritechartnotes = '' else: favoritechartnotes = '' else: # pragma: no cover favoritechartnotes = '' favoritenr = 0 if 'plottype' in kwargs: # pragma: no cover plottype = kwargs['plottype'] else: if favorites: plottype = favorites[favoritenr].plottype else: plottype = 'line' if 'trendline' in kwargs: # pragma: no cover trendline = kwargs['trendline'] else: trendline = False if 'workstrokesonly' in kwargs: # pragma: no cover workstrokesonly = kwargs['workstrokesonly'] else: if favorites: workstrokesonly = not favorites[favoritenr].reststrokes else: workstrokesonly = False if request.method == 'POST' and 'savefavorite' in request.POST: if not request.user.is_anonymous: workstrokesonly = request.POST['workstrokesonlysave'] reststrokes = not workstrokesonly r = getrower(request.user) try: f = FavoriteChart(user=r, xparam=xparam, yparam1=yparam1, yparam2=yparam2, plottype=plottype, workouttype=workouttype, reststrokes=reststrokes) f.save() except KeyError: # pragma: no cover messages.error( request, 'We cannot save the ad hoc metrics in a favorite chart') if request.method == 'POST' and 'xaxis' in request.POST: flexoptionsform = FlexOptionsForm(request.POST) if flexoptionsform.is_valid(): cd = flexoptionsform.cleaned_data includereststrokes = cd['includereststrokes'] plottype = cd['plottype'] trendline = cd['trendline'] workstrokesonly = not includereststrokes flexaxesform = FlexAxesForm(request, request.POST) if flexaxesform.is_valid(): cd = flexaxesform.cleaned_data xparam = cd['xaxis'] yparam1 = cd['yaxis1'] yparam2 = cd['yaxis2'] else: pass if not promember: for name, d in rowingmetrics: if d['type'] != 'basic': if xparam == name: # pragma: no cover xparam = 'time' messages.info( request, 'To use '+d['verbose_name']+', you have to be Pro member') if yparam1 == name: # pragma: no cover yparam1 = 'pace' messages.info( request, 'To use '+d['verbose_name']+', you have to be Pro member') if yparam2 == name: # pragma: no cover yparam2 = 'spm' messages.info( request, 'To use '+d['verbose_name']+', you have to be Pro member') # bring back slashes # yparam1 = yparam1.replace('_slsh_','/') # yparam2 = yparam2.replace('_slsh_','/') # xparam = xparam.replace('_slsh_','/') # create interactive plot ( script, div, workstrokesonly ) = interactive_flex_chart2( encoder.decode_hex(id), request.user.rower, xparam=xparam, yparam1=yparam1, yparam2=yparam2, promember=promember, plottype=plottype, workstrokesonly=workstrokesonly, trendline=trendline, mode=row.workouttype ) axchoicesbasic = {ax[0]: ax[1] for ax in axes if ax[4] == 'basic'} axchoicespro = {ax[0]: ax[1] for ax in axes if ax[4] == 'pro'} noylist = ["time", "distance"] axchoicesbasic.pop("cumdist") if row.workouttype in mytypes.otwtypes: for name, d in rowingmetrics: if d['mode'] == 'erg': axchoicespro.pop(name) else: for name, d in rowingmetrics: if d['mode'] == 'water': axchoicespro.pop(name) from rowers.metrics import nometrics rowdata = rdata(csvfile=row.csvfilename) try: rowdata.set_instroke_metrics() except (AttributeError, TypeError): # pragma: no cover pass try: additionalmetrics = rowdata.get_additional_metrics() additionalmetrics = [ m for m in additionalmetrics if m not in nometrics] except AttributeError: # pragma: no cover additionalmetrics = [] # extrametrics = {m.replace('/','_slsh_'):m for m in additionalmetrics} extrametrics = additionalmetrics initial = { 'xaxis': xparam, 'yaxis1': yparam1, 'yaxis2': yparam2, } flexaxesform = FlexAxesForm(request, initial=initial) initial = { 'includereststrokes': not workstrokesonly, 'plottype': plottype, 'trendline': trendline, } flexoptionsform = FlexOptionsForm(initial=initial) row = Workout.objects.get(id=encoder.decode_hex(id)) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': row.name }, { 'url': reverse('workout_flexchart3_view', kwargs=kwargs), 'name': 'Flex Chart' } ] return render(request, 'flexchart3otw.html', {'the_script': script, 'the_div': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'active': 'nav-workouts', 'workout': row, 'chartform': flexaxesform, 'optionsform': flexoptionsform, 'teams': get_my_teams(request.user), 'id': id, 'xparam': xparam, 'yparam1': yparam1, 'yparam2': yparam2, 'plottype': plottype, 'axchoicesbasic': axchoicesbasic, 'axchoicespro': axchoicespro, 'extrametrics': extrametrics, 'favoritechartnotes': favoritechartnotes, 'noylist': noylist, 'mayedit': mayedit, 'promember': promember, 'workstrokesonly': not workstrokesonly, 'favoritenr': favoritenr, 'maxfav': maxfav, }) @login_required() @permission_required('workout.view_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_flexchart_stacked_view(request, *args, **kwargs): try: id = kwargs['id'] except KeyError: # pragma: no cover raise Http404("Invalid workout number") workout = get_workout(id) r = getrequestrower(request) xparam = 'time' yparam1 = 'pace' yparam2 = 'power' yparam3 = 'hr' yparam4 = 'spm' xparam = r.chartstacktemplate_x yparam1 = r.chartstacktemplate_y[0] yparam2 = r.chartstacktemplate_y[1] yparam3 = r.chartstacktemplate_y[2] yparam4 = r.chartstacktemplate_y[3] if request.method == 'POST': flexaxesform = StravaChartForm(request, request.POST) if flexaxesform.is_valid(): cd = flexaxesform.cleaned_data xparam = cd['xaxis'] yparam1 = cd['yaxis1'] yparam2 = cd['yaxis2'] yparam3 = cd['yaxis3'] yparam4 = cd['yaxis4'] if 'save' in request.POST: r.chartstacktemplate_x = xparam r.chartstacktemplate_y = [yparam1, yparam2, yparam3, yparam4] r.save() ( script, div ) = interactive_flexchart_stacked( encoder.decode_hex(id), r, xparam=xparam, yparam1=yparam1, yparam2=yparam2, yparam3=yparam3, yparam4=yparam4, mode=workout.workouttype, ) initial = { 'xaxis': xparam, 'yaxis1': yparam1, 'yaxis2': yparam2, 'yaxis3': yparam3, 'yaxis4': yparam4, } flexaxesform = StravaChartForm(request, initial=initial, ) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': workout.name }, { 'url': reverse('workout_flexchart_stacked_view', kwargs=kwargs), 'name': 'Stacked Flex Chart' } ] return render(request, 'flexchartstacked.html', { 'the_script': script, 'the_div': div, 'breadcrumbs': breadcrumbs, 'rower': r, 'active': 'nav-workouts', 'workout': workout, 'chartform': flexaxesform, 'id': id, 'xparam': xparam, 'yparam1': yparam1, 'yparam2': yparam2, 'yparam3': yparam3, 'yparam4': yparam4, } ) # The interactive plot with wind corrected pace for OTW outings # @login_required() def workout_unsubscribe_view(request, id=0): w = get_workout(id) if w.privacy == 'hidden' and w.user.user != request.user: # pragma: no cover return HttpResponseForbidden("Permission error") comments = WorkoutComment.objects.filter(workout=w, user=request.user).order_by("created") for c in comments: c.notification = False c.save() form = WorkoutCommentForm() successmessage = 'You have been unsubscribed from new comment notifications for this workout' messages.info(request, successmessage) return render(request, 'workout_comments.html', {'workout': w, 'teams': get_my_teams(request.user), 'comments': comments, 'form': form, }) # list of comments to a workout @login_required() def workout_comment_view(request, id=0): w = get_workout(id) if w.privacy == 'hidden' and w.user.user != request.user: # pragma: no cover return HttpResponseForbidden("Permission error") comments = WorkoutComment.objects.filter(workout=w).order_by("created") # ok we're permitted if request.method == 'POST': r = w.user form = WorkoutCommentForm(request.POST) if form.is_valid(): cd = form.cleaned_data comment = cd['comment'] comment = bleach.clean(comment) try: # pragma: no cover if isinstance(comment, unicode): comment = comment.encode('utf8') elif isinstance(comment, str): comment = comment.decode('utf8') except: pass notification = cd['notification'] c = WorkoutComment(workout=w, user=request.user, comment=comment, notification=notification) c.save() url = reverse('workout_comment_view', kwargs={ 'id': id, }) message = '{name} says: {comment}'.format( name=request.user.first_name, comment=comment, url=url, ) if request.user != r.user: # pragma: no cover a_messages.info(r.user, message.encode('ascii', 'ignore')) _ = myqueue(queuehigh, handle_sendemailnewcomment, r.user.first_name, r.user.last_name, r.user.email, request.user.first_name, request.user.last_name, comment, w.name, w.id, emailbounced=r.emailbounced ) commenters = {oc.user for oc in comments if oc.notification} for u in commenters: # pragma: no cover a_messages.info(u, message) if u != request.user and u != r.user: ocr = Rower.objects.get(user=u) _ = myqueue(queue, handle_sendemailnewresponse, u.first_name, u.last_name, u.email, request.user.first_name, request.user.last_name, comment, w.name, w.id, c.id, emailbounced=ocr.emailbounced ) url = reverse('workout_comment_view', kwargs={ 'id': id}) return HttpResponseRedirect(url) form = WorkoutCommentForm() g = GraphImage.objects.filter(workout=w).order_by("-creationdatetime") for i in g: # pragma: no cover try: width, height = Image.open(i.filename).size i.width = width i.height = height i.save() except: pass rower = getrower(request.user) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_comment_view', kwargs={'id': id}), 'name': 'Comments' } ] return render(request, 'workout_comments.html', {'workout': w, 'rower': rower, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'teams': get_my_teams(request.user), 'graphs': g, 'comments': comments, 'form': form, }) # The basic edit page @login_required() @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_edit_view(request, id=0, message="", successmessage=""): request.session['referer'] = absolute(request)['PATH'] row = get_workoutuser(id, request) if request.user.rower.rowerplan == 'basic' and 'speedcoach2' in row.workoutsource: # pragma: no cover data = read_data(['wash'], ids=[encoder.decode_hex(id)]) try: if data['wash'].std() != 0: url = reverse('paidplans_view') messages.info( request, 'Some Empower Oarlock data are only available to users with a paid plan'.format( u=url) ) except: pass if request.user.rower.rowerplan == 'basic' and 'nklinklogbook' in row.workoutsource: # pragma: no cover data = read_data(['wash'], ids=[encoder.decode_hex(id)]) try: if data['wash'].std() != 0: url = reverse('paidplans_view') messages.info( request, 'Some Empower Oarlock data are only available to users with a paid plan'.format( u=url) ) except: pass form = WorkoutForm(instance=row) if request.method == 'POST': # Form was submitted form = WorkoutForm(request.POST, instance=row) if form.is_valid(): # Get values from form name = form.cleaned_data['name'] date = form.cleaned_data['date'] starttime = form.cleaned_data['starttime'] workouttype = form.cleaned_data['workouttype'] weightcategory = form.cleaned_data['weightcategory'] adaptiveclass = form.cleaned_data['adaptiveclass'] duration = form.cleaned_data['duration'] distance = form.cleaned_data['distance'] private = form.cleaned_data['private'] notes = form.cleaned_data['notes'] newdragfactor = form.cleaned_data['dragfactor'] thetimezone = form.cleaned_data['timezone'] is_commute = form.cleaned_data['is_commute'] is_race = form.cleaned_data['is_race'] subtype = form.cleaned_data['sub_type'] try: rpe = form.cleaned_data['rpe'] if not rpe: # pragma: no cover rpe = -1 except KeyError: # pragma: no cover rpe = -1 ps = form.cleaned_data.get('plannedsession', None) boattype = request.POST.get('boattype', Workout.objects.get( id=encoder.decode_hex(id)).boattype) privacy = request.POST.get('privacy', Workout.objects.get( id=encoder.decode_hex(id)).privacy) rankingpiece = form.cleaned_data.get( 'rankingpiece', Workout.objects.get(id=row.id).rankingpiece) duplicate = form.cleaned_data.get( 'duplicate', Workout.objects.get(id=row.id).duplicate) seatnumber = form.cleaned_data.get('seatnumber', 1) boatname = form.cleaned_data.get('boatname', '') empowerside = form.cleaned_data.get('empowerside','port') if is_race: subtype = "Race" elif is_commute: subtype = "Commute" if private: privacy = 'hidden' else: # pragma: no cover privacy = 'visible' try: startdatetime = datetime.datetime.combine( date, starttime ) except TypeError: # pragma: no cover startdatetime = datetime.datetime.combine( date, datetime.datetime.min.time() ) try: startdatetime = pytz.timezone(thetimezone).localize( startdatetime ) except UnknownTimeZoneError: # pragma: no cover pass try: # aware object can be in any timezone _ = startdatetime.astimezone(pytz.utc) except (ValueError, TypeError): # pragma: no cover startdatetime = timezone.make_aware(startdatetime) try: startdatetime = startdatetime.astimezone( pytz.timezone(thetimezone)) except UnknownTimeZoneError: # pragma: no cover thetimezone = 'UTC' timechanged = (startdatetime != row.startdatetime) row.name = name row.date = date row.starttime = starttime row.startdatetime = startdatetime row.workouttype = workouttype row.weightcategory = weightcategory row.adaptiveclass = adaptiveclass row.notes = notes row.rpe = rpe row.duration = duration row.distance = distance row.boattype = boattype row.duplicate = duplicate row.privacy = privacy row.rankingpiece = rankingpiece row.timezone = thetimezone row.plannedsession = ps row.boatname = boatname row.empowerside = empowerside row.seatnumber = seatnumber row.is_commute = is_commute row.is_race = is_race row.sub_type = subtype dragchanged = False if newdragfactor != row.dragfactor: # pragma: no cover row.dragfactor = newdragfactor dragchanged = True try: row.save() except IntegrityError: # pragma: no cover pass if row.uploadedtointervals is not None and row.user.intervals_auto_export: _ = myqueue(queuehigh, handle_intervals_updateworkout, row ) if ps: # pragma: no cover add_workouts_plannedsession([row], ps, row.user) # change data in csv file datachanged = (dragchanged or timechanged) if datachanged: r = rdata(csvfile=row.csvfilename) if dragchanged: try: # pragma: no cover r.change_drag(newdragfactor) except AttributeError: # pragma: no cover pass if r == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") r.rowdatetime = startdatetime r.write_csv(row.csvfilename, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), r.df) successmessage = "Changes saved" messages.info(request, successmessage) else: form = WorkoutForm(instance=row) g = GraphImage.objects.filter(workout=row).order_by("-creationdatetime") for i in g: # pragma: no cover try: width, height = Image.open(i.filename).size i.width = width i.height = height i.save() except: pass videos = VideoAnalysis.objects.filter(workout=row) # create interactive plot f1 = row.csvfilename u = row.user.user r = getrower(u) rowdata = rdata(csvfile=f1) hascoordinates = 1 courses = [] suggested_courses = [] has_latlon, lat_mean, lon_mean = dataprep.workout_has_latlon(row.id) mapscript = "" mapdiv = "" if has_latlon: try: mapscript, mapdiv = leaflet_chart( rowdata.df[' latitude'], rowdata.df[' longitude'], row.name) except KeyError: # pragma: no cover pass records = VirtualRaceResult.objects.filter( workoutid=row.id, userid=row.user.id, coursecompleted=True) if records.count() > 0: # pragma: no cover courses = list(set([record.course for record in records])) suggested_courses = getnearestcourses([lat_mean, lon_mean], GeoCourse.objects.all(), whatisnear=25, strict=True) suggested_courses = list(set(courses) ^ set(suggested_courses)) s2 = [] for c in suggested_courses: if pass_start(rowdata.df, c): s2.append(c) suggested_courses = s2 breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(row.id)), 'name': row.name }, { 'url': reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(row.id)}), 'name': 'Edit' } ] if row.workouttype in mytypes.otetypes: indoorraces = get_indoorraces(row) else: indoorraces = [] r = row.user # render page return render(request, 'workout_form.html', {'form': form, 'workout': row, 'teams': get_my_teams(request.user), 'graphs': g, 'videos': videos, 'breadcrumbs': breadcrumbs, 'rower': r, 'indoorraces': indoorraces, 'active': 'nav-workouts', 'mapscript': mapscript, 'mapdiv': mapdiv, 'rower': r, 'courses': courses, 'suggested_courses': suggested_courses, }) @login_required() def workout_map_view(request, id=0): request.session['referer'] = absolute(request)['PATH'] w = get_workout(id) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_map_view', kwargs={'id': id}), 'name': 'Map' } ] # create interactive plot f1 = w.csvfilename u = w.user.user r = getrower(u) rowdata = rdata(csvfile=f1) hascoordinates = 1 if rowdata != 0: try: latitude = rowdata.df[' latitude'] if not latitude.std(): # pragma: no cover hascoordinates = 0 except (KeyError, AttributeError): hascoordinates = 0 else: # pragma: no cover hascoordinates = 0 if hascoordinates: mapscript, mapdiv = leaflet_chart(rowdata.df[' latitude'], rowdata.df[' longitude'], w.name) else: mapscript = "" mapdiv = "" mayedit = 0 if not request.user.is_anonymous: r = getrower(request.user) if request.user == w.user.user: mayedit = 1 records = VirtualRaceResult.objects.filter( workoutid=w.id, userid=w.user.id, coursecompleted=True ) courses = [] if records.count() > 0: courses = list(set([record.course for record in records])) has_latlon, lat_mean, lon_mean = dataprep.workout_has_latlon(w.id) suggested_courses = getnearestcourses([lat_mean, lon_mean], GeoCourse.objects.all(), whatisnear=25, strict=True) suggested_courses = list(set(courses) ^ set(suggested_courses)) s2 = [] for c in suggested_courses: if pass_start(rowdata.df, c): s2.append(c) suggested_courses = s2 return render(request, 'map_view.html', {'mapscript': mapscript, 'workout': w, 'rower': r, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'mapdiv': mapdiv, 'mayedit': mayedit, 'courses': courses, 'suggested_courses': suggested_courses, }) # Image upload @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_uploadimage_view(request, id): # pragma: no cover is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' r = getrower(request.user) w = get_workoutuser(id, request) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': w.name }, { 'url': reverse('workout_uploadimage_view', kwargs={'id': id}), 'name': 'Upload Image' } ] images = GraphImage.objects.filter(workout=w) if images.count() >= 6: # pragma: no cover message = "You have reached the maximum number of static images for this workout" messages.error(request, message) url = reverse(r.defaultlandingpage, kwargs={ 'id': id, }) return HttpResponseRedirect(url) if request.method == 'POST': form = ImageForm(request.POST, request.FILES) if form.is_valid(): f = form.cleaned_data['file'] if f is not None: filename, path_and_filename = handle_uploaded_image(f) try: width, height = Image.open(path_and_filename).size except: message = "Not a valid image" messages.error(request, message) os.remove(path_and_filename) url = reverse('workout_uploadimage_view', kwargs={'id': id}) if is_ajax: return JSONResponse({'result': 0, 'url': 0}) else: return HttpResponseRedirect(url) i = GraphImage(workout=w, creationdatetime=timezone.now(), filename=path_and_filename, width=width, height=height) i.save() url = reverse(r.defaultlandingpage, kwargs={'id': id}) if is_ajax: return JSONResponse({'result': 1, 'url': url}) else: return HttpResponseRedirect(url) else: messages.error( request, 'Something went wrong - no file attached') url = reverse('workout_uploadimage_view', kwargs={'id': id}) if is_ajax: return JSONResponse({'result': 0, 'url': 0}) else: return HttpResponseRedirect(url) else: return HttpResponse("Form is not valid") else: if not is_ajax: form = ImageForm() return render(request, 'image_form.html', {'form': form, 'rower': r, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'teams': get_my_teams(request.user), 'workout': w, }) else: return {'result': 0} # Generic chart creation @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_add_chart_view(request, id, plotnr=1): w = get_workoutuser(id, request) r = getrower(request.user) plotnr = int(plotnr) f1 = w.csvfilename[6:-4] timestr = strftime("%Y%m%d-%H%M%S") imagename = f1+timestr+'.png' u = w.user.user r = getrower(u) title = w.name res, jobid = uploads.make_plot( r, w, f1, w.csvfilename, 'timeplot', title, plotnr=plotnr, imagename=imagename ) if res == 0: # pragma: no cover messages.error(request, jobid) else: try: request.session['async_tasks'] += [(jobid, 'make_plot')] except KeyError: request.session['async_tasks'] = [(jobid, 'make_plot')] url = reverse(r.defaultlandingpage, kwargs={ 'id': encoder.encode_hex(w.id)}) return HttpResponseRedirect(url) @login_required @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_toggle_ranking(request, id=0): is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' row = get_workout_by_opaqueid(request, id) row.rankingpiece = not row.rankingpiece row.save() if is_ajax: # pragma: no cover response = JSONResponse({'result': row.rankingpiece}, content_type='application/json') return response else: url = reverse('workouts_view') response = HttpResponseRedirect(url) return response # simple POST API for files on local (e.g. in mailbox) @csrf_exempt def workout_upload_api(request): if request.method != 'POST': # pragma: no cover message = {'status': 'false', 'message': 'this view cannot be accessed through GET'} return JSONResponse(status=403, data=message) # test if JSON try: json_data = json.loads(request.body) secret = json_data['secret'] post_data = json_data except: q = request.POST post_data = {k: q.getlist(k) if len( q.getlist(k)) > 1 else v for k, v in q.items()} # only allow local host hostt = request.get_host().split(':') if hostt[0] not in ['localhost', '127.0.0.1', 'dev.rowsandall.com', 'rowsandall.com','testserver']: message = {'status': 'false', 'message': 'permission denied for host '+hostt[0]} return JSONResponse(status=403, data=message) # check credentials here try: secret = post_data['secret'] except KeyError: dologging('own_api.log','Missing credentials') message = {'status': 'false', 'message': 'missing credentials'} return JSONResponse(status=400, data=message) if secret != settings.UPLOAD_SERVICE_SECRET: message = {'status': 'false', 'message': 'invalid credentials'} return JSONResponse(status=403, data=message) form = DocumentsForm(post_data) optionsform = TeamUploadOptionsForm(post_data) rowerform = TeamInviteForm(post_data) rowerform.fields.pop('email') try: fstr = post_data['file'] nn, ext = os.path.splitext(fstr) if ext == '.gz': # pragma: no cover nn, ext2 = os.path.splitext(nn) ext = ext2+ext f1 = uuid.uuid4().hex[:10]+'-'+time.strftime("%Y%m%d-%H%M%S")+ext f2 = 'media/'+f1 copyfile(fstr, f2) except KeyError: dologging('own_api.log','no filename given') message = {'status': 'false', 'message': 'no filename given'} return JSONResponse(status=400, data=message) except FileNotFoundError: dologging('own_api.log','could not find file') message = {'status': 'false', 'message': 'could not find file'} return JSONResponse(status=400, data=message) # sync related IDs sporttracksid = post_data.get('sporttracksid','') intervalsid = post_data.get('intervalsid','') c2id = post_data.get('c2id', '') garminid = post_data.get('garminid','') workoutid = post_data.get('id','') startdatetime = post_data.get('startdatetime', '') timezone = post_data.get('timezone','') oarlockfirmware = post_data.get('oarlockfirmware', None) inboard = post_data.get('inboard', None) oarlength = post_data.get('oarlength', None) useImpeller = post_data.get('useImpeller', False) seatnumber = post_data.get('seatNumber', 1) boatname = post_data.get('boatName','') portStarboard = post_data.get('portStarboard', 1) empowerside = 'port' stravaid = post_data.get('stravaid','') if portStarboard == 1: empowerside = 'starboard' totalDistance = post_data.get('totalDistance', None) elapsedTime = post_data.get('elapsedTime', None) summary = post_data.get('summary', None) timezone = post_data.get('timezone', 'UTC') s = 'Posting c2id {c2id} to Rowsandall. Startdatetime {startdatetime}, time zone {timezone}'.format( c2id=c2id, startdatetime=startdatetime, timezone=timezone, ) dologging('c2_log.log', s) r = None if form.is_valid(): t = form.cleaned_data['title'] t = re.sub('\r',' ',t) t = re.sub('\n',' ',t) boattype = form.cleaned_data['boattype'] workouttype = form.cleaned_data['workouttype'] try: rpe = form.cleaned_data['rpe'] try: rpe = int(rpe) except ValueError: # pragma: no cover rpe = 0 except KeyError: # pragma: no cover rpe = -1 if rowerform.is_valid(): u = rowerform.cleaned_data['user'] r = getrower(u) if 'useremail' in post_data: us = User.objects.filter(email=post_data['useremail']) if len(us): # pragma: no cover u = us[0] r = getrower(u) else: r = None for rwr in Rower.objects.all(): if rwr.emailalternatives is not None: if post_data['useremail'] in rwr.emailalternatives: r = rwr break if r is not None and r.emailalternatives is not None: if post_data['useremail'] not in r.emailalternatives: # pragma: no cover dologging('own_api.log','could not find user by email') message = {'status': 'false', 'message': 'could not find user'} return JSONResponse(status=400, data=message) if r is None: # pragma: no cover dologging('own_api.log','invalid user') message = {'status': 'false', 'message': 'invalid user'} return JSONResponse(status=400, data=message) notes = form.cleaned_data['notes'] if optionsform.is_valid(): make_plot = optionsform.cleaned_data['make_plot'] plottype = optionsform.cleaned_data['plottype'] makeprivate = optionsform.cleaned_data['makeprivate'] else: # pragma: no cover dologging('own_api.log','invalid options form') dologging('own_api.log',json.dumps(optionsform.errors)) message = optionsform.errors return JSONResponse(status=400, data=message) if r is None: # pragma: no cover dologging('own_api.log','r is None') message = {'status': 'false', 'message': 'something went wrong'} return JSONResponse(status=400, data=message) id, message, f2 = dataprep.new_workout_from_file( r, f2, workouttype=workouttype, workoutsource=None, boattype=boattype, makeprivate=makeprivate, title=t, rpe=rpe, notes=notes, uploadoptions=post_data, startdatetime=startdatetime, timezone=timezone, oarlockfirmware=oarlockfirmware, inboard=inboard, oarlength=oarlength, impeller=useImpeller, workoutid=workoutid, empowerside=empowerside, boatname=boatname, seatnumber=seatnumber, ) if id == 0: # pragma: no cover if message is not None: message = {'status': 'false', 'message': 'unable to process file: '+message} else: message = {'status': 'false', 'message': 'unable to process file'} return JSONResponse(status=400, data=message) if id == -1: # pragma: no cover message = {'status': 'true', 'message': message} return JSONResponse(status=200, data=message) w = Workout.objects.get(id=id) if timezone is not None: # pragma: no cover w.startdatetime = w.startdatetime.astimezone( pytz.timezone(timezone)) w.workoutdate = w.startdatetime.strftime('%Y-%m-%d') w.starttime = w.startdatetime.strftime('%H:%M:%S') w.timezone = timezone dologging('debuglog.log', 'Start Date Time set to {s}'.format( s=w.startdatetime)) dologging('debuglog.log', 'Workout Date set to {s}'.format(s=w.workoutdate)) dologging('debuglog.log', 'Time zone set to {zone}'.format(zone=w.timezone)) w.save() if make_plot: # pragma: no cover res, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) elif r.staticchartonupload != 'None': # pragma: no cover plottype = r.staticchartonupload res, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) if inboard: # pragma: no cover w.inboard = inboard w.save() if oarlength: # pragma: no cover w.oarlength = oarlength w.save() if totalDistance: # pragma: no cover w.distance = totalDistance w.save() if elapsedTime: # pragma: no cover w.duration = totaltime_sec_to_string(elapsedTime) if summary: # pragma: no cover w.summary = summary w.save() uploads.do_sync(w, post_data, quick=True) else: # pragma: no cover # form invalid dologging('own_api.log','Invalid DocumentsForm') dologging('own_api.log',json.dumps(form.errors)) if fstr: os.remove(fstr) message = form.errors return JSONResponse(status=400, data=message) message = {'status': 'true', 'id': w.id} statuscode = 200 if fstr: try: os.remove(fstr) except FileNotFoundError: # pragma: no cover message = {'status': 'true', 'id': w.id, 'message': 'Error deleting temporary file'} statuscode = 500 if r.getemailnotifications and not r.emailbounced: # pragma: no cover link = settings.SITE_URL+reverse( r.defaultlandingpage, kwargs={ 'id': encoder.encode_hex(w.id), } ) _ = send_confirm(r.user, t, link, '') return JSONResponse(status=statuscode, data=message) # This is the main view for processing uploaded files @login_required() def workout_upload_view(request, uploadoptions={ 'makeprivate': False, 'make_plot': False, 'upload_to_C2': False, 'plottype': 'timeplot', 'landingpage': 'workout_edit_view', }, docformoptions={ 'workouttype': 'rower', }, raceid=0): is_ajax = request.META.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' if settings.TESTING: is_ajax = False r = getrower(request.user) if r.imports_are_private: uploadoptions['makeprivate'] = True breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': reverse('workout_upload_view'), 'name': 'Upload' } ] if 'uploadoptions' in request.session: uploadoptions = request.session['uploadoptions'] try: _ = uploadoptions['landingpage'] except KeyError: # pragma: no cover uploadoptions['landingpage'] = r.defaultlandingpage else: request.session['uploadoptions'] = uploadoptions if 'docformoptions' in request.session: docformoptions = request.session['docformoptions'] else: request.session['docformoptions'] = docformoptions makeprivate = uploadoptions.get('makeprivate', False) make_plot = uploadoptions.get('make_plot', False) workouttype = uploadoptions.get('WorkoutType', 'rower') boattype = docformoptions.get('boattype', '1x') try: rpe = docformoptions['rpe'] try: # pragma: no cover rpe = int(rpe) except ValueError: # pragma: no cover rpe = 0 if not rpe: # pragma: no cover rpe = -1 except KeyError: rpe = -1 notes = docformoptions.get('notes', '') workoutsource = uploadoptions.get('workoutsource', None) plottype = uploadoptions.get('plottype', 'timeplot') landingpage = uploadoptions.get('landingpage', r.defaultlandingpage) upload_to_c2 = uploadoptions.get('upload_to_C2', False) upload_to_strava = uploadoptions.get('upload_to_Strava', False) upload_to_st = uploadoptions.get('upload_to_SportTracks', False) upload_to_tp = uploadoptions.get('upload_to_TrainingPeaks', False) upload_to_intervals = uploadoptions.get('upload_to_Intervals', False) response = {} if request.method == 'POST': form = DocumentsForm(request.POST, request.FILES) optionsform = UploadOptionsForm(request.POST, request=request) if form.is_valid(): # f = request.FILES['file'] f = form.cleaned_data['file'] if f is not None: res = handle_uploaded_file(f) else: # pragma: no cover messages.error(request, "Something went wrong - no file attached") url = reverse('workout_upload_view') if is_ajax: return JSONResponse({'result': 0, 'url': 0}) else: return HttpResponseRedirect(url) t = form.cleaned_data['title'] workouttype = form.cleaned_data['workouttype'] boattype = form.cleaned_data['boattype'] try: rpe = form.cleaned_data['rpe'] try: rpe = int(rpe) except ValueError: rpe = 0 except KeyError: # pragma: no cover rpe = -1 request.session['docformoptions'] = { 'workouttype': workouttype, 'boattype': boattype, } notes = form.cleaned_data['notes'] offline = form.cleaned_data['offline'] registrationid = 0 if optionsform.is_valid(): make_plot = optionsform.cleaned_data['make_plot'] plottype = optionsform.cleaned_data['plottype'] upload_to_c2 = optionsform.cleaned_data['upload_to_C2'] upload_to_strava = optionsform.cleaned_data['upload_to_Strava'] upload_to_st = optionsform.cleaned_data['upload_to_SportTracks'] upload_to_tp = optionsform.cleaned_data['upload_to_TrainingPeaks'] upload_to_intervals = optionsform.cleaned_data['upload_to_Intervals'] makeprivate = optionsform.cleaned_data['makeprivate'] landingpage = optionsform.cleaned_data['landingpage'] raceid = optionsform.cleaned_data['raceid'] try: registrationid = optionsform.cleaned_data['submitrace'] except KeyError: registrationid = 0 uploadoptions = { 'makeprivate': makeprivate, 'make_plot': make_plot, 'plottype': plottype, 'upload_to_C2': upload_to_c2, 'upload_to_Strava': upload_to_strava, 'upload_to_SportTracks': upload_to_st, 'upload_to_TrainingPeaks': upload_to_tp, 'upload_to_Intervals': upload_to_intervals, 'landingpage': landingpage, 'boattype': boattype, 'rpe': rpe, 'workouttype': workouttype, } request.session['uploadoptions'] = uploadoptions f1 = res[0] # file name f2 = res[1] # file name incl media directory if not offline: id, message, f2 = dataprep.new_workout_from_file( r, f2, workouttype=workouttype, workoutsource=workoutsource, boattype=boattype, rpe=rpe, makeprivate=makeprivate, title=t, notes=notes, ) else: uploadoptions['secret'] = settings.UPLOAD_SERVICE_SECRET uploadoptions['user'] = r.user.id uploadoptions['title'] = t uploadoptions['file'] = f2 url = settings.UPLOAD_SERVICE_URL _ = myqueue(queuehigh, handle_request_post, url, uploadoptions ) messages.info( request, "The file was too large to process in real time." " It will be processed in a background process." " You will receive an email when it is ready") url = reverse('workout_upload_view') if is_ajax: # pragma: no cover return JSONResponse({'result': 1, 'url': url}) else: response = HttpResponseRedirect(url) return response if not id: # pragma: no cover messages.error(request, message) url = reverse('workout_upload_view') if is_ajax: # pragma: no cover return JSONResponse({'result': 0, 'url': url}) else: response = HttpResponseRedirect(url) return response elif id == -1: # pragma: no cover message = 'The zip archive will be processed in the background." \ " The files in the archive will only be uploaded without the extra actions." \ " You will receive email when the workouts are ready.' messages.info(request, message) url = reverse('workout_upload_view') if is_ajax: return JSONResponse({'result': 1, 'url': url}) else: response = HttpResponseRedirect(url) return response else: if message: # pragma: no cover messages.error(request, message) w = Workout.objects.get(id=id) url = reverse('workout_edit_view', kwargs={ 'id': encoder.encode_hex(w.id), }) if is_ajax: # pragma: no cover response = {'result': 1, 'url': url} else: response = HttpResponseRedirect(url) r = getrower(request.user) if (make_plot): # pragma: no cover res, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) if res == 0: messages.error(request, jobid) else: try: request.session['async_tasks'] += [ (jobid, 'make_plot')] except KeyError: request.session['async_tasks'] = [(jobid, 'make_plot')] elif r.staticchartonupload is not None: plottype = r.staticchartonupload res, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) # upload to C2 if (upload_to_c2): # pragma: no cover try: c2integration = C2Integration(request.user) id = c2integration.workout_export(w) except NoTokenError: id = 0 message = "Something went wrong with the Concept2 sync" messages.error(request, message) if (upload_to_strava): # pragma: no cover strava_integration = StravaIntegration(request.user) try: id = strava_integration.workout_export(w) except NoTokenError: id = 0 message = "Please connect to Strava first" messages.error(request, message) if (upload_to_st): # pragma: no cover st_integration = SportTracksIntegration(request.user) try: id = st_integration.workout_export(w) except NoTokenError: message = "Please connect to SportTracks first" id = 0 messages.error(request, message) if (upload_to_tp): # pragma: no cover tp_integration = TPIntegration(request.user) try: id = tp_integration.workout_export(w) except NoTokenError: message = "Please connect to TrainingPeaks first" messages.error(request, message) if (upload_to_intervals): intervals_integration = IntervalsIntegration(request.user) try: id = intervals_integration.workout_export(w) except NoTokenError: message = "Please connect to Intervals.icu first" messages.error(request, message) if int(registrationid) < 0: # pragma: no cover race = VirtualRace.objects.get(id=-int(registrationid)) if race.sessiontype == 'race': result, comments, errors, jobid = add_workout_race( [w], race, r, doregister=True, ) if result: messages.info( request, "We have submitted your workout to the race") for c in comments: messages.info(request, c) for er in errors: messages.error(request, er) elif race.sessiontype == 'indoorrace': result, comments, errors, jobid = add_workout_indoorrace( [w], race, r, doregister=True, ) if result: messages.info( request, "We have submitted your workout to the race") for c in comments: messages.info(request, c) for er in errors: messages.error(request, er) elif race.sessiontype in ['fastest_time', 'fastest_distance']: result, comments, errors, jobid = add_workout_fastestrace( [w], race, r, doregister=True, ) if result: messages.info( request, "We have submitted your workout to the race") for c in comments: messages.info(request, c) for er in errors: messages.error(request, er) if int(registrationid) > 0: # pragma: no cover races = VirtualRace.objects.filter( registration_closure__gt=timezone.now() ) if raceid != 0: races = VirtualRace.objects.filter( registration_closure__gt=timezone.now(), id=raceid, ) registrations = IndoorVirtualRaceResult.objects.filter( race__in=races, id=registrationid, userid=r.id, ) registrations2 = VirtualRaceResult.objects.filter( race__in=races, id=registrationid, userid=r.id, ) if int(registrationid) in [r.id for r in registrations]: # pragma: no cover # indoor race registrations = registrations.filter(id=registrationid) if registrations: race = registrations[0].race if race.sessiontype == 'indoorrace': result, comments, errors, jobid = add_workout_indoorrace( [w], race, r, recordid=registrations[0].id ) elif race.sessiontype in ['fastest_time', 'fastest_distance']: result, comments, errors, jobid = add_workout_fastestrace( [w], race, r, recordid=registrations[0].id ) if result: messages.info( request, "We have submitted your workout to the race") for c in comments: messages.info(request, c) for er in errors: messages.error(request, er) if int(registrationid) in [r.id for r in registrations2]: # pragma: no cover # race registrations = registrations2.filter(id=registrationid) if registrations: race = registrations[0].race if race.sessiontype == 'race': result, comments, errors, jobid = add_workout_race( [w], race, r, recordid=registrations[0].id ) elif race.sessiontype in ['fastest_time', 'fastest_distance']: result, comments, errors, jobid = add_workout_fastestrace( [w], race, r, recordid=registrations[0].id ) if result: messages.info( request, "We have submitted your workout to the race") for c in comments: messages.info(request, c) for er in errors: messages.error(request, er) if registrationid != 0: # pragma: no cover try: url = reverse('virtualevent_view', kwargs={ 'id': race.id, }) except UnboundLocalError: if landingpage != 'workout_upload_view': url = reverse(landingpage, kwargs={ 'id': encoder.encode_hex(w.id), }) else: # pragma: no cover url = reverse(landingpage) elif landingpage != 'workout_upload_view': # pragma: no cover url = reverse(landingpage, kwargs={ 'id': encoder.encode_hex(w.id), }) else: # pragma: no cover url = reverse(landingpage) if is_ajax: # pragma: no cover response = {'result': 1, 'url': url} else: response = HttpResponseRedirect(url) else: if not is_ajax: # pragma: no cover response = render(request, 'document_form.html', {'form': form, 'teams': get_my_teams(request.user), 'optionsform': optionsform, }) if is_ajax: # pragma: no cover return JSONResponse(response) else: return response else: if not is_ajax: form = DocumentsForm(initial=docformoptions) optionsform = UploadOptionsForm(initial=uploadoptions, request=request, raceid=raceid) return render(request, 'document_form.html', {'form': form, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'teams': get_my_teams(request.user), 'optionsform': optionsform, }) else: # pragma: no cover return {'result': 0} # This is the main view for processing uploaded files @user_passes_test(ispromember, login_url="/rowers/paidplans", redirect_field_name=None, message="This functionality requires a Coach plan or higher") def team_workout_upload_view(request, userid=0, message="", successmessage="", uploadoptions={ 'make_plot': False, 'plottype': 'timeplot', }): if 'uploadoptions' in request.session: uploadoptions = request.session['uploadoptions'] else: request.session['uploadoptions'] = uploadoptions breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': reverse('team_workout_upload_view'), 'name': 'Team Upload' } ] make_plot = uploadoptions['make_plot'] plottype = uploadoptions['plottype'] r = getrower(request.user) if request.method == 'POST': form = DocumentsForm(request.POST, request.FILES) optionsform = TeamUploadOptionsForm(request.POST) rowerform = TeamInviteForm(request.POST) rowerform.fields.pop('email') rowers = Rower.objects.filter( coachinggroups__in=[r.mycoachgroup] ).distinct() rowerform.fields['user'].queryset = User.objects.filter( rower__in=rowers).distinct() rowerform.fields['user'].required = True if form.is_valid() and rowerform.is_valid(): f = request.FILES.get('file', False) if f: res = handle_uploaded_file(f) else: # pragma: no cover messages.error(request, 'No file attached') response = render(request, 'team_document_form.html', {'form': form, 'teams': get_my_teams(request.user), 'optionsform': optionsform, 'rowerform': rowerform, }) return response t = form.cleaned_data['title'] offline = form.cleaned_data['offline'] boattype = form.cleaned_data['boattype'] workouttype = form.cleaned_data['workouttype'] if rowerform.is_valid(): u = rowerform.cleaned_data['user'] r = getrower(u) if not can_add_workout_member(request.user, r): # pragma: no cover message = 'Please select a rower' messages.error(request, message) messages.info(request, successmessage) response = render(request, 'team_document_form.html', {'form': form, 'teams': get_my_teams(request.user), 'optionsform': optionsform, 'rowerform': rowerform, }) return response workouttype = form.cleaned_data['workouttype'] if optionsform.is_valid(): make_plot = optionsform.cleaned_data['make_plot'] plottype = optionsform.cleaned_data['plottype'] uploadoptions = { 'makeprivate': False, 'make_plot': make_plot, 'plottype': plottype, 'upload_to_C2': False, } request.session['uploadoptions'] = uploadoptions f1 = res[0] # file name f2 = res[1] # file name incl media directory if not offline: id, message, f2 = dataprep.new_workout_from_file( r, f2, workouttype=workouttype, boattype=boattype, makeprivate=False, title=t, notes='' ) else: # pragma: no cover _ = myqueue( queuehigh, handle_zip_file, r.user.email, t, f2, emailbounced=r.emailbounced ) messages.info( request, "The file was too large to process in real time." " It will be processed in a background process." " The user will receive an email when it is ready" ) url = reverse('team_workout_upload_view') response = HttpResponseRedirect(url) return response if not id: # pragma: no cover messages.error(request, message) url = reverse('team_workout_upload_view') response = HttpResponseRedirect(url) return response elif id == -1: # pragma: no cover message = 'The zip archive will be processed in the background." \ " The files in the archive will only be uploaded without the extra actions." \ " You will receive email when the workouts are ready.' messages.info(request, message) url = reverse('team_workout_upload_view') response = HttpResponseRedirect(url) return response else: successmessage = "The workout was added to the user's account" messages.info(request, successmessage) url = reverse('team_workout_upload_view') response = HttpResponseRedirect(url) w = Workout.objects.get(id=id) r = getrower(request.user) if (make_plot): # pragma: no cover id, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) elif r.staticchartonupload: plottype = r.staticchartonupload id, jobid = uploads.make_plot(r, w, f1, f2, plottype, t) else: response = render(request, 'team_document_form.html', {'form': form, 'teams': get_my_teams(request.user), 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, 'optionsform': optionsform, 'rowerform': rowerform, }) return response else: form = DocumentsForm() optionsform = TeamUploadOptionsForm(initial=uploadoptions) rowerform = TeamInviteForm(userid=userid) rowerform.fields.pop('email') rowers = Rower.objects.filter( coachinggroups__in=[r.mycoachgroup] ).distinct() rowerform.fields['user'].queryset = User.objects.filter( rower__in=rowers).distinct() return render(request, 'team_document_form.html', {'form': form, # 'teams':get_my_teams(request.user), 'optionsform': optionsform, 'active': 'nav-workouts', 'breadcrumbs': breadcrumbs, # 'rower':r, 'rowerform': rowerform, }) # A page with all the recent graphs (searchable on workout name) @login_required() def list_videos(request, userid=0): r = getrequestrower(request, userid=userid) workouts = Workout.objects.filter(user=r).order_by("-date", "-starttime") query = request.GET.get('q') if query: # pragma: no cover query_list = query.split() if query: query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)) ) searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() g = VideoAnalysis.objects.filter( workout__in=workouts).order_by("-workout__date") paginator = Paginator(g, 8) page = request.GET.get('page', 1) try: g = paginator.page(page) except EmptyPage: # pragma: no cover g = paginator.page(paginator.num_pages) return render(request, 'list_videos.html', {'analyses': g, 'searchform': searchform, 'active': 'nav-analysis', 'rower': r, 'teams': get_my_teams(request.user), }) @login_required() def graphs_view(request, userid=0): request.session['referer'] = reverse('graphs_view') r = getrequestrower(request, userid=userid) workouts = Workout.objects.filter(user=r).order_by("-date", "-starttime") query = request.GET.get('q') if query: # pragma: no cover query_list = query.split() workouts = workouts.filter( reduce(operator.and_, (Q(name__icontains=q) for q in query_list)) | reduce(operator.and_, (Q(notes__icontains=q) for q in query_list)) ) searchform = SearchForm(initial={'q': query}) else: searchform = SearchForm() g = GraphImage.objects.filter( workout__in=workouts).order_by("-creationdatetime") paginator = Paginator(g, 8) page = request.GET.get('page') try: g = paginator.page(page) except PageNotAnInteger: g = paginator.page(1) except EmptyPage: # pragma: no cover g = paginator.page(paginator.num_pages) return render(request, 'list_graphs.html', {'graphs': g, 'searchform': searchform, 'active': 'nav-workouts', 'teams': get_my_teams(request.user), 'rower': r, }) # Show the chart (png image) def graph_show_view(request, id): try: g = GraphImage.objects.get(id=id) try: # pragma: no cover width, height = Image.open(g.filename).size g.width = width g.height = height g.save() except: pass w = Workout.objects.get(id=g.workout.id) r = Rower.objects.get(id=w.user.id) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(w.id)), 'name': w.name }, { 'url': reverse('graph_show_view', kwargs={'id': id}), 'name': 'Chart' } ] return render(request, 'show_graph.html', {'graph': g, 'teams': get_my_teams(request.user), 'workout': w, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'rower': r, }) except GraphImage.DoesNotExist: # pragma: no cover raise Http404("This graph doesn't exist") except Workout.DoesNotExist: # pragma: no cover raise Http404("This workout doesn't exist") # Restore original stroke data and summary @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_summary_restore_view(request, id, message="", successmessage=""): row = get_workout_by_opaqueid(request, id) # still here - this is a workout we may edit f1 = row.csvfilename u = row.user.user r = getrower(u) powerperc = 100*np.array([r.pw_ut2, r.pw_ut1, r.pw_at, r.pw_tr, r.pw_an])/r.ftp ftp = float(r.ftp) if row.workouttype in mytypes.otwtypes: ftp = ftp*(100.-r.otwslack)/100. rr = rrower(hrmax=r.max, hrut2=r.ut2, hrut1=r.ut1, hrat=r.at, hrtr=r.tr, hran=r.an, ftp=ftp, powerperc=powerperc, powerzones=r.powerzones, hrzones=r.hrzones) rowdata = rdata(csvfile=f1, rower=rr) if rowdata == 0: # pragma: no cover raise Http404("Error: CSV Data File Not Found") rowdata.restoreintervaldata() rowdata.write_csv(f1, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), rowdata.df) intervalstats = rowdata.allstats() row.summary = intervalstats row.save() messages.info(request, 'Original Interval Data Restored') url = reverse('workout_summary_edit_view', kwargs={ 'id': encoder.encode_hex(row.id), } ) return HttpResponseRedirect(url) # Split a workout @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workout_split_view(request, id=0): row = get_workout_by_opaqueid(request, id) r = row.user breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, id), 'name': row.name }, { 'url': reverse('workout_split_view', kwargs={'id': id}), 'name': 'Chart' } ] if request.method == 'POST': form = WorkoutSplitForm(request.POST) if form.is_valid(): splittime = form.cleaned_data['splittime'] splitsecond = splittime.hour*3600 splitsecond += splittime.minute*60 splitsecond += splittime.second splitsecond += splittime.microsecond/1.e6 splitmode = form.cleaned_data['splitmode'] try: ids, mesgs = dataprep.split_workout( r, row, splitsecond, splitmode ) for message in mesgs: messages.info(request, message) except IndexError: # pragma: no cover messages.error(request, "Something went wrong in Split") if request.user == r: # pragma: no cover url = reverse('workouts_view') else: mgrids = [team.id for team in Team.objects.filter( manager=request.user)] rwrids = [team.id for team in r.team.all()] teamids = list(set(mgrids) & set(rwrids)) if len(teamids) > 0: # pragma: no cover teamid = teamids[0] url = reverse('workouts_view', kwargs={ 'teamid': int(teamid), } ) else: url = reverse('workouts_view') rowname = row.name try: if isinstance(rowname, unicode): # pragma: no cover rowname = rowname.encode('utf8') elif isinstance(rowname, str): # pragma: no cover rowname = rowname.decode('utf8') except: pass qdict = {'q': rowname} url += '?'+urllib.parse.urlencode(qdict) return HttpResponseRedirect(url) form = WorkoutSplitForm() # create interactive plot try: res = interactive_chart(encoder.decode_hex(id), promember=1) script = res[0] div = res[1] except ValueError: # pragma: no cover pass return render(request, 'splitworkout.html', {'form': form, 'rower': r, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'workout': row, 'thediv': script, 'thescript': div, }) # Fuse two workouts @user_passes_test(ispromember, login_url="/rowers/paidplans", message="This functionality requires a Pro plan or higher." " If you are already a Pro user, please log in to access this functionality", redirect_field_name=None) def workout_fusion_view(request, id1=0, id2=1): try: id1 = encoder.decode_hex(id1) id2 = encoder.decode_hex(id2) except: # pragma: no cover pass r = getrower(request.user) try: w1 = Workout.objects.get(id=id1) w2 = Workout.objects.get(id=id2) r = w1.user if (is_workout_user(request.user, w1) is False) or \ (is_workout_user(request.user, w2) is False): # pragma: no cover raise PermissionDenied("You are not allowed to use these workouts") except Workout.DoesNotExist: # pragma: no cover raise Http404("One of the workouts doesn't exist") if request.method == 'POST': form = FusionMetricChoiceForm(request.POST, instance=w2) if form.is_valid(): cd = form.cleaned_data columns = cd['columns'] timeoffset = cd['offset'] posneg = cd['posneg'] if posneg == 'neg': # pragma: no cover timeoffset = -timeoffset # Create DataFrame df, forceunit = dataprep.datafusion(id1, id2, columns, timeoffset) idnew, message = dataprep.new_workout_from_df(r, df, title='Fused data', parent=w1, forceunit=forceunit) if message is not None: # pragma: no cover messages.error(request, message) else: messages.info(request, message) url = reverse('workout_edit_view', kwargs={ 'id': encoder.encode_hex(idnew), }) return HttpResponseRedirect(url) else: form = FusionMetricChoiceForm(instance=w2) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(w1.id)), 'name': encoder.encode_hex(w1.id) }, { 'url': reverse('workout_fusion_list', kwargs={'id': encoder.encode_hex(id1)}), 'name': 'Sensor Fusion' }, { 'url': reverse('workout_fusion_view', kwargs={ 'id1': encoder.encode_hex(id1), 'id2': encoder.encode_hex(id2) }), 'name': encoder.encode_hex(w2.id) } ] return render(request, 'fusion.html', {'form': form, 'teams': get_my_teams(request.user), 'workout': w1, 'rower': r, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'workout1': w1, 'workout2': w2, }) # See attached courses / attaching courses @login_required() @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_submit_course_view(request, id, courseid): row = get_workout_by_opaqueid(request, id) r = getrower(request.user) try: course = GeoCourse.objects.get(id=courseid) except GeoCourse.DoesNotExist: url = reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(row.id)}) return HttpResponseRedirect(url) # got a course records = VirtualRaceResult.objects.filter( userid = r.id, course=course, workoutid=row.id ) if records: record = records[0] else: # create record record = VirtualRaceResult( userid=r.id, username=r.user.first_name+' '+r.user.last_name, workoutid=row.id, weightcategory=r.weightcategory, adaptiveclass=r.adaptiveclass, course=course, distance=course.distance, boatclass=row.workouttype, boattype=row.boattype, sex=r.sex, age=calculate_age(r.birthdate), ) record.save() job = myqueue( queuehigh, handle_check_race_course, row.csvfilename, row.id, course.id, record.id, r.user.email, r.user.first_name, summary=True, successemail=True, ) try: request.session['async_tasks'] += [ (job.id, 'check_race_course')] except KeyError: # pragma: no cover request.session['async_tasks'] = [ (job.id, 'check_race_course')] messages.info(request, 'We are checking your time on the course in the background." \ " You will receive an email when the check is complete." \ " You can check the status here') url = reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(row.id)}) return HttpResponseRedirect(url) @login_required() @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_course_view(request, id): row = get_workout_by_opaqueid(request, id) r = getrower(request.user) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(row.id)), 'name': row.name }, { 'url': reverse('workout_course_view', kwargs={'id': id}), 'name': 'Measured Courses' } ] courses = [] courseselectform = CourseSelectForm() has_latlon, lat_mean, lon_mean = dataprep.workout_has_latlon(row.id) if has_latlon: courses = getnearestcourses([lat_mean, lon_mean], GeoCourse.objects.all(), whatisnear=25, strict=True) courseselectform = CourseSelectForm(choices=courses) if request.method == 'POST': courseselectform = CourseSelectForm(request.POST, choices=courses) if courseselectform.is_valid(): course = courseselectform.cleaned_data['course'] # get or create a record records = VirtualRaceResult.objects.filter( userid=r.id, course=course, workoutid=row.id ) if records: record = records[0] else: # pragma: no cover # create record record = VirtualRaceResult( userid=r.id, username=r.user.first_name+' '+r.user.last_name, workoutid=row.id, weightcategory=r.weightcategory, adaptiveclass=r.adaptiveclass, course=course, distance=course.distance, boatclass=row.workouttype, boattype=row.boattype, sex=r.sex, age=calculate_age(r.birthdate), ) record.save() job = myqueue( queuehigh, handle_check_race_course, row.csvfilename, row.id, course.id, record.id, r.user.email, r.user.first_name, summary=True, successemail=True, ) try: request.session['async_tasks'] += [ (job.id, 'check_race_course')] except KeyError: # pragma: no cover request.session['async_tasks'] = [ (job.id, 'check_race_course')] messages.info(request, 'We are checking your time on the course in the background." \ " You will receive an email when the check is complete." \ " You can check the status here') # get results records = VirtualRaceResult.objects.filter( course__isnull=False, workoutid=row.id, coursecompleted=True).order_by("duration", "-distance") return render(request, 'workout_courses.html', {'workout': row, 'rower': r, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'teams': get_my_teams(request.user), 'courses': courses, 'courseselectform': courseselectform, 'records': records, }) # Edit the splits/summary @login_required() @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_summary_edit_view(request, id, message="", successmessage="" ): row = get_workout_by_opaqueid(request, id) r = getrower(request.user) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(request, encoder.encode_hex(row.id)), 'name': row.name }, { 'url': reverse('workout_summary_edit_view', kwargs={'id': id}), 'name': 'Edit Intervals' } ] s = "" # still here - this is a workout we may edit f1 = row.csvfilename u = row.user.user r = getrower(u) powerperc = 100*np.array([r.pw_ut2, r.pw_ut1, r.pw_at, r.pw_tr, r.pw_an])/r.ftp ftp = float(r.ftp) if row.workouttype in mytypes.otwtypes: ftp = ftp*(100.-r.otwslack)/100. rr = rrower(hrmax=r.max, hrut2=r.ut2, hrut1=r.ut1, hrat=r.at, hrtr=r.tr, hran=r.an, ftp=ftp, powerperc=powerperc, powerzones=r.powerzones, hrzones=r.hrzones) rowdata = rdata(csvfile=f1, rower=rr) if rowdata == 0: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") intervalstats = row.summary try: itime, idist, itype = rowdata.intervalstats_values() except TypeError: # pragma: no cover return HttpResponse("Error: CSV Data File Not Found") nrintervals = len(idist) activeminutesmax = int(rowdata.duration/60.) activeminutesmin = 0 maxminutes = activeminutesmax savebutton = 'nosavebutton' formvalues = {} form = SummaryStringForm() tss = row.rscore normp = row.normp normv = row.normv normw = row.normw try: normspm = rowdata.df[' Cadence (stokes/min)'].mean() except KeyError: normspm = 0 if tss == -1: tss, normp = dataprep.workout_rscore(row) normv, normw = dataprep.workout_normv(row, pp=8.0) work = int(normw) power = int(normp) try: pace_secs = int(500./normv) except (OverflowError, ZeroDivisionError): pace_secs = 140. try: avpace = datetime.timedelta(seconds=int(500./normv)) except (OverflowError, ZeroDivisionError): avpace = datetime.timedelta(seconds=130) try: normspm = int(normspm) except ValueError: # pragma: no cover normspm = 18 try: normw = int(normw) except ValueError: # pragma: no cover normw = 100 try: normp = int(normp) except ValueError: # pragma: no cover normp = 100 data = { 'power': normp, 'pace': avpace, 'selector': 'power', 'work': normw, 'spm': normspm, 'activeminutesmin': 0, 'activeminutesmax': activeminutesmax, } powerorpace = 'power' if normp == 0: data['selector'] = 'pace' powerorpace = 'pace' # looking for courses courses = [] courseselectform = CourseSelectForm() has_latlon, lat_mean, lon_mean = dataprep.workout_has_latlon(row.id) if has_latlon: courses = getnearestcourses([lat_mean, lon_mean], GeoCourse.objects.all(), whatisnear=25, strict=True) courseselectform = CourseSelectForm(choices=courses) powerupdateform = PowerIntervalUpdateForm(initial=data) if request.method == 'POST' and "course" in request.POST: courseselectform = CourseSelectForm(request.POST, choices=courses) if courseselectform.is_valid(): course = courseselectform.cleaned_data['course'] # get or create a record records = VirtualRaceResult.objects.filter( userid=r.id, course=course, workoutid=row.id ) if records: # pragma: no cover record = records[0] else: # create record record = VirtualRaceResult( userid=r.id, username=r.user.first_name+' '+r.user.last_name, workoutid=row.id, weightcategory=r.weightcategory, adaptiveclass=r.adaptiveclass, course=course, distance=course.distance, boatclass=row.workouttype, boattype=row.boattype, sex=r.sex, age=calculate_age(r.birthdate), ) record.save() job = myqueue( queuehigh, handle_check_race_course, row.csvfilename, row.id, course.id, record.id, r.user.email, r.user.first_name, summary=True, successemail=True, ) try: request.session['async_tasks'] += [ (job.id, 'check_race_course')] except KeyError: request.session['async_tasks'] = [ (job.id, 'check_race_course')] messages.info(request, 'We are checking your time on the course in the background." \ " You will receive an email when the check is complete." \ " You can check the status here') vals = None # feeling lucky / ruptures if request.method == 'POST' and "ruptures" in request.POST: try: df = pd.DataFrame({ 'spm': rowdata.df[' Cadence (stokes/min)'], 'power': rowdata.df[' Power (watts)'], 'v': rowdata.df[' AverageBoatSpeed (m/s)'] }) algo = rpt.Pelt(model="rbf").fit(df.values) result = algo.predict(pen=10) df['time'] = rowdata.df['TimeStamp (sec)'].values try: timeprev = int(df['time'].values[0]) timenext = int(df['time'].values[result[0]]) s = '{delta}sec'.format(delta=timenext-timeprev) except IndexError: # pragma: no cover s = '0sec' for i in range(len(result)-1): timeprev = int(df['time'].values[result[i]-1]) timenext = int(df['time'].values[result[i+1]-1]) interval = '+{delta}sec'.format(delta=timenext-timeprev) s += interval try: rowdata.updateinterval_string(s) except: # pragma: no cover messages.error(request, "Nope, you were not lucky") intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() nrintervals = len(idist) savebutton = 'savestringform' intervalString = s form = SummaryStringForm(initial={'intervalstring': intervalString}) except KeyError: messages.error(request, "Nope, you were not lucky") # We have submitted the mini language interpreter if request.method == 'POST' and "intervalstring" in request.POST: form = SummaryStringForm(request.POST) if form.is_valid(): cd = form.cleaned_data s = cd["intervalstring"] try: rowdata.updateinterval_string(s) except: # pragma: no cover messages.error(request, 'Parsing error') intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() nrintervals = len(idist) savebutton = 'savestringform' powerupdateform = PowerIntervalUpdateForm(initial=data) # we are saving the results obtained from the split by power/pace interpreter elif request.method == 'POST' and "savepowerpaceform" in request.POST: powerorpace = request.POST.get('powerorpace', 'pace') value_pace = request.POST.get('value_pace', avpace) value_power = request.POST.get('value_power', int(normp)) value_work = request.POST.get('value_work', int(normw)) value_spm = request.POST.get('value_spm', int(normspm)) activeminutesmin = request.POST.get('activeminutesmin', 0) try: activeminutesmax = request.POST['activeminutesmax'] except: # pragma: no cover pass try: activesecondsmin = 60.*float(activeminutesmin) activesecondsmax = 60.*float(activeminutesmax) except ValueError: # pragma: no cover activesecondsmin = 0 activeminutesmin = 0 activeminutesmax = maxminutes activesecondsmax = rowdata.duration if abs(rowdata.duration-activesecondsmax) < 60.: # pragma: no cover activesecondsmax = rowdata.duration if powerorpace == 'power': try: power = int(value_power) except ValueError: # pragma: no cover int(normp) elif powerorpace == 'pace': # pragma: no cover try: pace_secs = float(value_pace) except ValueError: try: pace_secs = float(value_pace.replace(',', '.')) except ValueError: pace_secs = int(500./normv) elif powerorpace == 'work': # pragma: no cover try: work = int(value_work) except ValueError: work = int(normw) elif powerorpace == 'spm': # pragma: no cover try: spm = int(value_spm) except ValueError: spm = int(normspm) if powerorpace == 'power' and power is not None: try: vals, units, typ = rowdata.updateinterval_metric( ' Power (watts)', power, mode='larger', debug=False, smoothwindow=15., activewindow=[activesecondsmin, activesecondsmax], ) except: # pragma: no cover messages.error(request, 'Error updating power') elif powerorpace == 'pace': # pragma: no cover try: velo = 500./pace_secs vals, units, typ = rowdata.updateinterval_metric( ' AverageBoatSpeed (m/s)', velo, mode='larger', debug=False, smoothwindow=15., activewindow=[activesecondsmin, activesecondsmax], ) except: messages.error(request, 'Error updating pace') elif powerorpace == 'work': # pragma: no cover try: vals, units, typ = rowdata.updateinterval_metric( 'driveenergy', work, mode='larger', debug=False, smoothwindow=15., activewindow=[activesecondsmin, activesecondsmax], ) except: messages.error(request, 'Error updating Work per Stroke') elif powerorpace == 'spm': # pragma: no cover try: vals, units, typ = rowdata.updateinterval_metric( ' Cadence (stokes/min)', spm, mode='larger', debug=False, smoothwindow=2., activewindow=[activesecondsmin, activesecondsmax],) except: messages.error(request, 'Error updating SPM') intervalString = '' if vals is not None: intervalString = intervals_to_string(vals, units, typ) intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() nrintervals = len(idist) row.summary = intervalstats row.save() rowdata.write_csv(f1, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), rowdata.df) messages.info(request, "Updated interval data saved") data = { 'power': power, 'pace': datetime.timedelta(seconds=int(pace_secs)), 'work': work, 'selector': powerorpace, 'spm': int(normspm), 'activeminutesmin': activeminutesmin, 'activeminutesmax': activeminutesmax, } form = SummaryStringForm(initial={'intervalstring': intervalString}) powerupdateform = PowerIntervalUpdateForm(initial=data) savebutton = 'savepowerpaceform' formvalues = { 'powerorpace': powerorpace, 'value_power': power, 'value_pace': pace_secs, 'value_work': work, } # we are saving the results obtained from the mini language interpreter elif request.method == 'POST' and "savestringform" in request.POST: s = request.POST["savestringform"] try: rowdata.updateinterval_string(s) except (ParseException, err): # pragma: no cover messages.error(request, 'Parsing error in column '+str(err.col)) intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() nrintervals = len(idist) row.summary = intervalstats if s: try: row.notes = u'{n} \n {s}'.format( n=row.notes, s=s ) except TypeError: # pragma: no cover pass row.save() rowdata.write_csv(f1, gzip=True) dataprep.update_strokedata(encoder.decode_hex(id), rowdata.df) messages.info(request, "Updated interval data saved") data = {'intervalstring': s} form = SummaryStringForm(initial=data) powerupdateform = PowerIntervalUpdateForm(initial={ 'power': int(normp), 'pace': avpace, 'selector': 'power', 'work': int(normw), 'spm': int(normspm), 'activeminutesmin': 0, 'activeminutesmax': activeminutesmax, }) savebutton = 'savestringform' # we are saving the results obtained from the power update form elif request.method == 'POST' and 'selector' in request.POST: powerupdateform = PowerIntervalUpdateForm(request.POST) if powerupdateform.is_valid(): cd = powerupdateform.cleaned_data powerorpace = cd['selector'] power = cd['power'] pace = cd['pace'] work = cd['work'] spm = cd['spm'] activeminutesmin = cd['activeminutesmin'] activeminutesmax = cd['activeminutesmax'] activesecondsmin = 60.*activeminutesmin activesecondsmax = 60.*activeminutesmax if abs(rowdata.duration-activesecondsmax) < 60.: # pragma: no cover activesecondsmax = rowdata.duration try: pace_secs = pace.seconds+pace.microseconds/1.0e6 except AttributeError: # pragma: no cover pace_secs = 120. if powerorpace == 'power' and power is not None: vals, units, typ = rowdata.updateinterval_metric(' Power (watts)', power, mode='larger', debug=False, smoothwindow=15, activewindow=[ activesecondsmin, activesecondsmax], ) elif powerorpace == 'pace': # pragma: no cover try: velo = 500./pace_secs vals, units, typ = rowdata.updateinterval_metric(' AverageBoatSpeed (m/s)', velo, mode='larger', debug=False, smoothwindow=15, activewindow=[ activesecondsmin, activesecondsmax], ) except: messages.error(request, 'Error updating pace') elif powerorpace == 'work': # pragma: no cover try: vals, units, typ = rowdata.updateinterval_metric( 'driveenergy', work, mode='larger', debug=False, smoothwindow=15., activewindow=[activesecondsmin, activesecondsmax],) except: messages.error(request, 'Error updating Work per Stroke') elif powerorpace == 'spm': # pragma: no cover try: vals, units, typ = rowdata.updateinterval_metric(' Cadence (stokes/min)', spm, mode='larger', debug=False, smoothwindow=2., activewindow=[ activesecondsmin, activesecondsmax], ) except: messages.error(request, 'Error updating SPM') intervalString = '' if vals is not None: intervalString = intervals_to_string(vals, units, typ) intervalstats = rowdata.allstats() itime, idist, itype = rowdata.intervalstats_values() nrintervals = len(idist) savebutton = 'savepowerpaceform' formvalues = { 'powerorpace': powerorpace, 'value_power': power, 'value_pace': pace_secs, 'value_work': work, 'value_spm': spm, 'activeminutesmin': activeminutesmin, 'activeminutesmax': activeminutesmax, } powerupdateform = PowerIntervalUpdateForm(initial=cd) form = SummaryStringForm( initial={'intervalstring': intervalString}) # create interactive plot try: intervaldata = { 'itime': itime, 'idist': idist, 'itype': itype, 'selector': powerorpace, 'normp': normp, 'normv': normv, } res = interactive_chart(encoder.decode_hex( id), promember=1, intervaldata=intervaldata) script = res[0] div = res[1] except ValueError: # pragma: no cover script = '' div = '' # render page return render(request, 'summary_edit.html', {'form': form, 'activeminutesmax': activeminutesmax, 'activeminutesmin': activeminutesmin, 'maxminutes': maxminutes, 'powerupdateform': powerupdateform, 'workout': row, 'rower': r, 'breadcrumbs': breadcrumbs, 'active': 'nav-workouts', 'teams': get_my_teams(request.user), 'intervalstats': intervalstats, 'nrintervals': nrintervals, 'interactiveplot': script, 'the_div': div, 'intervalstring': s, 'savebutton': savebutton, 'formvalues': formvalues, 'courses': courses, 'courseselectform': courseselectform, }) @login_required() @permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True) def workout_split_by_interval_view(request, id): row = get_workout_by_opaqueid(request, id) r = getrower(request.user) url = reverse("workout_summary_edit_view", kwargs={"id": id}) _ = myqueue( queuehigh, handle_split_workout_by_intervals, row.id, ) messages.info(request,"New workouts are created in the background. They will show up in the workouts list soon.") return HttpResponseRedirect(url) class VideoDelete(DeleteView): login_required = True model = VideoAnalysis template_name = 'video_delete_confirm.html' # extra parameters def get_context_data(self, **kwargs): context = super(VideoDelete, self).get_context_data(**kwargs) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page( self.request, encoder.encode_hex(self.object.workout.id)), 'name': self.object.workout.name }, { 'url': reverse('workout_video_view', kwargs={'id': encoder.encode_hex(self.object.id)}), 'name': 'Video Analysis' }, {'url': reverse('video_delete', kwargs={'pk': str(self.object.pk)}), 'name': 'Delete' } ] context['active'] = 'nav-workouts' context['rower'] = getrower(self.request.user) context['breadcrumbs'] = breadcrumbs return context def get_success_url(self): w = self.object.workout try: w = Workout.objects.get(id=w.id) except Workout.DoesNotExist: # pragma: no cover return reverse('workouts_view') return reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(w.id)}) def get_object(self, *args, **kwargs): obj = super(VideoDelete, self).get_object(*args, **kwargs) if not is_coach_user(self.request.user, obj.workout.user.user): # pragma: no cover raise PermissionDenied( 'You are not allowed to delete this analysis') return obj class GraphDelete(DeleteView): login_required = True model = GraphImage template_name = 'graphimage_delete_confirm.html' # extra parameters def get_context_data(self, **kwargs): context = super(GraphDelete, self).get_context_data(**kwargs) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page( self.request, encoder.encode_hex(self.object.workout.id)), 'name': self.object.workout.name }, { 'url': reverse('graph_show_view', kwargs={'id': self.object.pk}), 'name': 'Chart' }, {'url': reverse('graph_delete', kwargs={'pk': str(self.object.pk)}), 'name': 'Delete' } ] context['active'] = 'nav-workouts' context['rower'] = getrower(self.request.user) context['breadcrumbs'] = breadcrumbs return context def get_success_url(self): w = self.object.workout try: w = Workout.objects.get(id=w.id) except Workout.DoesNotExist: # pragma: no cover return reverse('workouts_view') return reverse('workout_edit_view', kwargs={'id': encoder.encode_hex(w.id)}) def get_object(self, *args, **kwargs): obj = super(GraphDelete, self).get_object(*args, **kwargs) if not is_workout_user(self.request.user, obj.workout): # pragma: no cover raise PermissionDenied('You are not allowed to delete this chart') return obj class WorkoutDelete(PermissionRequiredMixin, DeleteView): login_required = True model = Workout template_name = 'workout_delete_confirm.html' permission_required = 'workout.change_workout' # extra parameters def get_context_data(self, **kwargs): context = super(WorkoutDelete, self).get_context_data(**kwargs) breadcrumbs = [ { 'url': '/rowers/list-workouts/', 'name': 'Workouts' }, { 'url': get_workout_default_page(self.request, encoder.encode_hex(self.object.id)), 'name': encoder.encode_hex(self.object.id) }, {'url': reverse('workout_delete', kwargs={'pk': str(self.object.pk)}), 'name': 'Delete' } ] mayedit = 0 promember = 0 if not self.request.user.is_anonymous: result = self.request.user.is_authenticated and ispromember( self.request.user) if result: promember = 1 if self.request.user == self.object.user.user: mayedit = 1 context['active'] = 'nav-workouts' context['rower'] = getrower(self.request.user) context['breadcrumbs'] = breadcrumbs context['workout'] = self.object context['mayedit'] = mayedit context['promember'] = promember return context def get_success_url(self): return reverse('workouts_view') def get_object(self, *args, **kwargs): try: workout_pk = self.kwargs['pk'] except KeyError: workout_pk = self.kwargs['id'] try: obj = Workout.objects.get(pk=workout_pk) except (ValueError, Workout.DoesNotExist): workout_pk = encoder.decode_hex(workout_pk) try: obj = Workout.objects.get(pk=workout_pk) except Workout.DoesNotExist: # pragma: no cover raise Http404("One of the workouts doesn't exist") # obj = super(WorkoutDelete, self).get_object(*args, **kwargs) return obj