Private
Public Access
1
0
Files
rowsandall/rowers/ownapistuff.py
Sander Roosendaal a547e851be more pep
2022-02-17 15:16:27 +01:00

288 lines
8.5 KiB
Python

# Interactions with Rowsandall.com API. Not fully complete.
# Python
import oauth2 as oauth
import cgi
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime, timedelta
import numpy as np
from dateutil import parser
import time
import math
from math import sin, cos, atan2, sqrt
import urllib
import rowers.c2stuff as c2stuff
# Django
from django.http import HttpResponseRedirect, HttpResponse, JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
# Project
# from .models import Profile
from rowingdata import rowingdata
import pandas as pd
from rowers.models import Rower, Workout
from rowsandall_app.settings import (
C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET,
STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET,
SPORTTRACKS_CLIENT_SECRET, SPORTTRACKS_CLIENT_ID, SPORTTRACKS_REDIRECT_URI)
TEST_CLIENT_ID = "1"
TEST_CLIENT_SECRET = "aapnootmies"
TEST_REDIRECT_URI = "http://localhost:8000/rowers/test_callback"
def custom_exception_handler(exc, message): # pragma: no cover
response = {
"errors": [
{
"code": str(exc),
"detail": message,
}
]
}
res = HttpResponse(message)
res.status_code = 401
res.json = json.dumps(response)
return res
def do_refresh_token(refreshtoken): # pragma: no cover
# client_auth = requests.auth.HTTPBasicAuth(
# TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "refresh_token",
"client_secret": TEST_CLIENT_SECRET,
"client_id": TEST_CLIENT_ID,
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
'Accept': 'application/json',
'Content-Type': 'application/json'}
url = "http://localhost:8000/rowers/o/token"
response = requests.post(url,
data=json.dumps(post_data),
headers=headers)
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
try:
refresh_token = token_json['refresh_token']
except KeyError:
refresh_token = refreshtoken
return [thetoken, expires_in, refresh_token]
def get_token(code): # pragma: no cover
# client_auth = requests.auth.HTTPBasicAuth(
# TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:8000/rowers/test_callback",
"client_secret": "aapnootmies",
"client_id": 1,
}
headers = {'Accept': 'application/json',
'Content-Type': 'application/json'}
url = "http://localhost:8000/rowers/o/token/"
response = requests.post(url,
data=json.dumps(post_data),
headers=headers)
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
refresh_token = token_json['refresh_token']
return [thetoken, expires_in, refresh_token]
def make_authorization_url(request): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
from uuid import uuid4
state = str(uuid4())
params = {"client_id": TEST_CLIENT_ID,
"response_type": "code",
"redirect_uri": TEST_REDIRECT_URI,
"scope": "write",
"state": state}
url = "http://localhost:8000/rowers/o/authorize" + \
urllib.parse.urlencode(params)
return HttpResponseRedirect(url)
def rower_ownapi_token_refresh(user): # pragma: no cover
r = Rower.objects.get(user=user)
res = do_refresh_token(r.ownapirefreshtoken)
access_token = res[0]
expires_in = res[1]
refresh_token = res[2]
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
r = Rower.objects.get(user=user)
r.ownapitoken = access_token
r.tokenexpirydate = expirydatetime
r.ownapirefreshtoken = refresh_token
r.save()
return r.ownapitoken
def get_ownapi_workout_list(user): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401, s)
elif (timezone.now() > r.ownapitokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401, s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.ownapitoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ownapi.mobi/api/v2/fitnessActivities"
s = requests.get(url, headers=headers)
return s
def get_ownapi_workout(user, ownapiid): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401, s)
elif (timezone.now() > r.ownapitokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401, s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.ownapitoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ownapi.mobi/api/v2/fitnessActivities/"+str(ownapiid)
s = requests.get(url, headers=headers)
return s
def createownapiworkoutdata(w): # pragma: no cover
filename = w.csvfilename
row = rowingdata(csvfile=filename)
averagehr = int(row.df[' HRCur (bpm)'].mean())
maxhr = int(row.df[' HRCur (bpm)'].max())
# adding diff, trying to see if this is valid
t = row.df.ix[:, 'TimeStamp (sec)'].values - \
10*row.df.ix[0, 'TimeStamp (sec)']
t[0] = t[1]
d = row.df.ix[:, 'cum_dist'].values
d[0] = d[1]
t = t.astype(int)
d = d.astype(int)
spm = row.df[' Cadence (stokes/min)'].astype(int)
spm[0] = spm[1]
hr = row.df[' HRCur (bpm)'].astype(int)
haslatlon = 1
try:
lat = row.df[' latitude'].values
lon = row.df[' longitude'].values
except KeyError:
haslatlon = 0
haspower = 1
try:
power = row.df[' Power (watts)'].values
except KeyError:
haspower = 0
locdata = []
hrdata = []
spmdata = []
distancedata = []
powerdata = []
for i in range(len(t)):
hrdata.append(t[i])
hrdata.append(hr[i])
distancedata.append(t[i])
distancedata.append(d[i])
spmdata.append(t[i])
spmdata.append(spm[i])
if haslatlon:
locdata.append(t[i])
locdata.append([lat[i], lon[i]])
if haspower:
powerdata.append(t[i])
powerdata.append(power[i])
if haslatlon:
data = {
"type": "Rowing",
"name": w.name,
# "start_time": str(w.date)+"T"+str(w.starttime)+"Z",
"start_time": w.startdatetime.isoformat(),
"total_distance": int(w.distance),
"duration": int(max(t)),
"notes": w.notes,
"avg_heartrate": averagehr,
"max_heartrate": maxhr,
"location": locdata,
"distance": distancedata,
"cadence": spmdata,
"heartrate": hrdata,
}
else:
data = {
"type": "Rowing",
"name": w.name,
# "start_time": str(w.date)+"T"+str(w.starttime)+"Z",
"start_time": w.startdatetime.isoformat(),
"total_distance": int(w.distance),
"duration": int(max(t)),
"notes": w.notes,
"avg_heartrate": averagehr,
"max_heartrate": maxhr,
"distance": distancedata,
"cadence": spmdata,
"heartrate": hrdata,
}
if haspower:
data['power'] = powerdata
return data
def getidfromresponse(response): # pragma: no cover
t = json.loads(response.text)
uri = t['uris'][0]
id = uri[len(uri)-13:len(uri)-5]
return int(id)