2252 lines
72 KiB
Python
2252 lines
72 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
from __future__ import unicode_literals
|
|
|
|
from rowers.views.statements import *
|
|
|
|
import numpy
|
|
|
|
def default(o): # pragma: no cover
|
|
if isinstance(o, numpy.int64): return int(o)
|
|
raise TypeError
|
|
|
|
|
|
# Send workout to TP
|
|
@permission_required('workout.change_workout',fn=get_workout_by_opaqueid,raise_exception=True)
|
|
def workout_tp_upload_view(request,id=0):
|
|
|
|
message = ""
|
|
r = getrower(request.user)
|
|
res = -1
|
|
try:
|
|
thetoken = tp_open(r.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/tpauthorize/")
|
|
|
|
# ready to upload. Hurray
|
|
w = get_workout_by_opaqueid(request,id)
|
|
r = w.user
|
|
|
|
|
|
tcxfile = tpstuff.createtpworkoutdata(w)
|
|
if tcxfile:
|
|
res,reason,status_code,headers = tpstuff.uploadactivity(
|
|
r.tptoken,tcxfile,
|
|
name=w.name
|
|
)
|
|
if res == 0: # pragma: no cover
|
|
message = "Upload to TrainingPeaks failed with status code "+str(status_code)+": "+reason
|
|
try:
|
|
os.remove(tcxfile)
|
|
except WindowsError:
|
|
pass
|
|
|
|
messages.error(request,message)
|
|
|
|
else: # res != 0
|
|
w.uploadedtotp = res
|
|
w.save()
|
|
os.remove(tcxfile)
|
|
messages.info(request,'Uploaded to TrainingPeaks')
|
|
|
|
else: # pragma: no cover # no tcxfile
|
|
message = "Upload to TrainingPeaks failed"
|
|
w.uploadedtotp = -1
|
|
w.save()
|
|
messages.error(request,message)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Send workout to Strava
|
|
# abundance of error logging here because there were/are some bugs
|
|
@permission_required('workout.change_workout',fn=get_workout_by_opaqueid,raise_exception=True)
|
|
def workout_strava_upload_view(request,id=0):
|
|
message = ""
|
|
r = getrower(request.user)
|
|
res = -1
|
|
|
|
try:
|
|
thetoken = strava_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
|
|
if (r.stravatoken == '') or (r.stravatoken is None): # pragma: no cover
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
else:
|
|
# ready to upload. Hurray
|
|
w = get_workout_by_opaqueid(request,id)
|
|
r = w.user
|
|
|
|
try:
|
|
tcxfile,tcxmessg = stravastuff.createstravaworkoutdata(w)
|
|
if tcxfile:
|
|
with open(tcxfile,'rb') as f:
|
|
try:
|
|
newnotes = w.notes+'\n from '+w.workoutsource+' via rowsandall.com'
|
|
except TypeError:
|
|
newnotes = 'from '+w.workoutsource+' via rowsandall.com'
|
|
activity_type = 'Ride'
|
|
if w.workouttype in mytypes.rowtypes:
|
|
activity_type = r.stravaexportas
|
|
if activity_type == 'match':
|
|
try:
|
|
activity_type = mytypes.stravamapping[w.workouttype]
|
|
except KeyError: # pragma: no cover
|
|
activity_type = 'Ride'
|
|
else: # pragma: no cover
|
|
try:
|
|
activity_type = mytypes.stravamapping[w.workouttype]
|
|
except KeyError:
|
|
activity_type = 'Ride'
|
|
|
|
res,mes = stravastuff.handle_stravaexport(
|
|
f,w.name,
|
|
r.stravatoken,
|
|
description=newnotes,
|
|
activity_type=activity_type,quick=False)
|
|
if res==0: # pragma: no cover
|
|
messages.error(request,mes)
|
|
w.uploadedtostrava = -1
|
|
w.save()
|
|
try:
|
|
os.remove(tcxfile)
|
|
except WindowsError:
|
|
pass
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
response = HttpResponseRedirect(url)
|
|
return response
|
|
|
|
try:
|
|
w.uploadedtostrava = res
|
|
w.save()
|
|
try:
|
|
os.remove(tcxfile)
|
|
except WindowsError: # pragma: no cover
|
|
pass
|
|
url = reverse('workout_edit_view',kwargs={'id':w.id})
|
|
|
|
|
|
messages.info(request,mes)
|
|
except: # pragma: no cover
|
|
with open("media/stravaerrors.log","a") as errorlog:
|
|
errorstring = str(sys.exc_info()[0])
|
|
timestr = strftime("%Y%m%d-%H%M%S")
|
|
errorlog.write(timestr+errorstring+"\r\n")
|
|
errorlog.write("views.py line 826\r\n")
|
|
message = 'Error: '+errorstring
|
|
messages.error(request,message)
|
|
else: # pragma: no cover # No tcxfile
|
|
message = "Strava Data error "+tcxmessg
|
|
messages.error(request,message)
|
|
w.uploadedtostrava = -1
|
|
w.save()
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
response = HttpResponseRedirect(url)
|
|
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
}
|
|
)
|
|
response = HttpResponseRedirect(url)
|
|
except ActivityUploadFailed as e: # pragma: no cover
|
|
message = "Strava Upload error: %s" % e
|
|
messages.error(request,message)
|
|
w.uploadedtostrava = -1
|
|
w.save()
|
|
os.remove(tcxfile)
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
response = HttpResponseRedirect(url)
|
|
|
|
return response
|
|
|
|
# Upload workout to Concept2 logbook
|
|
@login_required()
|
|
def workout_c2_upload_view(request,id=0):
|
|
message = ""
|
|
# ready to upload. Hurray
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
try:
|
|
message,c2id = c2stuff.workout_c2_upload(request.user,w)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
if message and c2id <=0: # pragma: no cover
|
|
messages.error(request,message)
|
|
elif message:
|
|
messages.info(request,message)
|
|
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id)
|
|
})
|
|
|
|
|
|
response = HttpResponseRedirect(url)
|
|
|
|
return response
|
|
|
|
# Upload workout to RunKeeper
|
|
@permission_required('workout.change_workout',fn=get_workout_by_opaqueid,raise_exception=True)
|
|
def workout_runkeeper_upload_view(request,id=0):
|
|
message = ""
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
try:
|
|
thetoken = runkeeper_open(r.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/runkeeperauthorize/")
|
|
|
|
# ready to upload. Hurray
|
|
|
|
|
|
data = runkeeperstuff.createrunkeeperworkoutdata(w)
|
|
if not data: # pragma: no cover
|
|
message = "Data error"
|
|
messages.error(request,message)
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':id,
|
|
})
|
|
return HttpResponseRedirect(url)
|
|
|
|
authorizationstring = str('Bearer ' + thetoken)
|
|
headers = {'Authorization': authorizationstring,
|
|
'user-agent': 'sanderroosendaal',
|
|
'Content-Type': 'application/vnd.com.runkeeper.NewFitnessActivity+json',
|
|
'Content-Length':'nnn'}
|
|
|
|
url = "https://api.runkeeper.com/fitnessActivities"
|
|
response = requests.post(url,headers=headers,data=json.dumps(data,default=default))
|
|
|
|
# check for duplicate error first
|
|
if (response.status_code == 409 ): # pragma: no cover # pragma: no cover
|
|
message = "Duplicate error"
|
|
messages.error(request,message)
|
|
w.uploadedtorunkeeper = -1
|
|
w.save()
|
|
elif (response.status_code == 201 or response.status_code==200):
|
|
runkeeperid = runkeeperstuff.getidfromresponse(response)
|
|
w.uploadedtorunkeeper = runkeeperid
|
|
w.save()
|
|
url = reverse('workout_edit_view',
|
|
kwargs={'id':encoder.encode_hex(w.id)})
|
|
|
|
return HttpResponseRedirect(url)
|
|
else: # pragma: no cover
|
|
s = response
|
|
message = "Something went wrong in workout_runkeeper_upload_view: %s - %s" % (s.reason,s.text)
|
|
messages.error(request,message)
|
|
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
}) # pragma: no cover
|
|
|
|
return HttpResponseRedirect(url) # pragma: no cover
|
|
|
|
# Upload workout to Underarmour
|
|
@permission_required('workout.change_workout',fn=get_workout_by_opaqueid,raise_exception=True)
|
|
def workout_underarmour_upload_view(request,id=0):
|
|
message = ""
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
try:
|
|
thetoken = underarmour_open(r.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/underarmourauthorize/")
|
|
|
|
# ready to upload. Hurray
|
|
|
|
|
|
data = underarmourstuff.createunderarmourworkoutdata(w)
|
|
if not data: # pragma: no cover
|
|
message = "Data error"
|
|
messages.error(request,message)
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
return HttpResponseRedirect(url)
|
|
|
|
authorizationstring = str('Bearer ' + thetoken)
|
|
headers = {'Authorization': authorizationstring,
|
|
'Api-Key': UNDERARMOUR_CLIENT_KEY,
|
|
'user-agent': 'sanderroosendaal',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
url = "https://api.ua.com/v7.1/workout/"
|
|
response = requests.post(url,headers=headers,data=json.dumps(data,default=default))
|
|
|
|
|
|
# check for duplicate error first
|
|
if (response.status_code == 409 ): # pragma: no cover # pragma: no cover
|
|
message = "Duplicate error"
|
|
messages.error(request,message)
|
|
w.uploadedtounderarmour = -1
|
|
w.save()
|
|
elif (response.status_code == 201 or response.status_code==200):
|
|
underarmourid = underarmourstuff.getidfromresponse(response)
|
|
w.uploadedtounderarmour = underarmourid
|
|
w.save()
|
|
url = reverse('workout_edit_view',kwargs={'id':encoder.encode_hex(w.id)})
|
|
|
|
return HttpResponseRedirect(url)
|
|
else: # pragma: no cover
|
|
s = response
|
|
message = "Something went wrong in workout_underarmour_upload_view: %s " % s.reason
|
|
messages.error(request,message)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
}) # pragma: no cover
|
|
|
|
return HttpResponseRedirect(url) # pragma: no cover
|
|
|
|
# Upload workout to SportTracks
|
|
@permission_required('workout.change_workout',fn=get_workout_by_opaqueid)
|
|
def workout_sporttracks_upload_view(request,id=0):
|
|
message = ""
|
|
# ready to upload. Hurray
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
try:
|
|
thetoken = sporttracks_open(r.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/sporttracksauthorize/")
|
|
|
|
|
|
|
|
data = sporttracksstuff.createsporttracksworkoutdata(w)
|
|
|
|
if not data: # pragma: no cover
|
|
message = "Data error"
|
|
messages.error(request,message)
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
return HttpResponseRedirect(url)
|
|
|
|
authorizationstring = str('Bearer ' + thetoken)
|
|
headers = {'Authorization': authorizationstring,
|
|
'user-agent': 'sanderroosendaal',
|
|
'Content-Type': 'application/json'}
|
|
|
|
url = "https://api.sporttracks.mobi/api/v2/fitnessActivities.json"
|
|
response = requests.post(url,headers=headers,data=json.dumps(data,default=default))
|
|
|
|
|
|
# check for duplicate error first
|
|
if (response.status_code == 409 ): # pragma: no cover
|
|
message = "Duplicate error"
|
|
messages.error(request,message)
|
|
w.uploadedtosporttracks = -1
|
|
w.save()
|
|
elif (response.status_code == 201 or response.status_code==200):
|
|
s= response.json()
|
|
sporttracksid = sporttracksstuff.getidfromresponse(response)
|
|
w.uploadedtosporttracks = sporttracksid
|
|
w.save()
|
|
message = "Upload to SportTracks was successful"
|
|
messages.info(request,message)
|
|
|
|
url = reverse('workout_edit_view',kwargs={'id':encoder.encode_hex(w.id)})
|
|
return HttpResponseRedirect(url)
|
|
else: # pragma: no cover
|
|
s = response
|
|
message = "Something went wrong in workout_sporttracks_upload_view: %s" % s.reason
|
|
messages.error(request,message)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
}) # pragma: no cover
|
|
|
|
return HttpResponseRedirect(url) # pragma: no cover
|
|
|
|
|
|
# NK Logbook authorization
|
|
@login_required()
|
|
def rower_nk_authorize(request): # pragma: no cover
|
|
state = str(uuid4())
|
|
scope = "read"
|
|
params = {
|
|
"grant_type": "authorization_code",
|
|
"response_type": "code",
|
|
"client_id": NK_CLIENT_ID,
|
|
"scope": scope,
|
|
"state": state,
|
|
"redirect_uri": NK_REDIRECT_URI,
|
|
}
|
|
|
|
url = "https://oauth-stage.nkrowlink.com/oauth/authorize?"+urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Concept2 authorization
|
|
@login_required()
|
|
def rower_c2_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
scope = "user:read,results:write"
|
|
params = {"client_id": C2_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": C2_REDIRECT_URI}
|
|
url = "http://log.concept2.com/oauth/authorize?"+ urllib.parse.urlencode(params)
|
|
url += "&scope="+scope
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Garmin authorization
|
|
@login_required()
|
|
def rower_garmin_authorize(request): # pragma: no cover
|
|
authorization_url,token,secret = garmin_stuff.garmin_authorize()
|
|
request.session['garmin_owner_key'] = token
|
|
request.session['garmin_owner_secret'] = secret
|
|
return HttpResponseRedirect(authorization_url)
|
|
|
|
# Strava Authorization
|
|
@login_required()
|
|
def rower_strava_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": STRAVA_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": STRAVA_REDIRECT_URI,
|
|
"scope": "activity:write,activity:read_all"}
|
|
|
|
url = "https://www.strava.com/oauth/authorize?"+ urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Polar Authorization
|
|
@login_required()
|
|
def rower_polar_authorize(request): # pragma: no cover
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": POLAR_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": POLAR_REDIRECT_URI,
|
|
"state": state,
|
|
# "scope":"accesslink.read_all"
|
|
}
|
|
url = "https://flow.polar.com/oauth2/authorization?" +urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Runkeeper authorization
|
|
@login_required()
|
|
def rower_runkeeper_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": RUNKEEPER_CLIENT_ID,
|
|
"response_type": "code",
|
|
"state": state,
|
|
"redirect_uri": RUNKEEPER_REDIRECT_URI}
|
|
|
|
url = "https://runkeeper.com/apps/authorize?"+ urllib.parse.urlencode(params)
|
|
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# SportTracks Authorization
|
|
@login_required()
|
|
def rower_sporttracks_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": SPORTTRACKS_CLIENT_ID,
|
|
"response_type": "code",
|
|
"state": state,
|
|
"redirect_uri": SPORTTRACKS_REDIRECT_URI}
|
|
|
|
url = "https://api.sporttracks.mobi/oauth2/authorize?"+ urllib.parse.urlencode(params)
|
|
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Underarmour Authorization
|
|
@login_required()
|
|
def rower_underarmour_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
redirect_uri = UNDERARMOUR_REDIRECT_URI
|
|
|
|
url = 'https://www.mapmyfitness.com/v7.1/oauth2/authorize/?' \
|
|
'client_id={0}&response_type=code&redirect_uri={1}'.format(
|
|
UNDERARMOUR_CLIENT_KEY, redirect_uri
|
|
)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Underarmour Authorization
|
|
@login_required()
|
|
def rower_rp3_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
params = {"client_id": RP3_CLIENT_KEY,
|
|
"response_type": "code",
|
|
"redirect_uri": RP3_REDIRECT_URI,
|
|
}
|
|
url = "https://rp3rowing-app.com/oauth/authorize/?" +urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Underarmour Authorization
|
|
@login_required()
|
|
def rower_tp_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
params = {"client_id": TP_CLIENT_KEY,
|
|
"response_type": "code",
|
|
"redirect_uri": TP_REDIRECT_URI,
|
|
"scope": "file:write",
|
|
}
|
|
url = "https://oauth.trainingpeaks.com/oauth/authorize/?" +urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Concept2 token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_c2_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = c2stuff.do_refresh_token(r.c2refreshtoken)
|
|
|
|
if res[0] != None:
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
r = getrower(request.user)
|
|
r.c2token = access_token
|
|
r.tokenexpirydate = expirydatetime
|
|
r.c2refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request,successmessage)
|
|
else: # pragma: no cover
|
|
message = "Something went wrong (refreshing tokens). Please reauthorize:"
|
|
messages.error(request,message)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Underarmour token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_underarmour_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = underarmourstuff.do_refresh_token(
|
|
r.underarmourrefreshtoken,
|
|
r.underarmourtoken
|
|
)
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.underarmourtoken = access_token
|
|
r.underarmourtokenexpirydate = expirydatetime
|
|
r.underarmourrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request,successmessage)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# TrainingPeaks token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_tp_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = tpstuff.do_refresh_token(
|
|
r.tprefreshtoken,
|
|
)
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.tptoken = access_token
|
|
r.tptokenexpirydate = expirydatetime
|
|
r.tprefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request,successmessage)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
|
|
# SportTracks token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_sporttracks_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = sporttracksstuff.do_refresh_token(
|
|
r.sporttracksrefreshtoken,
|
|
)
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.sporttrackstoken = access_token
|
|
r.sporttrackstokenexpirydate = expirydatetime
|
|
r.sporttracksrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request,successmessage)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Concept2 Callback
|
|
@login_required()
|
|
def rower_process_callback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
res = c2stuff.get_token(code)
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request,message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0: # pragma: no cover
|
|
message = res[1]
|
|
message += ' Contact info@rowsandall.com if this behavior persists.'
|
|
messages.error(request,message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.c2token = access_token
|
|
r.tokenexpirydate = expirydatetime
|
|
r.c2refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# dummy
|
|
@login_required()
|
|
def rower_process_twittercallback(request): # pragma: no cover
|
|
return "dummy"
|
|
|
|
# Process Polar Callback
|
|
@login_required()
|
|
def rower_process_polarcallback(request): # pragma: no cover
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError:
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError:
|
|
message = "access error"
|
|
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token, expires_in, user_id = polarstuff.get_token(code)
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.polartoken = access_token
|
|
r.polartokenexpirydate = expirydatetime
|
|
r.polaruserid = user_id
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# process Garmin callback
|
|
@login_required()
|
|
def rower_process_garmincallback(request): # pragma: no cover
|
|
r = getrower(request.user)
|
|
absoluteurl = request.build_absolute_uri()
|
|
|
|
try:
|
|
key = request.session['garmin_owner_key']
|
|
secret = request.session['garmin_owner_secret']
|
|
except KeyError:
|
|
authorization_url, key, secret = garmin_stuff.garmin_authorize()
|
|
garmintoken,garminrefreshtoken = garmin_stuff.garmin_processcallback(absoluteurl,key,secret)
|
|
r.garmintoken = garmintoken
|
|
r.garminrefreshtoken = garminrefreshtoken
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process NK Callback
|
|
@login_required()
|
|
def rower_process_nkcallback(request): # pragma: no cover
|
|
# do stuff
|
|
try:
|
|
code = request.GET.get('code',None)
|
|
res = nkstuff.get_token(code)
|
|
except MultiValueDictKeyError:
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request,message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0:
|
|
message = res[1]
|
|
message += ' Contact support@rowsandall.com if this behavior persists'
|
|
messages.error(request,message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
nk_owner_id = res[3]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.nktoken = access_token
|
|
r.nktokenexpirydate = expirydatetime
|
|
r.nkrefreshtoken = refresh_token
|
|
r.nk_owner_id = nk_owner_id
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
def workout_getnkworkout_all(request):
|
|
try:
|
|
thetoken = nk_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("rower_nk_authorize")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = nkstuff.get_nk_workouts(r,do_async=True)
|
|
|
|
if result:
|
|
messages.info(request,"Your NK workouts will be imported in the coming few minutes")
|
|
else: # pragma: no cover
|
|
messages.error(request,"Your NK workouts import failed")
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid, raise_exception=True)
|
|
def workout_nkimport_view(request,userid=0,after=0,before=0):
|
|
r = getrequestrower(request,userid=userid)
|
|
try:
|
|
thetoken = nk_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/nkauthorize/")
|
|
|
|
res = nkstuff.get_nk_workout_list(request.user,before=before,after=after)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/nkauthorize/")
|
|
message = "Something went wrong in workout_nkimport_view"
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# get NK IDs
|
|
nkids = [item['id'] for item in res.json()]
|
|
knownnkids = uniqify([
|
|
w.uploadedtonk for w in Workout.objects.filter(user=r)
|
|
])
|
|
tombstones = [
|
|
t.uploadedtonk for t in TombStone.objects.filter(user=r)
|
|
]
|
|
parkedids = []
|
|
try:
|
|
with open('nkblocked.json','r') as nkblocked:
|
|
jsondata = json.load(nkblocked)
|
|
parkedids = jsondata['ids']
|
|
except FileNotFoundError: # pragma: no cover
|
|
pass
|
|
|
|
knownnkids = uniqify(knownnkids+tombstones+parkedids)
|
|
newids = [nkid for nkid in nkids if not nkid in knownnkids]
|
|
|
|
|
|
nkdata = [{
|
|
'id':int(item['id']),
|
|
'elapsed_time':item['elapsedTime'],
|
|
'start_date':arrow.get(item['startTime']),
|
|
} for item in res.json()]
|
|
|
|
#for item in res.json():
|
|
# print(item['startTime'],arrow.get(item['startTime']),item['name'])
|
|
|
|
workouts = []
|
|
|
|
for item in res.json():
|
|
d = int(float(item['totalDistanceGps'])) # could also be Impeller
|
|
i = item['id']
|
|
n = item['name']
|
|
if i in knownnkids:
|
|
nnn = ''
|
|
else: # pragma: no cover
|
|
nnn = 'NEW'
|
|
ttot = str(datetime.timedelta(seconds=int(float(item['elapsedTime'])/1000.)))
|
|
s = arrow.get(item['startTime'],tzinfo=r.defaulttimezone).format(arrow.FORMAT_RFC850)
|
|
#s = arrow.get(item['startTime']).to(r.defaulttimezone).isoformat()
|
|
keys = ['id','distance','duration','starttime','name','new']
|
|
values = [i,d,ttot,s,n,nnn]
|
|
res = dict(zip(keys, values))
|
|
workouts.append(res)
|
|
|
|
workouts = workouts[::-1]
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_nkimport_view'),
|
|
'name':'Strava'
|
|
},
|
|
]
|
|
|
|
return render(request,'nk_list_import.html',
|
|
{
|
|
'workouts':workouts,
|
|
'rower':r,
|
|
'active':'nav-workouts',
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user)
|
|
})
|
|
|
|
# Process Strava Callback
|
|
@login_required()
|
|
def rower_process_stravacallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
scope = request.GET['scope']
|
|
except MultiValueDictKeyError:# pragma: no cover
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError:# pragma: no cover
|
|
message = "access error"
|
|
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
res = stravastuff.get_token(code)
|
|
|
|
if res[0]:
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.stravatoken = access_token
|
|
r.stravatokenexpirydate = expirydatetime
|
|
r.stravarefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
id = stravastuff.set_strava_athlete_id(r.user)
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
else:# pragma: no cover
|
|
message = "Something went wrong with the Strava authorization"
|
|
messages.error(request,message)
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process Runkeeper callback
|
|
@login_required()
|
|
def rower_process_runkeepercallback(request):
|
|
code = request.GET['code']
|
|
res = runkeeperstuff.get_token(code)
|
|
access_token = res[0]
|
|
|
|
if access_token == 0:# pragma: no cover
|
|
messages.error(request,"Something went wrong importing the token")
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
r = getrower(request.user)
|
|
r.runkeepertoken = access_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Process SportTracks callback
|
|
@login_required()
|
|
def rower_process_sporttrackscallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except:# pragma: no cover
|
|
messages.error(request,"Sorry, something went wrong.")
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = sporttracksstuff.get_token(code)
|
|
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.sporttrackstoken = access_token
|
|
r.sporttrackstokenexpirydate = expirydatetime
|
|
r.sporttracksrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Process Underarmour callback
|
|
@login_required()
|
|
def rower_process_underarmourcallback(request):
|
|
code = request.GET['code']
|
|
res = underarmourstuff.get_token(code)
|
|
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.underarmourtoken = access_token
|
|
r.underarmourtokenexpirydate = expirydatetime
|
|
r.underarmourrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process RP3 callback
|
|
@login_required()
|
|
def rower_process_rp3callback(request): # pragma: no cover
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError:
|
|
messages.error(request,"There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request,errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = rp3stuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.rp3token = access_token
|
|
r.rp3tokenexpirydate = expirydatetime
|
|
r.rp3refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process TrainingPeaks callback
|
|
@login_required()
|
|
def rower_process_tpcallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
messages.error(request,"There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request,errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = tpstuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.tptoken = access_token
|
|
r.tptokenexpirydate = expirydatetime
|
|
r.tprefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request,successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
# Process Own API callback - for API testing purposes
|
|
@login_required()
|
|
def rower_process_testcallback(request): # pragma: no cover
|
|
code = request.GET['code']
|
|
res = ownapistuff.get_token(code)
|
|
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
text = "Access Token:\n"
|
|
text += access_token
|
|
|
|
text += "\n\nRefresh Token:\n"
|
|
text += refresh_token
|
|
|
|
return HttpResponse(text)
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_rp3import_view(request,userid=0):
|
|
r = getrequestrower(request,userid=userid)
|
|
|
|
try:
|
|
thetoken = rp3stuff.rp3_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
url = reverse('rower_rp3_authorize')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = rp3stuff.get_rp3_workout_list(request.user)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
message = "Something went wrong in workout_rp3import_view"
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts_list = pd.json_normalize(res.json()['data']['workouts'])
|
|
|
|
|
|
rp3ids = workouts_list['id'].values
|
|
|
|
knownrp3ids = uniqify([
|
|
w.uploadedtorp3 for w in Workout.objects.filter(user=r)
|
|
])
|
|
|
|
newids = [rp3id for rp3id in rp3ids if not rp3id in knownrp3ids]
|
|
|
|
workouts = []
|
|
|
|
for key,data in workouts_list.iterrows():
|
|
i = data['id']
|
|
if i in knownrp3ids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
|
|
s = data['executed_at']
|
|
|
|
keys = ['id','starttime','new']
|
|
values = [i,s,nnn]
|
|
|
|
res = dict(zip(keys,values))
|
|
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_rp3import_view'),
|
|
'name':'RP3'
|
|
},
|
|
]
|
|
|
|
return render(request,'rp3_list_import.html',
|
|
{
|
|
'workouts':workouts,
|
|
'rower':r,
|
|
'active':'nav-workouts',
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user)
|
|
})
|
|
|
|
|
|
# The page where you select which Strava workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_stravaimport_view(request,message="",userid=0):
|
|
r = getrequestrower(request,userid=userid)
|
|
#if r.user != request.user:
|
|
# messages.info(request,"You cannot import other people's workouts from Strava")
|
|
try:
|
|
thetoken = strava_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
|
|
|
|
res = stravastuff.get_strava_workout_list(request.user)
|
|
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
message = "Something went wrong in workout_stravaimport_view"
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
workouts = []
|
|
r = getrower(request.user)
|
|
stravaids = [int(item['id']) for item in res.json()]
|
|
stravadata = [{
|
|
'id':int(item['id']),
|
|
'elapsed_time':item['elapsed_time'],
|
|
'start_date':item['start_date'],
|
|
} for item in res.json()]
|
|
|
|
wfailed = Workout.objects.filter(user=r,uploadedtostrava=-1)
|
|
|
|
for w in wfailed: # pragma: no cover
|
|
for item in stravadata:
|
|
elapsed_time = item['elapsed_time']
|
|
start_date = item['start_date']
|
|
stravaid = item['id']
|
|
if arrow.get(start_date) == arrow.get(w.startdatetime):
|
|
elapsed_td = datetime.timedelta(seconds=int(elapsed_time))
|
|
elapsed_time = datetime.datetime.strptime(
|
|
str(elapsed_td),
|
|
"%H:%M:%S"
|
|
)
|
|
if str(elapsed_time)[-7:] == str(w.duration)[-7:]:
|
|
w.uploadedtostrava = int(stravaid)
|
|
w.save()
|
|
|
|
|
|
knownstravaids = uniqify([
|
|
w.uploadedtostrava for w in Workout.objects.filter(user=r)
|
|
])
|
|
newids = [stravaid for stravaid in stravaids if not stravaid in knownstravaids]
|
|
|
|
for item in res.json():
|
|
d = int(float(item['distance']))
|
|
i = item['id']
|
|
if i in knownstravaids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
n = item['name']
|
|
ttot = str(datetime.timedelta(seconds=int(float(item['elapsed_time']))))
|
|
s = item['start_date']
|
|
r = item['type']
|
|
keys = ['id','distance','duration','starttime','type','name','new']
|
|
values = [i,d,ttot,s,r,n,nnn]
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_stravaimport_view'),
|
|
'name':'Strava'
|
|
},
|
|
]
|
|
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request,'strava_list_import.html',
|
|
{'workouts':workouts,
|
|
'rower':r,
|
|
'active':'nav-workouts',
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# for Strava webhook request validation
|
|
@csrf_exempt
|
|
def strava_webhook_view(request):
|
|
if request.method == 'GET':
|
|
challenge = request.GET.get('hub.challenge')
|
|
verificationtoken = request.GET.get('hub.verify_token')
|
|
if verificationtoken != stravastuff.webhookverification: # pragma: no cover
|
|
return HttpResponse(status=403)
|
|
data = {"hub.challenge":challenge}
|
|
return JSONResponse(data)
|
|
|
|
# logging
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write(str(request.body))
|
|
|
|
# POST - does nothing so far
|
|
data = json.loads(request.body)
|
|
aspect_type = data['aspect_type']
|
|
object_type = data['object_type']
|
|
strava_owner = data['owner_id']
|
|
starttimeunix = data['event_time']
|
|
try:
|
|
aspect_type = data['aspect_type']
|
|
object_type = data['object_type']
|
|
strava_owner = data['owner_id']
|
|
starttimeunix = data['event_time']
|
|
except KeyError: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 1038')
|
|
return HttpResponse(status=200)
|
|
|
|
if object_type == 'activity':
|
|
if aspect_type == 'create':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 1051')
|
|
return HttpResponse(status=200)
|
|
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Rower not found')
|
|
return HttpResponse(status=200)
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count()==0 and r.strava_auto_import:
|
|
job = stravastuff.async_get_workout(r.user,stravaid)
|
|
if job == 0: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M',t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Strava strava_open yielded NoTokenError')
|
|
else: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Workouts already existing:\n ')
|
|
for w in ws:
|
|
f.write('{w} \n'.format(w=str(w)))
|
|
elif aspect_type == 'delete':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 10576')
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0:
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
try: # pragma: no cover
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Rower not found')
|
|
return HttpResponse(status=200)
|
|
if r.strava_auto_delete: # pragma: no cover
|
|
for w in ws:
|
|
if w.user == r:
|
|
w.delete()
|
|
elif aspect_type == 'update':
|
|
try:
|
|
updates = data['updates']
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 10576')
|
|
return HttpResponse(status=200)
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Workout not found')
|
|
return HttpResponse(status=200)
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Rower not found')
|
|
return HttpResponse(status=200)
|
|
if r.strava_auto_import:
|
|
for key, value in updates.items():
|
|
for w in ws:
|
|
if key == 'title':
|
|
w.name = value
|
|
w.save()
|
|
if key == 'type':
|
|
try:
|
|
w.workouttype = mytypes.stravamappinginv[value]
|
|
w.save()
|
|
except KeyError: # pragma: no cover
|
|
with open('strava_webhooks.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Workout type not found: '+str(value))
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
# For push notifications from Garmin
|
|
@csrf_exempt
|
|
def garmin_summaries_view(request): # pragma: no cover
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('garminlog.log','a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write(str(request.body))
|
|
f.write('\n')
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
activities = data['activities']
|
|
result = garmin_stuff.garmin_workouts_from_summaries(activities)
|
|
|
|
if result:
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
@csrf_exempt
|
|
def garmin_newfiles_ping(request): # pragma: no cover
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
|
|
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
files = data['activityFiles']
|
|
for file in data['activityFiles']:
|
|
try:
|
|
garmintoken = file['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
callbackURL = file['callbackURL']
|
|
starttime = file['startTimeInSeconds']
|
|
fileType = file['fileType']
|
|
job = garmin_stuff.get_garmin_file(r,callbackURL,starttime,fileType)
|
|
except Rower.DoesNotExist:
|
|
pass
|
|
except KeyError:
|
|
pass
|
|
|
|
return HttpResponse(status=200) # pragma: no cover
|
|
|
|
@csrf_exempt
|
|
def garmin_deregistration_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
deregistrations = data['deregistrations']
|
|
for deregistration in deregistrations:
|
|
try:
|
|
garmintoken = deregistration['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
r.garmintoken = ''
|
|
r.save()
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
pass
|
|
except KeyError: # pragma: no cover
|
|
pass
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
@csrf_exempt
|
|
def garmin_details_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
result = garmin_stuff.garmin_workouts_from_details(data)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
# The page where you select which RunKeeper workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_runkeeperimport_view(request,message="",userid=0):
|
|
res = runkeeperstuff.get_runkeeper_workout_list(request.user)
|
|
if (res.status_code != 200):
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.runkeepertoken == '') or (r.runkeepertoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/runkeeperauthorize/")
|
|
message = "Something went wrong in workout_runkeeperimport_view" # pragma: no cover
|
|
messages.error(request,message) # pragma: no cover
|
|
|
|
if settings.DEBUG: # pragma: no cover
|
|
return HttpResponse(res)
|
|
else: # pragma: no cover
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts = []
|
|
for item in res.json()['items']:
|
|
d = int(float(item['total_distance']))
|
|
i = getidfromuri(item['uri'])
|
|
ttot = str(datetime.timedelta(seconds=int(float(item['duration']))))
|
|
s = item['start_time']
|
|
r = item['type']
|
|
keys = ['id','distance','duration','starttime','type']
|
|
values = [i,d,ttot,s,r]
|
|
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_runkeeperimport_view'),
|
|
'name':'Runkeeper'
|
|
}
|
|
]
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request,'runkeeper_list_import.html',
|
|
{'workouts':workouts,
|
|
'rower':r,
|
|
'active':'nav-workouts',
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# The page where you select which RunKeeper workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_underarmourimport_view(request,message="",userid=0):
|
|
res = underarmourstuff.get_underarmour_workout_list(request.user)
|
|
if (res.status_code != 200):
|
|
return HttpResponseRedirect("/rowers/me/underarmourauthorize/")
|
|
|
|
workouts = []
|
|
items = res.json()['_embedded']['workouts']
|
|
for item in items:
|
|
s = item['start_datetime']
|
|
i,r = underarmourstuff.get_idfromuri(request.user,item['_links'])
|
|
n = item['name']
|
|
try:
|
|
d = item['aggregates']['distance_total']
|
|
except KeyError: # pragma: no cover
|
|
d = 0
|
|
try:
|
|
ttot = item['aggregates']['active_time_total']
|
|
except KeyError:
|
|
ttot = 0
|
|
|
|
keys = ['id','distance','duration','starttime','type']
|
|
values = [i,d,ttot,s,r]
|
|
thedict = dict(zip(keys,values))
|
|
|
|
workouts.append(thedict)
|
|
|
|
rower = getrower(request.user)
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_c2import_view'),
|
|
'name':'Concept2'
|
|
},
|
|
]
|
|
|
|
return render(request,'underarmour_list_import.html',
|
|
{'workouts':workouts,
|
|
'breadcrumbs':breadcrumbs,
|
|
'rower':rower,
|
|
'active':'nav-workouts',
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# the page where you select which Polar workout to Import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_polarimport_view(request,userid=0): # pragma: no cover
|
|
exercises = polarstuff.get_polar_workouts(request.user)
|
|
workouts = []
|
|
|
|
try:
|
|
a = exercises.status_code
|
|
if a == 401:
|
|
messages.error(request,'Not authorized. You need to connect to Polar first')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except: # pragma: no cover
|
|
pass
|
|
|
|
for exercise in exercises:
|
|
try:
|
|
d = exercise['distance']
|
|
except KeyError:
|
|
d = 0
|
|
|
|
i = exercise['id']
|
|
transactionid = exercise['transaction-id']
|
|
starttime = exercise['start-time']
|
|
rowtype = exercise['sport']
|
|
durationstring = exercise['duration']
|
|
duration = isodate.parse_duration(durationstring)
|
|
keys = ['id','distance','duration','starttime','type','transactionid']
|
|
values = [i,d,duration,starttime,rowtype,transactionid]
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_polarimport_view'),
|
|
'name':'Polar'
|
|
},
|
|
]
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request, 'polar_list_import.html',
|
|
{
|
|
'workouts':workouts,
|
|
'active':'nav-workouts',
|
|
'rower':r,
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
|
|
|
|
|
|
# The page where you select which SportTracks workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_sporttracksimport_view(request,message="",userid=0):
|
|
|
|
|
|
res = sporttracksstuff.get_sporttracks_workout_list(request.user)
|
|
if (res.status_code != 200):
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.sporttrackstoken == '') or (r.sporttrackstoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/sporttracksauthorize/")
|
|
else: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/sporttracksrefresh/")
|
|
message = "Something went wrong in workout_sporttracksimport_view" # pragma: no cover
|
|
messages.error(request,message) # pragma: no cover
|
|
if settings.DEBUG: # pragma: no cover
|
|
return HttpResponse(res)
|
|
else: # pragma: no cover
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts = []
|
|
r = getrower(request.user)
|
|
stids = [int(getidfromuri(item['uri'])) for item in res.json()['items']]
|
|
knownstids = uniqify([
|
|
w.uploadedtosporttracks for w in Workout.objects.filter(user=r)
|
|
])
|
|
newids = [stid for stid in stids if not stid in knownstids]
|
|
for item in res.json()['items']:
|
|
d = int(float(item['total_distance']))
|
|
i = int(getidfromuri(item['uri']))
|
|
if i in knownstids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
n = item['name']
|
|
ttot = str(datetime.timedelta(seconds=int(float(item['duration']))))
|
|
s = item['start_time']
|
|
r = item['type']
|
|
keys = ['id','distance','duration','starttime','type','name','new']
|
|
values = [i,d,ttot,s,r,n,nnn]
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
r = getrower(request.user)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_sporttracksimport_view'),
|
|
'name':'SportTracks'
|
|
},
|
|
]
|
|
|
|
return render(request,'sporttracks_list_import.html',
|
|
{'workouts':workouts,
|
|
'breadcrumbs':breadcrumbs,
|
|
'active':'nav-workouts',
|
|
'rower':r,
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# List of workouts on Concept2 logbook. This view only used for debugging
|
|
@login_required()
|
|
def c2listdebug_view(request,page=1,message=""): # pragma: no cover
|
|
try:
|
|
thetoken = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
r = getrower(request.user)
|
|
|
|
res = c2stuff.get_c2_workout_list(request.user,page=page)
|
|
|
|
if (res.status_code != 200):
|
|
message = "Something went wrong in workout_c2import_view (C2 token renewal)"
|
|
messages.error(request,message)
|
|
if settings.DEBUG:
|
|
return HttpResponse(res)
|
|
else:
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
workouts = []
|
|
|
|
for item in res.json()['data']:
|
|
d = item['distance']
|
|
i = item['id']
|
|
ttot = item['time_formatted']
|
|
s = item['date']
|
|
r = item['type']
|
|
s2 = item['source']
|
|
c = item['comments']
|
|
keys = ['id','distance','duration','starttime','rowtype','source','comment']
|
|
values = [i,d,ttot,s,r,s2,c]
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
|
|
return render(request,
|
|
'c2_list_import2.html',
|
|
{'workouts':workouts,
|
|
'teams':get_my_teams(request.user),
|
|
})
|
|
|
|
# Import all unknown workouts available on Concept2 logbook
|
|
@login_required()
|
|
def workout_getc2workout_all(request,page=1,message=""): # pragma: no cover
|
|
try:
|
|
thetoken = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = c2stuff.get_c2_workouts(r,do_async=True)
|
|
|
|
if result:
|
|
messages.info(request,'Your C2 workouts will be imported in the coming few minutes')
|
|
else:
|
|
messages.error(request,'Your C2 workouts import failed')
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
def workout_getrp3workout_all(request): # pragma: no cover
|
|
try:
|
|
thetoken = rp3_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/rp3authorize/")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = rp3stuff.get_rp3_workouts(r,do_async=True)
|
|
|
|
if result:
|
|
messages.info(request,'Your RP3 workouts will be imported in the coming few minutes')
|
|
else:
|
|
messages.error(request,'Your RP3 workouts import failed')
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# List of workouts available on Concept2 logbook - for import
|
|
@login_required()
|
|
@permission_required('rower.is_coach',fn=get_user_by_userid,raise_exception=True)
|
|
def workout_c2import_view(request,page=1,userid=0,message=""):
|
|
|
|
r = getrequestrower(request,userid=userid)
|
|
|
|
try:
|
|
thetoken = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
res = c2stuff.get_c2_workout_list(request.user,page=page)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
message = "Something went wrong in workout_c2import_view (C2 token refresh)"
|
|
messages.error(request,message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts = []
|
|
c2ids = [item['id'] for item in res.json()['data']]
|
|
knownc2ids = uniqify([
|
|
w.uploadedtoc2 for w in Workout.objects.filter(user=r)
|
|
])
|
|
tombstones = [
|
|
t.uploadedtoc2 for t in TombStone.objects.filter(user=r)
|
|
]
|
|
parkedids = []
|
|
try:
|
|
with open('c2blocked.json','r') as c2blocked:
|
|
jsondata = json.load(c2blocked)
|
|
parkedids = jsondata['ids']
|
|
except FileNotFoundError: # pragma: no cover
|
|
pass
|
|
|
|
knownc2ids = uniqify(knownc2ids+tombstones+parkedids)
|
|
|
|
newids = [c2id for c2id in c2ids if not c2id in knownc2ids]
|
|
for item in res.json()['data']:
|
|
d = item['distance']
|
|
i = item['id']
|
|
ttot = item['time_formatted']
|
|
s = item['date']
|
|
r = item['type']
|
|
s2 = item['source']
|
|
c = item['comments']
|
|
if i in knownc2ids:
|
|
nnn = ''
|
|
else: # pragma: no cover
|
|
nnn = 'NEW'
|
|
keys = ['id','distance','duration','starttime','rowtype','source','comment','new']
|
|
values = [i,d,ttot,s,r,s2,c,nnn]
|
|
res = dict(zip(keys,values))
|
|
workouts.append(res)
|
|
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url':'/rowers/list-workouts/',
|
|
'name':'Workouts'
|
|
},
|
|
{
|
|
'url':reverse('workout_c2import_view'),
|
|
'name':'Concept2'
|
|
},
|
|
{
|
|
'url':reverse('workout_c2import_view',kwargs={'page':page}),
|
|
'name':'Page '+str(page)
|
|
}
|
|
]
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request,
|
|
'c2_list_import2.html',
|
|
{'workouts':workouts,
|
|
'rower':r,
|
|
'active':'nav-workouts',
|
|
'breadcrumbs':breadcrumbs,
|
|
'teams':get_my_teams(request.user),
|
|
'page':page,
|
|
})
|
|
|
|
importsources = {
|
|
'c2':c2stuff,
|
|
'strava':stravastuff,
|
|
'polar':polarstuff,
|
|
'ownapi':ownapistuff,
|
|
'runkeeper':runkeeperstuff,
|
|
'sporttracks':sporttracksstuff,
|
|
'trainingpeaks':tpstuff,
|
|
'underarmour':underarmourstuff,
|
|
'nk':nkstuff,
|
|
}
|
|
|
|
@login_required()
|
|
def workout_getrp3importview(request,externalid):
|
|
r = getrequestrower(request)
|
|
token = rp3stuff.rp3_open(r.user)
|
|
startdatetime = request.GET.get('startdatetime')
|
|
|
|
job = myqueue(queuehigh,
|
|
handle_rp3_async_workout,
|
|
r.user.id,
|
|
token,
|
|
externalid,
|
|
startdatetime,
|
|
20,
|
|
)
|
|
|
|
#id = rp3stuff.get_rp3_workout(r.user,externalid,startdatetime=startdatetime)
|
|
|
|
messages.info(request,'The workout will be imported in the background')
|
|
|
|
url = reverse('workout_rp3import_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
def workout_getimportview(request,externalid,source = 'c2',do_async=False):
|
|
data,strokedata = importsources[source].get_workout(request.user,externalid,
|
|
do_async=do_async)
|
|
|
|
if do_async: # pragma: no cover
|
|
messages.info(request,"Your workout will be imported in the background")
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
if not data: # pragma: no cover
|
|
messages.error(request,"No strokedata received")
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
try:
|
|
workouttype = mytypes.c2mappinginv[data['type']]
|
|
except KeyError:
|
|
workouttype = 'rower'
|
|
|
|
|
|
# Now works only for C2
|
|
try:
|
|
if strokedata == 0:
|
|
messages.error(request,'An error occurred importing the workout from Concept2')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
if strokedata.empty: # pragma: no cover
|
|
distance = data['distance']
|
|
c2id = data['id']
|
|
workouttype = mytypes.c2mappinginv[data['type']]
|
|
verified = data['verified']
|
|
startdatetime = iso8601.parse_date(data['date'])
|
|
weightclass = data['weight_class']
|
|
weightcategory = 'hwt'
|
|
if weightclass == "L":
|
|
weightcategory = 'lwt'
|
|
totaltime = data['time']/10.
|
|
duration = dataprep.totaltime_sec_to_string(totaltime)
|
|
duration = datetime.datetime.strptime(duration,'%H:%M:%S.%f').time()
|
|
|
|
try:
|
|
timezone_str = data['timezone']
|
|
except:
|
|
timezone_str = 'UTC'
|
|
|
|
if timezone_str is None:
|
|
timezone_str = 'UTC'
|
|
|
|
|
|
|
|
workoutdate = startdatetime.astimezone(
|
|
pytz.timezone(timezone_str)
|
|
).strftime('%Y-%m-%d')
|
|
starttime = startdatetime.astimezone(
|
|
pytz.timezone(timezone_str)
|
|
).strftime('%H:%M:%S')
|
|
|
|
try:
|
|
notes = data['comments']
|
|
name = notes[:40]
|
|
except (KeyError,TypeError):
|
|
comments = 'C2 Import Workout from {startdatetime}'.format(startdatetime=startdatetime)
|
|
name = notes
|
|
|
|
r = getrower(request.user)
|
|
|
|
id, message = dataprep.create_row_df(r,
|
|
distance,
|
|
duration,
|
|
startdatetime,
|
|
workouttype=workouttype)
|
|
|
|
w = Workout.objects.get(id=id)
|
|
w.uploadedtoc2 = c2id
|
|
w.name = name
|
|
w.notes = notes
|
|
w.workouttype = workouttype
|
|
w.save()
|
|
|
|
message = "This workout does not have any stroke data associated with it. We created synthetic stroke data."
|
|
messages.info(request,message)
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id),
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# strokedata not empty - continue
|
|
id,message = importsources[source].add_workout_from_data(
|
|
request.user,
|
|
externalid,data,
|
|
strokedata,
|
|
source=source,
|
|
workoutsource=source)
|
|
|
|
w = get_workout(encoder.encode_hex(id))
|
|
|
|
if 'workout' in data: # pragma: no cover
|
|
if 'splits' in data['workout']:
|
|
splitdata = data['workout']['splits']
|
|
elif 'intervals' in data['workout']:
|
|
splitdata = data['workout']['intervals']
|
|
else:
|
|
splitdata = False
|
|
else:
|
|
splitdata = False
|
|
|
|
# splitdata (only for C2)
|
|
if splitdata: # pragma: no cover
|
|
w.summary,sa,results = c2stuff.summaryfromsplitdata(splitdata,data,w.csvfilename,workouttype=workouttype)
|
|
w.save()
|
|
|
|
from rowingdata.trainingparser import getlist
|
|
# set stroke data in CSV file
|
|
if sa:
|
|
values = getlist(sa)
|
|
units = getlist(sa,sel='unit')
|
|
types = getlist(sa,sel='type')
|
|
|
|
rowdata = rdata(csvfile=w.csvfilename)
|
|
if rowdata:
|
|
rowdata.updateintervaldata(values,
|
|
units,types,results)
|
|
|
|
rowdata.write_csv(w.csvfilename,gzip=True)
|
|
dataprep.update_strokedata(w.id,rowdata.df)
|
|
|
|
|
|
|
|
if source == 'strava':
|
|
w.uploadedtostrava = externalid
|
|
elif source == 'c2':
|
|
w.uploadedtoc2 = externalid
|
|
elif source == 'polar': # pragma: no cover
|
|
w.uploadedtopolar = externalid
|
|
elif source == 'runkeeper':
|
|
w.uploadedtorunkeeper = externalid
|
|
elif source == 'sporttracks':
|
|
w.uploadedtosporttracks = externalid
|
|
elif source == 'trainingpeaks': # pragma: no cover
|
|
w.uploadedtotp = externalid
|
|
elif source == 'underarmour':
|
|
w.uploadedtounderarmour = externalid
|
|
|
|
w.save()
|
|
|
|
if message: # pragma: no cover
|
|
messages.error(request,message)
|
|
|
|
r = getrower(request.user)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs = {
|
|
'id':encoder.encode_hex(w.id)
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getsporttracksworkout_all(request):
|
|
res = sporttracksstuff.get_sporttracks_workout_list(request.user)
|
|
if (res.status_code == 200):
|
|
r = getrower(request.user)
|
|
stids = [int(getidfromuri(item['uri'])) for item in res.json()['items']]
|
|
knownstids = uniqify([
|
|
w.uploadedtosporttracks for w in Workout.objects.filter(user=r)
|
|
])
|
|
newids = [stid for stid in stids if not stid in knownstids]
|
|
for sporttracksid in newids:
|
|
data,strokedata = sporttracksstuff.get_workout(
|
|
request.user,sporttracksid)
|
|
|
|
id,message = sporttracksstuff.add_workout_from_data(
|
|
request.user,sporttracksid,data,strokedata
|
|
)
|
|
if id==0: # pragma: no cover
|
|
messages.error(request,message)
|
|
|
|
else:
|
|
w = Workout.objects.get(id=id)
|
|
w.uploadedtosporttracks=sporttracksid
|
|
w.save()
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getstravaworkout_all(request):
|
|
r = getrower(request.user)
|
|
res = stravastuff.get_strava_workouts(r)
|
|
if res == 1: # pragma: no cover
|
|
messages.info(request,"Your workouts are being imported and should appear on the site in the next 15 minutes")
|
|
else:
|
|
messages.error(request,"Couldn't import Strava workouts ")
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getstravaworkout_next(request): # pragma: no cover
|
|
|
|
r = Rower.objects.get(user=request.user)
|
|
|
|
res = stravastuff.get_strava_workout_list(r.user)
|
|
|
|
if (res.status_code != 200):
|
|
return 0
|
|
else:
|
|
stravaids = [int(item['id']) for item in res.json()]
|
|
|
|
alldata = {}
|
|
for item in res.json():
|
|
alldata[item['id']] = item
|
|
|
|
knownstravaids = uniqify([
|
|
w.uploadedtostrava for w in Workout.objects.filter(user=r)
|
|
])
|
|
newids = [stravaid for stravaid in stravaids if not stravaid in knownstravaids]
|
|
|
|
theid = newids[0]
|
|
|
|
workoutid = stravastuff.create_async_workout(alldata,r.user,stravaid,debug=True)
|
|
|
|
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|