from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids from rowers.models import Rower, User, Workout, TombStone from rowingdata import rowingdata from rowers import mytypes from rowers.rower_rules import is_workout_user, ispromember from rowers.utils import myqueue, dologging, custom_exception_handler from rowers.tasks import handle_intervals_getworkout import urllib import gzip import requests import arrow import datetime import os from uuid import uuid4 from django.utils import timezone from datetime import timedelta import rowers.dataprep as dataprep from rowsandall_app.settings import ( INTERVALS_CLIENT_ID, INTERVALS_REDIRECT_URI, INTERVALS_CLIENT_SECRET, SITE_URL ) import django_rq queue = django_rq.get_queue('default', default_timeout=3600) queuelow = django_rq.get_queue('low', default_timeout=3600) queuehigh = django_rq.get_queue('high', default_timeout=3600) def seconds_to_duration(seconds): hours = seconds // 3600 minutes = (seconds % 3600) // 60 remaining_seconds = seconds % 60 # Format as "H:MM:SS" or "MM:SS" if no hours if hours > 0: return f"{int(hours)}:{int(minutes):02}:{int(remaining_seconds):02}" else: return f"{int(minutes)}:{int(remaining_seconds):02}" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } intervals_authorize_url = 'https://intervals.icu/oauth/authorize?' intervals_token_url = 'https://intervals.icu/api/oauth/token' class IntervalsIntegration(SyncIntegration): def __init__(self, *args, **kwargs): super(IntervalsIntegration, self).__init__(*args, **kwargs) self.oauth_data = { 'client_id': INTERVALS_CLIENT_ID, 'client_secret': INTERVALS_CLIENT_SECRET, 'redirect_uri': INTERVALS_REDIRECT_URI, 'authorization_uri': intervals_authorize_url, 'content_type': 'application/json', 'tokenname': 'intervals_token', 'expirydatename': 'intervals_exp', 'refreshtokenname': 'intervals_r', 'bearer_auth': True, 'base_url': 'https://intervals.icu/api/v1/', 'grant_type': 'refresh_token', 'headers': headers, 'scope': 'ACTIVITY:WRITE, LIBRARY:READ', } def get_token(self, code, *args, **kwargs): post_data = { 'client_id': str(self.oauth_data['client_id']), 'client_secret': self.oauth_data['client_secret'], 'code': code, } response = requests.post( intervals_token_url, data=post_data, ) if response.status_code not in [200, 201]: dologging('intervals.icu.log',response.text) return [0,"Failed to get token. ",0] token_json = response.json() access_token = token_json['access_token'] athlete = token_json['athlete'] return [access_token, athlete, ''] def get_name(self): return 'Intervals' def get_shortname(self): return 'intervals' def open(self, *args, **kwargs): # dologging('intervals.icu.log', "Getting token for user {id}".format(id=self.rower.id)) token = super(IntervalsIntegration, self).open(*args, **kwargs) return token def createworkoutdata(self, w, *args, **kwargs) -> str: dozip = kwargs.get('dozip', True) # resample if wanted by user, not tested if w.user.intervals_resample_to_1s: datadf, id, msgs = dataprep.resample( w.id, w.user, w, overwrite=False ) w_resampled = Workout.objects.get(id=id) filename = w_resampled.csvfilename else: w_resampled = None filename = w.csvfilename try: row = rowingdata(csvfile=filename) except IOError: # pragma: no cover data = dataprep.read_df_sql(w.id) try: datalength = len(data) except AttributeError: datalength = 0 if datalength == 0: data.rename(columns=columndict, inplace=True) _ = data.to_csv(w.csvfilename+'.gz', index_label='index', compression='gzip') try: row = rowingdata(csvfile=filename) except IOError: # pragma: no cover return '' # pragma: no cover else: return '' tcxfilename = w.csvfilename[:-4] + '.tcx' try: newnotes = w.notes + '\n from'+w.workoutsource+' via rowsandall.com' except TypeError: newnotes = 'from'+w.workoutsource+' via rowsandall.com' if w.user.intervals_resample_to_1s and w_resampled: w_resampled.delete() row.exporttotcx(tcxfilename, notes=newnotes, sport=mytypes.intervalsmapping[w.workouttype]) if dozip: gzfilename = tcxfilename + '.gz' try: with open(tcxfilename, 'rb') as inF: s = inF.read() with gzip.GzipFile(gzfilename, 'wb') as outF: outF.write(s) try: os.remove(tcxfilename) except WindowsError: # pragma: no cover pass except FileNotFoundError: return '' return gzfilename return tcxfilename def workout_export(self, workout, *args, **kwargs) -> str: token = self.open() dologging('intervals.icu.log', "Exporting workout {id}".format(id=workout.id)) filename = self.createworkoutdata(workout) if not filename: return 0 params = { 'name': workout.name, 'description': workout.notes, } authorizationstring = str('Bearer ' + token) # headers with authorization string and content type multipart/form-data headers = { 'Authorization': authorizationstring, } url = "https://intervals.icu/api/v1/athlete/{athleteid}/activities".format(athleteid=0) with open(filename, 'rb') as f: files = {'file': f} response = requests.post(url, params=params, headers=headers, files=files) if response.status_code not in [200, 201]: dologging('intervals.icu.log', response.reason) return 0 id = response.json()['id'] # set workout type to workouttype url = "https://intervals.icu/api/v1/activity/{activityid}".format(activityid=id) thetype = mytypes.intervalsmapping[workout.workouttype] response = requests.put(url, headers=headers, json={'type': thetype}) if response.status_code not in [200, 201]: return 0 workout.uploadedtointervals = id workout.save() os.remove(filename) dologging('intervals.icu.log', "Exported workout {id}".format(id=workout.id)) return id def get_workout_list(self, *args, **kwargs) -> int: url = self.oauth_data['base_url'] + 'athlete/0/activities?' startdate = timezone.now() - timedelta(days=30) enddate = timezone.now() + timedelta(days=1) startdatestring = kwargs.get("startdate","") enddatestring = kwargs.get("enddate","") try: startdate = arrow.get(startdatestring).datetime except: pass try: enddate = arrow.get(enddatestring).datetime except: pass url += 'oldest=' + startdate.strftime('%Y-%m-%d') + '&newest=' + enddate.strftime('%Y-%m-%d') headers = { 'accept': '*/*', 'authorization': 'Bearer ' + self.open(), } response = requests.get(url, headers=headers) if response.status_code != 200: dologging('intervals.icu.log', response.text) return [] data = response.json() known_interval_ids = get_known_ids(self.rower, 'intervalsid') workouts = [] for item in data: i = item['id'] r = item['type'] d = item['distance'] ttot = seconds_to_duration(item['moving_time']) s = item['start_date'] s2 = '' c = item['name'] if i in known_interval_ids: nnn = '' else: nnn = 'NEW' keys = ['id','distance','duration','starttime', 'rowtype','source','name','new'] values = [i, d, ttot, s, r, s2, c, nnn] ress = dict(zip(keys, values)) workouts.append(ress) return workouts def get_workout(self, id, *args, **kwargs) -> int: _ = self.open() r = self.rower record = create_or_update_syncrecord(r, None, intervalsid=id) _ = myqueue(queuehigh, handle_intervals_getworkout, self.rower, self.rower.intervals_token, id) return 1 def get_workouts(self, *args, **kwargs): startdate = timezone.now() - timedelta(days=7) enddate = timezone.now() + timedelta(days=1) startdatestring = kwargs.get(startdate,"") enddatestring = kwargs.get(enddate,"") try: startdate = arrow.get(startdatestring).datetime except: pass try: enddate = arrow.get(enddatestring).datetime except: pass count = 0 workouts = self.get_workout_list(startdate=startdate, enddate=enddate) for workout in workouts: if workout['new'] == 'NEW': self.get_workout(workout['id']) count +=1 return count def make_authorization_url(self, *args, **kwargs): return super(IntervalsIntegration, self).make_authorization_url(*args, **kwargs) def token_refresh(self, *args, **kwargs): return super(IntervalsIntegration, self).token_refresh(*args, **kwargs)