# All the functionality to connect to SportTracks # Python import oauth2 as oauth import cgi import pytz import requests import requests.auth import json from django.utils import timezone from datetime import datetime from datetime import timedelta import arrow import numpy as np from dateutil import parser import time from time import strftime import rowers.dataprep as dataprep import math from math import sin, cos, atan2, sqrt import os import sys import urllib import iso8601 from uuid import uuid4 # Django from django.http import HttpResponseRedirect, HttpResponse, JsonResponse from django.conf import settings from django.contrib.auth import authenticate, login, logout from django.contrib.auth.models import User from django.contrib.auth.decorators import login_required # Project # from .models import Profile from rowingdata import rowingdata, make_cumvalues import pandas as pd from rowers.models import Rower, Workout, TombStone import rowers.mytypes as mytypes from rowsandall_app.settings import ( C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET, STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET, SPORTTRACKS_CLIENT_SECRET, SPORTTRACKS_CLIENT_ID, SPORTTRACKS_REDIRECT_URI ) from rowers.utils import ( NoTokenError, custom_exception_handler, ewmovingaverage, geo_distance, uniqify ) # Splits SportTracks data which is one long sequence of # [t,[lat,lon],t2,[lat2,lon2] ...] # to [t,t2,t3, ...], [[lat,long],[lat2,long2],... def splitstdata(lijst): t = [] latlong = [] while len(lijst) >= 2: t.append(lijst[0]) latlong.append(lijst[1]) lijst = lijst[2:] return [np.array(t), np.array(latlong)] # covered in integrations def imports_open(user, oauth_data): r = Rower.objects.get(user=user) token = getattr(r, oauth_data['tokenname']) try: tokenexpirydate = getattr(r, oauth_data['expirydatename']) except (TypeError, AttributeError, KeyError): # pragma: no cover tokenexpirydate = None if (token == '') or (token is None): raise NoTokenError("User has no token") else: tokenname = oauth_data['tokenname'] refreshtokenname = oauth_data['refreshtokenname'] expirydatename = oauth_data['expirydatename'] if tokenexpirydate and timezone.now()+timedelta(seconds=60) > tokenexpirydate: token = imports_token_refresh( user, tokenname, refreshtokenname, expirydatename, oauth_data, ) elif tokenexpirydate is None and expirydatename is not None and 'strava' in expirydatename: # pragma: no cover token = imports_token_refresh( user, tokenname, refreshtokenname, expirydatename, oauth_data, ) return token # covered in integrations # Refresh token using refresh token def imports_do_refresh_token(refreshtoken, oauth_data, access_token=''): # client_auth = requests.auth.HTTPBasicAuth( # oauth_data['client_id'], # oauth_data['client_secret'] # ) post_data = {"grant_type": "refresh_token", "client_secret": oauth_data['client_secret'], "client_id": oauth_data['client_id'], "refresh_token": refreshtoken, } headers = {'user-agent': 'sanderroosendaal', 'Accept': 'application/json', 'Content-Type': oauth_data['content_type']} # for Strava if 'grant_type' in oauth_data: if oauth_data['grant_type']: post_data['grant_type'] = oauth_data['grant_type'] if oauth_data['bearer_auth']: headers['authorization'] = 'Bearer %s' % access_token baseurl = oauth_data['base_url'] if 'json' in oauth_data['content_type']: try: response = requests.post(baseurl, data=json.dumps(post_data), headers=headers, verify=False) except: # pragma: no cover raise NoTokenError("Failed to get token") else: try: response = requests.post(baseurl, data=post_data, headers=headers, verify=False, ) except: # pragma: no cover raise NoTokenError("Failed to get token") if response.status_code == 200 or response.status_code == 201: token_json = response.json() else: # pragma: no cover raise NoTokenError("User has no token") try: thetoken = token_json['access_token'] except KeyError: # pragma: no cover raise NoTokenError("User has no token") try: expires_in = token_json['expires_in'] except KeyError: try: expires_at = arrow.get(token_json['expires_at']).timestamp() expires_in = expires_at - arrow.now().timestamp() except KeyError: # pragma: no cover expires_in = 0 try: refresh_token = token_json['refresh_token'] except KeyError: # pragma: no cover refresh_token = refreshtoken try: expires_in = int(expires_in) except (TypeError, ValueError): # pragma: no cover expires_in = 0 return [thetoken, expires_in, refresh_token] # Exchange ST access code for long-lived ST access token # implemented in integrations def imports_get_token( code, oauth_data ): redirect_uri = oauth_data['redirect_uri'] client_secret = oauth_data['client_secret'] client_id = oauth_data['client_id'] base_uri = oauth_data['base_url'] # client_auth = requests.auth.HTTPBasicAuth( # client_id, client_secret # ) post_data = {"grant_type": "authorization_code", "code": code, "redirect_uri": redirect_uri, "client_secret": client_secret, "client_id": client_id, } try: headers = oauth_data['headers'] except KeyError: headers = {'Accept': 'application/json', 'Api-Key': client_id, 'Content-Type': 'application/json', 'user-agent': 'sanderroosendaal'} if 'grant_type' in oauth_data: if oauth_data['grant_type']: post_data['grant_type'] = oauth_data['grant_type'] if 'strava' in oauth_data['autorization_uri']: post_data['grant_type'] = "authorization_code" if 'json' in oauth_data['content_type']: response = requests.post( base_uri, data=json.dumps(post_data), headers=headers) else: # pragma: no cover response = requests.post( base_uri, data=post_data, headers=headers, verify=False) if response.status_code == 200 or response.status_code == 201: token_json = response.json() try: thetoken = token_json['access_token'] except KeyError: # pragma: no cover return [0, 0, 0] try: refresh_token = token_json['refresh_token'] except KeyError: # pragma: no cover refresh_token = '' try: expires_in = token_json['expires_in'] except KeyError: # pragma: no cover expires_in = 0 try: expires_in = int(expires_in) except (ValueError, TypeError): # pragma: no cover expires_in = 0 else: # pragma: no cover return [0, response.text, 0] return [thetoken, expires_in, refresh_token] # Make authorization URL including random string def imports_make_authorization_url(oauth_data): # pragma: no cover # Generate a random string for the state parameter # Save it for use later to prevent xsrf attacks state = str(uuid4()) params = {"client_id": oauth_data['client_id'], "response_type": "code", "redirect_uri": oauth_data['redirect_uri'], "scope": oauth_data['scope'], "state": state} url = oauth_data['authorizaton_uri']+urllib.parse.urlencode(params) return HttpResponseRedirect(url) # This is token refresh. Looks for tokens in our database, then refreshes def imports_token_refresh(user, tokenname, refreshtokenname, expirydatename, oauth_data): r = Rower.objects.get(user=user) refreshtoken = getattr(r, refreshtokenname) # for Strava transition if not refreshtoken: # pragma: no cover refreshtoken = getattr(r, tokenname) res = imports_do_refresh_token(refreshtoken, oauth_data) access_token = res[0] expires_in = res[1] refresh_token = res[2] expirydatetime = timezone.now()+timedelta(seconds=expires_in) setattr(r, tokenname, access_token) if expirydatename is not None: setattr(r, expirydatename, expirydatetime) if refreshtokenname is not None: setattr(r, refreshtokenname, refresh_token) r.save() return access_token