Private
Public Access
1
0
Files
rowsandall/rowers/integrations/strava.py
2023-10-27 16:48:30 +02:00

354 lines
12 KiB
Python

from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids
from rowers.models import User, Rower, Workout, TombStone
from rowingdata import rowingdata
from rowers import mytypes
from rowers.tasks import handle_strava_sync, fetch_strava_workout
from stravalib.exc import ActivityUploadFailed, TimeoutExceeded
from rowers.rower_rules import is_workout_user, ispromember
from rowers.utils import get_strava_stream, custom_exception_handler
from rowers.utils import myqueue, dologging
#from rowers.imports import *
import urllib
import gzip
import time
import requests
import arrow
import datetime
import os
from uuid import uuid4
from django.utils import timezone
from datetime import timedelta
from rowsandall_app.settings import (
STRAVA_CLIENT_ID, STRAVA_REDIRECT_URI, STRAVA_CLIENT_SECRET,
SITE_URL
)
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
webhookverification = "kudos_to_rowing"
webhooklink = SITE_URL+'/rowers/strava/webhooks/'
headers = {'Accept': 'application/json',
'Api-Key': STRAVA_CLIENT_ID,
'Content-Type': 'application/json',
'user-agent': 'sanderroosendaal'}
from json.decoder import JSONDecodeError
from rowers.dataprep import columndict
def strava_establish_push(): # pragma: no cover
url = "https://www.strava.com/api/v3/push_subscriptions"
post_data = {
'client_id': STRAVA_CLIENT_ID,
'client_secret': STRAVA_CLIENT_SECRET,
'callback_url': webhooklink,
'verify_token': webhookverification,
}
response = requests.post(url, data=post_data)
return response.status_code
def strava_list_push(): # pragma: no cover
url = "https://www.strava.com/api/v3/push_subscriptions"
params = {
'client_id': STRAVA_CLIENT_ID,
'client_secret': STRAVA_CLIENT_SECRET,
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
return [w['id'] for w in data]
return []
def strava_push_delete(id): # pragma: no cover
url = "https://www.strava.com/api/v3/push_subscriptions/{id}".format(id=id)
params = {
'client_id': STRAVA_CLIENT_ID,
'client_secret': STRAVA_CLIENT_SECRET,
}
response = requests.delete(url, json=params)
return response.status_code
class StravaIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(StravaIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': STRAVA_CLIENT_ID,
'client_secret': STRAVA_CLIENT_SECRET,
'redirect_uri': STRAVA_REDIRECT_URI,
'autorization_uri': "https://www.strava.com/oauth/authorize",
'content_type': 'application/json',
'tokenname': 'stravatoken',
'refreshtokenname': 'stravarefreshtoken',
'expirydatename': 'stravatokenexpirydate',
'bearer_auth': True,
'base_url': "https://www.strava.com/oauth/token",
'grant_type': 'refresh_token',
'headers': headers,
'scope': 'activity:write,activity:read_all',
}
def get_token(self, code, *args, **kwargs):
return super(StravaIntegration, self).get_token(code, *args, **kwargs)
def get_name(self):
return "Strava"
def get_shortname(self):
return "strava"
def open(self, *args, **kwargs):
dologging('strava_log.log','Getting token for user {id}'.format(id=self.rower.id))
token = super(StravaIntegration, self).open(*args, **kwargs)
if self.rower.strava_owner_id == 0:
_ = self.set_strava_athlete_id()
return token
# createworkoutdata
def createworkoutdata(self, w, *args, **kwargs) -> str:
dozip = kwargs.get('dozip', True)
filename = w.csvfilename
try:
row = rowingdata(csvfile=filename)
except IOError: # pragma: no cover
data = dataprep.read_df_sql(w.id)
try:
datalength = len(data)
except AttributeError:
datalength = 0
if datalength != 0:
data.rename(columns=columndict, inplace=True)
_ = data.to_csv(w.csvfilename+'.gz',
index_label='index',
compression='gzip')
try:
row = rowingdata(csvfile=filename)
except IOError:
return ''
else:
return ''
tcxfilename = filename[:-4]+'.tcx'
try:
newnotes = w.notes+'\n from '+w.workoutsource+' via rowsandall.com'
except TypeError:
newnotes = 'from '+w.workoutsource+' via rowsandall.com'
row.exporttotcx(tcxfilename, notes=newnotes)
if dozip:
gzfilename = tcxfilename+'.gz'
try:
with open(tcxfilename, 'rb') as inF:
s = inF.read()
with gzip.GzipFile(gzfilename, 'wb') as outF:
outF.write(s)
try:
os.remove(tcxfilename)
except WindowsError: # pragma: no cover
pass
except FileNotFoundError:
return ''
return gzfilename
return tcxfilename
# workout_export
def workout_export(self, workout, *args, **kwargs) -> str:
description = kwargs.get('description','')
quick = kwargs.get('quick',False)
try:
_ = self.open()
except NoTokenError:
return 0
if (self.rower.stravatoken == '') or (self.rower.stravatoken is None):
raise NoTokenError("Your hovercraft is full of eels")
if not (is_workout_user(self.user, workout)):
return 0
tcxfile = self.createworkoutdata(workout)
if not tcxfile:
return 0
activity_type = self.rower.stravaexportas
if activity_type == 'match':
try:
activity_type = mytypes.stravamapping[workout.workouttype]
except KeyError:
activity_type = 'Rowing'
_ = myqueue(queue,
handle_strava_sync,
self.rower.stravatoken,
workout.id,
tcxfile, workout.name, activity_type,
workout.notes)
dologging('strava_export_log.log', 'Exporting as {t} from {w}'.format(
t=activity_type, w=workout.workouttype))
return 1
# get_workouts
def get_workouts(workout, *args, **kwargs) -> int:
return NotImplemented
# get_workout
def get_workout(self, id, *args, **kwargs) -> int:
try:
_ = self.open()
except NoTokenError("Strava error"):
return 0
record = create_or_update_syncrecord(self.rower, None, stravaid=id)
csvfilename = 'media/{code}_{id}.csv'.format(
code=uuid4().hex[:16], id=id)
job = myqueue(queue,
fetch_strava_workout,
self.rower.stravatoken,
self.oauth_data,
id,
csvfilename,
self.user.id,
)
return job.id
# get_workout_list
def get_workout_list(self, *args, **kwargs) -> list:
limit_n = kwargs.get('limit_n',0)
if (self.rower.stravatoken == '') or (self.rower.stravatoken is None): # pragma: no cover
raise NoTokenError("Error")
elif (self.rower.stravatokenexpirydate is None or timezone.now()+timedelta(seconds=3599) > self.rower.stravatokenexpirydate): # pragma: no cover
raise NoTokenError("Error")
# ready to fetch. Hurray
authorizationstring = str('Bearer ' + self.rower.stravatoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://www.strava.com/api/v3/athlete/activities"
if limit_n == 0:
params = {}
else: # pragma: no cover
params = {'per_page': limit_n}
res = requests.get(url, headers=headers, params=params)
if (res.status_code != 200): # pragma: no cover
return []
workouts = []
rower = self.rower
stravaids = [int(item['id']) for item in res.json()]
stravadata = [{
'id': int(item['id']),
'elapsed_time':item['elapsed_time'],
'start_date':item['start_date'],
} for item in res.json()]
wfailed = Workout.objects.filter(user=rower, uploadedtostrava=-1)
for w in wfailed: # pragma: no cover
for item in stravadata:
elapsed_time = item['elapsed_time']
start_date = item['start_date']
stravaid = item['id']
if arrow.get(start_date) == arrow.get(w.startdatetime):
elapsed_td = datetime.timedelta(seconds=int(elapsed_time))
elapsed_time = datetime.datetime.strptime(
str(elapsed_td),
"%H:%M:%S"
)
if str(elapsed_time)[-7:] == str(w.duration)[-7:]:
w.uploadedtostrava = int(stravaid)
w.save()
knownstravaids = get_known_ids(self.rower,'stravaid')
for item in res.json():
d = int(float(item['distance']))
i = item['id']
if i in knownstravaids: # pragma: no cover
nnn = ''
else:
nnn = 'NEW'
n = item['name']
ttot = str(datetime.timedelta(
seconds=int(float(item['elapsed_time']))))
s = item['start_date']
r = item['type']
s2 = None
keys = ['id', 'distance', 'duration',
'starttime', 'rowtype', 'source', 'name', 'new']
values = [i, d, ttot, s, r, s2, n, nnn]
res2 = dict(zip(keys, values))
workouts.append(res2)
return workouts
# make_authorization_url
def make_authorization_url(self, *args, **kwargs):
params = {"client_id": STRAVA_CLIENT_ID,
"response_type": "code",
"redirect_uri": STRAVA_REDIRECT_URI,
"scope": "activity:write,activity:read_all"}
url = "https://www.strava.com/oauth/authorize?" + \
urllib.parse.urlencode(params)
return url
# token_refresh
def token_refresh(self, *args, **kwargs):
return super(StravaIntegration, self).token_refresh(*args, **kwargs)
def set_strava_athlete_id(self, *args, **kwargs):
r = self.rower
if (r.stravatoken == '') or (r.stravatoken is None): # pragma: no cover
s = "Token doesn't exist. Need to authorize"
return custom_exception_handler(401, s)
elif (r.stravatokenexpirydate is None or timezone.now()+timedelta(seconds=3599) > r.stravatokenexpirydate):
_ = self.open()
authorizationstring = str('Bearer ' + r.stravatoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://www.strava.com/api/v3/athlete"
response = requests.get(url, headers=headers, params={})
if response.status_code == 200: # pragma: no cover
r.strava_owner_id = response.json()['id']
r.save()
return response.json()['id']
return 0