Private
Public Access
1
0
Files
rowsandall/rowers/integrations/integrations.py

271 lines
9.4 KiB
Python

from abc import ABCMeta, ABC, abstractmethod
from importlib import import_module
from rowers.models import Rower, User, create_or_update_syncrecord
from rowers.utils import NoTokenError,dologging
import requests
from django.utils import timezone
from datetime import timedelta
import arrow
import urllib
from uuid import uuid4
import json
class SyncIntegration(metaclass=ABCMeta):
oauth_data = {
'tokenname':'token',
'expirydatename':'exp',
'refreshtokenname':'r',
'redirect_uri': 'r',
'client_secret': 's',
'base_uri': 's'
}
user = User()
rower = Rower()
def __init__(self, *args, **kwargs):
user = args[0]
self.user = user
self.rower = user.rower
@classmethod
def __subclasshook__(cls, subclass):
return (hasattr(subclass, 'get_token') and
callable(subclass.get_token) or
NotImplemented)
@abstractmethod
def get_name(self):
raise NotImplementedError
@abstractmethod
def get_shortname(self):
raise NotImplementedError
@abstractmethod
def createworkoutdata(self, w, *args, **kwargs):
return None
@abstractmethod
def workout_export(self, workout, *args, **kwargs) -> str:
pass
@abstractmethod
def get_workouts(self, *args, **kwargs) -> int:
pass
@abstractmethod
def get_workout(self, id, *args, **kwargs) -> int:
return 0
# need to unify workout list
@abstractmethod
def get_workout_list(self, *args, **kwargs) -> list:
return []
@abstractmethod
def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
params = {"client_id": self.oauth_data['client_id'],
"response_type": "code",
"redirect_uri": self.oauth_data['redirect_uri'],
"scope": self.oauth_data['scope'],
"state": state}
url = self.oauth_data['authorization_uri']+urllib.parse.urlencode(params)
return url
@abstractmethod
def get_token(self, code, *args, **kwargs) -> (str, int, str):
logfile = kwargs.get('logfile',None)
redirect_uri = self.oauth_data['redirect_uri']
client_secret = self.oauth_data['client_secret']
client_id = self.oauth_data['client_id']
base_uri = self.oauth_data['base_url']
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_secret": client_secret,
"client_id": client_id,
}
try:
headers = self.oauth_data['headers']
except KeyError:
headers = {'Accept': 'application/json',
'Api-Key': client_id,
'Content-Type': 'application/json',
'user-agent': 'sanderroosendaal'}
if 'grant_type' in self.oauth_data:
if self.oauth_data['grant_type']:
post_data['grant_type'] = self.oauth_data['grant_type']
if 'strava' in self.oauth_data['autorization_uri']:
post_data['grant_type'] = "authorization_code"
if 'json' in self.oauth_data['content_type']:
response = requests.post(
base_uri,
data=json.dumps(post_data),
headers=headers)
else: # pragma: no cover
response = requests.post(
base_uri,
data=post_data,
headers=headers, verify=False)
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
if logfile:
s = json.dumps(token_json)
dologging(logfile,s)
raise NoTokenError("Failed to obtain token")
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = ''
try:
expires_in = token_json['expires_in']
except KeyError: # pragma: no cover
try:
expires_at = arrow.get(token_json['expires_at']).timestamp()
expires_in = expires_at - arrow.now().timestamp()
except KeyError: # pragma: no cover
expires_in = 0
try:
expires_in = int(expires_in)
except (ValueError, TypeError): # pragma: no cover
expires_in = 0
else: # pragma: no cover
if logfile:
dologging(logfile,response.text)
raise NoTokenError("Failed to obtain token")
return [thetoken, expires_in, refresh_token]
@abstractmethod
def open(self, *args, **kwargs) -> str:
token = getattr(self.rower, self.oauth_data['tokenname'])
try:
tokenexpirydate = getattr(self.rower, self.oauth_data['expirydatename'])
except (TypeError, AttributeError, KeyError): # pragma: no cover
tokenexpirydate = None
if (token == '') or (token is None):
raise NoTokenError("User has no token")
else:
tokenname = self.oauth_data['tokenname']
refreshtokenname = self.oauth_data['refreshtokenname']
expirydatename = self.oauth_data['expirydatename']
if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate:
token = self.token_refresh()
elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover
token = self.token_refresh()
return token
def do_refresh_token(self, *args, **kwargs) -> (str, int, str):
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
access_token = kwargs.get('access_token','')
post_data = {"grant_type": "refresh_token",
"client_secret": self.oauth_data['client_secret'],
"client_id": self.oauth_data['client_id'],
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
'Accept': 'application/json',
'Content-Type': self.oauth_data['content_type']}
# for Strava
if 'grant_type' in self.oauth_data:
if self.oauth_data['grant_type']:
post_data['grant_type'] = self.oauth_data['grant_type']
if self.oauth_data['bearer_auth']:
headers['authorization'] = 'Bearer %s' % access_token
baseurl = self.oauth_data['base_url']
if 'json' in self.oauth_data['content_type']:
try:
response = requests.post(baseurl,
data=json.dumps(post_data),
headers=headers, verify=False)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
else:
try:
response = requests.post(baseurl,
data=post_data,
headers=headers, verify=False,
)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
else: # pragma: no cover
raise NoTokenError("User has no token")
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
raise NoTokenError("User has no token")
try:
expires_in = token_json['expires_in']
except KeyError:
try:
expires_at = arrow.get(token_json['expires_at']).timestamp()
expires_in = expires_at - arrow.now().timestamp()
except KeyError: # pragma: no cover
expires_in = 0
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = refreshtoken
try:
expires_in = int(expires_in)
except (TypeError, ValueError): # pragma: no cover
expires_in = 0
return [thetoken, expires_in, refresh_token]
@abstractmethod
def token_refresh(self, *args, **kwargs) -> str:
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
if not refreshtoken:
refreshtoken = getattr(self.rower, self.oauth_data['tokenname'])
access_token, expires_in, refresh_token = self.do_refresh_token()
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
setattr(self.rower, self.oauth_data['tokenname'], access_token)
if self.oauth_data['expirydatename'] is not None:
setattr(self.rower, self.oauth_data['expirydatename'], expirydatetime)
if self.oauth_data['refreshtokenname'] is not None:
setattr(self.rower, self.oauth_data['refreshtokenname'], refresh_token)
self.rower.save()
return access_token