Private
Public Access
1
0

removing mailprocessing file

This commit is contained in:
Sander Roosendaal
2021-12-17 10:32:53 +01:00
parent 27a560c5a7
commit e1931065f9
14 changed files with 85 additions and 816 deletions

View File

@@ -2028,8 +2028,7 @@ def handle_nonpainsled(f2, fileformat, summary='',startdatetime='',empowerfirmwa
return (f2, summary, oarlength, inboard, fileformat, impeller)
# Create new workout from file and store it in the database
# This routine should be used everywhere in views.py and mailprocessing.py
# Currently there is code duplication
# This routine should be used everywhere in views.py
def get_workouttype_from_fit(filename,workouttype='water'):
try:

View File

@@ -120,3 +120,21 @@ def send_template_email(from_email,to_email,subject,
return 0
return res
def send_confirm(user, name, link, options): # pragma: no cover
d = {
'first_name':user.first_name,
'name':name,
'link':link,
}
fullemail = user.email
subject = 'New Workout Added: '+name
res = send_template_email('Rowsandall <info@rowsandall.com>',
[fullemail],
subject,'confirmemail.html',
d
)
return 1

View File

@@ -2429,7 +2429,7 @@ def get_map_script_course(
longend,
scoordinates,
course,
):
): # pragma: no cover
latmean,lonmean,coordinates = course_coord_center(course)
lat_min, lat_max, long_min, long_max = course_coord_maxmin(course)
@@ -2742,7 +2742,7 @@ def leaflet_chart(lat,lon,name="",raceresult=0):
longend,
scoordinates,
)
else:
else: # pragma: no cover
record = VirtualRaceResult.objects.get(id=raceresult)
course = record.course
script = get_map_script_course(
@@ -7010,7 +7010,7 @@ def get_zones_report(rower,startdate,enddate,trainingzones='hr',date_agg='week',
def interactive_zoneschart(rower,data,startdate,enddate,trainingzones='hr',date_agg='week',
yaxis='time'):
if startdate >= enddate:
if startdate >= enddate: # pragma: no cover
st = startdate
startdate = enddate
enddate = st

View File

@@ -1,184 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
# Processes emails sent to workouts@rowsandall.com
""" Processes emails sent to workouts@rowsandall.com """
import shutil
import time
from django.conf import settings
from rowers.tasks import (
handle_sendemail_unrecognized,
handle_sendemail_unrecognizedowner
)
from rowers.models import User, Rower, RowerForm
from django.core.mail import EmailMessage
from rowingdata import rower as rrower
from rowingdata import rowingdata as rrdata
from rowingdata import get_file_type
import zipfile
import os
import rowers.dataprep as dataprep
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('default')
# Sends a confirmation with a link to the workout
from rowers.emails import send_template_email
def send_confirm(user, name, link, options): # pragma: no cover
d = {
'first_name':user.first_name,
'name':name,
'link':link,
}
fullemail = user.email
subject = 'New Workout Added: '+name
res = send_template_email('Rowsandall <info@rowsandall.com>',
[fullemail],
subject,'confirmemail.html',
d
)
return 1
# Reads a "rowingdata" object, plus some error protections
def rdata(file, rower=rrower()):
""" Reads rowingdata data or returns 0 on Error """
try:
result = rrdata(csvfile=file, rower=rower)
except IOError: # pragma: no cover
try:
result = rrdata(csvfile=file + '.gz', rower=rower)
except IOError:
result = 0
except TypeError: # pragma: no cover
try:
result = rrdata(csvfile=file)
except IOError:
try:
result = rrdata(csvfile=file+'.gz', rower=rower)
except IOError:
result = 0
return result
def make_new_workout_from_email(rower, datafile, name, cntr=0,testing=False):
""" This one is used in processemail """
workouttype = 'rower'
impeller = False
try: # pragma: no cover
datafilename = datafile.name
fileformat = get_file_type('media/' + datafilename)
raise ValueError
except IOError: # pragma: no cover
datafilename = datafile.name + '.gz'
fileformat = get_file_type('media/' + datafilename)
except AttributeError:
datafilename = datafile
fileformat = get_file_type('media/' + datafile)
if len(fileformat) == 3 and fileformat[0] == 'zip': # pragma: no cover
with zipfile.ZipFile('media/' + datafilename) as zip_file:
datafilename = zip_file.extract(
zip_file.namelist()[0],
path='media/')[6:]
fileformat = fileformat[2]
f,e = os.path.splitext(datafilename)
if fileformat == 'unknown' and 'txt' not in e:
fcopy = "media/"+datafilename
if not testing: # pragma: no cover
if settings.CELERY:
res = handle_sendemail_unrecognized.delay(
fcopy,
rower.user.email
)
res = handle_sendemail_unrecognizedowner.delay(
rower.user.email,
rower.user.first_name
)
else:
res = queuehigh.enqueue(handle_sendemail_unrecognized,
fcopy,
rower.user.email)
res = queuehigh.enqueue(handle_sendemail_unrecognizedowner,
rower.user.email,
rower.user.first_name
)
return 0
summary = ''
# handle non-Painsled
if fileformat == 'att':
return 0
if fileformat != 'csv':
filename_mediadir, summary, oarlength, inboard,fileformat,impeller = dataprep.handle_nonpainsled(
'media/' + datafilename, fileformat, summary)
if not filename_mediadir: # pragma: no cover
return 0
else:
filename_mediadir = 'media/' + datafilename
inboard = 0.88
oarlength = 2.89
row = rdata(filename_mediadir)
if row == 0: # pragma: no cover
return 0
# change filename
if datafilename[:5] != 'media':
timestr = time.strftime("%Y%m%d-%H%M%S")
datafilename = 'media/' + timestr + str(cntr) + 'o.csv'
try:
avglat = row.df[' latitude'].mean()
avglon = row.df[' longitude'].mean()
if avglat != 0 or avglon != 0:
workouttype = 'water'
except KeyError:
pass
row.write_csv(datafilename, gzip=True)
dosummary = (fileformat != 'fit' and 'speedcoach2' not in fileformat)
dosummary = dosummary or summary == ''
if name == '': # pragma: no cover
name = 'Workout from Background Queue'
id, message = dataprep.save_workout_database(
datafilename, rower,
workouttype=workouttype,
dosummary=dosummary,
summary=summary,
inboard=inboard,
oarlength=oarlength,
title=name,
dosmooth=rower.dosmooth,
workoutsource=fileformat,
notes='',impeller=impeller,
)
return id

View File

@@ -29,7 +29,7 @@ from rowingdata import rower as rrower
from rowingdata import rowingdata as rrdata
import rowers.uploads as uploads
from rowers.mailprocessing import make_new_workout_from_email, send_confirm
import rowers.polarstuff as polarstuff
import rowers.c2stuff as c2stuff
import rowers.rp3stuff as rp3stuff
@@ -60,102 +60,6 @@ def rdata(file_obj, rower=rrower()): # pragma: no cover
return result
def processattachment(rower, fileobj, title, uploadoptions,testing=False):
try:
filename = fileobj.name
# filename = os.path.abspath(fileobj.name)
except AttributeError:
filename = fileobj[6:]
# test if file exists and is not empty
try:
with io.open('media/'+filename,'rb') as fop:
line = fop.readline()
except (IOError, UnicodeEncodeError): # pragma: no cover
return 0
# set user
if rower.user.is_staff and 'username' in uploadoptions:
users = User.objects.filter(username=uploadoptions['username'])
if len(users)==1:
therower = users[0].rower
elif uploadoptions['username'] == '': # pragma: no cover
therower = rower
else: # pragma: no cover
return 0
else:
therower = rower
uploadoptions['secret'] = settings.UPLOAD_SERVICE_SECRET
uploadoptions['user'] = therower.user.id
uploadoptions['file'] = 'media/'+filename
uploadoptions['title'] = title
url = settings.UPLOAD_SERVICE_URL
if not testing: # pragma: no cover
response = requests.post(url,data=uploadoptions)
# print("Upload response status code",response.status_code, response.json())
if response.status_code == 200:
response_json = response.json()
workoutid = [int(response_json['id'])]
else:
workoutid = [0]
# this is ugly and needs to be done better
if testing:
workoutid = [
make_new_workout_from_email(therower, filename, title,testing=testing)
]
if workoutid[0] and uploadoptions and not 'error' in uploadoptions:
workout = Workout.objects.get(id=workoutid[0])
uploads.make_private(workout, uploadoptions)
uploads.set_workouttype(workout, uploadoptions)
uploads.do_sync(workout, uploadoptions)
if 'raceid' in uploadoptions and workoutid[0] and rower.user.is_staff:
if testing and workoutid[0]:
w = Workout.objects.get(id = workoutid[0])
w.startdatetime = timezone.now()
w.date = timezone.now().date()
w.save()
try:
race = VirtualRace.objects.get(id=uploadoptions['raceid'])
if race.manager == rower.user:
result = email_submit_race(therower,race,workoutid[0])
except VirtualRace.DoesNotExist: # pragma: no cover
pass
return workoutid
def get_from_address(message):
from_address = message.from_address[0].lower()
if message.encoded: # pragma: no cover
body = message.text.splitlines()
else:
body = message.get_body().splitlines()
try:
first_line = body[0].lower()
except IndexError: # pragma: no cover
first_line = ''
try:
first_line = first_line.decode('utf-8')
except AttributeError: # pragma: no cover
pass
if "quiske" in first_line: # pragma: no cover
match = re.search(r'[\w\.-]+@[\w\.-]+', first_line)
return match.group(0)
return from_address
class Command(BaseCommand):

View File

@@ -1145,10 +1145,10 @@ def update_indoorvirtualrace(ps,cd):
ps.timezone = timezone_str
if ps.sessiontype == 'fastest_distance':
if ps.sessiontype == 'fastest_distance': # pragma: no cover
ps.approximate_distance = ps.sessionvalue
if ps.course is not None:
if ps.course is not None: # pragma: no cover
ps.approximate_distance = ps.course.distance
ps.save()
@@ -1204,7 +1204,7 @@ def update_virtualrace(ps,cd):
ps.timezone = timezone_str
if ps.sessiontype == 'fastest_distance':
if ps.sessiontype == 'fastest_distance': # pragma: no cover
ps.approximate_distance = ps.sessionvalue
if ps.course is not None:
@@ -1403,129 +1403,6 @@ def race_can_withdraw(r,race):
return True
def email_submit_race(r,race,workoutid):
try:
w = Workout.objects.get(id=workoutid)
except Workout.DoesNotExist: # pragma: no cover
return 0
if race.sessionmode == 'time':
wduration = timefield_to_seconds_duration(w.duration)
delta = wduration - (60.*race.sessionvalue)
if delta > -2 and delta < 2:
w.duration = totaltime_sec_to_string(60.*race.sessionvalue)
w.save()
elif race.sessionmode == 'distance': # pragma: no cover
delta = w.distance - race.sessionvalue
if delta > -5 and delta < 5:
w.distance = race.sessionvalue
w.save()
if race_can_register(r,race):
teamname = ''
weightcategory = w.weightcategory
sex = r.sex
if sex == 'not specified':
sex = 'male'
if not r.birthdate: # pragma: no cover
return 0
age = calculate_age(r.birthdate)
adaptiveclass = r.adaptiveclass
boatclass = w.workouttype
record = IndoorVirtualRaceResult(
userid = r.id,
teamname=teamname,
race=race,
username = u'{f} {l}'.format(
f = r.user.first_name,
l = r.user.last_name
),
weightcategory=weightcategory,
adaptiveclass=adaptiveclass,
duration=dt.time(0,0),
boatclass=boatclass,
coursecompleted=False,
sex=sex,
age=age
)
record.save()
result = add_rower_race(r,race)
otherrecords = IndoorVirtualRaceResult.objects.filter(
race = race)
for otherrecord in otherrecords:
otheruser = Rower.objects.get(id=otherrecord.userid)
othername = otheruser.user.first_name+' '+otheruser.user.last_name
registeredname = r.user.first_name+' '+r.user.last_name
if otherrecord.emailnotifications:
job = myqueue(
queue,
handle_sendemail_raceregistration,
otheruser.user.email, othername,
registeredname,
race.name,
race.id
)
if race_can_submit(r,race):
records = IndoorVirtualRaceResult.objects.filter(
userid = r.id,
race=race
)
if not records: # pragma: no cover
return 0
record = records[0]
workouts = Workout.objects.filter(id=w.id)
result,comments,errors,jobid = add_workout_indoorrace(
workouts,race,r,recordid=record.id
)
if result:
otherrecords = IndoorVirtualRaceResult.objects.filter(
race = race)
for otherrecord in otherrecords:
otheruser = Rower.objects.get(id=otherrecord.userid)
othername = otheruser.user.first_name+' '+otheruser.user.last_name
registeredname = r.user.first_name+' '+r.user.last_name
if otherrecord.emailnotifications:
job = myqueue(
queue,
handle_sendemail_racesubmission,
otheruser.user.email, othername,
registeredname,
race.name,
race.id
)
return 1
else:
return 0 # pragma: no cover
else: # pragma: no cover
return 0
return 0 # pragma: no cover
def race_can_register(r,race):
if race.sessiontype in ['race']:

View File

@@ -586,6 +586,10 @@ class ChallengesTest(TestCase):
'course': self.ThyroBaantje.id
}
form = CourseSelectForm(formdata)
self.assertTrue(form.is_valid())
response = self.c.post(url, formdata, follow=True)
self.assertEqual(response.status_code, 200)

View File

@@ -132,6 +132,46 @@ class ViewTest(TestCase):
except (FileNotFoundError,OSError):
pass
@patch('rowers.dataprep.create_engine')
@patch('rowers.dataprep.getsmallrowdata_db',side_effect=mocked_getsmallrowdata_db)
def test_upload_view_sled_bg(self, mocked_sqlalchemy,mocked_getsmallrowdata_db):
login = self.c.login(username='john',password='koeinsloot')
self.assertTrue(login)
filename = 'rowers/tests/testdata/testdata.csv'
f = open(filename,'rb')
file_data = {'file': f}
form_data = {
'title':'test',
'workouttype':'rower',
'boattype':'1x',
'notes':'aap noot mies',
'rpe':4,
'make_plot':False,
'rpe':6,
'upload_to_c2':False,
'plottype':'timeplot',
'landingpage':'workout_edit_view',
'raceid':0,
'file': f,
'offline': True,
}
request = RequestFactory()
request.user = self.u
form = DocumentsForm(form_data,file_data)
optionsform = UploadOptionsForm(form_data,request=request)
self.assertTrue(optionsform.is_valid())
response = self.c.post('/rowers/workout/upload/', form_data, follow=True)
self.assertRedirects(response, expected_url=reverse('workout_upload_view'),
status_code=302,target_status_code=200)
f.close()

View File

@@ -48,360 +48,11 @@ from rowers.utils import (
str2bool,range_to_color_hex,absolute,myqueue,NoTokenError
)
def cleanbody(body):
try:
body = body.decode('utf-8')
except AttributeError: # pragma: no cover
pass
regex = r".*---\n([\s\S]*?)\.\.\..*"
matches = re.finditer(regex,body)
for m in matches:
if m != None:
body = m.group(0)
return body
sources = [s for s,name in workoutsources]
def matchsource(line):
results = []
for s in sources:
testert = '^source.*(%s)' % s
tester = re.compile(testert)
if tester.match(line.lower()): # pragma: no cover
return tester.match(line.lower()).group(1)
# currently only matches one chart
def matchchart(line):
results = []
testert = '^((chart)|(plot))'
tester2t = testert+'(.*)(dist)'
tester3t = testert+'(.*)(time)'
tester4t = testert+'(.*)(pie)'
tester = re.compile(testert)
tester2 = re.compile(tester2t)
tester3 = re.compile(tester3t)
tester4 = re.compile(tester4t)
if tester.match(line.lower()): # pragma: no cover
if tester2.match(line.lower()):
return 'distanceplot'
if tester3.match(line.lower()):
return 'timeplot'
if tester3.match(line.lower()):
return 'pieplot'
def matchuser(line):
testert = '^(user)'
tester = re.compile(testert)
if tester.match(line.lower()):
words = line.split()
return words[1]
return None
def matchrace(line):
testert = '^(race)'
tester = re.compile(testert)
if tester.match(line.lower()):
words = line.split()
try:
return int(words[1])
except: # pragma: no cover
return None
return None
def matchsync(line):
results = []
tester = '((sync)|(synchronization)|(export))'
tester2 = tester+'(.*)((c2)|(concept2)|(logbook))'
tester3 = tester+'(.*)((tp)|(trainingpeaks))'
tester4 = tester+'(.*)(strava)'
tester5 = tester+'(.*)((st)|(sporttracks))'
tester = re.compile(tester)
if tester.match(line.lower()): # pragma: no cover
testers = [
('upload_to_C2',re.compile(tester2)),
('upload_totp',re.compile(tester3)),
('upload_to_Strava',re.compile(tester4)),
('upload_to_SportTracks',re.compile(tester5)),
('upload_to_MapMyFitness',re.compile(tester7)),
]
for t in testers:
if t[1].match(line.lower()):
results.append(t[0])
return results
def getstravaid(uploadoptions,body):
stravaid = 0
tester = re.compile('^(stravaid)(.*?)(\d+)')
for line in body.splitlines():
if tester.match(line.lower()): # pragma: no cover
stravaid = tester.match(line.lower()).group(3)
uploadoptions['stravaid'] = int(stravaid)
return uploadoptions
def gettypeoptions_body2(uploadoptions,body):
tester = re.compile('^(workout)')
testerb = re.compile('^(boat)')
for line in body.splitlines():
if tester.match(line.lower()):
for typ,verb in workouttypes_ordered.items():
str1 = '^(workout)(.*)({a})'.format(
a = typ.lower()
)
testert = re.compile(str1)
if testert.match(line.lower()):
uploadoptions['workouttype'] = typ
break
if testerb.match(line.lower()):
for typ,verb in boattypes:
str1 = '^(boat)(.*)({a})'.format(
a = typ.replace('+','\+')
)
testert = re.compile(str1)
if testert.match(line.lower()):
uploadoptions['boattype'] = typ
break
return uploadoptions
def getprivateoptions_body2(uploadoptions,body):
tester = re.compile('^(priva)')
for line in body.splitlines():
if tester.match(line.lower()): # pragma: no cover
v = True
negs = ['false','False','None','no']
for neg in negs:
tstr = re.compile('^(.*)'+neg)
if tstr.match(line.lower()):
v = False
uploadoptions['makeprivate'] = v
return uploadoptions
def getworkoutsources(uploadoptions,body):
for line in body.splitlines():
workoutsource = matchsource(line)
if workoutsource: # pragma: no cover
uploadoptions['workoutsource'] = workoutsource
return uploadoptions
def getplotoptions_body2(uploadoptions,body):
for line in body.splitlines():
chart = matchchart(line)
if chart: # pragma: no cover
uploadoptions['make_plot'] = True
uploadoptions['plottype'] = chart
return uploadoptions
def getuseroptions_body2(uploadoptions,body):
for line in body.splitlines():
user = matchuser(line)
if user:
uploadoptions['username'] = user
return uploadoptions
def getraceoptions_body2(uploadoptions,body):
for line in body.splitlines():
raceid = matchrace(line)
if raceid:
uploadoptions['raceid'] = raceid
return uploadoptions
def getsyncoptions_body2(uploadoptions,body):
result = []
for line in body.splitlines():
result = result+matchsync(line)
result = list(set(result))
for r in result: # pragma: no cover
uploadoptions[r] = True
return uploadoptions
def getsyncoptions(uploadoptions,values): # pragma: no cover
try:
value = values.lower()
values = [values]
except AttributeError:
pass
for v in values:
try:
v = v.lower()
if v in ['c2','concept2','logbook']:
uploadoptions['upload_to_C2'] = True
if v in ['tp','trainingpeaks']:
uploadoptions['upload_totp'] = True
if v in ['strava']:
uploadoptions['upload_to_Strava'] = True
if v in ['st','sporttracks']:
uploadoptions['upload_to_SportTracks'] = True
except AttributeError:
pass
return uploadoptions
def getplotoptions(uploadoptions,value): # pragma: no cover
try:
v = value.lower()
if v in ['pieplot','timeplot','distanceplot']:
uploadoptions['make_plot'] = True
uploadoptions['plottype'] = v
elif 'pie' in v:
uploadoptions['make_plot'] = True
uploadoptions['plottype'] = 'pieplot'
elif 'distance' in v:
uploadoptions['make_plot'] = True
uploadoptions['plottype'] = 'distanceplot'
elif 'time' in v:
uploadoptions['make_plot'] = True
uploadoptions['plottype'] = 'timeplot'
except TypeError:
pass
return uploadoptions
def gettype(uploadoptions,value,key): # pragma: no cover
workouttype = 'rower'
for typ,verb in workouttypes_ordered.items():
if value == typ:
workouttype = typ
break
if value == verb:
workouttype = typ
break
uploadoptions[key] = workouttype
return uploadoptions
def getboattype(uploadoptions,value,key): # pragma: no cover
boattype = '1x'
for type,verb in boattypes:
if value == type:
boattype = type
if value == verb:
boattype = type
uploadoptions[key] = boattype
return uploadoptions
def getuser(uploadoptions,value,key): # pragma: no cover
uploadoptions['username'] = value
return uploadoptions
def getrace(uploadoptions,value,key): # pragma: no cover
try:
raceid = int(value)
uploadoptions['raceid'] = raceid
except:
pass
return uploadoptions
def getsource(uploadoptions,value,key): # pragma: no cover
workoutsource = 'unknown'
for type,verb in workoutsources:
if value == type:
workoutsource = type
if value == verb:
workoutsource = type
uploadoptions[key] = workoutsource
return uploadoptions
def getboolean(uploadoptions,value,key): # pragma: no cover
b = True
if not value:
b = False
if value in [False,'false','False',None,'no']:
b = False
uploadoptions[key] = b
return uploadoptions
def upload_options(body):
uploadoptions = {
'boattype':'1x',
'workouttype': 'rower',
}
body = cleanbody(body)
try:
yml = (yaml.safe_load(body))
if yml and 'fromuploadform' in yml: # pragma: no cover
return yml
try:
for key, value in yml.iteritems(): # pragma: no cover
lowkey = key.lower()
if lowkey == 'sync' or lowkey == 'synchronization' or lowkey == 'export':
uploadoptions = getsyncoptions(uploadoptions,value)
if lowkey == 'chart' or lowkey == 'static' or lowkey == 'plot':
uploadoptions = getplotoptions(uploadoptions,value)
if 'priva' in lowkey:
uploadoptions = getboolean(uploadoptions,value,'makeprivate')
if 'workout' in lowkey:
uploadoptions = gettype(uploadoptions,value,'workouttype')
if 'boat' in lowkey:
uploadoptions = getboattype(uploadoptions,value,'boattype')
if 'source' in lowkey:
uploadoptions = getsource(uploadoptions,value,'workoutsource')
if 'username' in lowkey:
uploadoptions = getuser(uploadoptions,value,'username')
if 'raceid' in lowkey:
uploadoptions = getraceid(uploadoptions,value,'raceid')
except AttributeError:
#pass
raise yaml.YAMLError
except yaml.YAMLError as exc:
try:
uploadoptions = getplotoptions_body2(uploadoptions,body)
uploadoptions = getsyncoptions_body2(uploadoptions,body)
uploadoptions = getprivateoptions_body2(uploadoptions,body)
uploadoptions = gettypeoptions_body2(uploadoptions,body)
uploadoptions = getstravaid(uploadoptions,body)
uploadoptions = getworkoutsources(uploadoptions,body)
uploadoptions = getuseroptions_body2(uploadoptions,body)
uploadoptions = getraceoptions_body2(uploadoptions,body)
except IOError: # pragma: no cover
pm = exc.problem_mark
strpm = str(pm)
pbm = "Your email has an issue on line {} at position {}. The error is: ".format(
pm.line+1,
pm.column+1,
)+strpm
return {'error':pbm}
if uploadoptions == {}: # pragma: no cover
uploadoptions['message'] = 'No parsing issue. No valid commands detected'
return uploadoptions
def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0):
if imagename == '':
@@ -489,33 +140,7 @@ import rowers.tpstuff as tpstuff
from rowers.rower_rules import is_promember
def set_workouttype(w,options):
try:
w.workouttype = options['workouttype']
w.save()
except KeyError: # pragma: no cover
pass
try:
w.boattype = options['boattype']
w.save()
except KeyError: # pragma: no cover
pass
return 1
def set_workoutsource(w,options): # pragma: no cover
try:
w.workoutsource = options['workoutsource']
w.save()
except KeyError: # pragma: no cover
pass
def make_private(w,options): # pragma: no cover
if 'makeprivate' in options and options['makeprivate']:
w.privacy = 'hidden'
w.save()
return 1
def do_sync(w,options, quick=False):
@@ -587,7 +212,7 @@ def do_sync(w,options, quick=False):
if w.duplicate:
return 0
if do_c2_export:
if do_c2_export: # pragma: no cover
try:
message,id = c2stuff.workout_c2_upload(w.user.user,w,asynchron=True)
except NoTokenError:
@@ -628,7 +253,7 @@ def do_sync(w,options, quick=False):
except KeyError:
upload_to_tp = False
if do_tp_export:
if do_tp_export: # pragma: no cover
try:
message,id = sporttracksstuff.workout_sporttracks_upload(
w.user.user,w,asynchron=True,

View File

@@ -5,7 +5,6 @@ from __future__ import unicode_literals
from rowers.views.statements import *
from rowers.tasks import handle_calctrimp
from rowers.mailprocessing import send_confirm
from rowers.opaque import encoder
import sys

View File

@@ -637,7 +637,7 @@ def workout_nkimport_view(request,userid=0,after=0,before=0):
workouts = workouts[::-1]
if request.method == 'POST':
if request.method == 'POST': # pragma: no cover
try:
tdict = dict(request.POST.lists())
ids = tdict['workoutid']
@@ -1053,7 +1053,7 @@ def workout_stravaimport_view(request,message="",userid=0):
messages.info(request,'Your Strava workouts will be imported in the background. It may take a few minutes before it appears.'.format(stravaid=stravaid))
url = reverse('workouts_view')
return HttpResponseRedirect(url)
except KeyError:
except KeyError: # pragma: no cover
pass
breadcrumbs = [
@@ -1133,7 +1133,7 @@ def strava_webhook_view(request):
except Rower.DoesNotExist: # pragma: no cover
dologging('strava_webhooks.log','Rower not found')
return HttpResponse(status=200)
except MultipleObjectsReturned:
except MultipleObjectsReturned: # pragma: no cover
s = 'Multiple rowers found for strava ID {id}'.format(id=strava_owner)
dologging('strava_webhooks.log',s)
rs = Rower.objects.filter(strava_owner_id=strava_owner)
@@ -1164,7 +1164,7 @@ def strava_webhook_view(request):
except Rower.DoesNotExist: # pragma: no cover
dologging('strava_webhooks.log','Rower not found')
return HttpResponse(status=200)
except MultipleObjectsReturned:
except MultipleObjectsReturned: # pragma: no cover
rs = Rower.objects.filter(strava_owner_id=strava_owner)
r = rs[0]
if r.strava_auto_delete: # pragma: no cover
@@ -1619,7 +1619,7 @@ def workout_c2import_view(request,page=1,userid=0,message=""):
messages.info(request,'Your Concept2 workouts will be imported in the background. It may take a few minutes before it appears.'.format(c2id=c2id))
url = reverse('workouts_view')
return HttpResponseRedirect(url)
except KeyError:
except KeyError: # pragma: no cover
pass
@@ -1768,18 +1768,6 @@ def workout_getsporttracksworkout_all(request):
return HttpResponseRedirect(url)
# Imports all new workouts from SportTracks
@login_required()
def workout_getstravaworkout_all(request):
r = getrower(request.user)
res = stravastuff.get_strava_workouts(r)
if res == 1: # pragma: no cover
messages.info(request,"Your workouts are being imported and should appear on the site in the next 15 minutes")
else:
messages.error(request,"Couldn't import Strava workouts ")
url = reverse('workouts_view')
return HttpResponseRedirect(url)
# Imports all new workouts from SportTracks

View File

@@ -234,14 +234,14 @@ def course_view(request,id=0):
if request.user.is_authenticated:
notsharing = Rower.objects.filter(share_course_results=False).exclude(id=r.id)
else:
else: # pragma: no cover
notsharing = Rower.objects.filter(share_course_results=False)
notsharing_ids = [o.user.id for o in notsharing]
records = records.exclude(userid__in=notsharing_ids)
if 'onlyme' in request.GET:
if 'onlyme' in request.GET: # pragma: no cover
onlyme = request.GET.get('onlyme',False)
if onlyme == 'true':
onlyme = True
@@ -253,7 +253,7 @@ def course_view(request,id=0):
form = RaceResultFilterForm(records=records,groups=False)
if request.method == 'POST':
if request.method == 'POST': # pragma: no cover
form = RaceResultFilterForm(request.POST,records=records,groups=False)
if form.is_valid():
cd = form.cleaned_data

View File

@@ -243,7 +243,7 @@ import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from rowers.emails import send_template_email,htmlstrip
from rowers.emails import send_template_email,htmlstrip, send_confirm
from rowers.alerts import *
from pytz import timezone as tz,utc
@@ -703,7 +703,7 @@ from rest_framework.permissions import IsAuthenticated
from rowers.permissions import IsOwnerOrNot, IsCompetitorOrNot
import rowers.plots as plots
import rowers.mailprocessing as mailprocessing
from io import BytesIO
from scipy.special import lambertw

View File

@@ -10,7 +10,6 @@ from rowers.views.statements import *
import rowers.teams as teams
import rowers.mytypes as mytypes
import numpy
from rowers.mailprocessing import send_confirm
import rowers.uploads as uploads
import rowers.utils as utils
from rowers.utils import intervals_to_string
@@ -1738,7 +1737,7 @@ def course_compare_view(request,id=0):
'teamid':0
}
)
if request.method == 'POST' and 'workouts' in request.POST:
if request.method == 'POST' and 'workouts' in request.POST: # pragma: no cover
form = WorkoutMultipleCompareForm(request.POST)
form.fields["workouts"].queryset = Workout.objects.filter(id__in=workoutids)
chartform = ChartParamChoiceForm(request.POST)
@@ -1752,7 +1751,7 @@ def course_compare_view(request,id=0):
teamid = chartform.cleaned_data['teamid']
ids = [int(w.id) for w in workouts]
request.session['ids'] = ids
elif request.method == 'POST':
elif request.method == 'POST': # pragma: no cover
form = WorkoutMultipleCompareForm()
form.fields["workouts"].queryset = Workout.objects.filter(id__in=workoutids)
request.session['ids'] = workoutids
@@ -1924,7 +1923,7 @@ def virtualevent_compare_view(request,id=0):
'teamid':0
}
)
if request.method == 'POST' and 'workouts' in request.POST:
if request.method == 'POST' and 'workouts' in request.POST: # pragma: no cover
form = WorkoutMultipleCompareForm(request.POST)
form.fields["workouts"].queryset = Workout.objects.filter(id__in=workoutids)
chartform = ChartParamChoiceForm(request.POST)