Private
Public Access
1
0
Files
rowsandall/rowers/integrations/polar.py
Sander Roosendaal 88c0e90dd8 fix polar
2023-02-22 07:24:46 +01:00

522 lines
18 KiB
Python

from rowers.rower_rules import ispromember
from .integrations import SyncIntegration
from rowers.models import User, Rower, Workout
from rowsandall_app.settings import (
POLAR_CLIENT_ID, POLAR_REDIRECT_URI, POLAR_CLIENT_SECRET, UPLOAD_SERVICE_URL
)
import urllib
import requests
from rowers.utils import dologging, myqueue, NoTokenError
from django.utils import timezone
from uuid import uuid4
import base64
from rowers.tasks import handle_request_post
from json.decoder import JSONDecodeError
from rowers.opaque import encoder
import rowers.mytypes as mytypes
from django.conf import settings
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
baseurl = 'https://polaraccesslink.com/v3'
class PolarIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
if args[0] is not None:
super(PolarIntegration, self).__init__(*args, **kwargs)
def get_notifications(self):
url = baseurl+'/notifications'
# state = str(uuid4())
auth_string = '{id}:{secret}'.format(
id=POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = {'Authorization': 'Basic %s' % base64.b64encode(auth_string)}
except TypeError:
headers = {'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string, 'utf-8')).decode('utf-8')}
try:
response = requests.get(url, headers=headers)
except ConnectionError: # pragma: no cover
response = {
'status_code': 400,
}
available_data = []
try:
if response.status_code == 200:
available_data = response.json()['available-user-data']
dologging('polar.log', available_data)
else: # pragma: no cover
dologging('polar.log', response.status_code)
dologging('polar.log', response.text)
except AttributeError: # pragma: no cover
try:
dologging('polar.log', response.text)
except AttributeError:
pass
pass
return available_data
def get_name(self):
return "Polar Flow"
def get_shortname(self):
raise "polar"
def createworkoutdata(self, w, *args, **kwargs):
raise NotImplementedError
def workout_export(self, workout, *args, **kwargs) -> str:
raise NotImplementedError
def revoke_access(self): # pragma: no cover
user = self.user
headers = {
'Authorization': 'Bearer {token}'.format(token=user.rower.polartoken)
}
response = requests.delete('https://www.polaraccesslink.com/v3/users/{userid}'.format(
userid=user.rower.polaruserid
), headers=headers)
dologging('polar.log', response.text)
dologging('polar.log', response.reason)
return 1
def get_polar_workouts(self, user):
r = Rower.objects.get(user=user)
exercise_list = []
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return []
elif (timezone.now() > r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
dologging('polar.log', s)
return []
authorizationstring = str('Bearer ' + r.polartoken)
headers = {'Authorization': authorizationstring,
'Accept': 'application/json'}
headers2 = {
'Authorization': authorizationstring,
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid=r.polaruserid
)
response = requests.post(url, headers=headers)
dologging('polar.log', url)
dologging('polar.log', authorizationstring)
dologging('polar.log', str(response.status_code))
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid=transactionid,
userid=r.polaruserid
)
dologging('polar.log', url)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
dologging('polar.log', exerciseurls)
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl, headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
tcxuri = exerciseurl+'/tcx'
response = requests.get(tcxuri, headers=headers2)
if response.status_code == 200:
filename = 'media/mailbox_attachments/{code}_{id}.tcx'.format(
id=exercise_dict['id'],
code=uuid4().hex[:16]
)
dologging('polar.log', filename)
with open(filename, 'wb') as fop:
fop.write(response.content)
workouttype = 'other'
try:
workouttype = mytypes.polaraccesslink_sports[
exercise_dict['detailed-sport-info']]
except KeyError: # pragma: no cover
dologging(
'polar.log', exercise_dict['detailed-sport-info'])
dologging('polar.log', workouttype)
try:
workouttype = mytypes.polarmappinginv[exercise_dict['sport'].lower(
)]
except KeyError:
dologging('polar.log', workouttype)
pass
dologging('polar.log', workouttype)
# post file to upload api
# TODO: add workouttype
uploadoptions = {
'title': '',
'workouttype': workouttype,
'boattype': '1x',
'user': user.id,
'secret': settings.UPLOAD_SERVICE_SECRET,
'file': filename,
'title': '',
}
url = settings.UPLOAD_SERVICE_URL
dologging('polar.log', uploadoptions)
dologging('polar.log', url)
_ = myqueue(
queuehigh,
handle_request_post,
url,
uploadoptions
)
dologging('polar.log', response.status_code)
if response.status_code != 200: # pragma: no cover
try:
dologging('polar.log', response.text)
except:
pass
try:
dologging('polar.log', response.json())
except:
pass
exercise_dict['filename'] = filename
else: # pragma: no cover
exercise_dict['filename'] = ''
exercise_list.append(exercise_dict)
dologging('polar.log', str(exercise_dict))
# commit transaction
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid=transactionid,
userid=r.polaruserid
)
requests.put(url, headers=headers)
dologging(
'polar.log', 'Committed transation at {url}'.format(url=url))
return exercise_list
def get_workouts(self, *args, **kwargs) -> int:
available_data = self.get_notifications()
polaruserid = self.rower.polaruserid
for record in available_data:
dologging('polar.log', str(record))
if record['data-type'] == 'EXERCISE':
try:
r = Rower.objects.get(polaruserid=record['user-id'])
u = r.user
if r.polar_auto_import and ispromember(u):
exercise_list = self.get_polar_workouts(u)
dologging('polar.log', exercise_list)
elif record['user-id'] == polaruserid:
exercise_list = self.get_polar_workouts(u)
except Rower.DoesNotExist: # pragma: no cover
pass
return 1
def register_user(self, token, *args, **kwargs):
try:
_ = self.open()
except NoTokenError:
pass
authorizationstring = 'Bearer {token}'.format(token=token)
headers = {
'Content-Type': 'application/xml',
'Authorization': authorizationstring,
'Accept': 'application/json'
}
payload = {
"member-id": encoder.encode_hex(self.user.id)
}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {token}'.format(token=token)
}
dologging('polar.log', 'Registering user')
response = requests.post(
'https://www.polaraccesslink.com/v3/users',
json=payload,
headers=headers
)
if response.status_code not in [200, 201]: # pragma: no cover
# dologging('polar.log',url)
dologging('polar.log', headers)
dologging('polar.log', payload)
dologging('polar.log', response.status_code)
dologging('polar.log', response.content)
try:
dologging('polar.log', response.reason)
dologging('polar.log', response.text)
except KeyError:
pass
try:
jsondata = response.json()
if jsondata['error']['error_type'] == 'user_already_registered':
return jsondata
except:
pass
return {}
polar_user_data = response.json()
return polar_user_data
def get_polar_user_info(self, physical=False): # pragma: no cover
r = self.rower
_ = self.open()
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization': authorizationstring,
'Accept': 'application/json'
}
if not physical:
url = baseurl+'/users/{userid}'.format(
userid=r.polaruserid
)
else:
url = 'https://www.polaraccesslink.com/v3/users/{userid}/physical-information-transactions/'.format(
userid=r.polaruserid
)
if physical:
response = requests.post(url, headers=headers)
else:
response = requests.get(url, headers=headers)
return response
def get_workout(self, id, transaction_id, *args, **kwargs) -> int:
r = self.rower
_ = self.open()
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization': authorizationstring,
'Accept': 'application/json'
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid=r.polaruserid
)
response = requests.post(url, headers=headers)
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid=transactionid,
userid=r.polaruserid
)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl, headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
thisid = exercise_dict['id']
if thisid == id:
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}' \
'/exercises/{exerciseid}/tcx'.format(
userid=r.polaruserid,
transactionid=transactionid,
exerciseid=id)
authorizationstring = str('Bearer ' + r.polartoken)
headers2 = {
'Authorization': authorizationstring,
}
response = requests.get(url, headers=headers2)
if response.status_code == 200:
result = response.content
# commit transaction
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid=transactionid,
userid=r.polaruserid
)
response = requests.put(url, headers=headers)
dologging(
'polar.log', 'Committing transaction on {url}'.format(url=url))
else: # pragma: no cover
result = None
return result
def get_workout_list(self, *args, **kwargs) -> list:
exercises = self.get_polar_workouts(self.user)
workouts = []
try:
a = exercises.status_code
return []
except:
return []
for exercise in exercises:
try:
d = exercise['distance']
except KeyError:
d = 0
i = exercise['id']
transactionid = exercise['transaction-id']
starttime = exercise['start-time']
rowtype = exercise['sport']
durationstring = exercise['duration']
duration = isodate.parse_duration(durationstring)
keys = ['id', 'distance', 'duration',
'starttime', 'rowtype', 'source', 'name', 'new']
values = [i, d, duration, starttime, rowtype, transactionid, '', '']
res = dict(zip(keys, values))
workouts.append(res)
return workouts
def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover
state = str(uuid4())
params = {"client_id": POLAR_CLIENT_ID,
"response_type": "code",
# "redirect_uri": POLAR_REDIRECT_URI,
"state": state,
# "scope":"accesslink.read_all"
}
url = "https://flow.polar.com/oauth2/authorization?" + \
urllib.parse.urlencode(params)
dologging('polar.log', 'Authorizing')
dologging('polar.log', url)
dologging('polar.log', params)
return url
def get_token(self, code, *args, **kwargs) -> (str, int, str):
post_data = {"grant_type": "authorization_code",
"code": code,
# "redirect_uri": POLAR_REDIRECT_URI,
}
auth_string = '{id}:{secret}'.format(
id=POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = {'Authorization': 'Basic %s' % base64.b64encode(auth_string)}
except TypeError:
headers = {'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string, 'utf-8')).decode('utf-8')}
dologging('polar.log', 'Getting token')
dologging('polar.log', post_data)
dologging('polar.log', auth_string)
response = requests.post("https://polarremote.com/v2/oauth2/token",
data=post_data,
headers=headers)
if response.status_code != 200: # pragma: no cover
dologging('polar.log', 'Getting token, got:')
dologging('polar.log', response.status_code)
dologging('polar.log', response.reason)
dologging('polar.log', response.text)
try:
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
user_id = token_json['x_user_id']
dologging('polar.log', response.status_code)
try:
dologging('polar.log', response.text)
except AttributeError:
pass
dologging('polar.log', token_json)
except (KeyError, JSONDecodeError) as e: # pragma: no cover
dologging('polar.log', e)
try:
dologging('polar.log', response.text)
except AttributeError:
pass
thetoken = 0
expires_in = 0
user_id = 0
return [thetoken, expires_in, user_id]
def open(self, *args, **kwargs) -> str:
r = self.rower
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
raise NoTokenError(s)
elif (timezone.now() > r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
raise NoTokenError(s)
token = self.rower.polartoken
return token
def token_refresh(self, *args, **kwargs) -> str:
raise NotImplementedError