Private
Public Access
1
0
Files
rowsandall/rowers/uploads.py
2023-07-11 08:53:04 +02:00

289 lines
8.4 KiB
Python

from rowers.mytypes import workouttypes, boattypes, otwtypes, workoutsources, workouttypes_ordered
from rowers.rower_rules import is_promember
from rowers.integrations import *
from rowers.utils import (
geo_distance, serialize_list, deserialize_list, uniqify,
str2bool, range_to_color_hex, absolute, myqueue, NoTokenError
)
# for actions related to uploads
from django.conf import settings
from django.utils import timezone, translation
from rowers.tasks import (
handle_sendemail_unrecognized, handle_sendemailnewcomment,
handle_sendemailnewresponse, handle_updatedps,
handle_makeplot, handle_otwsetpower, handle_sendemailtcx,
handle_sendemailcsv
)
from rowers.models import GraphImage, SyncRecord
from rowers.rower_rules import ispromember
from rowers.utils import dologging
from PIL import Image
import numpy as np
import yaml
import argparse
import yamllint
from subprocess import call
import re
import sys
import time
from verbalexpressions import VerEx
import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('high')
try:
from cStringIO import StringIO
except:
from io import StringIO
sources = [s for s, name in workoutsources]
def make_plot(r, w, f1, f2, plottype, title, imagename='', plotnr=0):
if imagename == '':
imagename = f1[:-4]+'.png'
fullpathimagename = 'static/plots/'+imagename
powerperc = 100*np.array([r.pw_ut2,
r.pw_ut1,
r.pw_at,
r.pw_tr, r.pw_an])/r.ftp
ftp = float(r.ftp)
if w.workouttype in otwtypes:
ftp = ftp*(100.-r.otwslack)/100.
hrpwrdata = {
'hrmax': r.max,
'hrut2': r.ut2,
'hrut1': r.ut1,
'hrat': r.at,
'hrtr': r.tr,
'hran': r.an,
'ftp': ftp,
'powerperc': serialize_list(powerperc),
'powerzones': serialize_list(r.powerzones),
'hrzones': serialize_list(r.hrzones),
}
# make plot - asynchronous task
plotnrs = {
'timeplot': 1,
'distanceplot': 2,
'pieplot': 3,
'None': 0,
}
axis = r.staticgrids
if axis is None or axis.lower() == 'none': # pragma: no cover
gridtrue = False
axis = 'both'
else:
gridtrue = True
if plotnr == 0:
plotnr = plotnrs[plottype]
if plotnr == 0:
return 0, 0
if w.workouttype in otwtypes:
plotnr = plotnr+3
otwrange = [r.fastpaceotw.total_seconds(), r.slowpaceotw.total_seconds()]
oterange = [r.fastpaceerg.total_seconds(), r.slowpaceerg.total_seconds()]
job = myqueue(queuehigh, handle_makeplot, f1, f2,
title, hrpwrdata,
plotnr, imagename, gridtrue=gridtrue, axis=axis,
otwrange=otwrange, oterange=oterange)
try:
width, height = Image.open(fullpathimagename).size
except:
width = 1200
height = 600
imgs = GraphImage.objects.filter(workout=w)
if len(imgs) < 7:
i = GraphImage(workout=w,
creationdatetime=timezone.now(),
filename=fullpathimagename,
width=width, height=height)
i.save()
else: # pragma: no cover
return 0, 'You have reached the maximum number of static images for this workout. Delete an image first'
return i.id, job.id
def do_sync(w, options, quick=False):
do_strava_export = w.user.strava_auto_export
try:
do_strava_export = options['upload_to_Strava'] or do_strava_export
except KeyError:
pass
try:
if options['stravaid'] != 0 and options['stravaid'] != '': # pragma: no cover
w.uploadedtostrava = options['stravaid']
# upload_to_strava = False
do_strava_export = False
w.save()
except KeyError:
pass
try:
if options['nkid'] != 0 and options['nkid'] != '': # pragma: no cover
w.uploadedtonk = options['nkid']
w.save()
record = SyncRecord(workout=w,nkid=options['nkid'])
record.save()
except KeyError:
pass
try:
if options['inboard'] != 0 and options['inboard'] != '': # pragma: no cover
w.inboard = options['inboard']
except KeyError:
pass
try:
if options['oarlength'] != 0 and options['oarlength'] != '': # pragma: no cover
w.oarlength = options['oarlength']
except KeyError:
pass
try:
if options['garminid'] != 0 and options['garminid'] != '': # pragma: no cover
w.uploadedtogarmin = options['garminid']
w.save()
except KeyError:
pass
do_c2_export = w.user.c2_auto_export
try:
do_c2_export = options['upload_to_C2'] or do_c2_export
except KeyError:
pass
try:
if options['c2id'] != 0 and options['c2id'] != '': # pragma: no cover
w.uploadedtoc2 = options['c2id']
# upload_to_c2 = False
do_c2_export = False
w.save()
record = SyncRecord(workout=w,c2id=options['c2id'])
record.save()
except KeyError:
pass
try:
if options['rp3id'] != 0 and options['rp3id'] != '': # pragma: no cover
w.uploadedtorp3 = options['rp3id']
w.save()
except KeyError:
pass
if w.duplicate:
return 0
if do_c2_export: # pragma: no cover
dologging('c2_log.log','Exporting workout to C2 for user {user}'.format(user=w.user.user.id))
c2_integration = C2Integration(w.user.user)
try:
id = c2_integration.workout_export(w)
dologging('c2_log.log','C2 upload succeeded')
except NoTokenError:
id = 0
message = "Something went wrong with the Concept2 sync"
dologging('c2_log.log','C2 no token error')
except: # pragma: no cover
dologging('c2_log.log','Error C2')
pass
if do_strava_export: # pragma: no cover
strava_integration = StravaIntegration(w.user.user)
try:
id = strava_integration.workout_export(w)
dologging(
'strava_export_log.log',
'exporting workout {id} as {type}'.format(
id=w.id,
type=w.workouttype,
)
)
except NoTokenError: # pragma: no cover
id = 0
message = "Please connect to Strava first"
except:
e = sys.exc_info()[0]
t = time.localtime()
timestamp = time.strftime('%b-%d-%Y_%H%M', t)
with open('stravalog.log', 'a') as f:
f.write('\n')
f.write(timestamp)
f.write(str(e))
do_st_export = w.user.sporttracks_auto_export
sporttracksid = options.get('sporttracksid','')
if sporttracksid != 0 and sporttracksid != '':
w.uploadedtosporttracks = sporttracksid
w.save()
do_st_export = False
try: # pragma: no cover
upload_to_st = options['upload_to_SportTracks'] or do_st_export
do_st_export = upload_to_st
except KeyError:
upload_to_st = False
if do_st_export: # pragma: no cover
try:
st_integration = SportTracksIntegration(w.user.user)
id = st_integration.workout_export(w)
dologging('st_export.log',
'exported workout {wid} for user {uid}'.format(
wid = w.id,
uid = w.user.user.id,
)
)
except NoTokenError:
dologging('st_export.log','No Token Error')
return 0
do_tp_export = w.user.trainingpeaks_auto_export
try:
upload_to_tp = options['upload_to_TrainingPeaks'] or do_tp_export
do_tp_export = upload_to_tp
except KeyError:
upload_to_st = False
if do_tp_export:
try:
tp_integration = TPIntegration(w.user.user)
id = tp_integration.workout_export(w)
dologging('tp_export.log',
'exported workout {wid} for user {uid}'.format(
wid = w.id,
uid = w.user.user.id,
)
)
except NoTokenError:
dologging('tp_export.log','No Token Error')
return 0
return 1