Private
Public Access
1
0
Files
rowsandall/rowers/integrations/intervals.py

93 lines
3.0 KiB
Python

from .integrations import SyncIntegration, NoTokenError, create_or_update_syncrecord, get_known_ids
from rowers.models import Rower, User, Workout, TombStone
from rowingdata import rowingdata
from rowers import mytypes
from rowers.rower_rules import is_workout_user, ispromember
from rowers.utils import myqueue, dologging, custom_exception_handler
import urllib
import gzip
import requests
import arrow
import datetime
import os
from uuid import uuid4
from django.utils import timezone
from datetime import timedelta
from rowsandall_app.settings import (
INTERVALS_CLIENT_ID, INTERVALS_REDIRECT_URI, INTERVALS_CLIENT_SECRET, SITE_URL
)
import django_rq
queue = django_rq.get_queue('default', default_timeout=3600)
queuelow = django_rq.get_queue('low', default_timeout=3600)
queuehigh = django_rq.get_queue('high', default_timeout=3600)
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
intervals_authorize_url = 'https://intervals.icu/oauth/authorize?'
intervals_token_url = 'https://intervals.icu/oauth/token'
class IntervalsIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(IntervalsIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': INTERVALS_CLIENT_ID,
'client_secret': INTERVALS_CLIENT_SECRET,
'redirect_uri': INTERVALS_REDIRECT_URI,
'authorization_uri': intervals_authorize_url,
'content_type': 'application/json',
'tokenname': 'intervals_token',
'expirydatename': 'intervals_exp',
'refreshtokenname': 'intervals_r',
'bearer_auth': True,
'base_uri': 'https://intervals.icu/api/v1/',
'grant_type': 'refresh_token',
'headers': headers,
'scope': 'ACTIVITY:WRITE'
}
def get_token(self, code, *args, **kwargs):
return super(IntervalsIntegration, self).get_token(code, *args, **kwargs)
def get_name(self):
return 'Intervals'
def get_shortname(self):
return 'intervals'
def open(self, *args, **kwargs):
dologging('intervals.icu.log', "Getting token for user {id}".format(id=self.rower.id))
token = super(IntervalsIntegration).open(*args, **kwargs)
return token
def createworkoutdata(self, w, *args, **kwargs) -> str:
return NotImplemented
def workout_export(self, workout, *args, **kwargs) -> str:
return NotImplemented
def get_workouts(workout, *args, **kwargs) -> int:
return NotImplemented
def get_workout(self, id, *args, **kwargs) -> int:
return NotImplemented
def get_workout_list(self, *args, **kwargs) -> list:
return NotImplemented
def make_authorization_url(self, *args, **kwargs):
return super(IntervalsIntegration, self).make_authorization_url(*args, **kwargs)
def token_refresh(self, *args, **kwargs):
return super(IntervalsIntegration, self).token_refresh(*args, **kwargs)