Private
Public Access
1
0
Files
rowsandall/rowers/polarstuff.py
2022-01-22 15:02:37 +01:00

516 lines
17 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import unicode_literals, absolute_import
# All the functionality needed to connect to Strava
# Python
import oauth2 as oauth
import cgi
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime
import numpy as np
from dateutil import parser
import time
import math
from math import sin,cos,atan2,sqrt
import os,sys
import gzip
import base64
import yaml
from uuid import uuid4
from requests import ConnectionError
from json.decoder import JSONDecodeError
# Django
from django.http import HttpResponseRedirect, HttpResponse,JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
from django.urls import reverse, reverse_lazy
from rowers.utils import myqueue
from rowers.opaque import encoder
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
# Project
# from .models import Profile
from rowingdata import rowingdata
import pandas as pd
from rowers.models import Rower,Workout
from rowers.tasks import handle_request_post
import rowers.dataprep as dataprep
from rowers.dataprep import columndict
from io import StringIO
import stravalib
from stravalib.exc import ActivityUploadFailed,TimeoutExceeded
from rowsandall_app.settings import (
POLAR_CLIENT_ID, POLAR_REDIRECT_URI, POLAR_CLIENT_SECRET,UPLOAD_SERVICE_URL
)
from rowers.utils import dologging
#baseurl = 'https://polaraccesslink.com/v3-example'
baseurl = 'https://polaraccesslink.com/v3'
from rowers.utils import NoTokenError, custom_exception_handler
import rowers.mytypes as mytypes
# Exchange access code for long-lived access token
def get_token(code):
post_data = {"grant_type": "authorization_code",
"code": code,
#"redirect_uri": POLAR_REDIRECT_URI,
}
auth_string = '{id}:{secret}'.format(
id= POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(auth_string) }
except TypeError:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string,'utf-8')).decode('utf-8') }
dologging('polar.log','Getting token')
dologging('polar.log',post_data)
dologging('polar.log',auth_string)
response = requests.post("https://polarremote.com/v2/oauth2/token",
data=post_data,
headers=headers)
if response.status_code != 200: # pragma: no cover
dologging('polar.log','Getting token, got:')
dologging('polar.log',response.status_code)
dologging('polar.log',response.reason)
dologging('polar.log',response.text)
try:
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
user_id = token_json['x_user_id']
dologging('polar.log',response.status_code)
try:
dologging('polar.log',response.text)
except AttributeError:
pass
dologging('polar.log',token_json)
except (KeyError,JSONDecodeError) as e: # pragma: no cover
dologging('polar.log',e)
try:
dologging('polar.log',response.text)
except AttributeError:
pass
thetoken = 0
expires_in = 0
user_id = 0
return [thetoken,expires_in,user_id]
# Make authorization URL including random string
def make_authorization_url(): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
params = {"client_id": POLAR_CLIENT_ID,
"response_type": "code",
"redirect_uri": POLAR_REDIRECT_URI,
"scope":"write"}
import urllib
url = "https://flow.polar.com/oauth2/authorization" +urllib.parse.urlencode(params)
return HttpResponseRedirect(url)
def revoke_access(user): # pragma: no cover
headers = {
'Authorization': 'Bearer {token}'.format(token=user.rower.polartoken)
}
response = requests.delete('https://www.polaraccesslink.com/v3/users/{userid}'.format(
userid = user.rower.polaruserid
), headers = headers)
dologging('polar.log',response.text)
dologging('polar.log',response.reason)
return 1
def get_polar_notifications():
url = baseurl+'/notifications'
state = str(uuid4())
auth_string = '{id}:{secret}'.format(
id= POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(auth_string) }
except TypeError:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string,'utf-8')).decode('utf-8') }
try:
response = requests.get(url, headers=headers)
except ConnectionError: # pragma: no cover
response = {
'status_code':400,
}
available_data = []
try:
if response.status_code == 200:
available_data = response.json()['available-user-data']
dologging('polar.log',available_data)
else: # pragma: no cover
dologging('polar.log',response.status_code)
dologging('polar.log',response.text)
except AttributeError: # pragma: no cover
try:
dologging('polar.log',response.text)
except AttributeError:
pass
pass
return available_data
from rowers.rower_rules import ispromember
def get_all_new_workouts(available_data,testing=False):
for record in available_data:
dologging('polar.log',str(record))
if testing: # pragma: no cover
print(record)
if record['data-type'] == 'EXERCISE':
try:
r = Rower.objects.get(polaruserid=record['user-id'])
u = r.user
if r.polar_auto_import and ispromember(u):
exercise_list = get_polar_workouts(u)
dologging('polar.log',exercise_list)
if testing: # pragma: no cover
print(exercise_list)
except Rower.DoesNotExist: # pragma: no cover
pass
return 1
def get_polar_workouts(user):
r = Rower.objects.get(user=user)
exercise_list = []
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
dologging('polar.log',s)
return custom_exception_handler(401,s)
else:
authorizationstring = str('Bearer ' + r.polartoken)
headers = {'Authorization':authorizationstring,
'Accept': 'application/json'}
headers2 = {
'Authorization':authorizationstring,
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid = r.polaruserid
)
response = requests.post(url, headers=headers)
dologging('polar.log',url)
dologging('polar.log',authorizationstring)
dologging('polar.log',str(response.status_code))
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
dologging('polar.log',url)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
dologging('polar.log',exerciseurls)
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl,headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
tcxuri = exerciseurl+'/tcx'
response = requests.get(tcxuri,headers=headers2)
if response.status_code == 200:
filename = 'media/mailbox_attachments/{code}_{id}.tcx'.format(
id = exercise_dict['id'],
code = uuid4().hex[:16]
)
dologging('polar.log',filename)
with open(filename,'wb') as fop:
fop.write(response.content)
workouttype = 'other'
try:
workouttype = mytypes.polaraccesslink_sports[exercise_dict['detailed-sport-info']]
except KeyError: # pragma: no cover
dologging('polar.log',exercise_dict['detailed-sport-info'])
dologging('polar.log',workouttype)
try:
workouttype = mytypes.polarmappinginv[exercise_dict['sport'].lower()]
except KeyError:
dologging('polar.log',workouttype)
pass
dologging('polar.log',workouttype)
# post file to upload api
# TODO: add workouttype
uploadoptions = {
'title':'',
'workouttype':workouttype,
'boattype':'1x',
'user':user.id,
'secret':settings.UPLOAD_SERVICE_SECRET,
'file':filename,
'title': '',
}
#session = requests.session()
#newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'}
#session.headers.update(newHeaders)
url = settings.UPLOAD_SERVICE_URL
dologging('polar.log',uploadoptions)
dologging('polar.log',url)
#response = session.post(url,json=uploadoptions)
job = myqueue(
queuehigh,
handle_request_post,
url,
uploadoptions
)
dologging('polar.log',response.status_code)
if response.status_code != 200: # pragma: no cover
try:
dologging('polar.log',response.text)
except:
pass
try:
dologging('polar.log', response.json())
except:
pass
exercise_dict['filename'] = filename
else: # pragma: no cover
exercise_dict['filename'] = ''
exercise_list.append(exercise_dict)
dologging('polar.log',str(exercise_dict))
# commit transaction
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
requests.put(url, headers=headers)
dologging('polar.log','Committed transation at {url}'.format(url=url))
return exercise_list
def register_user(user, token):
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
authorizationstring = 'Bearer {token}'.format(token=token)
headers = {
'Content-Type': 'application/xml',
'Authorization':authorizationstring,
'Accept': 'application/json'
}
payload = {
"member-id": encoder.encode_hex(user.id)
}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {token}'.format(token=token)
}
dologging('polar.log','Registering user')
response = requests.post(
'https://www.polaraccesslink.com/v3/users',
json = payload,
headers = headers
)
#url = baseurl+'/users'
#response = requests.post(url,params=params,headers=headers)
if response.status_code not in [200,201]: # pragma: no cover
#dologging('polar.log',url)
dologging('polar.log',headers)
dologging('polar.log',payload)
dologging('polar.log',response.status_code)
dologging('polar.log',response.content)
try:
dologging('polar.log',response.reason)
dologging('polar.log',response.text)
except KeyError:
pass
return {}
polar_user_data = response.json()
return polar_user_data
def get_polar_user_info(user,physical=False): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
}
params = {
'user-id': r.polaruserid
}
if not physical:
url = baseurl+'/users/{userid}'.format(
userid = r.polaruserid
)
else:
url = 'https://www.polaraccesslink.com/v3/users/{userid}/physical-information-transactions/'.format(
userid = r.polaruserid
)
if physical:
response = requests.post(url, headers=headers)
else:
response = requests.get(url, headers=headers)
return response
def get_polar_workout(user,id,transactionid):
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
else:
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid = r.polaruserid
)
response = requests.post(url, headers=headers)
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl,headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
thisid = exercise_dict['id']
if thisid == id:
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}/exercises/{exerciseid}/tcx'.format(
userid = r.polaruserid,
transactionid = transactionid,
exerciseid = id
)
authorizationstring = str('Bearer ' + r.polartoken)
headers2 = {
'Authorization':authorizationstring,
}
response = requests.get(url,headers = headers2)
if response.status_code == 200:
result = response.content
# commit transaction
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
response = requests.put(url,headers=headers)
dologging('polar.log','Committing transaction on {url}'.format(url=url))
else: # pragma: no cover
result = None
return result
return None # pragma: no cover