Private
Public Access
1
0
Files
rowsandall/rowers/dataflow.py

640 lines
20 KiB
Python

from rowers.celery import app
from rowers.utils import myqueue
import zipfile
import os
from rowingdata import get_file_type
import django_rq
from shutil import copyfile
from time import strftime
import numpy as np
from scipy.signal import find_peaks, savgol_filter
import pandas as pd
import datetime
import math
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
from YamJam import yamjam
CFG = yamjam()['rowsandallapp']
try:
os.environ.setdefault("DJANGO_SETTINGS_MODULE",CFG['settings_name'])
except KeyError: # pragma: no cover
os.environ.setdefault("DJANGO_SETTINGS_MODULE","rowsandall_app.settings")
from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('default')
from django.conf import settings
from django.urls import reverse
from django.utils import timezone as tz
from rowers.forms import DocumentsForm, TeamUploadOptionsForm
from rowers.models import TeamInviteForm, Workout, User, Rower, Team
from rowers.opaque import encoder
from rowers import uploads
from rowingdata import rower as rrower
from rowers.dataroutines import rdata, get_startdate_time_zone, df_resample, checkduplicates, dataplep
from rowers.mytypes import otetypes, otwtypes
from rowers.utils import totaltime_sec_to_string
from rowers.dataprep import check_marker, checkbreakthrough, update_wps
from rowers.emails import send_confirm
from rowers.tasks import handle_sendemail_unrecognized, handle_sendemail_breakthrough, handle_sendemail_hard, handle_calctrimp
from uuid import uuid4
def getrower(user):
try:
if user is None or user.is_anonymous:
return None
except AttributeError: # pragma: no cover
if User.objects.get(id=user).is_anonymous:
return None
try:
r = Rower.objects.get(user=user)
except Rower.DoesNotExist: # pragma: no cover:
r = Rower(user=user)
r.save()
return r
def generate_job_id():
return str(uuid4())
def valid_uploadoptions(uploadoptions):
secret = uploadoptions.get('secret', '')
if secret != settings.UPLOAD_SERVICE_SECRET:
return False
fstr = uploadoptions.get('file', None)
if fstr is None:
return False
# check if file can be found
if not os.path.isfile(fstr):
return False
form = DocumentsForm(uploadoptions)
optionsform = TeamUploadOptionsForm(uploadoptions)
rowerform = TeamInviteForm(uploadoptions)
rowerform.fields.pop('email') # we don't need email here
return form.is_valid() and optionsform.is_valid() and rowerform.is_valid()
def is_zipfile(file_path):
fileformat = get_file_type(file_path)
return fileformat[0] == 'zip'
def is_invalid_file(file_path):
fileformat = get_file_type(file_path)
if fileformat == "imageformat":
return False, "Image files are not supported for upload."
if fileformat == "json":
return False, "JSON files are not supported for upload."
if fileformat == "c2log":
return False, "Concept2 log files are not supported for upload."
if fileformat == "nostrokes":
return False, "No stroke data found in the file."
if fileformat == "kml":
return False, "KML files are not supported for upload."
if fileformat == "notgzip":
return False, "The gzip file appears to be corrupted."
if fileformat == "rowprolog":
return False, "RowPro logbook summary files are not supported for upload."
if fileformat == "gpx":
return False, "GPX files are not supported for upload."
if fileformat == "unknown":
extension = os.path.splitext(f2)[1]
filename = os.path.splitext(f2)[0]
if extension == '.gz':
filename = os.path.splitext(filename)[0]
extension2 = os.path.splitext(filename)[1]+extension
extension = extension2
f4 = filename+'a'+extension
copyfile(f2, f4)
_ = myqueue(queuehigh,
handle_sendemail_unrecognized,
f4,
r.user.email)
return False, "The file format is not recognized or supported."
return True, ""
def upload_handler(uploadoptions, filename):
if not valid_uploadoptions(uploadoptions):
return {
"status": "error",
"job_id": None,
"message": "Invalid upload options or file not found."
}
is_valid, message = is_invalid_file(filename)
if not is_valid:
os.remove(filename)
return {
"status": "error",
"job_id": None,
"message": message
}
if is_zipfile(filename):
parent_job_id = generate_job_id()
_ = myqueue(
queuehigh,
unzip_and_process,
filename,
uploadoptions,
parent_job_id)
return {
"status": "processing",
"job_id": parent_job_id,
"message": "Your zip file is being processed. You will be notified when it is complete."
}
job_id = generate_job_id()
_ = myqueue(
queuehigh,
process_single_file,
filename,
uploadoptions,
job_id)
return {
"status": "processing",
"job_id": job_id,
"message": "Your file is being processed. You will be notified when it is complete."
}
@app.task
def unzip_and_process(zip_filepath, uploadoptions, parent_job_id, debug=False, **kwargs):
with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
for id, filename in enumerate(zip_ref.namelist()):
datafile = zip_ref.extract(filename, path='media/')
if id > 0:
uploadoptions['title'] = uploadoptions['title'] + " Part {id+1}".format(id=id)
uploadoptions['file'] = datafile
job_id = generate_job_id()
_ = myqueue(
queuehigh,
process_single_file,
datafile,
uploadoptions,
job_id)
return {
"status": "completed",
"job_id": parent_job_id,
"message": "All files from the zip have been processed."
}
def get_rower_from_uploadoptions(uploadoptions):
rowerform = TeamInviteForm(uploadoptions)
if not rowerform.is_valid():
return None
try:
u = rowerform.cleaned_data['user']
r = getrower(u)
except KeyError:
if 'useremail' in uploadoptions:
us = User.objects.filter(email=uploadoptions['useremail'])
if len(us):
u = us[0]
r = getrower(u)
else:
r = None
for rwr in Rower.objects.all():
if rwr.emailalternatives is not None:
if uploadoptions['useremail'] in rwr.emailalternatives:
r = rwr
break
return r
def check_and_fix_samplerate(row, file_path):
# implement sample rate check and fix here
dtavg = row.df['TimeStamp (sec)'].diff().mean()
if dtavg < 1:
newdf = df_resample(row.df)
try:
os.remove(file_path)
except Exception:
pass
row = rrdata(df=newdf)
row.write_csv(file_path, gzip=True)
return row, file_path
def is_water_rowing(df):
lat = df[' latitude']
if lat.mean() != 0 and lat.std() != 0:
return True
def remove_negative_power_peaks(row):
x = row.df[' Power (watts)'].values
x = x * - 1
neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value
row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1]
x = row.df[' Power (watts)'].values
x = x * - 1
neg_peaks, _ = find_peaks(x, height=0) # hieght is the threshold value
row.df[' Power (watts)'][neg_peaks] = row.df[' Power (watts)'][neg_peaks-1]
return row
def do_smooth(row, f2):
# implement smoothing here if needed
pace = row.df[' Stroke500mPace (sec/500m)'].values
velo = 500. / pace
f = row.df['TimeStamp (sec)'].diff().mean()
if f != 0 and not np.isnan(f):
windowsize = 2 * (int(10. / (f))) + 1
else: # pragma: no cover
windowsize = 1
if 'originalvelo' not in row.df:
row.df['originalvelo'] = velo
if windowsize > 3 and windowsize < len(velo):
velo2 = savgol_filter(velo, windowsize, 3)
else: # pragma: no cover
velo2 = velo
velo3 = pd.Series(velo2, dtype='float')
velo3 = velo3.replace([-np.inf, np.inf], np.nan)
velo3 = velo3.fillna(method='ffill')
pace2 = 500. / abs(velo3)
row.df[' Stroke500mPace (sec/500m)'] = pace2
row.df = row.df.fillna(0)
row.write_csv(f2, gzip=True)
try:
os.remove(f2)
except:
pass
return row
def update_workout_attributes(w, row, file_path, uploadoptions,
startdatetime='',
timezone='', forceunit='lbs'):
# calculate
startdatetime, startdate, starttime, timezone_str, partofday = get_startdate_time_zone(
w.user, row, startdatetime=startdatetime, timezone=timezone
)
boattype = uploadoptions.get('boattype', '1x')
workoutsource = 'unknown'
stravaid = uploadoptions.get('stravaid', 0)
rpe = uploadoptions.get('rpe', 0)
notes = uploadoptions.get('notes', '')
inboard = uploadoptions.get('inboard', 0.88)
oarlength = uploadoptions.get('oarlength', 2.89)
useImpeller = uploadoptions.get('useImpeller', False)
seatnumber = uploadoptions.get('seatNumber', 1)
boatname = uploadoptions.get('boatName','')
portStarboard = uploadoptions.get('portStarboard', 1)
empowerside = 'port'
if portStarboard == 1:
empowerside = 'starboard'
stravaid = uploadoptions.get('stravaid','')
if stravaid != 0:
workoutsource = 'strava'
workouttype = uploadoptions.get('workouttype', 'rower')
title = uploadoptions.get('title', '')
if title is None or title == '':
title = 'Workout'
if partofday is not None:
title = '{partofday} {workouttype}'.format(
partofday=partofday,
workouttype=workouttype,
)
averagehr = row.df[' HRCur (bpm)'].mean()
maxhr = row.df[' HRCur (bpm)'].max()
totaldist = uploadoptions.get('distance', 0)
if totaldist == 0:
totaldist = row.df['cum_dist'].max()
totaltime = uploadoptions.get('duration', 0)
if totaltime == 0:
totaltime = row.df['TimeStamp (sec)'].max() - row.df['TimeStamp (sec)'].min()
try:
totaltime = totaltime + row.df.loc[:, ' ElapsedTime (sec)'].iloc[0]
except KeyError:
pass
if np.isnan(totaltime):
totaltime = 0
if uploadoptions.get('summary', '') == '':
summary = row.allstats()
if uploadoptions.get('makeprivate', False):
privacy = 'hidden'
elif workoutsource != 'strava':
privacy = 'visible'
else:
privacy = 'hidden'
# checking for in values
totaldist = np.nan_to_num(totaldist)
maxhr = np.nan_to_num(maxhr)
averagehr = np.nan_to_num(averagehr)
dragfactor = 0
if workouttype in otetypes:
dragfactor = row.dragfactor
delta = datetime.timedelta(seconds=totaltime)
try:
workoutenddatetime = startdatetime+delta
except AttributeError as e:
workoutstartdatetime = pendulum.parse(str(startdatetime))
workoutenddatetime = startdatetime+delta
# check for duplicate start times and duration
duplicate = checkduplicates(
w.user, startdate, startdatetime, workoutenddatetime)
if duplicate:
rankingpiece = False
# test title length
if title is not None and len(title) > 140: # pragma: no cover
title = title[0:140]
timezone_str = str(startdatetime.tzinfo)
duration = totaltime_sec_to_string(totaltime)
# implement workout attribute updates here
w.name = title
w.date = startdate
w.workouttype = workouttype
w.boattype = boattype
w.dragfactor = dragfactor
w.duration = duration
w.distance = totaldist
w.weightcategory = w.user.weightcategory
w.adaptiveclass = w.user.adaptiveclass
w.starttime = starttime
w.duplicate = duplicate
w.workoutsource = workoutsource
w.rankingpiece = False
w.forceunit = forceunit
w.rpe = rpe
w.csvfilename = file_path
w.notes = notes
w.summary = summary
w.maxhr = maxhr
w.averagehr = averagehr
w.startdatetime = startdatetime
w.inboard = inboard
w.oarlength = oarlength
w.seatnumber = seatnumber
w.boatname = boatname
w.empowerside = empowerside
w.timezone = timezone_str
w.privacy = privacy
w.impeller = useImpeller
w.save()
return w
def send_upload_confirmation_email(rower, workout):
# implement email sending here
if rower.getemailnotifications and not rower.emailbounced: # pragma: no cover
link = settings.SITE_URL+reverse(
rower.defaultlandingpage,
kwargs={
'id': encoder.encode_hex(workout.id),
}
)
_ = send_confirm(rower.user, workout.name, link, '')
def update_running_wps(r, w, row):
# implement wps update here
if not w.duplicate and w.workouttype in otetypes:
cntr = Workout.objects.filter(user=r, workouttype__in=otetypes,
startdatetime__gt=tz.now()-tz.timedelta(days=42),
duplicate=False).count()
new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0)
# if new_value is not zero or infinite or -inf, r.running_wps can be set to value
if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0):
r.running_wps_erg = new_value
elif not (math.isnan(r.running_wps_erg) or math.isinf(r.running_wps_erg) or r.running_wps_erg == 0):
pass
else:
r.running_wps_erg = 600.
r.save()
if not w.duplicate and w.workouttype in otwtypes:
cntr = Workout.objects.filter(user=r, workouttype__in=otwtypes,
startdatetime__gt=tz.now()-tz.timedelta(days=42),
duplicate=False).count()
try:
new_value = (cntr*r.running_wps_erg + row.df['driveenergy'].mean())/(cntr+1.0)
except TypeError:
new_value = r.running_wps
if not (math.isnan(new_value) or math.isinf(new_value) or new_value == 0):
r.running_wps = new_value
elif not (math.isnan(r.running_wps) or math.isinf(r.running_wps) or r.running_wps == 0):
pass
else:
r.running_wps = 400.
r.save()
@app.task
def process_single_file(file_path, uploadoptions, job_id, debug=False, **kwargs):
# copy file to a unique name in media folder
f2 = file_path
try:
nn, ext = os.path.splitext(f2)
if ext == '.gz':
nn, ext2 = os.path.splitext(nn)
ext = ext2 + ext
f1 = uuid4().hex[:10]+'-'+strftime('%Y%m%d-%H%M%S')+ext
f2 = 'media/'+f1
copyfile(file_path, f2)
except FileNotFoundError:
return {
"status": "error",
"job_id": job_id,
"message": "File not found during processing."
}
# determine the user
r = get_rower_from_uploadoptions(uploadoptions)
if r is None:
os.remove(f2)
return {
"status": "error",
"job_id": job_id,
"message": "Rower not found for the provided upload options."
}
try:
fileformat = get_file_type(f2)
except Exception as e:
os.remove(f2)
return {
"status": "error",
"job_id": job_id,
"message": "Error determining file format: {error}".format(error=str(e))
}
# Get fileformat from fit & tcx
if fileformat == 'fit':
workouttype = get_workouttype_from_fit(f2)
uploadoptions['workouttype'] = workouttype
new_title = get_title_from_fit(f2)
if new_title:
uploadoptions['title'] = new_title
new_notes = get_notes_from_fit(f2)
if new_notes:
uploadoptions['notes'] = new_notes
# handle non-Painsled
if fileformat != 'csv':
f2, summary, oarlength, inboard, fileformat, impeller = handle_nonpainsled(
f2,
fileformat,
)
if not f2:
return {
"status": "error",
"job_id": job_id,
"message": "Error processing non-Painsled file."
}
# create raw row data object
powerperc = 100 * np.array([r.pw_ut2,
r.pw_ut1,
r.pw_at,
r.pw_tr, r.pw_an]) / r.ftp
rr = rrower(hrmax=r.max, hrut2=r.ut2,
hrut1=r.ut1, hrat=r.at,
hrtr=r.tr, hran=r.an, ftp=r.ftp,
powerperc=powerperc, powerzones=r.powerzones)
row = rdata(f2, rower=rr)
if row.df.empty:
os.remove(f2)
return {
"status": "error",
"job_id": job_id,
"message": "No valid data found in the uploaded file."
}
if row == 0:
os.remove(f2)
return {
"status": "error",
"job_id": job_id,
"message": "Error creating row data from the file."
}
# check and fix sample rate
row, f2 = check_and_fix_samplerate(row, f2)
# change rower type to water if GPS data is present
if is_water_rowing(row.df):
uploadoptions['workouttype'] = 'water'
# remove negative power peaks
row = remove_negative_power_peaks(row)
# optional auto smoothing
row = do_smooth(row, f2)
# recalculate power data
if uploadoptions['workouttype'] in otetypes:
try:
if r.erg_recalculatepower:
row.erg_recalculatepower()
row.write_csv(f2, gzip=True)
except Exception as e:
pass
workoutid = uploadoptions.get('workoutid', None)
if workoutid is not None:
try:
w = Workout.objects.get(id=workoutid)
except Workout.DoesNotExist:
w = Workout(user=r, duration='00:00:00')
w.save()
else:
w = Workout(user=r, duration='00:00:00')
w.save()
# set workout attributes from uploadoptions and calculated values
w = update_workout_attributes(w, row, f2, uploadoptions)
# add teams
if w.privacy == 'visible':
ts = Team.objects.filter(rower=r
)
for t in ts:
w.team.add(t)
# put stroke data in file store through "dataplep"
try:
row = rrdata_pl(df=pl.form_pandas(row.df))
except:
pass
_ = dataplep(row.df, id=w.id, bands=True,
barchart=True, otwpower=True, empower=True, inboard=w.inboard)
# send confirmation email
send_upload_confirmation_email(r, w)
# check for breakthroughs
isbreakthrough, ishard = checkbreakthrough(w, r)
_ = check_marker(w)
_ = update_wps(r, otwtypes)
_ = update_wps(r, otetypes)
# update running_wps
update_running_wps(r, w, row)
# calculate TRIMP
if w.workouttype in otwtypes:
wps_avg = r.median_wps
elif w.workouttype in otetypes:
wps_avg = r.median_wps_erg
else:
wps_avg = 0
_ = myqueue(queuehigh, handle_calctrimp, w.id, f2,
r.ftp, r.sex, r.hrftp, r.max, r.rest, wps_avg)
# make plots
if uploadoptions['make_plot']:
res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name)
elif r.staticchartonupload != 'None': # pragma: no cover
plottype = r.staticchartonupload
res, jobid = uploads.make_plot(r, w, f1, f2, plottype, w.name)
# sync workouts to connected services
uploads.do_sync(w, uploadoptions, quick=True)
return True