253 lines
8.7 KiB
Python
253 lines
8.7 KiB
Python
from abc import ABCMeta, ABC, abstractmethod
|
|
from importlib import import_module
|
|
from rowers.models import Rower, User
|
|
from rowers.utils import NoTokenError
|
|
|
|
import requests
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
import arrow
|
|
import urllib
|
|
from uuid import uuid4
|
|
|
|
|
|
|
|
class SyncIntegration(metaclass=ABCMeta):
|
|
oauth_data = {
|
|
'tokenname':'token',
|
|
'expirydatename':'exp',
|
|
'refreshtokenname':'r',
|
|
'redirect_uri': 'r',
|
|
'client_secret': 's',
|
|
'base_uri': 's'
|
|
}
|
|
|
|
user = User()
|
|
rower = Rower()
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
user = args[1]
|
|
self.user = user
|
|
self.rower = user.rower
|
|
|
|
@classmethod
|
|
def __subclasshook__(cls, subclass):
|
|
return (hasattr(subclass, 'get_token') and
|
|
callable(subclass.get_token) or
|
|
NotImplemented)
|
|
|
|
|
|
@abstractmethod
|
|
def createworkoutdata(w, *args, **kwargs):
|
|
return None
|
|
|
|
@abstractmethod
|
|
def workout_export(workout, *args, **kwargs) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_workouts(*args, **kwargs) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_workout(id) -> int:
|
|
return 0
|
|
|
|
# need to unify workout list
|
|
@abstractmethod
|
|
def get_workout_list(*args, **kwargs) -> list:
|
|
return []
|
|
|
|
@abstractmethod
|
|
def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover
|
|
# Generate a random string for the state parameter
|
|
# Save it for use later to prevent xsrf attacks
|
|
|
|
state = str(uuid4())
|
|
|
|
params = {"client_id": self.oauth_data['client_id'],
|
|
"response_type": "code",
|
|
"redirect_uri": self.oauth_data['redirect_uri'],
|
|
"scope": self.oauth_data['scope'],
|
|
"state": state}
|
|
|
|
url = oauth_data['authorizaton_uri']+urllib.parse.urlencode(params)
|
|
|
|
return url
|
|
|
|
|
|
@abstractmethod
|
|
def get_token(self, code, *args, **kwargs) -> (str, int, str):
|
|
redirect_uri = self.oauth_data['redirect_uri']
|
|
client_secret = self.oauth_data['client_secret']
|
|
client_id = self.oauth_data['client_id']
|
|
base_uri = self.oauth_data['base_url']
|
|
|
|
post_data = {"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": redirect_uri,
|
|
"client_secret": client_secret,
|
|
"client_id": client_id,
|
|
}
|
|
|
|
try:
|
|
headers = self.oauth_data['headers']
|
|
except KeyError:
|
|
headers = {'Accept': 'application/json',
|
|
'Api-Key': client_id,
|
|
'Content-Type': 'application/json',
|
|
'user-agent': 'sanderroosendaal'}
|
|
|
|
if 'grant_type' in self.oauth_data:
|
|
if self.oauth_data['grant_type']:
|
|
post_data['grant_type'] = self.oauth_data['grant_type']
|
|
if 'strava' in self.oauth_data['autorization_uri']:
|
|
post_data['grant_type'] = "authorization_code"
|
|
|
|
if 'json' in self.oauth_data['content_type']:
|
|
response = requests.post(
|
|
base_uri,
|
|
data=json.dumps(post_data),
|
|
headers=headers)
|
|
else: # pragma: no cover
|
|
response = requests.post(
|
|
base_uri,
|
|
data=post_data,
|
|
headers=headers, verify=False)
|
|
|
|
if response.status_code == 200 or response.status_code == 201:
|
|
token_json = response.json()
|
|
try:
|
|
thetoken = token_json['access_token']
|
|
except KeyError: # pragma: no cover
|
|
raise NoTokenError("Failed to obtain token")
|
|
try:
|
|
refresh_token = token_json['refresh_token']
|
|
except KeyError: # pragma: no cover
|
|
refresh_token = ''
|
|
try:
|
|
expires_in = token_json['expires_in']
|
|
except KeyError: # pragma: no cover
|
|
expires_in = 0
|
|
try:
|
|
expires_in = int(expires_in)
|
|
except (ValueError, TypeError): # pragma: no cover
|
|
expires_in = 0
|
|
else: # pragma: no cover
|
|
raise NoTokenError("Failed to obtain token")
|
|
|
|
return [thetoken, expires_in, refresh_token]
|
|
|
|
|
|
@abstractmethod
|
|
def open(self, *args, **kwargs) -> str:
|
|
token = getattr(self.rower, self.oauth_data['tokenname'])
|
|
|
|
try:
|
|
tokenexpirydate = getattr(self.rower, self.oauth_data['expirydatename'])
|
|
except (TypeError, AttributeError, KeyError): # pragma: no cover
|
|
tokenexpirydate = None
|
|
|
|
if (token == '') or (token is None):
|
|
raise NoTokenError("User has no token")
|
|
else:
|
|
tokenname = self.oauth_data['tokenname']
|
|
refreshtokenname = self.oauth_data['refreshtokenname']
|
|
expirydatename = self.oauth_data['expirydatename']
|
|
if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate:
|
|
token = self.token_refresh()
|
|
elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover
|
|
token = self.token_refresh()
|
|
|
|
return token
|
|
|
|
def do_refresh_token(self, *args, **kwargs) -> (str, int, str):
|
|
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
|
|
access_token = kwargs.get('access_token','')
|
|
post_data = {"grant_type": "refresh_token",
|
|
"client_secret": self.oauth_data['client_secret'],
|
|
"client_id": self.oauth_data['client_id'],
|
|
"refresh_token": refreshtoken,
|
|
}
|
|
headers = {'user-agent': 'sanderroosendaal',
|
|
'Accept': 'application/json',
|
|
'Content-Type': self.oauth_data['content_type']}
|
|
|
|
# for Strava
|
|
if 'grant_type' in self.oauth_data:
|
|
if self.oauth_data['grant_type']:
|
|
post_data['grant_type'] = self.oauth_data['grant_type']
|
|
|
|
if self.oauth_data['bearer_auth']:
|
|
headers['authorization'] = 'Bearer %s' % access_token
|
|
|
|
baseurl = self.oauth_data['base_url']
|
|
|
|
if 'json' in self.oauth_data['content_type']:
|
|
try:
|
|
response = requests.post(baseurl,
|
|
data=json.dumps(post_data),
|
|
headers=headers, verify=False)
|
|
except: # pragma: no cover
|
|
raise NoTokenError("Failed to get token")
|
|
else:
|
|
try:
|
|
response = requests.post(baseurl,
|
|
data=post_data,
|
|
headers=headers, verify=False,
|
|
)
|
|
except: # pragma: no cover
|
|
raise NoTokenError("Failed to get token")
|
|
|
|
if response.status_code == 200 or response.status_code == 201:
|
|
token_json = response.json()
|
|
else: # pragma: no cover
|
|
raise NoTokenError("User has no token")
|
|
|
|
try:
|
|
thetoken = token_json['access_token']
|
|
except KeyError: # pragma: no cover
|
|
raise NoTokenError("User has no token")
|
|
|
|
try:
|
|
expires_in = token_json['expires_in']
|
|
except KeyError:
|
|
try:
|
|
expires_at = arrow.get(token_json['expires_at']).timestamp()
|
|
expires_in = expires_at - arrow.now().timestamp()
|
|
except KeyError: # pragma: no cover
|
|
expires_in = 0
|
|
try:
|
|
refresh_token = token_json['refresh_token']
|
|
except KeyError: # pragma: no cover
|
|
refresh_token = refreshtoken
|
|
try:
|
|
expires_in = int(expires_in)
|
|
except (TypeError, ValueError): # pragma: no cover
|
|
expires_in = 0
|
|
|
|
return [thetoken, expires_in, refresh_token]
|
|
|
|
|
|
@abstractmethod
|
|
def token_refresh(self, *args, **kwargs) -> str:
|
|
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
|
|
|
|
if not refreshtoken:
|
|
refreshtoken = getattr(self.rower, self.oauth_data['tokenname'])
|
|
|
|
access_token, expires_in, refresh_token = self.do_refresh_token()
|
|
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
|
|
|
|
setattr(self.rower, self.oauth_data['tokenname'], access_token)
|
|
if self.oauth_data['expirydatename'] is not None:
|
|
setattr(self.rower, self.oauth_data['expirydatename'], expirydatetime)
|
|
if self.oauth_data['refreshtokenname'] is not None:
|
|
setattr(self.rower, self.oauth_data['refreshtokenname'], refresh_token)
|
|
|
|
self.rower.save()
|
|
|
|
return access_token
|
|
|
|
|