1835 lines
59 KiB
Python
1835 lines
59 KiB
Python
from rowsandall_app.settings import NK_OAUTH_LOCATION
|
|
|
|
from rowers.views.statements import *
|
|
from rowers.plannedsessions import get_dates_timeperiod
|
|
from rowers.tasks import fetch_strava_workout
|
|
|
|
import numpy
|
|
|
|
|
|
def default(o): # pragma: no cover
|
|
if isinstance(o, numpy.int64):
|
|
return int(o)
|
|
raise TypeError
|
|
|
|
|
|
# Send workout to TP
|
|
@permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True)
|
|
def workout_tp_upload_view(request, id=0):
|
|
|
|
message = ""
|
|
r = getrower(request.user)
|
|
res = -1
|
|
try:
|
|
_ = tp_open(r.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/tpauthorize/")
|
|
|
|
# ready to upload. Hurray
|
|
w = get_workout_by_opaqueid(request, id)
|
|
r = w.user
|
|
|
|
tcxfile = tpstuff.createtpworkoutdata(w)
|
|
if tcxfile:
|
|
res, reason, status_code, headers = tpstuff.uploadactivity(
|
|
r.tptoken, tcxfile,
|
|
name=w.name
|
|
)
|
|
if res == 0: # pragma: no cover
|
|
message = "Upload to TrainingPeaks failed with status code " + \
|
|
str(status_code)+": "+reason
|
|
try:
|
|
os.remove(tcxfile)
|
|
except WindowsError:
|
|
pass
|
|
|
|
messages.error(request, message)
|
|
|
|
else: # res != 0
|
|
w.uploadedtotp = res
|
|
w.save()
|
|
os.remove(tcxfile)
|
|
messages.info(request, 'Uploaded to TrainingPeaks')
|
|
|
|
else: # pragma: no cover # no tcxfile
|
|
message = "Upload to TrainingPeaks failed"
|
|
w.uploadedtotp = -1
|
|
w.save()
|
|
messages.error(request, message)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs={
|
|
'id': encoder.encode_hex(w.id),
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Send workout to Strava
|
|
# abundance of error logging here because there were/are some bugs
|
|
@permission_required('workout.change_workout', fn=get_workout_by_opaqueid, raise_exception=True)
|
|
def workout_strava_upload_view(request, id=0):
|
|
r = getrower(request.user)
|
|
w = get_workout_by_opaqueid(request, id)
|
|
result = -1
|
|
comment, result = stravastuff.workout_strava_upload(
|
|
r.user, w, asynchron=True)
|
|
messages.info(
|
|
request, 'Your workout will be synchronized to Strava in the background')
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs={
|
|
'id': encoder.encode_hex(w.id),
|
|
}
|
|
)
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Upload workout to Concept2 logbook
|
|
|
|
|
|
@login_required()
|
|
def workout_c2_upload_view(request, id=0):
|
|
message = ""
|
|
# ready to upload. Hurray
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
s = 'C2 Upload Workout starttime {startdatetime} timezone {timezone} user id {userid}'.format(
|
|
startdatetime=w.startdatetime,
|
|
timezone=w.timezone,
|
|
userid=w.user.user.id
|
|
)
|
|
dologging('c2_log.log', s)
|
|
|
|
try:
|
|
message, c2id = c2stuff.workout_c2_upload(
|
|
request.user, w, asynchron=True)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
messages.info(
|
|
request, 'Your workout will be synchronized to the Concept2 Logbook in the background')
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs={
|
|
'id': encoder.encode_hex(w.id)
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Upload workout to SportTracks
|
|
@permission_required('workout.change_workout', fn=get_workout_by_opaqueid)
|
|
def workout_sporttracks_upload_view(request, id=0):
|
|
message = ""
|
|
# ready to upload. Hurray
|
|
w = get_workout(id)
|
|
r = w.user
|
|
|
|
message, res = sporttracksstuff.workout_sporttracks_upload(
|
|
r.user, w, asynchron=True)
|
|
|
|
messages.info(
|
|
request, 'Your workout will be synchronized with SportTracks in the background')
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs={
|
|
'id': encoder.encode_hex(w.id),
|
|
}) # pragma: no cover
|
|
|
|
return HttpResponseRedirect(url) # pragma: no cover
|
|
|
|
|
|
# NK Logbook authorization
|
|
@login_required()
|
|
def rower_nk_authorize(request): # pragma: no cover
|
|
state = str(uuid4())
|
|
scope = "read"
|
|
params = {
|
|
"grant_type": "authorization_code",
|
|
"response_type": "code",
|
|
"client_id": NK_CLIENT_ID,
|
|
"scope": scope,
|
|
"state": state,
|
|
"redirect_uri": NK_REDIRECT_URI,
|
|
}
|
|
|
|
url = NK_OAUTH_LOCATION+"/oauth/authorize?"+urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Concept2 authorization
|
|
@login_required()
|
|
def rower_c2_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
# state = str(uuid4())
|
|
scope = "user:read,results:write"
|
|
params = {"client_id": C2_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": C2_REDIRECT_URI}
|
|
url = "http://log.concept2.com/oauth/authorize?" + \
|
|
urllib.parse.urlencode(params)
|
|
url += "&scope="+scope
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Garmin authorization
|
|
|
|
|
|
@login_required()
|
|
def rower_garmin_authorize(request): # pragma: no cover
|
|
authorization_url, token, secret = garmin_stuff.garmin_authorize()
|
|
request.session['garmin_owner_key'] = token
|
|
request.session['garmin_owner_secret'] = secret
|
|
return HttpResponseRedirect(authorization_url)
|
|
|
|
# Strava Authorization
|
|
|
|
|
|
@login_required()
|
|
def rower_strava_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
# state = str(uuid4())
|
|
|
|
params = {"client_id": STRAVA_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": STRAVA_REDIRECT_URI,
|
|
"scope": "activity:write,activity:read_all"}
|
|
|
|
url = "https://www.strava.com/oauth/authorize?" + \
|
|
urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Polar Authorization
|
|
|
|
|
|
@login_required()
|
|
def rower_polar_authorize(request): # pragma: no cover
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": POLAR_CLIENT_ID,
|
|
"response_type": "code",
|
|
# "redirect_uri": POLAR_REDIRECT_URI,
|
|
"state": state,
|
|
# "scope":"accesslink.read_all"
|
|
}
|
|
url = "https://flow.polar.com/oauth2/authorization?" + \
|
|
urllib.parse.urlencode(params)
|
|
dologging('polar.log', 'Authorizing')
|
|
dologging('polar.log', url)
|
|
dologging('polar.log', params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# SportTracks Authorization
|
|
@login_required()
|
|
def rower_sporttracks_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": SPORTTRACKS_CLIENT_ID,
|
|
"response_type": "code",
|
|
"state": state,
|
|
"redirect_uri": SPORTTRACKS_REDIRECT_URI}
|
|
|
|
url = "https://api.sporttracks.mobi/oauth2/authorize?" + \
|
|
urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# RP3 Authorization
|
|
@login_required()
|
|
def rower_rp3_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
# state = str(uuid4())
|
|
params = {"client_id": RP3_CLIENT_KEY,
|
|
"response_type": "code",
|
|
"redirect_uri": RP3_REDIRECT_URI,
|
|
}
|
|
url = "https://rp3rowing-app.com/oauth/authorize/?" + \
|
|
urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# TrainingPeaks Authorization
|
|
|
|
|
|
@login_required()
|
|
def rower_tp_authorize(request): # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
# state = str(uuid4())
|
|
params = {"client_id": TP_CLIENT_KEY,
|
|
"response_type": "code",
|
|
"redirect_uri": TP_REDIRECT_URI,
|
|
"scope": "file:write",
|
|
}
|
|
url = "https://oauth.trainingpeaks.com/oauth/authorize/?" + \
|
|
urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Concept2 token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_c2_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = c2stuff.do_refresh_token(r.c2refreshtoken)
|
|
|
|
if res[0] is not None:
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
r = getrower(request.user)
|
|
r.c2token = access_token
|
|
r.tokenexpirydate = expirydatetime
|
|
r.c2refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request, successmessage)
|
|
else: # pragma: no cover
|
|
message = "Something went wrong (refreshing tokens). Please reauthorize:"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# TrainingPeaks token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_tp_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = tpstuff.do_refresh_token(
|
|
r.tprefreshtoken,
|
|
)
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.tptoken = access_token
|
|
r.tptokenexpirydate = expirydatetime
|
|
r.tprefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request, successmessage)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# SportTracks token refresh. URL for manual refresh. Not visible to users
|
|
@login_required()
|
|
def rower_sporttracks_token_refresh(request):
|
|
r = getrower(request.user)
|
|
res = sporttracksstuff.do_refresh_token(
|
|
r.sporttracksrefreshtoken,
|
|
)
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.sporttrackstoken = access_token
|
|
r.sporttrackstokenexpirydate = expirydatetime
|
|
r.sporttracksrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens refreshed. Good to go"
|
|
messages.info(request, successmessage)
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Concept2 Callback
|
|
@login_required()
|
|
def rower_process_callback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
res = c2stuff.get_token(code)
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0: # pragma: no cover
|
|
message = res[1]
|
|
message += ' Contact info@rowsandall.com if this behavior persists.'
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.c2token = access_token
|
|
r.tokenexpirydate = expirydatetime
|
|
r.c2refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# dummy
|
|
@login_required()
|
|
def rower_process_twittercallback(request): # pragma: no cover
|
|
return "dummy"
|
|
|
|
# Process Polar Callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_polarcallback(request):
|
|
|
|
error = request.GET.get('error', 'no error')
|
|
dologging('polar.log', 'Callback: {error}'.format(error=error))
|
|
if error != 'no error': # pragma: no cover
|
|
messages.error(request, error)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
code = request.GET['code']
|
|
dologging('polar.log', code)
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError:
|
|
message = "access error"
|
|
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token, expires_in, user_id = polarstuff.get_token(code)
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
r = getrower(request.user)
|
|
r.polartoken = access_token
|
|
r.polartokenexpirydate = expirydatetime
|
|
r.polaruserid = user_id
|
|
|
|
r.save()
|
|
|
|
if user_id:
|
|
polar_user_data = polarstuff.register_user(request.user, access_token)
|
|
else: # pragma: no cover
|
|
messages.error(request, 'Polar Flow Authorization Failed')
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
user_id2 = polar_user_data['polar-user-id']
|
|
except KeyError: # pragma: no cover
|
|
user_id2 = 0
|
|
|
|
if user_id2 != user_id: # pragma: no cover
|
|
messages.error(request, 'Polar User ID error')
|
|
|
|
if user_id2 == user_id:
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
else: # pragma: no cover
|
|
messages.error(
|
|
request, "Please contact support@rowsandall.com for help.")
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# process Garmin callback
|
|
@login_required()
|
|
def rower_process_garmincallback(request): # pragma: no cover
|
|
r = getrower(request.user)
|
|
absoluteurl = request.build_absolute_uri()
|
|
|
|
try:
|
|
key = request.session['garmin_owner_key']
|
|
secret = request.session['garmin_owner_secret']
|
|
except KeyError:
|
|
authorization_url, key, secret = garmin_stuff.garmin_authorize()
|
|
garmintoken, garminrefreshtoken = garmin_stuff.garmin_processcallback(
|
|
absoluteurl, key, secret)
|
|
r.garmintoken = garmintoken
|
|
r.garminrefreshtoken = garminrefreshtoken
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process NK Callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_nkcallback(request): # pragma: no cover
|
|
# do stuff
|
|
try:
|
|
code = request.GET.get('code', None)
|
|
res = nkstuff.get_token(code)
|
|
except MultiValueDictKeyError:
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0:
|
|
message = res[1]
|
|
message += ' Contact support@rowsandall.com if this behavior persists'
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
nk_owner_id = res[3]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.nktoken = access_token
|
|
r.nktokenexpirydate = expirydatetime
|
|
r.nkrefreshtoken = refresh_token
|
|
r.nk_owner_id = nk_owner_id
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
@login_required()
|
|
def workout_getnkworkout_all(request, startdatestring='', enddatestring=''):
|
|
startdate, enddate = get_dates_timeperiod(
|
|
request, startdatestring=startdatestring, enddatestring=enddatestring)
|
|
startdate = startdate.date()
|
|
enddate = enddate.date()
|
|
|
|
before = arrow.get(enddate)
|
|
before = str(int(before.timestamp()*1000))
|
|
|
|
after = arrow.get(startdate)
|
|
after = str(int(after.timestamp()*1000))
|
|
|
|
try:
|
|
_ = nk_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("rower_nk_authorize")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = nkstuff.get_nk_workouts(
|
|
r, do_async=True, before=before, after=after)
|
|
|
|
if result:
|
|
messages.info(
|
|
request, "Your NK workouts will be imported in the coming few minutes")
|
|
else: # pragma: no cover
|
|
messages.error(request, "Your NK workouts import failed")
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_nkimport_view(request, userid=0, after=0, before=0):
|
|
startdate, enddate = get_dates_timeperiod(
|
|
request, defaulttimeperiod='last30')
|
|
startdate = startdate.date()
|
|
enddate = enddate.date()
|
|
r = getrequestrower(request, userid=userid)
|
|
if r.user != request.user: # pragma: no cover
|
|
messages.error(
|
|
request, 'You can only access your own workouts on the NK Logbook, not those of your athletes')
|
|
url = reverse('workout_nkimport_view', kwargs={
|
|
'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
_ = nk_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/nkauthorize/")
|
|
|
|
if request.method == 'POST': # pragma: no cover
|
|
dateform = DateRangeForm(request.POST)
|
|
if dateform.is_valid():
|
|
startdate = dateform.cleaned_data['startdate']
|
|
enddate = dateform.cleaned_data['enddate'] + \
|
|
datetime.timedelta(days=1)
|
|
else:
|
|
dateform = DateRangeForm(initial={
|
|
'startdate': startdate,
|
|
'enddate': enddate,
|
|
})
|
|
|
|
if enddate < startdate: # pragma: no cover
|
|
s = enddate
|
|
enddate = startdate
|
|
startdate = s
|
|
|
|
startdatestring = startdate.strftime('%Y-%m-%d')
|
|
enddatestring = enddate.strftime('%Y-%m-%d')
|
|
|
|
request.session['startdate'] = startdatestring
|
|
request.session['enddate'] = enddatestring
|
|
|
|
before = arrow.get(enddate)
|
|
before = str(int(before.timestamp()*1000))
|
|
|
|
after = arrow.get(startdate)
|
|
after = str(int(after.timestamp()*1000))
|
|
|
|
res = nkstuff.get_nk_workout_list(request.user, before=before, after=after)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/nkauthorize/")
|
|
message = "Something went wrong in workout_nkimport_view"
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# get NK IDs
|
|
nkids = [item['id'] for item in res.json()]
|
|
knownnkids = uniqify([
|
|
w.uploadedtonk for w in Workout.objects.filter(user=r)
|
|
])
|
|
tombstones = [
|
|
t.uploadedtonk for t in TombStone.objects.filter(user=r)
|
|
]
|
|
parkedids = []
|
|
try:
|
|
with open('nkblocked.json', 'r') as nkblocked:
|
|
jsondata = json.load(nkblocked)
|
|
parkedids = jsondata['ids']
|
|
except FileNotFoundError: # pragma: no cover
|
|
pass
|
|
|
|
knownnkids = uniqify(knownnkids+tombstones+parkedids)
|
|
workouts = []
|
|
|
|
for item in res.json():
|
|
d = int(float(item['totalDistanceGps'])) # could also be Impeller
|
|
i = item['id']
|
|
n = item['name']
|
|
if i in knownnkids:
|
|
nnn = ''
|
|
else: # pragma: no cover
|
|
nnn = 'NEW'
|
|
ttot = str(datetime.timedelta(
|
|
seconds=int(float(item['elapsedTime'])/1000.)))
|
|
s = arrow.get(item['startTime'], tzinfo=r.defaulttimezone).format(
|
|
arrow.FORMAT_RFC850)
|
|
keys = ['id', 'distance', 'duration', 'starttime', 'name', 'new']
|
|
values = [i, d, ttot, s, n, nnn]
|
|
rs = dict(zip(keys, values))
|
|
workouts.append(rs)
|
|
|
|
workouts = workouts[::-1]
|
|
|
|
if request.method == 'POST': # pragma: no cover
|
|
try:
|
|
tdict = dict(request.POST.lists())
|
|
ids = tdict['workoutid']
|
|
nkids = [int(id) for id in ids]
|
|
alldata = {}
|
|
|
|
for item in res.json():
|
|
alldata[item['id']] = item
|
|
counter = 0
|
|
for nkid in nkids:
|
|
_ = myqueue(
|
|
queue,
|
|
handle_nk_async_workout,
|
|
alldata,
|
|
r.user.id,
|
|
r.nktoken,
|
|
nkid,
|
|
counter,
|
|
r.defaulttimezone
|
|
)
|
|
counter = counter+1
|
|
messages.info(
|
|
request,
|
|
'Your NK logbook workouts will be imported in the background.'
|
|
' It may take a few minutes before it appears.')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except KeyError:
|
|
pass
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_nkimport_view'),
|
|
'name': 'NK Logbook'
|
|
},
|
|
]
|
|
|
|
checknew = request.GET.get('selectallnew', False)
|
|
|
|
return render(request, 'nk_list_import.html',
|
|
{
|
|
'workouts': workouts,
|
|
'rower': r,
|
|
'dateform': dateform,
|
|
'startdate': startdate,
|
|
'enddate': enddate,
|
|
'active': 'nav-workouts',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
'checknew': checknew,
|
|
})
|
|
|
|
# Process Strava Callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_stravacallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
_ = request.GET['scope']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
message = "access error"
|
|
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = stravastuff.get_token(code)
|
|
|
|
if res[0]:
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.stravatoken = access_token
|
|
r.stravatokenexpirydate = expirydatetime
|
|
r.stravarefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
_ = stravastuff.set_strava_athlete_id(r.user)
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
else: # pragma: no cover
|
|
message = "Something went wrong with the Strava authorization"
|
|
messages.error(request, message)
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process SportTracks callback
|
|
@login_required()
|
|
def rower_process_sporttrackscallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except: # pragma: no cover
|
|
messages.error(request, "Sorry, something went wrong.")
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = sporttracksstuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.sporttrackstoken = access_token
|
|
r.sporttrackstokenexpirydate = expirydatetime
|
|
r.sporttracksrefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process RP3 callback
|
|
@login_required()
|
|
def rower_process_rp3callback(request): # pragma: no cover
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError:
|
|
messages.error(request, "There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request, errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = rp3stuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.rp3token = access_token
|
|
r.rp3tokenexpirydate = expirydatetime
|
|
r.rp3refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process TrainingPeaks callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_tpcallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
messages.error(request, "There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request, errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = tpstuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.tptoken = access_token
|
|
r.tptokenexpirydate = expirydatetime
|
|
r.tprefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process Own API callback - for API testing purposes
|
|
@login_required()
|
|
def rower_process_testcallback(request): # pragma: no cover
|
|
code = request.GET['code']
|
|
res = ownapistuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
refresh_token = res[2]
|
|
|
|
text = "Access Token:\n"
|
|
text += access_token
|
|
|
|
text += "\n\nRefresh Token:\n"
|
|
text += refresh_token
|
|
|
|
return HttpResponse(text)
|
|
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_rp3import_view(request, userid=0):
|
|
r = getrequestrower(request, userid=userid)
|
|
|
|
try:
|
|
_ = rp3stuff.rp3_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
url = reverse('rower_rp3_authorize')
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = rp3stuff.get_rp3_workout_list(request.user)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
message = "Something went wrong in workout_rp3import_view"
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts_list = pd.json_normalize(res.json()['data']['workouts'])
|
|
|
|
knownrp3ids = uniqify([
|
|
w.uploadedtorp3 for w in Workout.objects.filter(user=r)
|
|
])
|
|
|
|
workouts = []
|
|
|
|
for key, data in workouts_list.iterrows():
|
|
try:
|
|
i = data['id']
|
|
except KeyError: # pragma: no cover
|
|
i = 0
|
|
if i in knownrp3ids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
|
|
try:
|
|
s = data['executed_at']
|
|
except KeyError: # pragma: no cover
|
|
s = ''
|
|
|
|
keys = ['id', 'starttime', 'new']
|
|
values = [i, s, nnn]
|
|
|
|
res = dict(zip(keys, values))
|
|
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_rp3import_view'),
|
|
'name': 'RP3'
|
|
},
|
|
]
|
|
|
|
return render(request, 'rp3_list_import.html',
|
|
{
|
|
'workouts': workouts,
|
|
'rower': r,
|
|
'active': 'nav-workouts',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user)
|
|
})
|
|
|
|
|
|
# The page where you select which Strava workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_stravaimport_view(request, message="", userid=0):
|
|
|
|
r = getrequestrower(request, userid=userid)
|
|
if r.user != request.user:
|
|
messages.error(
|
|
request, 'You can only access your own workouts on the NK Logbook, not those of your athletes')
|
|
url = reverse('workout_stravaimport_view',
|
|
kwargs={'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
# if r.user != request.user:
|
|
# messages.info(request,"You cannot import other people's workouts from Strava")
|
|
try:
|
|
_ = strava_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
|
|
res = stravastuff.get_strava_workout_list(request.user)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.stravatoken == '') or (r.stravatoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/stravaauthorize/")
|
|
message = "Something went wrong in workout_stravaimport_view"
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
workouts = []
|
|
r = getrower(request.user)
|
|
rower = r
|
|
stravaids = [int(item['id']) for item in res.json()]
|
|
stravadata = [{
|
|
'id': int(item['id']),
|
|
'elapsed_time':item['elapsed_time'],
|
|
'start_date':item['start_date'],
|
|
} for item in res.json()]
|
|
|
|
wfailed = Workout.objects.filter(user=r, uploadedtostrava=-1)
|
|
|
|
for w in wfailed: # pragma: no cover
|
|
for item in stravadata:
|
|
elapsed_time = item['elapsed_time']
|
|
start_date = item['start_date']
|
|
stravaid = item['id']
|
|
if arrow.get(start_date) == arrow.get(w.startdatetime):
|
|
elapsed_td = datetime.timedelta(seconds=int(elapsed_time))
|
|
elapsed_time = datetime.datetime.strptime(
|
|
str(elapsed_td),
|
|
"%H:%M:%S"
|
|
)
|
|
if str(elapsed_time)[-7:] == str(w.duration)[-7:]:
|
|
w.uploadedtostrava = int(stravaid)
|
|
w.save()
|
|
|
|
knownstravaids = uniqify([
|
|
w.uploadedtostrava for w in Workout.objects.filter(user=r)
|
|
])
|
|
|
|
for item in res.json():
|
|
d = int(float(item['distance']))
|
|
i = item['id']
|
|
if i in knownstravaids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
n = item['name']
|
|
ttot = str(datetime.timedelta(
|
|
seconds=int(float(item['elapsed_time']))))
|
|
s = item['start_date']
|
|
r = item['type']
|
|
keys = ['id', 'distance', 'duration',
|
|
'starttime', 'type', 'name', 'new']
|
|
values = [i, d, ttot, s, r, n, nnn]
|
|
res2 = dict(zip(keys, values))
|
|
workouts.append(res2)
|
|
|
|
if request.method == "POST":
|
|
try: # pragma: no cover
|
|
tdict = dict(request.POST.lists())
|
|
ids = tdict['workoutid']
|
|
stravaids = [int(id) for id in ids]
|
|
alldata = {}
|
|
for item in res.json():
|
|
alldata[item['id']] = item
|
|
for stravaid in stravaids:
|
|
csvfilename = 'media/{code}_{stravaid}.csv'.format(
|
|
code=uuid4().hex[:16], stravaid=stravaid)
|
|
_ = myqueue(
|
|
queue,
|
|
fetch_strava_workout,
|
|
rower.stravatoken,
|
|
stravastuff.oauth_data,
|
|
stravaid,
|
|
csvfilename,
|
|
rower.user.id
|
|
)
|
|
# done, redirect to workouts list
|
|
messages.info(request,
|
|
'Your Strava workouts will be imported in the background.'
|
|
' It may take a few minutes before it appears.')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except KeyError: # pragma: no cover
|
|
pass
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_stravaimport_view'),
|
|
'name': 'Strava'
|
|
},
|
|
]
|
|
|
|
checknew = request.GET.get('selectallnew', False)
|
|
|
|
return render(request, 'strava_list_import.html',
|
|
{'workouts': workouts,
|
|
'rower': rower,
|
|
'active': 'nav-workouts',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
'checknew': checknew,
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# for Strava webhook request validation
|
|
|
|
|
|
@csrf_exempt
|
|
def strava_webhook_view(request):
|
|
if request.method == 'GET':
|
|
challenge = request.GET.get('hub.challenge')
|
|
verificationtoken = request.GET.get('hub.verify_token')
|
|
if verificationtoken != stravastuff.webhookverification: # pragma: no cover
|
|
return HttpResponse(status=403)
|
|
data = {"hub.challenge": challenge}
|
|
return JSONResponse(data)
|
|
|
|
# logging
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write(str(request.body))
|
|
|
|
# POST - does nothing so far
|
|
data = json.loads(request.body)
|
|
try:
|
|
aspect_type = data['aspect_type']
|
|
object_type = data['object_type']
|
|
strava_owner = data['owner_id']
|
|
_ = data['event_time']
|
|
except KeyError: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 1038')
|
|
return HttpResponse(status=200)
|
|
|
|
if object_type == 'activity':
|
|
if aspect_type == 'create':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'KeyError line 1105')
|
|
return HttpResponse(status=200)
|
|
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Rower not found')
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
s = 'Multiple rowers found for strava ID {id}'.format(
|
|
id=strava_owner)
|
|
dologging('strava_webhooks.log', s)
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0 and r.strava_auto_import:
|
|
job = stravastuff.async_get_workout(r.user, stravaid)
|
|
if job == 0: # pragma: no cover
|
|
dologging('strava_webhooks.log',
|
|
'Strava strava_open yielded NoTokenError')
|
|
else: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Workouts already existing')
|
|
for w in ws:
|
|
dologging('strava_webhooks.log', str(w))
|
|
elif aspect_type == 'delete':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'KeyError line 1132')
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0:
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
try: # pragma: no cover
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Rower not found')
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
if r.strava_auto_delete: # pragma: no cover
|
|
for w in ws:
|
|
if w.user == r:
|
|
w.delete()
|
|
elif aspect_type == 'update':
|
|
try:
|
|
updates = data['updates']
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('KeyError line 10576')
|
|
return HttpResponse(status=200)
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Workout not found')
|
|
return HttpResponse(status=200)
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Rower not found')
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
|
|
for key, value in updates.items():
|
|
for w in ws:
|
|
if key == 'title':
|
|
w.name = value
|
|
w.save()
|
|
if key == 'type' and r.strava_auto_import:
|
|
try:
|
|
w.workouttype = mytypes.stravamappinginv[value]
|
|
w.save()
|
|
except KeyError: # pragma: no cover
|
|
with open('strava_webhooks.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write('Workout type not found: '+str(value))
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
# For push notifications from Garmin
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_summaries_view(request): # pragma: no cover
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
with open('garminlog.log', 'a') as f:
|
|
f.write('\n')
|
|
f.write(timestamp)
|
|
f.write(' ')
|
|
f.write(str(request.body))
|
|
f.write('\n')
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
activities = data['activities']
|
|
result = garmin_stuff.garmin_workouts_from_summaries(activities)
|
|
|
|
if result:
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_newfiles_ping(request): # pragma: no cover
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
for file in data['activityFiles']:
|
|
try:
|
|
garmintoken = file['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
callbackURL = file['callbackURL']
|
|
starttime = file['startTimeInSeconds']
|
|
fileType = file['fileType']
|
|
_ = garmin_stuff.get_garmin_file(r, callbackURL, starttime, fileType)
|
|
except Rower.DoesNotExist:
|
|
pass
|
|
except KeyError:
|
|
pass
|
|
|
|
return HttpResponse(status=200) # pragma: no cover
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_deregistration_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
deregistrations = data['deregistrations']
|
|
for deregistration in deregistrations:
|
|
try:
|
|
garmintoken = deregistration['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
r.garmintoken = ''
|
|
r.save()
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
pass
|
|
except KeyError: # pragma: no cover
|
|
pass
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_details_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
_ = garmin_stuff.garmin_workouts_from_details(data)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
# the page where you select which Polar workout to Import
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_polarimport_view(request, userid=0): # pragma: no cover
|
|
exercises = polarstuff.get_polar_workouts(request.user)
|
|
workouts = []
|
|
|
|
try:
|
|
a = exercises.status_code
|
|
if a == 401:
|
|
messages.error(
|
|
request, 'Not authorized. You need to connect to Polar first')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except: # pragma: no cover
|
|
exercises = []
|
|
pass
|
|
|
|
for exercise in exercises:
|
|
try:
|
|
d = exercise['distance']
|
|
except KeyError:
|
|
d = 0
|
|
|
|
i = exercise['id']
|
|
transactionid = exercise['transaction-id']
|
|
starttime = exercise['start-time']
|
|
rowtype = exercise['sport']
|
|
durationstring = exercise['duration']
|
|
duration = isodate.parse_duration(durationstring)
|
|
keys = ['id', 'distance', 'duration',
|
|
'starttime', 'type', 'transactionid']
|
|
values = [i, d, duration, starttime, rowtype, transactionid]
|
|
res = dict(zip(keys, values))
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_polarimport_view'),
|
|
'name': 'Polar'
|
|
},
|
|
]
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request, 'polar_list_import.html',
|
|
{
|
|
'workouts': workouts,
|
|
'active': 'nav-workouts',
|
|
'rower': r,
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
})
|
|
|
|
|
|
# The page where you select which SportTracks workout to import
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_sporttracksimport_view(request, message="", userid=0):
|
|
r = getrequestrower(request, userid=userid)
|
|
if r.user != request.user:
|
|
messages.error(
|
|
request, 'You can only access your own workouts on the NK Logbook, not those of your athletes')
|
|
url = reverse('workout_sporttracksimport_view',
|
|
kwargs={'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = sporttracksstuff.get_sporttracks_workout_list(request.user)
|
|
if (res.status_code != 200):
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.sporttrackstoken == '') or (r.sporttrackstoken is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/sporttracksauthorize/")
|
|
else: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/sporttracksrefresh/")
|
|
message = "Something went wrong in workout_sporttracksimport_view" # pragma: no cover
|
|
messages.error(request, message) # pragma: no cover
|
|
if settings.DEBUG: # pragma: no cover
|
|
return HttpResponse(res)
|
|
else: # pragma: no cover
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts = []
|
|
|
|
knownstids = uniqify([
|
|
w.uploadedtosporttracks for w in Workout.objects.filter(user=r)
|
|
])
|
|
for item in res.json()['items']:
|
|
d = int(float(item['total_distance']))
|
|
i = int(getidfromuri(item['uri']))
|
|
if i in knownstids: # pragma: no cover
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
n = item['name']
|
|
ttot = str(datetime.timedelta(seconds=int(float(item['duration']))))
|
|
s = item['start_time']
|
|
r = item['type']
|
|
keys = ['id', 'distance', 'duration',
|
|
'starttime', 'type', 'name', 'new']
|
|
values = [i, d, ttot, s, r, n, nnn]
|
|
res = dict(zip(keys, values))
|
|
workouts.append(res)
|
|
|
|
r = getrower(request.user)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_sporttracksimport_view'),
|
|
'name': 'SportTracks'
|
|
},
|
|
]
|
|
|
|
return render(request, 'sporttracks_list_import.html',
|
|
{'workouts': workouts,
|
|
'breadcrumbs': breadcrumbs,
|
|
'active': 'nav-workouts',
|
|
'rower': r,
|
|
'teams': get_my_teams(request.user),
|
|
})
|
|
|
|
return HttpResponse(res) # pragma: no cover
|
|
|
|
# List of workouts on Concept2 logbook. This view only used for debugging
|
|
|
|
|
|
@login_required()
|
|
def c2listdebug_view(request, page=1, message=""): # pragma: no cover
|
|
try:
|
|
_ = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
r = getrower(request.user)
|
|
|
|
res = c2stuff.get_c2_workout_list(request.user, page=page)
|
|
|
|
if (res.status_code != 200):
|
|
message = "Something went wrong in workout_c2import_view (C2 token renewal)"
|
|
messages.error(request, message)
|
|
if settings.DEBUG:
|
|
return HttpResponse(res)
|
|
else:
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
else:
|
|
workouts = []
|
|
|
|
for item in res.json()['data']:
|
|
d = item['distance']
|
|
i = item['id']
|
|
ttot = item['time_formatted']
|
|
s = item['date']
|
|
r = item['type']
|
|
s2 = item['source']
|
|
c = item['comments']
|
|
keys = ['id', 'distance', 'duration',
|
|
'starttime', 'rowtype', 'source', 'comment']
|
|
values = [i, d, ttot, s, r, s2, c]
|
|
res = dict(zip(keys, values))
|
|
workouts.append(res)
|
|
|
|
return render(request,
|
|
'c2_list_import2.html',
|
|
{'workouts': workouts,
|
|
'teams': get_my_teams(request.user),
|
|
})
|
|
|
|
# Import all unknown workouts available on Concept2 logbook
|
|
|
|
|
|
@login_required()
|
|
def workout_getc2workout_all(request, page=1, message=""): # pragma: no cover
|
|
try:
|
|
_ = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = c2stuff.get_c2_workouts(r, page=page, do_async=True)
|
|
|
|
if result:
|
|
messages.info(
|
|
request, 'Your C2 workouts will be imported in the coming few minutes')
|
|
else:
|
|
messages.error(request, 'Your C2 workouts import failed')
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
@login_required()
|
|
def workout_getrp3workout_all(request): # pragma: no cover
|
|
try:
|
|
_ = rp3_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/rp3authorize/")
|
|
|
|
r = getrequestrower(request)
|
|
|
|
result = rp3stuff.get_rp3_workouts(r, do_async=True)
|
|
|
|
if result:
|
|
messages.info(
|
|
request, 'Your RP3 workouts will be imported in the coming few minutes')
|
|
else:
|
|
messages.error(request, 'Your RP3 workouts import failed')
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# List of workouts available on Concept2 logbook - for import
|
|
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_c2import_view(request, page=1, userid=0, message=""):
|
|
|
|
rower = getrequestrower(request, userid=userid)
|
|
if rower.user != request.user:
|
|
messages.error(
|
|
request, 'You can only access your own workouts on the NK Logbook, not those of your athletes')
|
|
url = reverse('workout_c2import_view', kwargs={
|
|
'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
_ = c2_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/c2authorize/")
|
|
|
|
res = c2stuff.get_c2_workout_list(request.user, page=page)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
message = "Something went wrong in workout_c2import_view (C2 token refresh)"
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
workouts = []
|
|
c2ids = [item['id'] for item in res.json()['data']]
|
|
knownc2ids = uniqify([
|
|
w.uploadedtoc2 for w in Workout.objects.filter(user=rower)
|
|
])
|
|
tombstones = [
|
|
t.uploadedtoc2 for t in TombStone.objects.filter(user=rower)
|
|
]
|
|
parkedids = []
|
|
try:
|
|
with open('c2blocked.json', 'r') as c2blocked:
|
|
jsondata = json.load(c2blocked)
|
|
parkedids = jsondata['ids']
|
|
except: # pragma: no cover
|
|
pass
|
|
|
|
knownc2ids = uniqify(knownc2ids+tombstones+parkedids)
|
|
for item in res.json()['data']:
|
|
d = item['distance']
|
|
i = item['id']
|
|
ttot = item['time_formatted']
|
|
s = item['date']
|
|
r = item['type']
|
|
s2 = item['source']
|
|
c = item['comments']
|
|
if i in knownc2ids:
|
|
nnn = ''
|
|
else: # pragma: no cover
|
|
nnn = 'NEW'
|
|
keys = ['id', 'distance', 'duration', 'starttime',
|
|
'rowtype', 'source', 'comment', 'new']
|
|
values = [i, d, ttot, s, r, s2, c, nnn]
|
|
ress = dict(zip(keys, values))
|
|
workouts.append(ress)
|
|
|
|
if request.method == "POST":
|
|
try: # pragma: no cover
|
|
tdict = dict(request.POST.lists())
|
|
ids = tdict['workoutid']
|
|
c2ids = [int(id) for id in ids]
|
|
alldata = {}
|
|
|
|
for item in res.json()['data']:
|
|
alldata[item['id']] = item
|
|
counter = 0
|
|
for c2id in c2ids:
|
|
_ = myqueue(
|
|
queue,
|
|
handle_c2_async_workout,
|
|
alldata,
|
|
rower.user.id,
|
|
rower.c2token,
|
|
c2id,
|
|
counter,
|
|
rower.defaulttimezone
|
|
)
|
|
counter = counter+1
|
|
# done, redirect to workouts list
|
|
messages.info(
|
|
request,
|
|
'Your Concept2 workouts will be imported in the background.'
|
|
' It may take a few minutes before it appears.')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except KeyError: # pragma: no cover
|
|
pass
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_c2import_view'),
|
|
'name': 'Concept2'
|
|
},
|
|
{
|
|
'url': reverse('workout_c2import_view', kwargs={'page': page}),
|
|
'name': 'Page '+str(page)
|
|
}
|
|
]
|
|
|
|
rower = getrower(request.user)
|
|
|
|
checknew = request.GET.get('selectallnew', False)
|
|
|
|
return render(request,
|
|
'c2_list_import2.html',
|
|
{'workouts': workouts,
|
|
'rower': rower,
|
|
'active': 'nav-workouts',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
'page': page,
|
|
'checknew': checknew,
|
|
})
|
|
|
|
|
|
importlistviews = {
|
|
'c2': 'workout_c2import_view',
|
|
'strava': 'workout_stravaimport_view',
|
|
'polar': 'workout_polarimport_view',
|
|
'ownapi': 'workout_view',
|
|
'sporttracks': 'workout_sporttracksimport_view',
|
|
'trainingpeaks': 'workout_view',
|
|
'nk': 'workout_nkimport_view',
|
|
}
|
|
|
|
importauthorizeviews = {
|
|
'c2': 'rower_c2_authorize',
|
|
'strava': 'rower_strava_authorize',
|
|
'polar': 'rower_polar_authorize',
|
|
'ownapi': 'workout_view',
|
|
'sporttracks': 'rower_sporttracks_authorize',
|
|
'trainingpeaks': 'rower_tp_authorize',
|
|
'nk': 'rower_nk_authorize',
|
|
}
|
|
|
|
importsources = {
|
|
'c2': c2stuff,
|
|
'strava': stravastuff,
|
|
'polar': polarstuff,
|
|
'ownapi': ownapistuff,
|
|
'sporttracks': sporttracksstuff,
|
|
'trainingpeaks': tpstuff,
|
|
'nk': nkstuff,
|
|
}
|
|
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_getrp3importview(request, externalid):
|
|
r = getrequestrower(request)
|
|
if r.user != request.user: # pragma: no cover
|
|
messages.error(
|
|
request, 'You can only access your own workouts on the NK Logbook, not those of your athletes')
|
|
url = reverse('workout_rp3import_view', kwargs={
|
|
'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
token = rp3stuff.rp3_open(r.user)
|
|
startdatetime = request.GET.get('startdatetime')
|
|
|
|
_ = myqueue(queuehigh,
|
|
handle_rp3_async_workout,
|
|
r.user.id,
|
|
token,
|
|
externalid,
|
|
startdatetime,
|
|
20,
|
|
)
|
|
|
|
messages.info(request, 'The workout will be imported in the background')
|
|
|
|
url = reverse('workout_rp3import_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
@login_required()
|
|
def workout_getimportview(request, externalid, source='c2', do_async=True):
|
|
if 'startdate' in request.session and source == 'nk': # pragma: no cover
|
|
startdate = request.session.get('startdate')
|
|
enddate = request.session.get('enddate')
|
|
|
|
try:
|
|
result = importsources[source].get_workout(request.user, externalid, do_async=do_async,
|
|
startdate=startdate, enddate=enddate)
|
|
except NoTokenError:
|
|
return HttpResponseRedirect(reverse(importauthorizeviews[source]))
|
|
|
|
url = reverse(importlistviews[source])
|
|
return HttpResponseRedirect(url)
|
|
try:
|
|
result = importsources[source].get_workout(request.user, externalid,
|
|
do_async=do_async)
|
|
except NoTokenError:
|
|
|
|
return HttpResponseRedirect(reverse(importauthorizeviews[source]))
|
|
|
|
if result: # pragma: no cover
|
|
messages.info(
|
|
request, "Your workout will be imported in the background")
|
|
# this should return to the respective import list page
|
|
else: # pragma: no cover
|
|
messages.error(request, 'Error getting the workout')
|
|
|
|
url = reverse(importlistviews[source])
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getsporttracksworkout_all(request):
|
|
res = sporttracksstuff.get_sporttracks_workout_list(request.user)
|
|
if (res.status_code == 200):
|
|
r = getrower(request.user)
|
|
stids = [int(getidfromuri(item['uri']))
|
|
for item in res.json()['items']]
|
|
knownstids = uniqify([
|
|
w.uploadedtosporttracks for w in Workout.objects.filter(user=r)
|
|
])
|
|
newids = [stid for stid in stids if stid not in knownstids]
|
|
for sporttracksid in newids:
|
|
id = sporttracksstuff.get_workout(
|
|
request.user, sporttracksid)
|
|
|
|
if id == 0: # pragma: no cover
|
|
messages.error(
|
|
request, "Something went wrong with workout {id}".format(id=sporttracksid))
|
|
|
|
else:
|
|
w = Workout.objects.get(id=id)
|
|
w.uploadedtosporttracks = sporttracksid
|
|
w.save()
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getstravaworkout_next(request): # pragma: no cover
|
|
|
|
r = Rower.objects.get(user=request.user)
|
|
|
|
res = stravastuff.get_strava_workout_list(r.user)
|
|
|
|
if (res.status_code != 200):
|
|
return 0
|
|
else:
|
|
alldata = {}
|
|
for item in res.json():
|
|
alldata[item['id']] = item
|
|
|
|
_ = stravastuff.create_async_workout(
|
|
alldata, r.user, stravaid, debug=True)
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|