Private
Public Access
1
0
Files
rowsandall/rowers/garmin_stuff.py
Sander Roosendaal 34c1f83b70 bug fix
2020-07-05 14:39:06 +02:00

162 lines
6.0 KiB
Python

from rowers.imports import *
import datetime
import requests
from requests_oauthlib import OAuth1,OAuth1Session
from requests import Request, Session
import rowers.mytypes as mytypes
from rowers.mytypes import otwtypes
from rowers.rower_rules import is_workout_user,ispromember
from iso8601 import ParseError
import numpy
import json
from json.decoder import JSONDecodeError
from rowsandall_app.settings import (
GARMIN_CLIENT_KEY, GARMIN_REDIRECT_URI, GARMIN_CLIENT_SECRET
)
from rowers.tasks import handle_c2_import_stroke_data, handle_c2_sync
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
from rowers.utils import myqueue
from rowers.models import C2WorldClassAgePerformance,Rower,Workout,TombStone
from django.core.exceptions import PermissionDenied
from rowers.utils import custom_exception_handler,NoTokenError
oauth_data = {
'client_id': GARMIN_CLIENT_KEY,
'client_secret': GARMIN_CLIENT_SECRET,
'redirect_uri': GARMIN_REDIRECT_URI,
'authorization_uri': "https://connectapi.garmin.com/oauth-service/oauth/request_token",
'content_type': 'application/x-www-form-urlencoded',
'tokenname': 'garmintoken',
'refreshtokenname': 'garminrefreshtoken',
'expirydatename': 'garmintokenexpirydate',
'bearer_auth': True,
'base_url': "https://connect.garmin.com/oauthConfirm",
'scope':'write',
'headers': 'Authorization: OAuth oauth_version="1.0"'
}
def garmin_authorize():
redirect_uri = oauth_data['redirect_uri']
client_secret = oauth_data['client_secret']
client_id = oauth_data['client_id']
base_uri = oauth_data['base_url']
garmin = OAuth1Session(oauth_data['client_id'],
client_secret=oauth_data['client_secret'],
)
fetch_response = garmin.fetch_request_token(oauth_data['authorization_uri'])
resource_owner_key = fetch_response.get('oauth_token')
resource_owner_secret = fetch_response.get('oauth_token_secret')
authorization_url = garmin.authorization_url(base_uri)
return authorization_url,resource_owner_key,resource_owner_secret
def garmin_processcallback(redirect_response,resource_owner_key,resource_owner_secret):
garmin = OAuth1Session(oauth_data['client_id'],
client_secret=oauth_data['client_secret'],
)
oauth_response = garmin.parse_authorization_response(redirect_response)
verifier = oauth_response.get('oauth_verifier')
token = oauth_response.get('oauth_token')
access_token_url = 'https://connectapi.garmin.com/oauth-service/oauth/access_token'
# Using OAuth1Session
garmin = OAuth1Session(oauth_data['client_id'],
client_secret=oauth_data['client_secret'],
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
verifier=verifier,)
oauth_tokens = garmin.fetch_access_token(access_token_url)
garmintoken = oauth_tokens.get('oauth_token')
garminrefreshtoken = oauth_tokens.get('oauth_token_secret')
return garmintoken,garminrefreshtoken
def garmin_open(user):
r = Rower.objects.get(user=user)
token = Rower.garmintoken
if (token == '') or (token is None):
raise NoTokenError("User has no garmin token")
return token
def get_garmin_workout_list(user):
r = Rower.objects.get(user=user)
if (r.garmintoken == '') or (r.stravatoken is None):
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401,s)
garmin = OAuth1Session(oauth_data['client_id'],
client_secret=oauth_data['client_secret'],
resource_owner_key=r.garmintoken,
resource_owner_secret=r.garminrefreshtoken,
)
url = 'https://healthapi.garmin.com/wellness-api/rest/activities?uploadStartTimeInSeconds=1593113760&uploadEndTimeInSeconds=1593279360'
result = garmin.get(url)
return result
def garmin_workouts_from_summaries(activities):
for activity in activities:
garmintoken = activity['userAccessToken']
try:
r = Rower.objects.get(garmintoken=garmintoken)
starttime = activity['startTimeInSeconds']
startdatetime = arrow.get(starttime)
durationseconds = activity['durationInSeconds']
duration = dataprep.totaltime_sec_to_string(durationseconds)
activitytype = activity['activityType']
name = 'Imported from Garmin'
date = startdatetime.date()
try:
distance = activity['durationInMeters']
except KeyError:
distance = 0
try:
averagehr = activity['averageHeartRateInBeatsPerMinute']
maxhr = activity['maxHeartRateInBeatsPerMinute']
except KeyError:
averagehr = 0
maxhr = 0
uploadedtogarmin = activity['summaryId']
try:
w = Workout.objects.get(uploadedtogarmin=uploadedtogarmin)
except Workout.DoesNotExist:
newcsvfile='media/garmin{code}_{importid}.csv'
w = Workout(user=r,csvfilename=newcsvfile)
w.startdatetime = datetime.datetime(
year=startdatetime.year,
month=startdatetime.month,
day=startdatetime.day,
hour=startdatetime.hour,
minute=startdatetime.minute,
second=startdatetime.second,
tzinfo=startdatetime.tzinfo)
w.starttime = startdatetime.time()
w.duration = duration
try:
w.workouttype = mytypes.garminmappinginv[activitytype]
except KeyError:
w.workouttype = 'other'
w.name = name
w.date = date
w.save()
except Rower.DoesNotExist:
pass
return 1