Private
Public Access
1
0
Files
rowsandall/rowers/underarmourstuff.py
2017-09-24 17:45:59 +02:00

459 lines
13 KiB
Python

# All the functionality needed to connect to Runkeeper
# Python
import oauth2 as oauth
import cgi
import pytz
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime
import numpy as np
from dateutil import parser
import time
import math
from math import sin,cos,atan2,sqrt
import os,sys
import urllib
# Django
from django.shortcuts import render_to_response
from django.http import HttpResponseRedirect, HttpResponse,JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
# Project
# from .models import Profile
from rowingdata import rowingdata
import pandas as pd
from rowers.models import Rower,Workout,checkworkoutuser
from rowsandall_app.settings import (
C2_CLIENT_ID, C2_REDIRECT_URI, C2_CLIENT_SECRET,
STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET,
UNDERARMOUR_CLIENT_ID, UNDERARMOUR_CLIENT_SECRET,
UNDERARMOUR_REDIRECT_URI,UNDERARMOUR_CLIENT_KEY,
)
# Custom error class - to raise a NoTokenError
class UnderArmourNoTokenError(Exception):
def __init__(self,value):
self.value=value
def __str__(self):
return repr(self.value)
# Exponentially weighted moving average
# Used for data smoothing of the jagged data obtained by Strava
# See bitbucket issue 72
def ewmovingaverage(interval,window_size):
# Experimental code using Exponential Weighted moving average
try:
intervaldf = pd.DataFrame({'v':interval})
idf_ewma1 = intervaldf.ewm(span=window_size)
idf_ewma2 = intervaldf[::-1].ewm(span=window_size)
i_ewma1 = idf_ewma1.mean().ix[:,'v']
i_ewma2 = idf_ewma2.mean().ix[:,'v']
interval2 = np.vstack((i_ewma1,i_ewma2[::-1]))
interval2 = np.mean( interval2, axis=0) # average
except ValueError:
interval2 = interval
return interval2
from utils import geo_distance
# Custom exception handler, returns a 401 HTTP message
# with exception details in the json data
def custom_exception_handler(exc,message):
response = {
"errors": [
{
"code": str(exc),
"detail": message,
}
]
}
res = HttpResponse(message)
res.status_code = 401
res.json = json.dumps(response)
return res
# Checks if user has UnderArmour token, renews them if they are expired
def underarmour_open(user):
r = Rower.objects.get(user=user)
if (r.underarmourtoken == '') or (r.underarmourtoken is None):
s = "Token doesn't exist. Need to authorize"
raise UnderArmourNoTokenError("User has no token")
else:
if (timezone.now()>r.underarmourtokenexpirydate):
thetoken = rower_underarmour_token_refresh(user)
else:
thetoken = r.underarmourtoken
return thetoken
# Refresh ST token using refresh token
def do_refresh_token(refreshtoken,access_token):
client_auth = requests.auth.HTTPBasicAuth(UNDERARMOUR_CLIENT_KEY, UNDERARMOUR_CLIENT_SECRET)
post_data = {"grant_type": "refresh_token",
"client_secret": UNDERARMOUR_CLIENT_SECRET,
"client_id":UNDERARMOUR_CLIENT_KEY,
"refresh_token": refreshtoken,
}
headers = {'user-agent': 'sanderroosendaal',
"Api-Key":UNDERARMOUR_CLIENT_KEY,
'Accept': 'application/json',
'Content-Type': 'application/x-www-form-urlencoded',
'authorization': 'Bearer %s' % access_token}
url = "https://api.ua.com/v7.1/oauth2/access_token/"
response = requests.post(url,
data=post_data,
headers=headers)
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
try:
refresh_token = token_json['refresh_token']
except KeyError:
refresh_token = refreshtoken
return [thetoken,expires_in,refresh_token]
# Exchange access code for long-lived access token
def get_token(code):
client_auth = requests.auth.HTTPBasicAuth(UNDERARMOUR_CLIENT_KEY, UNDERARMOUR_CLIENT_SECRET)
post_data = {
"grant_type": "authorization_code",
"code": code,
"client_secret": UNDERARMOUR_CLIENT_SECRET,
"client_id":UNDERARMOUR_CLIENT_KEY,
}
headers = {
'user-agent': 'sanderroosendaal',
"Api-Key":UNDERARMOUR_CLIENT_KEY,
}
response = requests.post("https://api.ua.com/v7.1/oauth2/access_token/",
data=post_data,
headers=headers)
try:
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
refresh_token = token_json['refresh_token']
except KeyError:
thetoken = 0
expires_in = 30
refresh_token = ''
return thetoken,expires_in,refresh_token
# Make authorization URL including random string
def make_authorization_url(request):
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
from uuid import uuid4
state = str(uuid4())
params = {"client_id": UNDERARMOUR_CLIENT_KEY,
"response_type": "code",
"redirect_uri": UNDERARMOUR_REDIRECT_URI,
}
url = "https://www.mapmyfitness.com/v7.1/oauth2/uacf/authorize/" +urllib.urlencode(params)
return HttpResponseRedirect(url)
# Get list of workouts available on Underarmour
def get_underarmour_workout_list(user):
r = Rower.objects.get(user=user)
if (r.underarmourtoken == '') or (r.underarmourtoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.underarmourtoken)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ua.com/v7.1/workout/?user="+str(get_userid(r.underarmourtoken))
s = requests.get(url,headers=headers)
return s
# Get workout summary data by Underarmour ID
def get_underarmour_workout(user,underarmourid):
r = Rower.objects.get(user=user)
if (r.underarmourtoken == '') or (r.underarmourtoken is None):
return custom_exception_handler(401,s)
s = "Token doesn't exist. Need to authorize"
else:
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + r.underarmourtoken)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ua.com/v7.1/workout/"+str(underarmourid)+"/?field_set=time_series"
s = requests.get(url,headers=headers)
return s
# Create Workout Data for upload to Underarmour
def createunderarmourworkoutdata(w):
filename = w.csvfilename
try:
row = rowingdata(filename)
except:
return 0
averagehr = int(row.df[' HRCur (bpm)'].mean())
maxhr = int(row.df[' HRCur (bpm)'].max())
duration = w.duration.hour*3600
duration += w.duration.minute*60
duration += w.duration.second
duration += +1.0e-6*w.duration.microsecond
name = w.name
try:
notes = w.notes+'\n from '+w.workoutsource+' via rowsandall.com'
except TypeError:
notes = 'from '+w.workoutsource+' via rowsandall.com'
# adding diff, trying to see if this is valid
#t = row.df.ix[:,'TimeStamp (sec)'].values-10*row.df.ix[0,'TimeStamp (sec)']
t = row.df.ix[:,'TimeStamp (sec)'].values-row.df.ix[0,'TimeStamp (sec)']
t[0] = t[1]
d = row.df.ix[:,'cum_dist'].values
d[0] = d[1]
t = t.astype(int)
d = d.astype(int)
spm = row.df[' Cadence (stokes/min)'].astype(int)
spm[0] = spm[1]
hr = row.df[' HRCur (bpm)'].astype(int)
haslatlon=1
try:
lat = row.df[' latitude'].values
lon = row.df[' longitude'].values
if not lat.std() and not lon.std():
haslatlon = 0
except KeyError:
haslatlon = 0
# path data
if haslatlon:
locdata = []
for e in zip(t,lat,lon):
point = {
'lat':e[1],
'lng':e[2],
'elevation':0,
}
locdata.append([e[0],point])
hrdata = []
for e in zip(t,hr):
point = [e[0],
e[1]
]
hrdata.append(point)
distancedata = []
for e in zip(t,d):
point = [e[0],
e[1]
]
distancedata.append(point)
spmdata = []
for e in zip(t,spm):
spmdata.append([e[0],e[1]])
st = w.startdatetime.astimezone(pytz.timezone(w.timezone))
start_time = st.isoformat()
timeseries = {
"distance": distancedata,
"heartrate": hrdata,
"cadence": spmdata,
}
aggregrates = {
"elapsed_time_total": int(duration),
"distance_total": int(max(d)),
"heartrate_avg": averagehr,
"heart_rate_min": int(min(hr)),
"heart_rate_max": int(max(hr)),
}
# if haslatlon:
# timeseries["position"] = locdata
data = {
"name": name,
"start_datetime": start_time,
"time_series": timeseries,
"start_locale_timezone": "Etc/UTC",
"activity_type": "/v7.1/activity_type/128/",
"notes": notes,
}
return data
# Obtain Underarmour Workout ID and activity type
def get_idfromuri(user,links):
id = links['self'][0]['id']
typeid = links['activity_type'][0]['id']
typename = get_typefromid(typeid,user)
return id,typename
def getidfromresponse(response):
t = json.loads(response.text)
links = t["_links"]
id = links["self"][0]["id"]
return int(id)
def refresh_ua_actlist(user):
r = Rower.objects.get(user=user)
authorizationstring = str('Bearer ' + r.underarmourtoken)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ua.com/v7.1/activity_type/"
response = requests.get(url,headers=headers)
me_json = response.json()
types = me_json["_embedded"]["activity_types"]
w = {int(t["_links"]["self"][0]["id"]):t["name"] for t in types}
wdf = pd.Series(w,name='Name')
wdf.to_csv('static/rigging/ua2.csv',index_label='id',header=True)
return w
try:
activities = pd.read_csv('static/rigging/ua2.csv',index_col='id')
actdict = activities.to_dict()['Name']
except:
actdict = {}
def get_typefromid(typeid,user):
r = Rower.objects.get(user=user)
try:
res = actdict[int(typeid)]
except KeyError:
authorizationstring = str('Bearer ' + r.underarmourtoken)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ua.com/v7.1/activity_type/"+str(typeid)
response = requests.get(url,headers=headers)
me_json = response.json()
try:
res = me_json['name']
except KeyError:
res = 0
return res
# Get user id, having access token
# Handy for checking if the API access is working
def get_userid(access_token):
authorizationstring = str('Bearer ' + access_token)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.ua.com/v7.1/user/self"
response = requests.get(url,headers=headers)
me_json = response.json()
try:
res = me_json['id']
except KeyError:
res = 0
return res
def workout_ua_upload(user,w):
message = "Uploading to MapMyFitness"
uaid = 0
r = w.user
thetoken = underarmour_open(r.user)
# ready to upload. Hurray
if (checkworkoutuser(user,w)):
data = createunderarmourworkoutdata(w)
# return HttpResponse(json.dumps(data))
if not data:
message = "Data error"
uaid = 0
return message, uaid
authorizationstring = str('Bearer ' + thetoken)
headers = {'Authorization': authorizationstring,
'Api-Key': UNDERARMOUR_CLIENT_KEY,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json',
}
url = "https://api.ua.com/v7.1/workout/"
response = requests.post(url,headers=headers,data=json.dumps(data))
# check for duplicate error first
if (response.status_code == 409 ):
message = "Duplicate error"
w.uploadedtounderarmour = -1
uaid = -1
w.save()
elif (response.status_code == 201 or response.status_code==200):
uaid = getidfromresponse(response)
w.uploadedtounderarmour = uaid
w.save()
return 'Successfully synchronized with MapMyFitness',uaid
else:
s = response
message = "Something went wrong in workout_underarmour_upload_view: %s - %s" % (s.reason,s.text)
uaid = 0
return message, uaid
else:
message = "You are not authorized to upload this workout"
uaid = 0
return message, uaid
return message, uaid