Private
Public Access
1
0
Files
rowsandall/rowers/integrations/intervals.py

314 lines
10 KiB
Python

from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids
from rowers.models import Rower, User, Workout, TombStone
from rowingdata import rowingdata
from rowers import mytypes
from rowers.rower_rules import is_workout_user, ispromember
from rowers.utils import myqueue, dologging, custom_exception_handler
from rowers.tasks import handle_intervals_getworkout
import urllib
import gzip
import requests
import arrow
import datetime
import os
from uuid import uuid4
from django.utils import timezone
from datetime import timedelta
import rowers.dataprep as dataprep
from rowsandall_app.settings import (
INTERVALS_CLIENT_ID, INTERVALS_REDIRECT_URI, INTERVALS_CLIENT_SECRET, SITE_URL
)
import django_rq
queue = django_rq.get_queue('default', default_timeout=3600)
queuelow = django_rq.get_queue('low', default_timeout=3600)
queuehigh = django_rq.get_queue('high', default_timeout=3600)
def seconds_to_duration(seconds):
hours = seconds // 3600
minutes = (seconds % 3600) // 60
remaining_seconds = seconds % 60
# Format as "H:MM:SS" or "MM:SS" if no hours
if hours > 0:
return f"{int(hours)}:{int(minutes):02}:{int(remaining_seconds):02}"
else:
return f"{int(minutes)}:{int(remaining_seconds):02}"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
intervals_authorize_url = 'https://intervals.icu/oauth/authorize?'
intervals_token_url = 'https://intervals.icu/api/oauth/token'
class IntervalsIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(IntervalsIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': INTERVALS_CLIENT_ID,
'client_secret': INTERVALS_CLIENT_SECRET,
'redirect_uri': INTERVALS_REDIRECT_URI,
'authorization_uri': intervals_authorize_url,
'content_type': 'application/json',
'tokenname': 'intervals_token',
'expirydatename': 'intervals_exp',
'refreshtokenname': 'intervals_r',
'bearer_auth': True,
'base_url': 'https://intervals.icu/api/v1/',
'grant_type': 'refresh_token',
'headers': headers,
'scope': 'ACTIVITY:WRITE, LIBRARY:READ',
}
def get_token(self, code, *args, **kwargs):
post_data = {
'client_id': str(self.oauth_data['client_id']),
'client_secret': self.oauth_data['client_secret'],
'code': code,
}
response = requests.post(
intervals_token_url,
data=post_data,
)
if response.status_code not in [200, 201]:
dologging('intervals.icu.log',response.text)
return [0,"Failed to get token. ",0]
token_json = response.json()
access_token = token_json['access_token']
athlete = token_json['athlete']
return [access_token, athlete, '']
def get_name(self):
return 'Intervals'
def get_shortname(self):
return 'intervals'
def open(self, *args, **kwargs):
# dologging('intervals.icu.log', "Getting token for user {id}".format(id=self.rower.id))
token = super(IntervalsIntegration, self).open(*args, **kwargs)
return token
def createworkoutdata(self, w, *args, **kwargs) -> str:
dozip = kwargs.get('dozip', True)
# resample if wanted by user, not tested
if w.user.intervals_resample_to_1s:
datadf, id, msgs = dataprep.resample(
w.id, w.user, w, overwrite=False
)
w_resampled = Workout.objects.get(id=id)
filename = w_resampled.csvfilename
else:
w_resampled = None
filename = w.csvfilename
try:
row = rowingdata(csvfile=filename)
except IOError: # pragma: no cover
data = dataprep.read_df_sql(w.id)
try:
datalength = len(data)
except AttributeError:
datalength = 0
if datalength == 0:
data.rename(columns=columndict, inplace=True)
_ = data.to_csv(w.csvfilename+'.gz', index_label='index', compression='gzip')
try:
row = rowingdata(csvfile=filename)
except IOError: # pragma: no cover
return '' # pragma: no cover
else:
return ''
tcxfilename = w.csvfilename[:-4] + '.tcx'
try:
newnotes = w.notes + '\n from'+w.workoutsource+' via rowsandall.com'
except TypeError:
newnotes = 'from'+w.workoutsource+' via rowsandall.com'
if w.user.intervals_resample_to_1s and w_resampled:
w_resampled.delete()
row.exporttotcx(tcxfilename, notes=newnotes, sport=mytypes.intervalsmapping[w.workouttype])
if dozip:
gzfilename = tcxfilename + '.gz'
try:
with open(tcxfilename, 'rb') as inF:
s = inF.read()
with gzip.GzipFile(gzfilename, 'wb') as outF:
outF.write(s)
try:
os.remove(tcxfilename)
except WindowsError: # pragma: no cover
pass
except FileNotFoundError:
return ''
return gzfilename
return tcxfilename
def workout_export(self, workout, *args, **kwargs) -> str:
token = self.open()
dologging('intervals.icu.log', "Exporting workout {id}".format(id=workout.id))
filename = self.createworkoutdata(workout)
if not filename:
return 0
params = {
'name': workout.name,
'description': workout.notes,
}
authorizationstring = str('Bearer ' + token)
# headers with authorization string and content type multipart/form-data
headers = {
'Authorization': authorizationstring,
}
url = "https://intervals.icu/api/v1/athlete/{athleteid}/activities".format(athleteid=0)
with open(filename, 'rb') as f:
files = {'file': f}
response = requests.post(url, params=params, headers=headers, files=files)
if response.status_code not in [200, 201]:
dologging('intervals.icu.log', response.reason)
return 0
id = response.json()['id']
# set workout type to workouttype
url = "https://intervals.icu/api/v1/activity/{activityid}".format(activityid=id)
thetype = mytypes.intervalsmapping[workout.workouttype]
response = requests.put(url, headers=headers, json={'type': thetype})
if response.status_code not in [200, 201]:
return 0
workout.uploadedtointervals = id
workout.save()
os.remove(filename)
dologging('intervals.icu.log', "Exported workout {id}".format(id=workout.id))
return id
def get_workout_list(self, *args, **kwargs) -> int:
url = self.oauth_data['base_url'] + 'athlete/0/activities?'
startdate = timezone.now() - timedelta(days=30)
enddate = timezone.now() + timedelta(days=1)
startdatestring = kwargs.get("startdate","")
enddatestring = kwargs.get("enddate","")
try:
startdate = arrow.get(startdatestring).datetime
except:
pass
try:
enddate = arrow.get(enddatestring).datetime
except:
pass
url += 'oldest=' + startdate.strftime('%Y-%m-%d') + '&newest=' + enddate.strftime('%Y-%m-%d')
headers = {
'accept': '*/*',
'authorization': 'Bearer ' + self.open(),
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
dologging('intervals.icu.log', response.text)
return []
data = response.json()
known_interval_ids = get_known_ids(self.rower, 'intervalsid')
workouts = []
for item in data:
i = item['id']
r = item['type']
d = item['distance']
ttot = seconds_to_duration(item['moving_time'])
s = item['start_date']
s2 = ''
c = item['name']
if i in known_interval_ids:
nnn = ''
else:
nnn = 'NEW'
keys = ['id','distance','duration','starttime',
'rowtype','source','name','new']
values = [i, d, ttot, s, r, s2, c, nnn]
ress = dict(zip(keys, values))
workouts.append(ress)
return workouts
def get_workout(self, id, *args, **kwargs) -> int:
_ = self.open()
r = self.rower
record = create_or_update_syncrecord(r, None, intervalsid=id)
_ = myqueue(queuehigh,
handle_intervals_getworkout,
self.rower,
self.rower.intervals_token,
id)
return 1
def get_workouts(self, *args, **kwargs):
startdate = timezone.now() - timedelta(days=7)
enddate = timezone.now() + timedelta(days=1)
startdatestring = kwargs.get(startdate,"")
enddatestring = kwargs.get(enddate,"")
try:
startdate = arrow.get(startdatestring).datetime
except:
pass
try:
enddate = arrow.get(enddatestring).datetime
except:
pass
count = 0
workouts = self.get_workout_list(startdate=startdate, enddate=enddate)
for workout in workouts:
if workout['new'] == 'NEW':
self.get_workout(workout['id'])
count +=1
return count
def make_authorization_url(self, *args, **kwargs):
return super(IntervalsIntegration, self).make_authorization_url(*args, **kwargs)
def token_refresh(self, *args, **kwargs):
return super(IntervalsIntegration, self).token_refresh(*args, **kwargs)