Private
Public Access
1
0
Files
rowsandall/rowers/polarstuff.py
2022-01-16 16:13:50 +01:00

402 lines
14 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import unicode_literals, absolute_import
# All the functionality needed to connect to Strava
# Python
import oauth2 as oauth
import cgi
import requests
import requests.auth
import json
from django.utils import timezone
from datetime import datetime
import numpy as np
from dateutil import parser
import time
import math
from math import sin,cos,atan2,sqrt
import os,sys
import gzip
import base64
import yaml
from uuid import uuid4
from requests import ConnectionError
# Django
from django.http import HttpResponseRedirect, HttpResponse,JsonResponse
from django.conf import settings
from django.contrib.auth import authenticate, login, logout
from django.contrib.auth.models import User
from django.contrib.auth.decorators import login_required
from django.urls import reverse, reverse_lazy
from rowers.utils import myqueue
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
# Project
# from .models import Profile
from rowingdata import rowingdata
import pandas as pd
from rowers.models import Rower,Workout
from rowers.tasks import handle_request_post
import rowers.dataprep as dataprep
from rowers.dataprep import columndict
from io import StringIO
import stravalib
from stravalib.exc import ActivityUploadFailed,TimeoutExceeded
from rowsandall_app.settings import (
POLAR_CLIENT_ID, POLAR_REDIRECT_URI, POLAR_CLIENT_SECRET,UPLOAD_SERVICE_URL
)
from rowers.utils import dologging
#baseurl = 'https://polaraccesslink.com/v3-example'
baseurl = 'https://polaraccesslink.com/v3'
from rowers.utils import NoTokenError, custom_exception_handler
import rowers.mytypes as mytypes
# Exchange access code for long-lived access token
def get_token(code): # pragma: no cover
post_data = {"grant_type": "authorization_code",
"code": code,
"redirect_uri": POLAR_REDIRECT_URI,
}
auth_string = '{id}:{secret}'.format(
id= POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(auth_string) }
except TypeError:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string,'utf-8')).decode('utf-8') }
response = requests.post("https://polarremote.com/v2/oauth2/token",
data=post_data,
headers=headers)
try:
token_json = response.json()
thetoken = token_json['access_token']
expires_in = token_json['expires_in']
user_id = token_json['x_user_id']
except (KeyError,JSONDecodeError):
thetoken = 0
expires_in = 0
user_id = 0
return [thetoken,expires_in,user_id]
# Make authorization URL including random string
def make_authorization_url(): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
params = {"client_id": POLAR_CLIENT_ID,
"response_type": "code",
"redirect_uri": POLAR_REDIRECT_URI,
"scope":"write"}
import urllib
url = "https://flow.polar.com/oauth2/authorization" +urllib.parse.urlencode(params)
return HttpResponseRedirect(url)
def get_polar_notifications(): # pragma: no cover
url = baseurl+'/notifications'
state = str(uuid4())
auth_string = '{id}:{secret}'.format(
id= POLAR_CLIENT_ID,
secret=POLAR_CLIENT_SECRET
)
try:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(auth_string) }
except TypeError:
headers = { 'Authorization': 'Basic %s' % base64.b64encode(
bytes(auth_string,'utf-8')).decode('utf-8') }
try:
response = requests.get(url, headers=headers)
except ConnectionError: # pragma: no cover
response = {
'status_code':400,
}
available_data = []
try:
if response.status_code == 200:
available_data = response.json()['available-user-data']
dologging('polar.log',available_data)
else:
dologging('polar.log',response.status_code)
dologging('polar.log',reponse.text)
except AttributeError:
dologging('polar.log',response.text)
pass
return available_data
from rowers.rower_rules import ispromember
def get_all_new_workouts(available_data,testing=False):
for record in available_data: # pragma: no cover
dologging('polar.log',str(record))
if testing:
print(record)
if record['data-type'] == 'EXERCISE':
try:
r = Rower.objects.get(polaruserid=record['user-id'])
u = r.user
if r.polar_auto_import and ispromember(u):
exercise_list = get_polar_workouts(u)
dologging('polar.log',exercise_list)
if testing:
print(exercise_list)
except Rower.DoesNotExist:
pass
return 1 # pragma: no cover
def get_polar_workouts(user):
r = Rower.objects.get(user=user)
exercise_list = []
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
dologging('polar.log',s)
return custom_exception_handler(401,s)
else: # pragma: no cover
authorizationstring = str('Bearer ' + r.polartoken)
headers = {'Authorization':authorizationstring,
'Accept': 'application/json'}
headers2 = {
'Authorization':authorizationstring,
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid = r.polaruserid
)
response = requests.post(url, headers=headers)
dologging('polar.log',url)
dologging('polar.log',authorizationstring)
dologging('polar.log',str(response.status_code))
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
dologging('polar.log',url)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
dologging('polar.log',exerciseurls)
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl,headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
tcxuri = exerciseurl+'/tcx'
response = requests.get(tcxuri,headers=headers2)
if response.status_code == 200:
filename = 'media/mailbox_attachments/{code}_{id}.tcx'.format(
id = exercise_dict['id'],
code = uuid4().hex[:16]
)
dologging('polar.log',filename)
with open(filename,'wb') as fop:
fop.write(response.content)
workouttype = 'water'
try:
workouttype = mytypes.polarmappinginv[exercise_dict['detailed-sport-info'].lower()]
except KeyError:
try:
workouttype = mytypes.polarmappinginv[exercise_dict['sport'].lower()]
except KeyError:
pass
# post file to upload api
# TODO: add workouttype
uploadoptions = {
'title':'',
'workouttype':workouttype,
'boattype':'1x',
'user':user.id,
'secret':settings.UPLOAD_SERVICE_SECRET,
'file':filename,
'title': '',
}
#session = requests.session()
#newHeaders = {'Content-type': 'application/json', 'Accept': 'text/plain'}
#session.headers.update(newHeaders)
url = settings.UPLOAD_SERVICE_URL
dologging('polar.log',uploadoptions)
dologging('polar.log',url)
#response = session.post(url,json=uploadoptions)
job = myqueue(
queuehigh,
handle_request_post,
url,
uploadoptions
)
dologging('polar.log',response.status_code)
if response.status_code != 200:
try:
dologging('polar.log',response.text)
except:
pass
try:
dologging('polar.log', response.json())
except:
pass
exercise_dict['filename'] = filename
else:
exercise_dict['filename'] = ''
exercise_list.append(exercise_dict)
dologging('polar.log',str(exercise_dict))
# commit transaction
#requests.put(url, headers=headers)
return exercise_list # pragma: no cover
def get_polar_user_info(user,physical=False): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
else:
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
}
params = {
'user-id': r.polaruserid
}
if not physical:
url = baseurl+'/users/{userid}'.format(
userid = r.polaruserid
)
else:
url = 'https://www.polaraccesslink.com/v3/users/{userid}/physical-information-transactions/'.format(
userid = r.polaruserid
)
if physical:
response = requests.post(url, headers=headers)
else:
response = requests.get(url, headers=headers)
return response
def get_polar_workout(user,id,transactionid): # pragma: no cover
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
else:
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
}
url = baseurl+'/users/{userid}/exercise-transactions'.format(
userid = r.polaruserid
)
response = requests.post(url, headers=headers)
if response.status_code == 201:
transactionid = response.json()['transaction-id']
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
userid = r.polaruserid
)
response = requests.get(url, headers=headers)
if response.status_code == 200:
exerciseurls = response.json()['exercises']
for exerciseurl in exerciseurls:
response = requests.get(exerciseurl,headers=headers)
if response.status_code == 200:
exercise_dict = response.json()
thisid = exercise_dict['id']
if thisid == id:
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}/exercises/{exerciseid}/tcx'.format(
userid = r.polaruserid,
transactionid = transactionid,
exerciseid = id
)
response = requests.get(url,headers = headers2)
if response.status_code == 200:
result = response.text
# commit transaction
response = requests.put(url,headers=headers)
else:
result = None
return result
return None