1063 lines
35 KiB
Python
1063 lines
35 KiB
Python
from rowsandall_app.settings import (
|
|
NK_OAUTH_LOCATION, ROJABO_OAUTH_LOCATION,
|
|
TP_OAUTH_LOCATION,
|
|
)
|
|
|
|
from rowers.views.statements import *
|
|
from rowers.plannedsessions import get_dates_timeperiod
|
|
from rowers.tasks import fetch_strava_workout
|
|
|
|
|
|
import rowers.integrations.strava as strava
|
|
from rowers.integrations import importsources
|
|
from rowers.utils import NoTokenError
|
|
|
|
import numpy
|
|
|
|
|
|
def default(o): # pragma: no cover
|
|
if isinstance(o, numpy.int64):
|
|
return int(o)
|
|
raise TypeError
|
|
|
|
def workout_export_view(request, id=0, source='c2'):
|
|
r = getrower(request.user)
|
|
w = get_workout_by_opaqueid(request, id)
|
|
integration = importsources[source](request.user)
|
|
try:
|
|
id = integration.workout_export(w)
|
|
except NoTokenError:
|
|
return HttpResponseRedirect(integration.make_authorization_url())
|
|
|
|
messages.info(
|
|
request,
|
|
"Your workout will be synchronized to {name} in the background".format(
|
|
name=integration.get_name()
|
|
)
|
|
)
|
|
|
|
url = reverse(r.defaultlandingpage,
|
|
kwargs={
|
|
'id': encoder.encode_hex(w.id)
|
|
})
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# ROJABO authorization
|
|
def rower_rojabo_authorize(request): # pragma: no cover
|
|
state = str(uuid4())
|
|
scope = "read"
|
|
params = {
|
|
"client_id": ROJABO_CLIENT_ID,
|
|
"redirect_uri": ROJABO_REDIRECT_URI,
|
|
}
|
|
|
|
url = ROJABO_OAUTH_LOCATION+'oauth/authorize?'+urllib.parse.urlencode(params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
def rower_integration_authorize(request, source='c2'):
|
|
integration = importsources[source](request.user)
|
|
url = integration.make_authorization_url()
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
@login_required()
|
|
def rower_garmin_authorize(request): # pragma: no cover
|
|
authorization_url, token, secret = garmin_stuff.garmin_authorize()
|
|
request.session['garmin_owner_key'] = token
|
|
request.session['garmin_owner_secret'] = secret
|
|
return HttpResponseRedirect(authorization_url)
|
|
|
|
|
|
# Polar Authorization
|
|
@login_required()
|
|
def rower_polar_authorize(request): # pragma: no cover
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": POLAR_CLIENT_ID,
|
|
"response_type": "code",
|
|
# "redirect_uri": POLAR_REDIRECT_URI,
|
|
"state": state,
|
|
# "scope":"accesslink.read_all"
|
|
}
|
|
url = "https://flow.polar.com/oauth2/authorization?" + \
|
|
urllib.parse.urlencode(params)
|
|
dologging('polar.log', 'Authorizing')
|
|
dologging('polar.log', url)
|
|
dologging('polar.log', params)
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
@login_required()
|
|
def rower_integration_token_refresh(request, source='c2'):
|
|
try:
|
|
integration = importsources[source](request.user)
|
|
except KeyError:
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
try:
|
|
token = integration.token_refresh()
|
|
messages.info(request, "Tokens refreshed. Good to go")
|
|
except NoTokenError:
|
|
messages.error(request,
|
|
"Something went wrong refreshing your access tokens to {name}. Please reauthorize".format(
|
|
name=integration.get_name()
|
|
))
|
|
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Concept2 Callback
|
|
@login_required()
|
|
def rower_process_callback(request):
|
|
c2_integration = importsources['c2'](request.user)
|
|
try:
|
|
code = request.GET['code']
|
|
res = c2_integration.get_token(code)
|
|
except (MultiValueDictKeyError, NoTokenError): # pragma: no cover
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0: # pragma: no cover
|
|
message = res[1]
|
|
message += ' Contact info@rowsandall.com if this behavior persists.'
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.c2token = access_token
|
|
r.tokenexpirydate = expirydatetime
|
|
r.c2refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# dummy
|
|
@login_required()
|
|
def rower_process_twittercallback(request): # pragma: no cover
|
|
return "dummy"
|
|
|
|
# Process Polar Callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_polarcallback(request):
|
|
|
|
error = request.GET.get('error', 'no error')
|
|
dologging('polar.log', 'Callback: {error}'.format(error=error))
|
|
if error != 'no error': # pragma: no cover
|
|
messages.error(request, error)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
code = request.GET['code']
|
|
dologging('polar.log', code)
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError:
|
|
message = "access error"
|
|
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token, expires_in, user_id = polarstuff.get_token(code)
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
r = getrower(request.user)
|
|
r.polartoken = access_token
|
|
r.polartokenexpirydate = expirydatetime
|
|
r.polaruserid = user_id
|
|
|
|
r.save()
|
|
|
|
if user_id:
|
|
polar_user_data = polarstuff.register_user(request.user, access_token)
|
|
else: # pragma: no cover
|
|
messages.error(request, 'Polar Flow Authorization Failed')
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
user_id2 = polar_user_data['polar-user-id']
|
|
except KeyError: # pragma: no cover
|
|
user_id2 = 0
|
|
|
|
if user_id2 != user_id: # pragma: no cover
|
|
messages.error(request, 'Polar User ID error')
|
|
|
|
if user_id2 == user_id:
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
else: # pragma: no cover
|
|
messages.error(
|
|
request, "Please contact support@rowsandall.com for help.")
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# process Garmin callback
|
|
@login_required()
|
|
def rower_process_garmincallback(request): # pragma: no cover
|
|
r = getrower(request.user)
|
|
absoluteurl = request.build_absolute_uri()
|
|
|
|
try:
|
|
key = request.session['garmin_owner_key']
|
|
secret = request.session['garmin_owner_secret']
|
|
except KeyError:
|
|
authorization_url, key, secret = garmin_stuff.garmin_authorize()
|
|
garmintoken, garminrefreshtoken = garmin_stuff.garmin_processcallback(
|
|
absoluteurl, key, secret)
|
|
r.garmintoken = garmintoken
|
|
r.garminrefreshtoken = garminrefreshtoken
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process Rojabo callback
|
|
@login_required()
|
|
def rower_process_rojabocallback(request): # pragma: no cover
|
|
# do stuff
|
|
try:
|
|
code = request.GET.get('code', None)
|
|
res = rojabo_stuff.get_token(code)
|
|
except MultiValueDictKeyError:
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
access_token = res[0]
|
|
if access_token == 0:
|
|
message = res[1]
|
|
message += ' Contact support@rowsandall.com if this behavior persists'
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.rojabo_token = access_token
|
|
r.rojabo_tokenexpirydate = expirydatetime
|
|
r.rojabo_refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process NK Callback
|
|
@login_required()
|
|
def rower_process_nkcallback(request): # pragma: no cover
|
|
# do stuff
|
|
try:
|
|
code = request.GET.get('code', None)
|
|
nk_integration = importsources['nk'](request.user)
|
|
access_token, expires_in, refresh_token = nk_integration.get_token(code)
|
|
except MultiValueDictKeyError:
|
|
message = "The resource owner or authorization server denied the request"
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
if access_token == 0:
|
|
message = res[1]
|
|
message += ' Contact support@rowsandall.com if this behavior persists'
|
|
messages.error(request, message)
|
|
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
# nk_owner_id = res[3]
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.nktoken = access_token
|
|
r.nktokenexpirydate = expirydatetime
|
|
r.nkrefreshtoken = refresh_token
|
|
# r.nk_owner_id = nk_owner_id
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_import_view(request, source='c2'):
|
|
startdate, enddate = get_dates_timeperiod(
|
|
request, defaulttimeperiod='last30')
|
|
startdate = startdate.date()
|
|
enddate = enddate.date()
|
|
|
|
r = getrequestrower(request)
|
|
integration = importsources[source](request.user)
|
|
|
|
try:
|
|
_ = integration.open()
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/nkauthorize/")
|
|
|
|
|
|
|
|
|
|
if request.method == 'POST': # pragma: no cover
|
|
dateform = DateRangeForm(request.POST)
|
|
if dateform.is_valid():
|
|
startdate = dateform.cleaned_data['startdate']
|
|
enddate = dateform.cleaned_data['enddate'] + \
|
|
datetime.timedelta(days=1)
|
|
else:
|
|
dateform = DateRangeForm(initial={
|
|
'startdate': startdate,
|
|
'enddate': enddate,
|
|
})
|
|
|
|
if enddate < startdate: # pragma: no cover
|
|
s = enddate
|
|
enddate = startdate
|
|
startdate = s
|
|
|
|
startdatestring = startdate.strftime('%Y-%m-%d')
|
|
enddatestring = enddate.strftime('%Y-%m-%d')
|
|
|
|
request.session['startdate'] = startdatestring
|
|
request.session['enddate'] = enddatestring
|
|
|
|
before = arrow.get(enddate)
|
|
before = str(int(before.timestamp()*1000))
|
|
|
|
after = arrow.get(startdate)
|
|
after = str(int(after.timestamp()*1000))
|
|
|
|
workouts = integration.get_workout_list(before=before, after=after, startdate=startdate, enddate=enddate)
|
|
|
|
|
|
if request.method == 'POST': # pragma: no cover
|
|
try:
|
|
tdict = dict(request.POST.lists())
|
|
ids = tdict['workoutid']
|
|
nkids = [int(id) for id in ids]
|
|
for nkid in nkids:
|
|
_ = integration.get_workout(nkid)
|
|
messages.info(
|
|
request,
|
|
'Your {source} workouts will be imported in the background.'
|
|
' It may take a few minutes before they appear.'.format(source=integration.get_name()))
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except KeyError:
|
|
pass
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_import_view',kwargs={'source':source}),
|
|
'name': integration.get_name()
|
|
},
|
|
]
|
|
|
|
checknew = request.GET.get('selectallnew', False)
|
|
|
|
|
|
return render(request, 'list_import.html',
|
|
{
|
|
'workouts': workouts,
|
|
'rower': r,
|
|
'dateform':dateform,
|
|
'active': 'nav-workouts',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
'checknew': checknew,
|
|
'startdate':startdate,
|
|
'enddate': enddate,
|
|
'integration': integration.get_name()
|
|
})
|
|
|
|
|
|
|
|
|
|
@login_required()
|
|
def rower_process_stravacallback(request):
|
|
strava_integration = importsources['strava'](request.user)
|
|
try:
|
|
code = request.GET['code']
|
|
_ = request.GET['scope']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
try:
|
|
message = request.GET['error']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
message = "access error"
|
|
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
res = strava_integration.get_token(code)
|
|
|
|
if res[0]:
|
|
access_token = res[0]
|
|
expires_in = res[1]
|
|
refresh_token = res[2]
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.stravatoken = access_token
|
|
r.stravatokenexpirydate = expirydatetime
|
|
r.stravarefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
_ = strava_integration.set_strava_athlete_id()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
else: # pragma: no cover
|
|
message = "Something went wrong with the Strava authorization"
|
|
messages.error(request, message)
|
|
url = reverse('rower_exportsettings_view')
|
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process SportTracks callback
|
|
@login_required()
|
|
def rower_process_sporttrackscallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except: # pragma: no cover
|
|
messages.error(request, "Sorry, something went wrong.")
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
st_integration = importsources['sporttracks'](request.user)
|
|
token = st_integration.get_token(code)
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process RP3 callback
|
|
@login_required()
|
|
def rower_process_rp3callback(request): # pragma: no cover
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError:
|
|
messages.error(request, "There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request, errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
rp3_integration = importsources['rp3'](request.user)
|
|
access_token, expires_in, refresh_token = rp3_integration.get_token(code)
|
|
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.rp3token = access_token
|
|
r.rp3tokenexpirydate = expirydatetime
|
|
r.rp3refreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
# Process TrainingPeaks callback
|
|
|
|
|
|
@login_required()
|
|
def rower_process_tpcallback(request):
|
|
try:
|
|
code = request.GET['code']
|
|
except MultiValueDictKeyError: # pragma: no cover
|
|
messages.error(request, "There was an error with the callback")
|
|
try:
|
|
errormessage = request.GET['error']
|
|
messages.error(request, errormessage)
|
|
except MultiValueDictKeyError:
|
|
pass
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
tp_integration = importsources['trainingpeaks'](request.user)
|
|
access_token, expires_in, refresh_token = tp_integration.get_token(code)
|
|
expirydatetime = timezone.now()+datetime.timedelta(seconds=expires_in)
|
|
|
|
r = getrower(request.user)
|
|
r.tptoken = access_token
|
|
r.tptokenexpirydate = expirydatetime
|
|
r.tprefreshtoken = refresh_token
|
|
|
|
r.save()
|
|
|
|
successmessage = "Tokens stored. Good to go. Please check your import/export settings"
|
|
messages.info(request, successmessage)
|
|
url = reverse('rower_exportsettings_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Process Own API callback - for API testing purposes
|
|
@login_required()
|
|
def rower_process_testcallback(request): # pragma: no cover
|
|
code = request.GET['code']
|
|
res = ownapistuff.get_token(code)
|
|
|
|
access_token = res[0]
|
|
refresh_token = res[2]
|
|
|
|
text = "Access Token:\n"
|
|
text += access_token
|
|
|
|
text += "\n\nRefresh Token:\n"
|
|
text += refresh_token
|
|
|
|
return HttpResponse(text)
|
|
|
|
|
|
|
|
# The page where you select which Strava workout to import
|
|
@login_required()
|
|
@user_passes_test(ispromember, login_url="/rowers/paidplans/",
|
|
message="This functionality requires a Pro plan or higher",
|
|
redirect_field_name=None)
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_rojaboimport_view(request, message="", userid=0):
|
|
r = getrequestrower(request, userid=userid)
|
|
if r.user != request.user:
|
|
messages.error(
|
|
request, 'You can only access your own workouts on Rojabo, not those of your athletes')
|
|
url = reverse('workout_rojaboimport_view',
|
|
kwargs={'userid': request.user.id})
|
|
return HttpResponseRedirect(url)
|
|
|
|
try:
|
|
_ = rojabo_open(request.user)
|
|
except NoTokenError: # pragma: no cover
|
|
return HttpResponseRedirect("/rowers/me/rojaboauthorize/")
|
|
|
|
res = rojabo_stuff.get_rojabo_workout_list(request.user)
|
|
|
|
if (res.status_code != 200): # pragma: no cover
|
|
if (res.status_code == 401):
|
|
r = getrower(request.user)
|
|
if (r.rojabo_token == '') or (r.rojabo_token is None):
|
|
s = "Token doesn't exist. Need to authorize"
|
|
return HttpResponseRedirect("/rowers/me/rojaboauthorize/")
|
|
message = "Something went wrong in workout_rojaboimport_view"
|
|
messages.error(request, message)
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|
|
sessions = []
|
|
r = getrower(request.user)
|
|
|
|
if request.method == "POST":
|
|
try:
|
|
tdict = dict(request.POST.lists())
|
|
ids = tdict['sessionid']
|
|
rojaboids = [int(id) for id in ids]
|
|
alldata = {}
|
|
for item in res.json():
|
|
alldata[item['training_session']['id']] = item['training_session']
|
|
for rojaboid in rojaboids:
|
|
try:
|
|
item = alldata[rojaboid]
|
|
name = item['workout']
|
|
spm = item['stroke']
|
|
points = item['points']
|
|
manager = userid
|
|
comment = 'ROJABO {name}, SPM: {spm}. Points: {points}'.format(
|
|
name = name,
|
|
points = points,
|
|
spm = spm,
|
|
)
|
|
|
|
preferreddate = datetime.datetime.strptime(item['training_date'],"%Y-%m-%d")
|
|
startdate = preferreddate
|
|
enddate = preferreddate
|
|
ps = PlannedSession(
|
|
name = item['workout'],
|
|
comment = comment,
|
|
startdate = startdate,
|
|
enddate = enddate,
|
|
preferreddate = preferreddate,
|
|
sessionsport = 'rower',
|
|
manager = request.user,
|
|
rojabo_id = rojaboid,
|
|
)
|
|
ps.save()
|
|
ps.rower.add(r)
|
|
ps.tags.add('ROJABO')
|
|
ps.save()
|
|
# get steps if there are any
|
|
steps = []
|
|
try:
|
|
steps = steps+rojabo_stuff.stepsconvert(
|
|
item['warm_up']['steps'],
|
|
warmup=True
|
|
)
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
steps = steps + rojabo_stuff.stepsconvert(
|
|
item['primary']['steps'],
|
|
startid=len(steps)
|
|
)
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
steps = steps + rojabo_stuff.stepsconvert(
|
|
item['cool_down']['steps'],
|
|
cooldown=True,
|
|
startid=len(steps))
|
|
except KeyError:
|
|
pass
|
|
|
|
if steps:
|
|
ps.steps = {
|
|
'name':'',
|
|
'sport':'rowing',
|
|
'filename':'',
|
|
'steps': steps,
|
|
}
|
|
ps.save()
|
|
|
|
messages.info(request,'Saved planned session {id}'.format(id=ps.id))
|
|
except KeyError:
|
|
pass
|
|
except KeyError:
|
|
pass
|
|
|
|
rojabo_ids = [int(item['training_session']['id']) for item in res.json()]
|
|
|
|
knownrojaboids = uniqify([
|
|
ps.rojabo_id for ps in PlannedSession.objects.filter(manager=request.user)
|
|
])
|
|
|
|
|
|
for item in res.json():
|
|
i = item['training_session']['id']
|
|
if i in knownrojaboids:
|
|
nnn = ''
|
|
else:
|
|
nnn = 'NEW'
|
|
n = item['training_session']['workout']
|
|
spm = item['training_session']['stroke']
|
|
d = item['training_session']['training_date']
|
|
p = item['training_session']['points']
|
|
sessions.append({
|
|
'id':i,
|
|
'name':n,
|
|
'spm':spm,
|
|
'new':nnn,
|
|
'date': d,
|
|
'points': p,
|
|
})
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_rojaboimport_view'),
|
|
'name': 'Rojabo'
|
|
},
|
|
]
|
|
|
|
checknew = request.GET.get('selectallnew', False)
|
|
|
|
return render(request, 'rojabo_list_import.html',
|
|
{'sessions': sessions,
|
|
'rower': r,
|
|
'active': 'nav-plans',
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
'checknew': checknew,
|
|
})
|
|
|
|
|
|
|
|
# for Strava webhook request validation
|
|
|
|
|
|
@csrf_exempt
|
|
def strava_webhook_view(request):
|
|
if request.method == 'GET':
|
|
challenge = request.GET.get('hub.challenge')
|
|
verificationtoken = request.GET.get('hub.verify_token')
|
|
if verificationtoken != strava.webhookverification: # pragma: no cover
|
|
return HttpResponse(status=403)
|
|
data = {"hub.challenge": challenge}
|
|
return JSONResponse(data)
|
|
|
|
# logging
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
dologging("strava_webhooks.log",request.body)
|
|
|
|
# POST - does nothing so far
|
|
data = json.loads(request.body)
|
|
try:
|
|
aspect_type = data['aspect_type']
|
|
object_type = data['object_type']
|
|
strava_owner = data['owner_id']
|
|
_ = data['event_time']
|
|
except KeyError: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
dologging("strava_webhooks.log","KeyError line 1330")
|
|
return HttpResponse(status=200)
|
|
|
|
if object_type == 'activity':
|
|
if aspect_type == 'create':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'KeyError line 1105')
|
|
return HttpResponse(status=200)
|
|
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Rower not found')
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
s = 'Multiple rowers found for strava ID {id}'.format(
|
|
id=strava_owner)
|
|
dologging('strava_webhooks.log', s)
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0 and r.strava_auto_import:
|
|
strava_integration = importsources['strava'](r.user)
|
|
jobid = strava_integration.get_workout(stravaid)
|
|
if jobid == 0: # pragma: no cover
|
|
dologging('strava_webhooks.log',
|
|
'Strava strava_open yielded NoTokenError')
|
|
else: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Workouts already existing')
|
|
for w in ws:
|
|
dologging('strava_webhooks.log', str(w))
|
|
elif aspect_type == 'delete':
|
|
try:
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'KeyError line 1132')
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0:
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
try: # pragma: no cover
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
dologging('strava_webhooks.log', 'Rower not found')
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
if r.strava_auto_delete: # pragma: no cover
|
|
for w in ws:
|
|
if w.user == r:
|
|
w.delete()
|
|
elif aspect_type == 'update':
|
|
try:
|
|
updates = data['updates']
|
|
stravaid = data['object_id']
|
|
except KeyError: # pragma: no cover
|
|
dologging("strava_webhooks.log","KeyError line 1391")
|
|
return HttpResponse(status=200)
|
|
try:
|
|
ws = Workout.objects.filter(uploadedtostrava=stravaid)
|
|
if ws.count() == 0: # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
except Workout.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
dologging("strava_webhooks.log","workout not found")
|
|
|
|
return HttpResponse(status=200)
|
|
try:
|
|
r = Rower.objects.get(strava_owner_id=strava_owner)
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
dologging("strava_webhooks.log","Rower not found")
|
|
return HttpResponse(status=200)
|
|
except MultipleObjectsReturned: # pragma: no cover
|
|
rs = Rower.objects.filter(strava_owner_id=strava_owner)
|
|
r = rs[0]
|
|
|
|
for key, value in updates.items():
|
|
for w in ws:
|
|
if key == 'title':
|
|
w.name = value
|
|
w.save()
|
|
if key == 'type' and r.strava_auto_import:
|
|
try:
|
|
w.workouttype = mytypes.stravamappinginv[value]
|
|
w.save()
|
|
except KeyError: # pragma: no cover
|
|
dologging("strava_webhooks.log","Workout type not found")
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
# For push notifications from Garmin
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_summaries_view(request): # pragma: no cover
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
t = time.localtime()
|
|
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
|
|
dologging("garminlog.log",request.body)
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
activities = data['activities']
|
|
result = garmin_stuff.garmin_workouts_from_summaries(activities)
|
|
|
|
if result:
|
|
return HttpResponse(status=200)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_newfiles_ping(request): # pragma: no cover
|
|
if request.method != 'POST':
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
for file in data['activityFiles']:
|
|
try:
|
|
garmintoken = file['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
callbackURL = file['callbackURL']
|
|
starttime = file['startTimeInSeconds']
|
|
fileType = file['fileType']
|
|
_ = garmin_stuff.get_garmin_file(r, callbackURL, starttime, fileType)
|
|
except Rower.DoesNotExist:
|
|
pass
|
|
except KeyError:
|
|
pass
|
|
|
|
return HttpResponse(status=200) # pragma: no cover
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_deregistration_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
data = json.loads(request.body)
|
|
deregistrations = data['deregistrations']
|
|
for deregistration in deregistrations:
|
|
try:
|
|
garmintoken = deregistration['userAccessToken']
|
|
try:
|
|
r = Rower.objects.get(garmintoken=garmintoken)
|
|
r.garmintoken = ''
|
|
r.save()
|
|
except Rower.DoesNotExist: # pragma: no cover
|
|
pass
|
|
except KeyError: # pragma: no cover
|
|
pass
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
@csrf_exempt
|
|
def garmin_details_view(request):
|
|
if request.method != 'POST': # pragma: no cover
|
|
return HttpResponse(status=200)
|
|
|
|
# POST request
|
|
data = json.loads(request.body)
|
|
_ = garmin_stuff.garmin_workouts_from_details(data)
|
|
|
|
return HttpResponse(status=200)
|
|
|
|
|
|
# the page where you select which Polar workout to Import
|
|
@login_required()
|
|
@permission_required('rower.is_coach', fn=get_user_by_userid, raise_exception=True)
|
|
@permission_required('rower.is_not_freecoach', fn=get_user_by_userid, raise_exception=True)
|
|
def workout_polarimport_view(request, userid=0): # pragma: no cover
|
|
exercises = polarstuff.get_polar_workouts(request.user)
|
|
workouts = []
|
|
|
|
try:
|
|
a = exercises.status_code
|
|
if a == 401:
|
|
messages.error(
|
|
request, 'Not authorized. You need to connect to Polar first')
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
except: # pragma: no cover
|
|
exercises = []
|
|
pass
|
|
|
|
for exercise in exercises:
|
|
try:
|
|
d = exercise['distance']
|
|
except KeyError:
|
|
d = 0
|
|
|
|
i = exercise['id']
|
|
transactionid = exercise['transaction-id']
|
|
starttime = exercise['start-time']
|
|
rowtype = exercise['sport']
|
|
durationstring = exercise['duration']
|
|
duration = isodate.parse_duration(durationstring)
|
|
keys = ['id', 'distance', 'duration',
|
|
'starttime', 'type', 'transactionid']
|
|
values = [i, d, duration, starttime, rowtype, transactionid]
|
|
res = dict(zip(keys, values))
|
|
workouts.append(res)
|
|
|
|
breadcrumbs = [
|
|
{
|
|
'url': '/rowers/list-workouts/',
|
|
'name': 'Workouts'
|
|
},
|
|
{
|
|
'url': reverse('workout_polarimport_view'),
|
|
'name': 'Polar'
|
|
},
|
|
]
|
|
|
|
r = getrower(request.user)
|
|
|
|
return render(request, 'polar_list_import.html',
|
|
{
|
|
'workouts': workouts,
|
|
'active': 'nav-workouts',
|
|
'rower': r,
|
|
'breadcrumbs': breadcrumbs,
|
|
'teams': get_my_teams(request.user),
|
|
})
|
|
|
|
importauthorizeviews = {
|
|
'c2': 'rower_integration_authorize',
|
|
'strava': 'rower_integration_authorize',
|
|
'polar': 'rower_polar_authorize',
|
|
'ownapi': 'workout_view',
|
|
'sporttracks': 'rower_integration_authorize',
|
|
'trainingpeaks': 'rower_integration_authorize',
|
|
'nk': 'rower_integration_authorize',
|
|
'rp3': 'rower_integration_authorize',
|
|
}
|
|
|
|
|
|
@login_required()
|
|
def workout_getimportview(request, externalid, source='c2', do_async=True):
|
|
try:
|
|
integration = importsources[source](request.user)
|
|
except (TypeError, NotImplementedError, KeyError):
|
|
return reverse("workouts_view")
|
|
if 'startdate' in request.session and source == 'nk': # pragma: no cover
|
|
startdate = request.session.get('startdate')
|
|
enddate = request.session.get('enddate')
|
|
|
|
|
|
try:
|
|
result = integration.get_workout(externalid, startdate=startdate, enddate=enddate)
|
|
except NoTokenError:
|
|
return HttpResponseRedirect(reverse(importauthorizeviews[source],kwargs={'source':source}))
|
|
|
|
url = reverse(importlistviews[source])
|
|
return HttpResponseRedirect(url)
|
|
try:
|
|
result = integration.get_workout(externalid)
|
|
except NoTokenError:
|
|
return HttpResponseRedirect(reverse(importauthorizeviews[source],kwargs={'source':source}))
|
|
|
|
if result: # pragma: no cover
|
|
messages.info(
|
|
request, "Your workout will be imported in the background")
|
|
# this should return to the respective import list page
|
|
else: # pragma: no cover
|
|
messages.error(request, 'Error getting the workout')
|
|
|
|
url = reverse("workout_import_view", kwargs={'source':source})
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
# Imports all new workouts from SportTracks
|
|
@login_required()
|
|
def workout_getsporttracksworkout_all(request):
|
|
st_integration = importsources['sporttracks'](request.user)
|
|
try:
|
|
_ = st_integration.get_workouts()
|
|
messages.info(request,"Your SportTracks workouts will be imported in the background")
|
|
except NoTokenError:
|
|
messages.error(request,"You have to connect to SportTracks first")
|
|
|
|
url = reverse('workouts_view')
|
|
return HttpResponseRedirect(url)
|
|
|