Private
Public Access
1
0
Files
rowsandall/rowers/integrations/intervals.py
2024-12-11 19:23:44 +01:00

299 lines
9.3 KiB
Python

from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids
from rowers.models import Rower, User, Workout, TombStone
from rowingdata import rowingdata
from rowers import mytypes
from rowers.rower_rules import is_workout_user, ispromember
from rowers.utils import myqueue, dologging, custom_exception_handler
from rowers.tasks import handle_intervals_getworkout
import urllib
import gzip
import requests
import arrow
import datetime
import os
from uuid import uuid4
from django.utils import timezone
from datetime import timedelta
from rowsandall_app.settings import (
INTERVALS_CLIENT_ID, INTERVALS_REDIRECT_URI, INTERVALS_CLIENT_SECRET, SITE_URL
)
import django_rq
queue = django_rq.get_queue('default', default_timeout=3600)
queuelow = django_rq.get_queue('low', default_timeout=3600)
queuehigh = django_rq.get_queue('high', default_timeout=3600)
def seconds_to_duration(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
remaining_seconds = seconds % 60
# Format as "H:MM:SS" or "MM:SS" if no hours
if hours > 0:
return f"{int(hours)}:{int(minutes):02}:{int(remaining_seconds):02}"
else:
return f"{int(minutes)}:{int(remaining_seconds):02}"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
intervals_authorize_url = 'https://intervals.icu/oauth/authorize?'
intervals_token_url = 'https://intervals.icu/api/oauth/token'
class IntervalsIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(IntervalsIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': INTERVALS_CLIENT_ID,
'client_secret': INTERVALS_CLIENT_SECRET,
'redirect_uri': INTERVALS_REDIRECT_URI,
'authorization_uri': intervals_authorize_url,
'content_type': 'application/json',
'tokenname': 'intervals_token',
'expirydatename': 'intervals_exp',
'refreshtokenname': 'intervals_r',
'bearer_auth': True,
'base_url': 'https://intervals.icu/api/v1/',
'grant_type': 'refresh_token',
'headers': headers,
'scope': 'ACTIVITY:WRITE, LIBRARY:READ',
}
def get_token(self, code, *args, **kwargs):
post_data = {
'client_id': str(self.oauth_data['client_id']),
'client_secret': self.oauth_data['client_secret'],
'code': code,
}
response = requests.post(
intervals_token_url,
data=post_data,
)
if response.status_code not in [200, 201]:
dologging('intervals.icu.log',response.text)
return [0,"Failed to get token. ",0]
token_json = response.json()
access_token = token_json['access_token']
athlete = token_json['athlete']
return [access_token, athlete, '']
def get_name(self):
return 'Intervals'
def get_shortname(self):
return 'intervals'
def open(self, *args, **kwargs):
# dologging('intervals.icu.log', "Getting token for user {id}".format(id=self.rower.id))
token = super(IntervalsIntegration, self).open(*args, **kwargs)
return token
def createworkoutdata(self, w, *args, **kwargs) -> str:
dozip = kwargs.get('dozip', True)
filename = w.csvfilename
try:
row = rowingdata(csvfile=filename)
except IOError: # pragma: no cover
data = dataprep.read_df_sql(w.id)
try:
datalength = len(data)
except AttributeError:
datalength = 0
if datalength == 0:
data.rename(columns=columndict, inplace=True)
_ = data.to_csv(w.csvfilename+'.gz', index_label='index', compression='gzip')
try:
row = rowingdata(csvfile=filename)
except IOError: # pragma: no cover
return '' # pragma: no cover
else:
return ''
tcxfilename = w.csvfilename[:-4] + '.tcx'
try:
newnotes = w.notes + '\n from'+w.workoutsource+' via rowsandall.com'
except TypeError:
newnotes = 'from'+w.workoutsource+' via rowsandall.com'
row.exporttotcx(tcxfilename, notes=newnotes, sport=mytypes.intervalsmapping[w.workouttype])
if dozip:
gzfilename = tcxfilename + '.gz'
try:
with open(tcxfilename, 'rb') as inF:
s = inF.read()
with gzip.GzipFile(gzfilename, 'wb') as outF:
outF.write(s)
try:
os.remove(tcxfilename)
except WindowsError: # pragma: no cover
pass
except FileNotFoundError:
return ''
return gzfilename
return tcxfilename
def workout_export(self, workout, *args, **kwargs) -> str:
token = self.open()
filename = self.createworkoutdata(workout)
if not filename:
return 0
params = {
'name': workout.name,
'description': workout.notes,
}
authorizationstring = str('Bearer ' + token)
# headers with authorization string and content type multipart/form-data
headers = {
'Authorization': authorizationstring,
}
url = "https://intervals.icu/api/v1/athlete/{athleteid}/activities".format(athleteid=0)
with open(filename, 'rb') as f:
files = {'file': f}
response = requests.post(url, params=params, headers=headers, files=files)
if response.status_code not in [200, 201]:
dologging('intervals.icu.log', response.reason)
return 0
id = response.json()['id']
# set workout type to workouttype
url = "https://intervals.icu/api/v1/activity/{activityid}".format(activityid=id)
thetype = mytypes.intervalsmapping[workout.workouttype]
response = requests.put(url, headers=headers, json={'type': thetype})
if response.status_code not in [200, 201]:
return 0
workout.uploadedtointervals = id
workout.save()
os.remove(filename)
return id
def get_workout_list(self, *args, **kwargs) -> int:
url = self.oauth_data['base_url'] + 'athlete/0/activities?'
startdate = timezone.now() - timedelta(days=30)
enddate = timezone.now() + timedelta(days=1)
startdatestring = kwargs.get("startdate","")
enddatestring = kwargs.get("enddate","")
try:
startdate = arrow.get(startdatestring).datetime
except:
pass
try:
enddate = arrow.get(enddatestring).datetime
except:
pass
url += 'oldest=' + startdate.strftime('%Y-%m-%d') + '&newest=' + enddate.strftime('%Y-%m-%d')
headers = {
'accept': '*/*',
'authorization': 'Bearer ' + self.open(),
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
dologging('intervals.icu.log', response.text)
return []
data = response.json()
known_interval_ids = get_known_ids(self.rower, 'intervalsid')
workouts = []
for item in data:
i = item['id']
r = item['type']
d = item['distance']
ttot = seconds_to_duration(item['moving_time'])
s = item['start_date']
s2 = ''
c = item['name']
if i in known_interval_ids:
nnn = ''
else:
nnn = 'NEW'
keys = ['id','distance','duration','starttime',
'rowtype','source','name','new']
values = [i, d, ttot, s, r, s2, c, nnn]
ress = dict(zip(keys, values))
workouts.append(ress)
return workouts
def get_workout(self, id, *args, **kwargs) -> int:
_ = self.open()
r = self.rower
record = create_or_update_syncrecord(r, None, intervalsid=id)
_ = myqueue(queuehigh,
handle_intervals_getworkout,
self.rower,
self.rower.intervals_token,
id)
return 1
def get_workouts(self, *args, **kwargs):
startdate = timezone.now() - timedelta(days=7)
enddate = timezone.now() + timedelta(days=1)
startdatestring = kwargs.get(startdate,"")
enddatestring = kwargs.get(enddate,"")
try:
startdate = arrow.get(startdatestring).datetime
except:
pass
try:
enddate = arrow.get(enddatestring).datetime
except:
pass
count = 0
workouts = self.get_workout_list(startdate=startdate, enddate=enddate)
for workout in workouts:
if workout['new'] == 'NEW':
self.get_workout(workout['id'])
count +=1
return count
def make_authorization_url(self, *args, **kwargs):
return super(IntervalsIntegration, self).make_authorization_url(*args, **kwargs)
def token_refresh(self, *args, **kwargs):
return super(IntervalsIntegration, self).token_refresh(*args, **kwargs)