Private
Public Access
1
0
Files
rowsandall/rowers/nkstuff.py
2021-03-31 07:41:00 +02:00

143 lines
4.5 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import unicode_literals, absolute_import
import time
from time import strftime
import requests
#https://oauth-stage.nkrowlink.com/oauth/authorizegrant_type=authorization_code&response_type=code&client_id=rowsandall-staging&scope=read&state=fc8fc3d8-ce0a-443e-838a-1c06fb5317c6&redirect_uri=https%3A%2F%2Fdunav.ngrok.io%2Fnk_callback%2F
#https://oauth-stage.nkrowlink.com/oauth/authorize?grant_type=authorization_code&response_type=code&client_id=rowsandall-staging&scope=read&state=1234&redirect_uri=https%3A%2F%2Fdev.rowsandall.com%2Fnk_callback
from requests_oauthlib import OAuth2Session
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
from rowers.rower_rules import is_workout_user, ispromember
from iso8601 import ParseError
from rowers.utils import myqueue
import rowers.mytypes as mytypes
import gzip
from rowsandall_app.settings import (
NK_CLIENT_ID, NK_REDIRECT_URI, NK_CLIENT_SECRET,
SITE_URL, NK_API_LOCATION
)
try:
from json.decoder import JSONDecodeError
except ImportError:
JSONDecodeError = ValueError
from rowers.imports import *
oauth_data = {
'client_id': NK_CLIENT_ID,
'client_secret': NK_CLIENT_SECRET,
'redirect_uri': NK_REDIRECT_URI,
'autorization_uri': "https://oauth-stage.nkrowlink.com/oauth/authorize",
'content_type': 'application/json',
'tokenname': 'nktoken',
'refreshtokenname': 'nkrefreshtoken',
'expirydatename': 'nktokenexpirydate',
'bearer_auth': True,
'base_url': "https://oauth-stage.nkrowlink.com/oauth/token",
'scope':'read',
}
def get_token(code):
#client_id = oauth_data['client_id']
#client_secret = oauth_data['client_secret']
#base_uri = oauth_data['base_url']
#callbackuri = 'https://'+callbackuri[9:]
#print(callbackuri)
#nk = OAuth2Session(client_id)
#token = nk.fetch_token(base_uri,client_secret=client_secret,authorization_response=callbackuri)
#print(token)
#return [0,0,0]
return imports_get_token(code, oauth_data)
def nk_open(user):
t = time.localtime()
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
token = imports_open(user,oauth_data)
return token
def do_refresh_token(refreshtoken):
return imports_do_refresh_token(refreshtoken, oauth_data)
def rower_nk_token_refresh(user):
r = Rower.objects.get(user=user)
res = do_refresh_token(r.nkrefreshtoken)
access_token = res[0]
expires_in = res[1]
refresh_token = res[2]
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
r.nktoken = access_token
r.nktokenexpirydate = expirydatetime
r.nkrefreshtoken = refresh_token
r.save()
return r.nktoken
def make_authorization_url(request):
return imports_make_authorization_url(oauth_data)
def get_nk_workout_list(user,fake=False):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (r.nktokenexpirydate is None or timezone.now()+timedelta(seconds=3599)>r.nktokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.nktoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = NK_API_LOCATION+"api/v1/sessions"
params = {} # start / end time
s = requests.get(url,headers=headers,params=params)
return s
def get_nk_workout_strokes(user,nkid):
r = Rower.objects.get(user=user)
if (r.nktoken == '') or (r.nktoken is None):
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
elif (timezone.now()>r.tokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.nktoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://log.concept2.com/api/users/me/results/"+str(nkid)+"/strokes"
s = requests.get(url,headers=headers)
return s
#
#def get_workout(user,nkid):