Private
Public Access
1
0
Files
rowsandall/rowers/ownapistuff.py
2019-02-25 16:47:50 +01:00

288 lines
8.5 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import unicode_literals, absolute_import
# Interactions with Rowsandall.com API. Not fully complete.
# Python
import oauth2 as oauth
import cgi
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime
import numpy as np
from dateutil import parser
import time
import math
from math import sin,cos,atan2,sqrt
import urllib
import rowers.c2stuff as c2stuff
# Django
from django.shortcuts import render_to_response
from django.http import HttpResponseRedirect, HttpResponse,JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
# Project
# from .models import Profile
from rowingdata import rowingdata
import pandas as pd
from rowers.models import Rower,Workout
from rowsandall_app.settings import C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET, STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET, SPORTTRACKS_CLIENT_SECRET, SPORTTRACKS_CLIENT_ID, SPORTTRACKS_REDIRECT_URI
TEST_CLIENT_ID = "1"
TEST_CLIENT_SECRET = "aapnootmies"
TEST_REDIRECT_URI = "http://localhost:8000/rowers/test_callback"
def custom_exception_handler(exc,message):
response = {
"errors": [
{
"code": str(exc),
"detail": message,
}
]
}
res = HttpResponse(message)
res.status_code = 401
res.json = json.dumps(response)
return res
def do_refresh_token(refreshtoken):
client_auth = requests.auth.HTTPBasicAuth(TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "refresh_token",
"client_secret": TEST_CLIENT_SECRET,
"client_id":TEST_CLIENT_ID,
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
'Accept': 'application/json',
'Content-Type': 'application/json'}
url = "http://localhost:8000/rowers/o/token"
response = requests.post(url,
data=json.dumps(post_data),
headers=headers)
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
try:
refresh_token = token_json['refresh_token']
except KeyError:
refresh_token = refreshtoken
return [thetoken,expires_in,refresh_token]
def get_token(code):
client_auth = requests.auth.HTTPBasicAuth(TEST_CLIENT_ID, TEST_CLIENT_SECRET)
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:8000/rowers/test_callback",
"client_secret": "aapnootmies",
"client_id":1,
}
headers = {'Accept': 'application/json',
'Content-Type': 'application/json'}
url = "http://localhost:8000/rowers/o/token/"
response = requests.post(url,
data=json.dumps(post_data),
headers=headers)
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
refresh_token = token_json['refresh_token']
return [thetoken,expires_in,refresh_token]
def make_authorization_url(request):
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
from uuid import uuid4
state = str(uuid4())
params = {"client_id": TEST_CLIENT_ID,
"response_type": "code",
"redirect_uri": TEST_REDIRECT_URI,
"scope":"write",
"state":state}
import urllib
url = "http://localhost:8000/rowers/o/authorize" +urllib.urlencode(params)
return HttpResponseRedirect(url)
def rower_ownapi_token_refresh(user):
r = Rower.objects.get(user=user)
res = do_refresh_token(r.ownapirefreshtoken)
access_token = res[0]
expires_in = res[1]
refresh_token = res[2]
expirydatetime = timezone.now()+timedelta(seconds=expires_in)
r = Rower.objects.get(user=user)
r.ownapitoken = access_token
r.tokenexpirydate = expirydatetime
r.ownapirefreshtoken = refresh_token
r.save()
return r.ownapitoken
def get_ownapi_workout_list(user):
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.ownapitokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.ownapitoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ownapi.mobi/api/v2/fitnessActivities"
s = requests.get(url,headers=headers)
return s
def get_ownapi_workout(user,ownapiid):
r = Rower.objects.get(user=user)
if (r.ownapitoken == '') or (r.ownapitoken is None):
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
elif (timezone.now()>r.ownapitokenexpirydate):
s = "Token expired. Needs to refresh."
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.ownapitoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ownapi.mobi/api/v2/fitnessActivities/"+str(ownapiid)
s = requests.get(url,headers=headers)
return s
def createownapiworkoutdata(w):
filename = w.csvfilename
row = rowingdata(filename)
averagehr = int(row.df[' HRCur (bpm)'].mean())
maxhr = int(row.df[' HRCur (bpm)'].max())
# adding diff, trying to see if this is valid
t = row.df.ix[:,'TimeStamp (sec)'].values-10*row.df.ix[0,'TimeStamp (sec)']
t[0] = t[1]
d = row.df.ix[:,'cum_dist'].values
d[0] = d[1]
t = t.astype(int)
d = d.astype(int)
spm = row.df[' Cadence (stokes/min)'].astype(int)
spm[0] = spm[1]
hr = row.df[' HRCur (bpm)'].astype(int)
haslatlon=1
try:
lat = row.df[' latitude'].values
lon = row.df[' longitude'].values
except KeyError:
haslatlon = 0
haspower = 1
try:
power = row.df[' Power (watts)'].values
except KeyError:
haspower = 0
locdata = []
hrdata = []
spmdata = []
distancedata = []
powerdata = []
for i in range(len(t)):
hrdata.append(t[i])
hrdata.append(hr[i])
distancedata.append(t[i])
distancedata.append(d[i])
spmdata.append(t[i])
spmdata.append(spm[i])
if haslatlon:
locdata.append(t[i])
locdata.append([lat[i],lon[i]])
if haspower:
powerdata.append(t[i])
powerdata.append(power[i])
if haslatlon:
data = {
"type": "Rowing",
"name": w.name,
# "start_time": str(w.date)+"T"+str(w.starttime)+"Z",
"start_time": w.startdatetime.isoformat(),
"total_distance": int(w.distance),
"duration": int(max(t)),
"notes": w.notes,
"avg_heartrate": averagehr,
"max_heartrate": maxhr,
"location": locdata,
"distance": distancedata,
"cadence": spmdata,
"heartrate": hrdata,
}
else:
data = {
"type": "Rowing",
"name": w.name,
# "start_time": str(w.date)+"T"+str(w.starttime)+"Z",
"start_time": w.startdatetime.isoformat(),
"total_distance": int(w.distance),
"duration": int(max(t)),
"notes": w.notes,
"avg_heartrate": averagehr,
"max_heartrate": maxhr,
"distance": distancedata,
"cadence": spmdata,
"heartrate": hrdata,
}
if haspower:
data['power'] = powerdata
return data
def getidfromresponse(response):
t = json.loads(response.text)
uri = t['uris'][0]
id = uri[len(uri)-13:len(uri)-5]
return int(id)