Private
Public Access
1
0

next step polar

This commit is contained in:
Sander Roosendaal
2022-01-21 15:41:00 +01:00
parent f9dbdfefca
commit ae6e2f55b0
5 changed files with 131 additions and 45 deletions

View File

@@ -93,7 +93,7 @@ def get_token(code):
data=post_data,
headers=headers)
if response.status_code != 200:
if response.status_code != 200: # pragma: no cover
dologging('polar.log','Getting token, got:')
dologging('polar.log',response.status_code)
dologging('polar.log',response.text)
@@ -109,7 +109,7 @@ def get_token(code):
except AttributeError:
pass
dologging('polar.log',token_json)
except (KeyError,JSONDecodeError) as e:
except (KeyError,JSONDecodeError) as e: # pragma: no cover
dologging('polar.log',e)
try:
dologging('polar.log',response.text)
@@ -122,7 +122,7 @@ def get_token(code):
return [thetoken,expires_in,user_id]
# Make authorization URL including random string
def make_authorization_url():
def make_authorization_url(): # pragma: no cover
# Generate a random string for the state parameter
# Save it for use later to prevent xsrf attacks
state = str(uuid4())
@@ -137,7 +137,7 @@ def make_authorization_url():
return HttpResponseRedirect(url)
def revoke_access(user):
def revoke_access(user): # pragma: no cover
headers = {
'Authorization': 'Bearer {token}'.format(token=user.rower.polartoken)
}
@@ -167,7 +167,7 @@ def get_polar_notifications():
try:
response = requests.get(url, headers=headers)
except ConnectionError:
except ConnectionError: # pragma: no cover
response = {
'status_code':400,
}
@@ -178,10 +178,10 @@ def get_polar_notifications():
if response.status_code == 200:
available_data = response.json()['available-user-data']
dologging('polar.log',available_data)
else:
else: # pragma: no cover
dologging('polar.log',response.status_code)
dologging('polar.log',response.text)
except AttributeError:
except AttributeError: # pragma: no cover
try:
dologging('polar.log',response.text)
except AttributeError:
@@ -195,7 +195,7 @@ from rowers.rower_rules import ispromember
def get_all_new_workouts(available_data,testing=False):
for record in available_data:
dologging('polar.log',str(record))
if testing:
if testing: # pragma: no cover
print(record)
if record['data-type'] == 'EXERCISE':
try:
@@ -204,9 +204,9 @@ def get_all_new_workouts(available_data,testing=False):
if r.polar_auto_import and ispromember(u):
exercise_list = get_polar_workouts(u)
dologging('polar.log',exercise_list)
if testing:
if testing: # pragma: no cover
print(exercise_list)
except Rower.DoesNotExist:
except Rower.DoesNotExist: # pragma: no cover
pass
return 1
@@ -220,7 +220,7 @@ def get_polar_workouts(user):
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
elif (timezone.now()>r.polartokenexpirydate): # pragma: no cover
s = "Token expired. Needs to refresh"
dologging('polar.log',s)
return custom_exception_handler(401,s)
@@ -278,7 +278,7 @@ def get_polar_workouts(user):
workouttype = 'other'
try:
workouttype = mytypes.polaraccesslink_sports[exercise_dict['detailed-sport-info']]
except KeyError:
except KeyError: # pragma: no cover
dologging('polar.log',exercise_dict['detailed-sport-info'])
dologging('polar.log',workouttype)
try:
@@ -319,7 +319,7 @@ def get_polar_workouts(user):
)
dologging('polar.log',response.status_code)
if response.status_code != 200:
if response.status_code != 200: # pragma: no cover
try:
dologging('polar.log',response.text)
except:
@@ -330,7 +330,7 @@ def get_polar_workouts(user):
pass
exercise_dict['filename'] = filename
else:
else: # pragma: no cover
exercise_dict['filename'] = ''
exercise_list.append(exercise_dict)
@@ -346,6 +346,39 @@ def get_polar_workouts(user):
return exercise_list
def register_user(user):
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
elif (timezone.now()>r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Content-Type': 'application/xml',
'Authorization':authorizationstring,
'Accept': 'application/json'
}
params = {
'member-id': str(user.id)
}
url = baseurl+'/users'
dologging('polar.log','Registering user')
response = requests.post(url,params=params,headers=headers)
if response.status_code != 200:
dologging('polar.log',response.status_code)
return {}
polar_user_data = response.json()
return polar_user_data
def get_polar_user_info(user,physical=False):
r = Rower.objects.get(user=user)
if (r.polartoken == '') or (r.polartoken is None):
@@ -354,34 +387,34 @@ def get_polar_user_info(user,physical=False):
elif (timezone.now()>r.polartokenexpirydate):
s = "Token expired. Needs to refresh"
return custom_exception_handler(401,s)
else:
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
authorizationstring = str('Bearer ' + r.polartoken)
headers = {
'Authorization':authorizationstring,
'Accept': 'application/json'
}
params = {
'user-id': r.polaruserid
}
params = {
'user-id': r.polaruserid
}
if not physical:
url = baseurl+'/users/{userid}'.format(
userid = r.polaruserid
if not physical:
url = baseurl+'/users/{userid}'.format(
userid = r.polaruserid
)
else:
url = 'https://www.polaraccesslink.com/v3/users/{userid}/physical-information-transactions/'.format(
userid = r.polaruserid
)
else:
url = 'https://www.polaraccesslink.com/v3/users/{userid}/physical-information-transactions/'.format(
userid = r.polaruserid
)
if physical:
response = requests.post(url, headers=headers)
else:
response = requests.get(url, headers=headers)
if physical:
response = requests.post(url, headers=headers)
else:
response = requests.get(url, headers=headers)
return response
return response
def get_polar_workout(user,id,transactionid):
@@ -429,11 +462,15 @@ def get_polar_workout(user,id,transactionid):
transactionid = transactionid,
exerciseid = id
)
authorizationstring = str('Bearer ' + r.polartoken)
headers2 = {
'Authorization':authorizationstring,
}
response = requests.get(url,headers = headers2)
if response.status_code == 200:
result = response.text
result = response.content
# commit transaction
url = baseurl+'/users/{userid}/exercise-transactions/{transactionid}'.format(
transactionid = transactionid,
@@ -441,9 +478,9 @@ def get_polar_workout(user,id,transactionid):
)
response = requests.put(url,headers=headers)
dologging('polar.log','Committing transaction on {url}'.format(url=url))
else:
else: # pragma: no cover
result = None
return result
return None
return None # pragma: no cover