Private
Public Access
1
0

st half implemented

This commit is contained in:
Sander Roosendaal
2023-02-13 20:12:42 +01:00
parent 9975dfd897
commit 410722a990
10 changed files with 327 additions and 176 deletions

View File

@@ -1,3 +1,4 @@
from .c2 import C2Integration
from .strava import StravaIntegration
from .nk import NKIntegration
from .sporttracks import SportTracksIntegration

View File

@@ -56,7 +56,7 @@ def c2wc(weightclass):
class C2Integration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(C2Integration, self).__init__(self, *args, **kwargs)
super(C2Integration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': C2_CLIENT_ID,
'client_secret': C2_CLIENT_SECRET,

View File

@@ -26,7 +26,7 @@ class SyncIntegration(metaclass=ABCMeta):
rower = Rower()
def __init__(self, *args, **kwargs):
user = args[1]
user = args[0]
self.user = user
self.rower = user.rower

View File

@@ -1,8 +1,6 @@
from .integrations import SyncIntegration, NoTokenError
from rowers.models import User, Rower, Workout, TombStone
from rowingdata import rowingdata
from rowers import mytypes
from rowers.nkimportutils import *
from rowers.tasks import handle_nk_async_workout
@@ -31,7 +29,7 @@ queuehigh = django_rq.get_queue('low')
class NKIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(NKIntegration, self).__init__(self, *args, **kwargs)
super(NKIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': NK_CLIENT_ID,
'client_secret': NK_CLIENT_SECRET,
@@ -310,7 +308,3 @@ class NKIntegration(SyncIntegration):
return r.nktoken
# just as a quick test during development
u = User.objects.get(id=1)
nk_integration_1 = NKIntegration(u)

View File

@@ -0,0 +1,143 @@
from .integrations import SyncIntegration, NoTokenError
from rowers.models import User, Rower, Workout, TombStone
from rowers.tasks import handle_sporttracks_sync
from rowers.rower_rules import is_workout_user
import rowers.mytypes as mytypes
from rowsandall_app.settings import (
SPORTTRACKS_CLIENT_SECRET, SPORTTRACKS_CLIENT_ID,
SPORTTRACKS_REDIRECT_URI
)
import re
import numpy
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
from rowers.utils import myqueue, dologging
class SportTracksIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(SportTracksIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': SPORTTRACKS_CLIENT_ID,
'client_secret': SPORTTRACKS_CLIENT_SECRET,
'redirect_uri': SPORTTRACKS_REDIRECT_URI,
'autorization_uri': "https://api.sporttracks.mobi/oauth2/authorize",
'content_type': 'application/json',
'tokenname': 'sporttrackstoken',
'refreshtokenname': 'sporttracksrefreshtoken',
'expirydatename': 'sporttrackstokenexpirydate',
'bearer_auth': False,
'base_url': "https://api.sporttracks.mobi/oauth2/token",
'scope': 'write',
}
def open(self, *args, **kwargs) -> str:
return super(SportTracksIntegration, self).open(*args, **kwargs)
def createworkoutdata(self, w, *args, **kwargs):
return None
def workout_export(self, workout, *args, **kwargs) -> str:
pass
def get_workouts(self, *args, **kwargs) -> int:
pass
def get_workout(self, id) -> int:
_ = self.open()
r = self.rower
authorizationstring = str('Bearer ' + r.sporttrackstoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.sporttracks.mobi/api/v2/fitnessActivities/" + \
str(sporttracksid)
s = requests.get(url, headers=headers)
data = s.json()
strokedata = pd.DataFrame.from_dict({
key: pd.Series(value, dtype='object') for key, value in data.items()
})
id = myqueue(
queue,
handle_sporttracks_workout_from_data,
self.user,
sporttracksid, data,
strokedata,
'sporttracks',
'sporttracks'
)
return id
def get_workout_list(self, *args, **kwargs) -> list:
_ = self.open()
r = self.rower
authorizationstring = str('Bearer ' + r.sporttrackstoken)
headers = {'Authorization': authorizationstring,
'user-agent': 'sanderroosendaal',
'Content-Type': 'application/json'}
url = "https://api.sporttracks.mobi/api/v2/fitnessActivities"
res = requests.get(url, headers=headers)
if (res.status_code != 200):
s = "Token doesn't exist. Need to authorize"
raise NoTokenError(s)
workouts = []
knownstids = uniqify([
w.uploadedtosporttracks for w in Workout.objects.filter(user=r)
])
for item in res.json()['items']:
d = int(float(item['total_distance']))
i = int(getidfromuri(item['uri']))
if i in knownstids: # pragma: no cover
nnn = ''
else:
nnn = 'NEW'
n = item['name']
ttot = str(datetime.timedelta(seconds=int(float(item['duration']))))
s = item['start_time']
r = item['type']
keys = ['id', 'distance', 'duration',
'starttime', 'rowtype', 'source', 'name', 'new']
values = [i, d, ttot, s, r, None, n, nnn]
res = dict(zip(keys, values))
workouts.append(res)
return workouts
def make_authorization_url(self, *args, **kwargs) -> str: # pragma: no cover
return super(SportTracksIntegration, self).make_authorization_url(*args, **kwargs)
def get_token(self, code, *args, **kwargs) -> (str, int, str):
return ""
def token_refresh(self, *args, **kwargs) -> str:
return super(SportTracksIntegration, self).token_refresh(*args, **kwargs)
# just as a quick test during development
u = User.objects.get(id=1)
nk_integration_1 = SportTracksIntegration(u)

View File

@@ -79,7 +79,7 @@ def strava_push_delete(id): # pragma: no cover
class StravaIntegration(SyncIntegration):
def __init__(self, *args, **kwargs):
super(StravaIntegration, self).__init__(self, *args, **kwargs)
super(StravaIntegration, self).__init__(*args, **kwargs)
self.oauth_data = {
'client_id': STRAVA_CLIENT_ID,
'client_secret': STRAVA_CLIENT_SECRET,