from abc import ABCMeta, ABC, abstractmethod from importlib import import_module from rowers.models import Rower, User, create_or_update_syncrecord, get_known_ids from rowers.utils import NoTokenError,dologging import requests from django.utils import timezone from datetime import timedelta import arrow import urllib from uuid import uuid4 import json class SyncIntegration(metaclass=ABCMeta): oauth_data = { 'tokenname':'token', 'expirydatename':'exp', 'refreshtokenname':'r', 'redirect_uri': 'r', 'client_secret': 's', 'base_uri': 's' } user = User() rower = Rower() def __init__(self, *args, **kwargs): user = args[0] self.user = user self.rower = user.rower @classmethod def __subclasshook__(cls, subclass): return (hasattr(subclass, 'get_token') and callable(subclass.get_token) or NotImplemented) @abstractmethod def get_name(self): raise NotImplementedError @abstractmethod def get_shortname(self): raise NotImplementedError @abstractmethod def createworkoutdata(self, w, *args, **kwargs): return None @abstractmethod def workout_export(self, workout, *args, **kwargs) -> str: pass @abstractmethod def get_workouts(self, *args, **kwargs) -> int: pass @abstractmethod def get_workout(self, id, *args, **kwargs) -> int: return 0 # need to unify workout list @abstractmethod def get_workout_list(self, *args, **kwargs) -> list: return [] @abstractmethod def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover # Generate a random string for the state parameter # Save it for use later to prevent xsrf attacks state = str(uuid4()) params = {"client_id": self.oauth_data['client_id'], "response_type": "code", "redirect_uri": self.oauth_data['redirect_uri'], "scope": self.oauth_data['scope'], "state": state} url = self.oauth_data['authorization_uri']+urllib.parse.urlencode(params) return url @abstractmethod def get_token(self, code, *args, **kwargs) -> (str, int, str): logfile = kwargs.get('logfile',None) redirect_uri = self.oauth_data['redirect_uri'] client_secret = self.oauth_data['client_secret'] client_id = self.oauth_data['client_id'] base_uri = self.oauth_data['base_url'] post_data = {"grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_secret": client_secret, "client_id": client_id, } try: headers = self.oauth_data['headers'] except KeyError: headers = {'Accept': 'application/json', 'Api-Key': client_id, 'Content-Type': 'application/json', 'user-agent': 'sanderroosendaal'} if 'grant_type' in self.oauth_data: if self.oauth_data['grant_type']: post_data['grant_type'] = self.oauth_data['grant_type'] if 'strava' in self.oauth_data['authorization_uri']: post_data['grant_type'] = "authorization_code" if 'json' in self.oauth_data['content_type']: response = requests.post( base_uri, data=json.dumps(post_data), headers=headers) else: # pragma: no cover response = requests.post( base_uri, data=post_data, headers=headers, verify=False) if response.status_code == 200 or response.status_code == 201: token_json = response.json() try: thetoken = token_json['access_token'] except KeyError: # pragma: no cover if logfile: s = json.dumps(token_json) dologging(logfile,s) raise NoTokenError("Failed to obtain token") try: refresh_token = token_json['refresh_token'] except KeyError: # pragma: no cover refresh_token = '' try: expires_in = token_json['expires_in'] except KeyError: # pragma: no cover try: expires_at = arrow.get(token_json['expires_at']).timestamp() expires_in = expires_at - arrow.now().timestamp() except KeyError: # pragma: no cover expires_in = 0 try: expires_in = int(expires_in) except (ValueError, TypeError): # pragma: no cover expires_in = 0 else: # pragma: no cover if logfile: dologging(logfile,response.text) raise NoTokenError("Failed to obtain token") return [thetoken, expires_in, refresh_token] @abstractmethod def open(self, *args, **kwargs) -> str: token = getattr(self.rower, self.oauth_data['tokenname']) try: tokenexpirydate = getattr(self.rower, self.oauth_data['expirydatename']) except (TypeError, AttributeError, KeyError): # pragma: no cover tokenexpirydate = None if (token == '') or (token is None): raise NoTokenError("User has no token") else: tokenname = self.oauth_data['tokenname'] refreshtokenname = self.oauth_data['refreshtokenname'] expirydatename = self.oauth_data['expirydatename'] if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate: token = self.token_refresh() elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover token = self.token_refresh() return token def do_refresh_token(self, *args, **kwargs) -> (str, int, str): refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname']) access_token = kwargs.get('access_token','') post_data = {"grant_type": "refresh_token", "client_secret": self.oauth_data['client_secret'], "client_id": self.oauth_data['client_id'], "refresh_token": refreshtoken, } headers = {'user-agent': 'sanderroosendaal', 'Accept': 'application/json', 'Content-Type': self.oauth_data['content_type']} # for Strava if 'grant_type' in self.oauth_data: if self.oauth_data['grant_type']: post_data['grant_type'] = self.oauth_data['grant_type'] if self.oauth_data['bearer_auth']: headers['authorization'] = 'Bearer %s' % access_token baseurl = self.oauth_data['base_url'] if 'json' in self.oauth_data['content_type']: try: response = requests.post(baseurl, data=json.dumps(post_data), headers=headers, verify=False) except: # pragma: no cover raise NoTokenError("Failed to get token") else: try: response = requests.post(baseurl, data=post_data, headers=headers, verify=False, ) except: # pragma: no cover raise NoTokenError("Failed to get token") if response.status_code == 200 or response.status_code == 201: token_json = response.json() else: # pragma: no cover raise NoTokenError("User has no token") try: thetoken = token_json['access_token'] except KeyError: # pragma: no cover raise NoTokenError("User has no token") try: expires_in = token_json['expires_in'] except KeyError: try: expires_at = arrow.get(token_json['expires_at']).timestamp() expires_in = expires_at - arrow.now().timestamp() except KeyError: # pragma: no cover expires_in = 0 try: refresh_token = token_json['refresh_token'] except KeyError: # pragma: no cover refresh_token = refreshtoken try: expires_in = int(expires_in) except (TypeError, ValueError): # pragma: no cover expires_in = 0 return [thetoken, expires_in, refresh_token] @abstractmethod def token_refresh(self, *args, **kwargs) -> str: refreshtoken = getattr(self.rower, self.oauth_data['refreshtokenname']) if not refreshtoken: refreshtoken = getattr(self.rower, self.oauth_data['tokenname']) access_token, expires_in, refresh_token = self.do_refresh_token() expirydatetime = timezone.now()+timedelta(seconds=expires_in) setattr(self.rower, self.oauth_data['tokenname'], access_token) if self.oauth_data['expirydatename'] is not None: setattr(self.rower, self.oauth_data['expirydatename'], expirydatetime) if self.oauth_data['refreshtokenname'] is not None: setattr(self.rower, self.oauth_data['refreshtokenname'], refresh_token) self.rower.save() return access_token