Private
Public Access
1
0
Files
rowsandall/rowers/integrations/integrations.py
Sander Roosendaal def98b9a85 fix tets
2023-02-11 17:33:31 +01:00

253 lines
8.7 KiB
Python

from abc import ABCMeta, ABC, abstractmethod
from importlib import import_module
from rowers.models import Rower, User
from rowers.utils import NoTokenError
import requests
from django.utils import timezone
from datetime import timedelta
import arrow
import urllib
from uuid import uuid4
import json
class SyncIntegration(metaclass=ABCMeta):
oauth_data = {
'tokenname':'token',
'expirydatename':'exp',
'refreshtokenname':'r',
'redirect_uri': 'r',
'client_secret': 's',
'base_uri': 's'
}
user = User()
rower = Rower()
def __init__(self, *args, **kwargs):
user = args[1]
self.user = user
self.rower = user.rower
@classmethod
def __subclasshook__(cls, subclass):
return (hasattr(subclass, 'get_token') and
callable(subclass.get_token) or
NotImplemented)
@abstractmethod
def createworkoutdata(w, *args, **kwargs):
return None
@abstractmethod
def workout_export(workout, *args, **kwargs) -> str:
pass
@abstractmethod
def get_workouts(*args, **kwargs) -> int:
pass
@abstractmethod
def get_workout(id) -> int:
return 0
# need to unify workout list
@abstractmethod
def get_workout_list(*args, **kwargs) -> list:
return []
@abstractmethod
def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
params = {"client_id": self.oauth_data['client_id'],
"response_type": "code",
"redirect_uri": self.oauth_data['redirect_uri'],
"scope": self.oauth_data['scope'],
"state": state}
url = oauth_data['authorizaton_uri']+urllib.parse.urlencode(params)
return url
@abstractmethod
def get_token(self, code, *args, **kwargs) -> (str, int, str):
redirect_uri = self.oauth_data['redirect_uri']
client_secret = self.oauth_data['client_secret']
client_id = self.oauth_data['client_id']
base_uri = self.oauth_data['base_url']
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_secret": client_secret,
"client_id": client_id,
}
try:
headers = self.oauth_data['headers']
except KeyError:
headers = {'Accept': 'application/json',
'Api-Key': client_id,
'Content-Type': 'application/json',
'user-agent': 'sanderroosendaal'}
if 'grant_type' in self.oauth_data:
if self.oauth_data['grant_type']:
post_data['grant_type'] = self.oauth_data['grant_type']
if 'strava' in self.oauth_data['autorization_uri']:
post_data['grant_type'] = "authorization_code"
if 'json' in self.oauth_data['content_type']:
response = requests.post(
base_uri,
data=json.dumps(post_data),
headers=headers)
else: # pragma: no cover
response = requests.post(
base_uri,
data=post_data,
headers=headers, verify=False)
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
raise NoTokenError("Failed to obtain token")
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = ''
try:
expires_in = token_json['expires_in']
except KeyError: # pragma: no cover
expires_in = 0
try:
expires_in = int(expires_in)
except (ValueError, TypeError): # pragma: no cover
expires_in = 0
else: # pragma: no cover
raise NoTokenError("Failed to obtain token")
return [thetoken, expires_in, refresh_token]
@abstractmethod
def open(self, *args, **kwargs) -> str:
token = getattr(self.rower, self.oauth_data['tokenname'])
try:
tokenexpirydate = getattr(self.rower, self.oauth_data['expirydatename'])
except (TypeError, AttributeError, KeyError): # pragma: no cover
tokenexpirydate = None
if (token == '') or (token is None):
raise NoTokenError("User has no token")
else:
tokenname = self.oauth_data['tokenname']
refreshtokenname = self.oauth_data['refreshtokenname']
expirydatename = self.oauth_data['expirydatename']
if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate:
token = self.token_refresh()
elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover
token = self.token_refresh()
return token
def do_refresh_token(self, *args, **kwargs) -> (str, int, str):
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
access_token = kwargs.get('access_token','')
post_data = {"grant_type": "refresh_token",
"client_secret": self.oauth_data['client_secret'],
"client_id": self.oauth_data['client_id'],
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
'Accept': 'application/json',
'Content-Type': self.oauth_data['content_type']}
# for Strava
if 'grant_type' in self.oauth_data:
if self.oauth_data['grant_type']:
post_data['grant_type'] = self.oauth_data['grant_type']
if self.oauth_data['bearer_auth']:
headers['authorization'] = 'Bearer %s' % access_token
baseurl = self.oauth_data['base_url']
if 'json' in self.oauth_data['content_type']:
try:
response = requests.post(baseurl,
data=json.dumps(post_data),
headers=headers, verify=False)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
else:
try:
response = requests.post(baseurl,
data=post_data,
headers=headers, verify=False,
)
except: # pragma: no cover
raise NoTokenError("Failed to get token")
if response.status_code == 200 or response.status_code == 201:
token_json = response.json()
else: # pragma: no cover
raise NoTokenError("User has no token")
try:
thetoken = token_json['access_token']
except KeyError: # pragma: no cover
raise NoTokenError("User has no token")
try:
expires_in = token_json['expires_in']
except KeyError:
try:
expires_at = arrow.get(token_json['expires_at']).timestamp()
expires_in = expires_at - arrow.now().timestamp()
except KeyError: # pragma: no cover
expires_in = 0
try:
refresh_token = token_json['refresh_token']
except KeyError: # pragma: no cover
refresh_token = refreshtoken
try:
expires_in = int(expires_in)
except (TypeError, ValueError): # pragma: no cover
expires_in = 0
return [thetoken, expires_in, refresh_token]
@abstractmethod
def token_refresh(self, *args, **kwargs) -> str:
refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname'])
if not refreshtoken:
refreshtoken = getattr(self.rower, self.oauth_data['tokenname'])
access_token, expires_in, refresh_token = self.do_refresh_token()
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
setattr(self.rower, self.oauth_data['tokenname'], access_token)
if self.oauth_data['expirydatename'] is not None:
setattr(self.rower, self.oauth_data['expirydatename'], expirydatetime)
if self.oauth_data['refreshtokenname'] is not None:
setattr(self.rower, self.oauth_data['refreshtokenname'], refresh_token)
self.rower.save()
return access_token