From 34c77c684bcb051da8e488c0a5cb673ef3f26e9e Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Thu, 14 Jul 2022 17:56:32 +0200 Subject: [PATCH] merged dataprep and dataroutines --- rowers/database.py | 4 + rowers/dataprep.py | 1955 ++++++++++++++++++++++++++++++++++++- rowers/dataroutines.py | 2080 ---------------------------------------- rowers/tasks.py | 3 +- 4 files changed, 1956 insertions(+), 2086 deletions(-) delete mode 100644 rowers/dataroutines.py diff --git a/rowers/database.py b/rowers/database.py index 7032795b..324965b7 100644 --- a/rowers/database.py +++ b/rowers/database.py @@ -16,3 +16,7 @@ database_url = 'mysql://{user}:{password}@{host}:{port}/{database_name}'.format( if settings.DEBUG or user == '': database_url = 'sqlite:///db.sqlite3' + +#database_name_dev = DEV_DATABASES['default']['NAME'] + +database_url_debug = database_url diff --git a/rowers/dataprep.py b/rowers/dataprep.py index 471b3e97..3fdfd95d 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -2,7 +2,8 @@ from rowers.metrics import axes, calc_trimp, rowingmetrics, dtypes, metricsgroup from rowers.utils import lbstoN, myqueue, wavg, dologging from rowers.mytypes import otwtypes, otetypes, rowtypes import glob -import rowingdata.tcxtools as tcxtools +from rowingdata import tcxtools + from rowers.utils import totaltime_sec_to_string from rowers.datautils import p0 from scipy import optimize @@ -10,13 +11,15 @@ from rowers.utils import calculate_age import datetime from scipy.signal import savgol_filter from rowers.opaque import encoder -from rowers.database import * +from rowers.database import database_url, database_url_debug from rowers import mytypes from rowsandall_app.settings import SITE_URL import django_rq from timezonefinder import TimezoneFinder -import rowers.datautils as datautils -import rowers.utils as utils +from rowers import datautils + +from rowers import utils + import sys import sqlalchemy as sa from sqlalchemy import create_engine @@ -75,11 +78,1953 @@ import yaml import shutil from shutil import copyfile +from rowingdata import make_cumvalues from rowingdata import ( get_file_type, get_empower_rigging, get_empower_firmware ) -from rowers.dataroutines import * +# All the data preparation, data cleaning and data mangling should +# be defined here + +from django.utils.timezone import get_current_timezone + + +thetimezone = get_current_timezone() + +allowedcolumns = [key for key, value in strokedatafields.items()] + + +# mapping the DB column names to the CSV file column names +columndict = { + 'time': 'TimeStamp (sec)', + 'hr': ' HRCur (bpm)', + 'velo': ' AverageBoatSpeed (m/s)', + 'pace': ' Stroke500mPace (sec/500m)', + 'spm': ' Cadence (stokes/min)', + 'power': ' Power (watts)', + 'averageforce': ' AverageDriveForce (lbs)', + 'drivelength': ' DriveLength (meters)', + 'peakforce': ' PeakDriveForce (lbs)', + 'distance': ' Horizontal (meters)', + 'catch': 'catch', + 'finish': 'finish', + 'peakforceangle': 'peakforceangle', + 'wash': 'wash', + 'slip': 'slip', + 'workoutstate': ' WorkoutState', + 'cumdist': 'cum_dist', +} + + +def get_video_data(w, groups=['basic'], mode='water'): + modes = [mode, 'both', 'basic'] + columns = ['time', 'velo', 'spm'] + columns += [name for name, d in rowingmetrics if d['group'] + in groups and d['mode'] in modes] + columns = list(set(columns)) + df = getsmallrowdata_db(columns, ids=[w.id], + workstrokesonly=False, doclean=False, compute=False) + + df['time'] = (df['time']-df['time'].min())/1000. + + df.sort_values(by='time', inplace=True) + + df.set_index(pd.to_timedelta(df['time'], unit='s'), inplace=True) + df2 = df.resample('1s').first().fillna(method='ffill') + df2['time'] = df2.index.total_seconds() + + if 'pace' in columns: + df2['pace'] = df2['pace']/1000. + p = df2['pace'] + p = p.apply(lambda x: timedeltaconv(x)) + p = nicepaceformat(p) + df2['pace'] = p + + df2['time'] = (df2['time']-df2['time'].min()) + + df2 = df2.round(decimals=2) + + boatspeed = (100*df2['velo']).astype(int)/100. + + try: + coordinates = get_latlon_time(w.id) + except KeyError: # pragma: no cover + nulseries = df['time']*0 + coordinates = pd.DataFrame({ + 'time': df['time'], + 'latitude': nulseries, + 'longitude': nulseries, + }) + + coordinates.set_index(pd.to_timedelta( + coordinates['time'], unit='s'), inplace=True) + coordinates = coordinates.resample('1s').mean().interpolate() + coordinates['time'] = coordinates['time']-coordinates['time'].min() + latitude = coordinates['latitude'] + longitude = coordinates['longitude'] + + # bundle data + data = { + 'boatspeed': boatspeed.values.tolist(), + 'latitude': latitude.values.tolist(), + 'longitude': longitude.values.tolist(), + } + + metrics = {} + + for c in columns: + if c != 'time': + try: + if dict(rowingmetrics)[c]['numtype'] == 'integer': # pragma: no cover + data[c] = df2[c].astype(int).tolist() + else: + sigfigs = dict(rowingmetrics)[c]['sigfigs'] + if c != 'pace': + da = ((10**sigfigs)*df2[c]).astype(int)/(10**sigfigs) + else: + da = df2[c] + data[c] = da.values.tolist() + metrics[c] = { + 'name': dict(rowingmetrics)[c]['verbose_name'], + 'metric': c, + 'unit': '' + } + except KeyError: # pragma: no cover + pass + + metrics['boatspeed'] = metrics.pop('velo') + # metrics['workperstroke'] = metrics.pop('driveenergy') + metrics = collections.OrderedDict(sorted(metrics.items())) + + maxtime = coordinates['time'].max() + + return data, metrics, maxtime + + +def polarization_index(df, rower): + df['dt'] = df['time'].diff()/6.e4 + # remove rest (spm<15) + df.dropna(axis=0, inplace=True) + df['dt'] = df['dt'].clip(upper=4, lower=0) + + masklow = (df['power'] > 0) & (df['power'] < int(rower.pw_at)) + maskmid = (df['power'] >= rower.pw_at) & (df['power'] < int(rower.pw_an)) + maskhigh = (df['power'] > rower.pw_an) + + time_low_pw = df.loc[masklow, 'dt'].sum() + time_mid_pw = df.loc[maskmid, 'dt'].sum() + time_high_pw = df.loc[maskhigh, 'dt'].sum() + + frac_low = time_low_pw/(time_low_pw+time_mid_pw+time_high_pw) + frac_mid = time_mid_pw/(time_low_pw+time_mid_pw+time_high_pw) + frac_high = time_high_pw/(time_low_pw+time_mid_pw+time_high_pw) + + index = math.log10(frac_high*100.*frac_low/frac_mid) + + return index + + +def get_latlon(id): + try: + w = Workout.objects.get(id=id) + except Workout.DoesNotExist: # pragma: no cover + return False + + rowdata = rdata(w.csvfilename) + + if rowdata.df.empty: # pragma: no cover + return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] + + try: + try: + latitude = rowdata.df.loc[:, ' latitude'] + longitude = rowdata.df.loc[:, ' longitude'] + except KeyError: + latitude = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] + longitude = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] + return [latitude, longitude] + except AttributeError: # pragma: no cover + return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] + + return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] # pragma: no cover + + +def get_latlon_time(id): + try: + w = Workout.objects.get(id=id) + except Workout.DoesNotExist: # pragma: no cover + return False + + rowdata = rdata(w.csvfilename) + + if rowdata.df.empty: # pragma: no cover + return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] + + try: + try: + _ = rowdata.df.loc[:, ' latitude'] + _ = rowdata.df.loc[:, ' longitude'] + except KeyError: # pragma: no cover + rowdata.df['latitude'] = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] + rowdata.df['longitude'] = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] + except AttributeError: # pragma: no cover + return pd.DataFrame() + + df = pd.DataFrame({ + 'time': rowdata.df['TimeStamp (sec)']-rowdata.df['TimeStamp (sec)'].min(), + 'latitude': rowdata.df[' latitude'], + 'longitude': rowdata.df[' longitude'] + }) + + return df + + +def workout_has_latlon(id): + latitude, longitude = get_latlon(id) + latmean = latitude.mean() + lonmean = longitude.mean() + + if latmean == 0 and lonmean == 0: + return False, latmean, lonmean + + if latitude.std() > 0 and longitude.std() > 0: + return True, latmean, lonmean + + return False, latmean, lonmean + + + +def get_workouts(ids, userid): # pragma: no cover + goodids = [] + for id in ids: + w = Workout.objects.get(id=id) + if int(w.user.user.id) == int(userid): + goodids.append(id) + + return [Workout.objects.get(id=id) for id in goodids] + + +def filter_df(datadf, fieldname, value, largerthan=True): + + try: + _ = datadf[fieldname] + except KeyError: + return datadf + + try: + if largerthan: + mask = datadf[fieldname] < value + else: + mask = datadf[fieldname] >= value + + datadf.loc[mask, fieldname] = np.nan + except TypeError: + pass + + return datadf + +# joins workouts + + + +def df_resample(datadf): + # time stamps must be in seconds + timestamps = datadf['TimeStamp (sec)'].astype('int') + datadf['timestamps'] = timestamps + newdf = datadf.groupby(['timestamps']).mean() + return newdf + + + +def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, + ignoreadvanced=False): + # clean data remove zeros and negative values + + try: + _ = datadf['workoutid'].unique() + except KeyError: + datadf['workoutid'] = 0 + + before = {} + for workoutid in datadf['workoutid'].unique(): + before[workoutid] = len(datadf[datadf['workoutid'] == workoutid]) + + data_orig = datadf.copy() + + # bring metrics which have negative values to positive domain + if len(datadf) == 0: + return datadf + try: + datadf['catch'] = -datadf['catch'] + except (KeyError, TypeError): + pass + + try: + datadf['peakforceangle'] = datadf['peakforceangle'] + 1000 + except (KeyError, TypeError): + pass + + try: + datadf['hr'] = datadf['hr'] + 10 + except (KeyError, TypeError): + pass + + # protect 0 spm values from being nulled + try: + datadf['spm'] = datadf['spm'] + 1.0 + except (KeyError, TypeError): + pass + + # protect 0 workoutstate values from being nulled + try: + datadf['workoutstate'] = datadf['workoutstate'] + 1 + except (KeyError, TypeError): + pass + + try: + datadf = datadf.clip(lower=0) + except TypeError: + pass + + # protect advanced metrics columns + advancedcols = [ + 'rhythm', + 'power', + 'drivelength', + 'forceratio', + 'drivespeed', + 'driveenergy', + 'catch', + 'finish', + 'averageforce', + 'peakforce', + 'slip', + 'wash', + 'peakforceangle', + 'effectiveangle', + ] + + datadf.replace(to_replace=0, value=np.nan, inplace=True) + # datadf = datadf.map_partitions(lambda df:df.replace(to_replace=0,value=np.nan)) + + # bring spm back to real values + try: + datadf['spm'] = datadf['spm'] - 1 + except (TypeError, KeyError): + pass + + # bring workoutstate back to real values + try: + datadf['workoutstate'] = datadf['workoutstate'] - 1 + except (TypeError, KeyError): + pass + + # return from positive domain to negative + try: + datadf['catch'] = -datadf['catch'] + except (KeyError, TypeError): + pass + + try: + datadf['peakforceangle'] = datadf['peakforceangle'] - 1000 + except (KeyError, TypeError): + pass + + try: + datadf['hr'] = datadf['hr'] - 10 + except (KeyError, TypeError): + pass + + # clean data for useful ranges per column + if not ignorehr: + try: + mask = datadf['hr'] < 30 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): # pragma: no cover + pass + + try: + mask = datadf['spm'] < 0 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['efficiency'] > 200. + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['spm'] < 10 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['pace'] / 1000. > 300. + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['efficiency'] < 0. + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['pace'] / 1000. < 60. + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['power'] > 5000 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['spm'] > 120 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['wash'] < 1 + datadf.loc[mask, 'wash'] = np.nan + except (KeyError, TypeError): + pass + + # try to guess ignoreadvanced + if not ignoreadvanced: + for metric in advancedcols: + try: + sum = datadf[metric].std() + if sum == 0 or np.isnan(sum): + ignoreadvanced = True + except KeyError: + pass + + if not ignoreadvanced: + try: + mask = datadf['rhythm'] < 0 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['rhythm'] > 70 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['power'] < 20 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['drivelength'] < 0.5 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['forceratio'] < 0.2 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['forceratio'] > 1.0 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['drivespeed'] < 0.5 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['drivespeed'] > 4 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['driveenergy'] > 2000 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['driveenergy'] < 100 + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + try: + mask = datadf['catch'] > -30. + datadf.mask(mask, inplace=True) + except (KeyError, TypeError): + pass + + # workoutstateswork = [1, 4, 5, 8, 9, 6, 7] + workoutstatesrest = [3] + # workoutstatetransition = [0, 2, 10, 11, 12, 13] + + if workstrokesonly == 'True' or workstrokesonly is True: + try: + datadf = datadf[~datadf['workoutstate'].isin(workoutstatesrest)] + except: + pass + + after = {} + for workoutid in data_orig['workoutid'].unique(): + after[workoutid] = len( + datadf[datadf['workoutid'] == workoutid].dropna()) + ratio = float(after[workoutid])/float(before[workoutid]) + if ratio < 0.01 or after[workoutid] < 2: + return data_orig + + return datadf + + +def getpartofday(row, r): + workoutstartdatetime = row.rowdatetime + try: # pragma: no cover + latavg = row.df[' latitude'].mean() + lonavg = row.df[' longitude'].mean() + + tf = TimezoneFinder() + try: + timezone_str = tf.timezone_at(lng=lonavg, lat=latavg) + except (ValueError, OverflowError): # pragma: no cover + timezone_str = 'UTC' + if timezone_str is None: # pragma: no cover + timezone_str = tf.closest_timezone_at(lng=lonavg, + lat=latavg) + if timezone_str is None: + timezone_str = r.defaulttimezone + try: + workoutstartdatetime = pytz.timezone(timezone_str).localize( + row.rowdatetime + ) + except ValueError: + workoutstartdatetime = row.rowdatetime + except KeyError: + timezone_str = r.defaulttimezone + workoutstartdatetime = row.rowdatetime + + h = workoutstartdatetime.astimezone(pytz.timezone(timezone_str)).hour + + if h < 12: # pragma: no cover + return "Morning" + elif h < 18: # pragma: no cover + return "Afternoon" + elif h < 22: # pragma: no cover + return "Evening" + else: # pragma: no cover + return "Night" + + return None # pragma: no cover + + +def getstatsfields(): + fielddict = {name: d['verbose_name'] for name, d in rowingmetrics} + +# fielddict.pop('ergpace') +# fielddict.pop('hr_an') +# fielddict.pop('hr_tr') +# fielddict.pop('hr_at') +# fielddict.pop('hr_ut2') +# fielddict.pop('hr_ut1') + fielddict.pop('time') + fielddict.pop('distance') +# fielddict.pop('nowindpace') +# fielddict.pop('fnowindpace') +# fielddict.pop('fergpace') +# fielddict.pop('equivergpower') +# fielddict.pop('workoutstate') +# fielddict.pop('fpace') +# fielddict.pop('pace') +# fielddict.pop('id') +# fielddict.pop('ftime') +# fielddict.pop('x_right') +# fielddict.pop('hr_max') +# fielddict.pop('hr_bottom') + fielddict.pop('cumdist') + + try: + fieldlist = [field for field, value in fielddict.iteritems()] + except AttributeError: + fieldlist = [field for field, value in fielddict.items()] + + return fieldlist, fielddict + + +# A string representation for time deltas +def niceformat(values): + out = [] + for v in values: + formattedv = strfdelta(v) + out.append(formattedv) + + return out + +# A nice printable format for time delta values + + +def strfdelta(tdelta): + try: + minutes, seconds = divmod(tdelta.seconds, 60) + tenths = int(tdelta.microseconds / 1e5) + except AttributeError: # pragma: no cover + minutes, seconds = divmod(tdelta.view(np.int64), 60e9) + seconds, rest = divmod(seconds, 1e9) + tenths = int(rest / 1e8) + res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format( + minutes=minutes, + seconds=seconds, + tenths=tenths, + ) + + return res + + +def timedelta_to_seconds(tdelta): # pragma: no cover + return 60.*tdelta.minute+tdelta.second + + +# A nice printable format for pace values + + +def nicepaceformat(values): + out = [] + for v in values: + formattedv = strfdelta(v) + out.append(formattedv) + + return out + +# Convert seconds to a Time Delta value, replacing NaN with a 5:50 pace + + +def timedeltaconv(x): + if np.isfinite(x) and x != 0 and x > 0 and x < 175000: + dt = datetime.timedelta(seconds=x) + else: + dt = datetime.timedelta(seconds=350.) + + return dt + + +def paceformatsecs(values): + out = [] + for v in values: + td = timedeltaconv(v) + formattedv = strfdelta(td) + out.append(formattedv) + + return out + + +def update_c2id_sql(id, c2id): + workout = Workout.objects.get(id=id) + workout.uploadedtoc2 = c2id + workout.save() + + return 1 + + +def getcpdata_sql(rower_id, table='cpdata'): + engine = create_engine(database_url, echo=False) + query = sa.text('SELECT * from {table} WHERE user={rower_id};'.format( + rower_id=rower_id, + table=table, + )) + + _ = engine.raw_connection() + df = pd.read_sql_query(query, engine) + + return df + + +def deletecpdata_sql(rower_id, table='cpdata'): # pragma: no cover + engine = create_engine(database_url, echo=False) + query = sa.text('DELETE from {table} WHERE user={rower_id};'.format( + rower_id=rower_id, + table=table, + )) + with engine.connect() as conn, conn.begin(): + try: + _ = conn.execute(query) + except Exception as e: + print(Exception, e) + print("Database locked") + conn.close() + engine.dispose() + + +def updatecpdata_sql(rower_id, delta, cp, table='cpdata', distance=pd.Series([], dtype='float'), + debug=False): # pragma: no cover + deletecpdata_sql(rower_id) + df = pd.DataFrame( + { + 'delta': delta, + 'cp': cp, + 'user': rower_id + } + ) + + if not distance.empty: + df['distance'] = distance + + engine = create_engine(database_url, echo=False) + with engine.connect() as conn, conn.begin(): + df.to_sql(table, engine, if_exists='append', index=False) + conn.close() + engine.dispose() + + + +def get_workoutsummaries(userid, startdate): # pragma: no cover + u = User.objects.get(id=userid) + r = u.rower + df = workout_summary_to_df(r, startdate=startdate) + df.drop(['Stroke Data TCX', 'Stroke Data CSV'], axis=1, inplace=True) + df = df.sort_values('date', ascending=False) + + return df + + + + + + + + +def checkduplicates(r, workoutdate, workoutstartdatetime, workoutenddatetime): + duplicate = False + ws = Workout.objects.filter(user=r, date=workoutdate, duplicate=False).exclude( + startdatetime__gt=workoutenddatetime + ) + + ws2 = [] + + for ww in ws: + t = ww.duration + delta = datetime.timedelta( + hours=t.hour, minutes=t.minute, seconds=t.second) + enddatetime = ww.startdatetime+delta + if enddatetime > workoutstartdatetime: + ws2.append(ww) + + if len(ws2) != 0: + duplicate = True + return duplicate + + return duplicate + + + +parsers = { + 'kinomap': KinoMapParser, + 'xls': ExcelTemplate, + 'rp': RowProParser, + 'tcx': TCXParser, + 'mystery': MysteryParser, + 'ritmotime': RitmoTimeParser, + 'quiske': QuiskeParser, + 'rowperfect3': RowPerfectParser, + 'coxmate': CoxMateParser, + 'bcmike': BoatCoachAdvancedParser, + 'boatcoach': BoatCoachParser, + 'boatcoachotw': BoatCoachOTWParser, + 'painsleddesktop': painsledDesktopParser, + 'speedcoach': speedcoachParser, + 'speedcoach2': SpeedCoach2Parser, + 'ergstick': ErgStickParser, + 'fit': FITParser, + 'ergdata': ErgDataParser, + 'humon': HumonParser, + 'eth': ETHParser, + 'nklinklogbook': NKLiNKLogbookParser, + 'hero': HeroParser, + 'smartrow': SmartRowParser, +} + + +def get_startdate_time_zone(r, row, startdatetime=None): + if startdatetime is not None and startdatetime != '': + try: + timezone_str = pendulum.instance(startdatetime).timezone.name + except ValueError: # pragma: no cover + timezone_str = 'Ect/GMT' + elif startdatetime == '': + startdatetime = row.rowdatetime + else: + startdatetime = row.rowdatetime + + try: + _ = startdatetime.tzinfo + except AttributeError: # pragma: no cover + startdatetime = row.rowdatetime + + partofday = getpartofday(row, r) + + if startdatetime.tzinfo is None or str(startdatetime.tzinfo) in ['tzutc()', 'Ect/GMT']: + timezone_str = 'UTC' + try: + startdatetime = timezone.make_aware(startdatetime) + except ValueError: # pragma: no cover + pass + + try: + latavg = row.df[' latitude'].mean() + lonavg = row.df[' longitude'].mean() + + tf = TimezoneFinder() + if row.df[' latitude'].std() != 0: + try: + timezone_str = tf.timezone_at(lng=lonavg, lat=latavg) + except (ValueError, OverflowError): # pragma: no cover + timezone_str = 'UTC' + if timezone_str is None: # pragma: no cover + timezone_str = tf.closest_timezone_at(lng=lonavg, + lat=latavg) + if timezone_str is None: # pragma: no cover + timezone_str = r.defaulttimezone + else: + timezone_str = r.defaulttimezone + try: + startdatetime = pytz.timezone(timezone_str).localize( + row.rowdatetime + ) + except ValueError: # pragma: no cover + startdatetime = startdatetime.astimezone( + pytz.timezone(timezone_str) + ) + except KeyError: # pragma: no cover + timezone_str = r.defaulttimezone + else: + timezone_str = str(startdatetime.tzinfo) + + startdatetime = startdatetime.astimezone(pytz.timezone(timezone_str)) + + startdate = startdatetime.strftime('%Y-%m-%d') + starttime = startdatetime.strftime('%H:%M:%S') + + if timezone_str == 'tzutc()': + timezone_str = 'UTC' # pragma: no cover + + return startdatetime, startdate, starttime, timezone_str, partofday + + +def parsenonpainsled(fileformat, f2, summary, startdatetime='', empowerfirmware=None, inboard=None, oarlength=None): + try: + if fileformat == 'nklinklogbook' and empowerfirmware is not None: # pragma: no cover + if inboard is not None and oarlength is not None: + row = NKLiNKLogbookParser( + f2, firmware=empowerfirmware, inboard=inboard, oarlength=oarlength) + else: + row = NKLiNKLogbookParser(f2) + else: + row = parsers[fileformat](f2) + if startdatetime != '': # pragma: no cover + row.rowdatetime = arrow.get(startdatetime).datetime + hasrecognized = True + except (KeyError, IndexError, ValueError): # pragma: no cover + hasrecognized = False + return None, hasrecognized, '', 'unknown' + + s = 'Parsenonpainsled, start date time = {startdatetime}'.format( + startdatetime=startdatetime, + ) + dologging('debuglog.log', s) + + # handle speed coach GPS 2 + if (fileformat == 'speedcoach2'): + oarlength, inboard = get_empower_rigging(f2) + empowerfirmware = get_empower_firmware(f2) + if empowerfirmware != '': + fileformat = fileformat+'v'+str(empowerfirmware) + else: # pragma: no cover + fileformat = 'speedcoach2v0' + try: + summary = row.allstats() + except ZeroDivisionError: # pragma: no cover + summary = '' + else: + fileformat = fileformat+'v'+str(empowerfirmware) + + # handle FIT + if (fileformat == 'fit'): # pragma: no cover + try: + s = fitsummarydata(f2) + s.setsummary() + summary = s.summarytext + except: + pass + hasrecognized = True + + return row, hasrecognized, summary, fileformat + + +def handle_nonpainsled(f2, fileformat, summary='', startdatetime='', empowerfirmware=None, impeller=False): + oarlength = 2.89 + inboard = 0.88 + hasrecognized = False + + row, hasrecognized, summary, fileformat = parsenonpainsled(fileformat, f2, summary, startdatetime=startdatetime, + empowerfirmware=empowerfirmware) + + # Handle c2log + if (fileformat == 'c2log' or fileformat == 'rowprolog'): # pragma: no cover + return (0, '', 0, 0, '', impeller) + + if not hasrecognized: # pragma: no cover + return (0, '', 0, 0, '', impeller) + + f_to_be_deleted = f2 + # should delete file + f2 = f2[:-4] + 'o.csv' + + row2 = rrdata(df=row.df) + + if 'speedcoach2' in fileformat or 'nklinklogbook' in fileformat: + # impeller consistency + impellerdata, consistent, ratio = row.impellerconsistent(threshold=0.3) + + if impellerdata and consistent: + impeller = True + if impellerdata and not consistent: + row2.use_gpsdata() + if impeller: + row2.use_impellerdata() + + row2.write_csv(f2, gzip=True) + + # os.remove(f2) + try: + os.remove(f_to_be_deleted) + except: # pragma: no cover + try: + os.remove(f_to_be_deleted + '.gz') + except: + pass + + return (f2, summary, oarlength, inboard, fileformat, impeller) + +# Create new workout from file and store it in the database +# This routine should be used everywhere in views.py + + +def get_workouttype_from_fit(filename, workouttype='water'): + try: + fitfile = FitFile(filename, check_crc=False) + except FitHeaderError: # pragma: no cover + return workouttype + + records = fitfile.messages + fittype = 'rowing' + for record in records: + if record.name in ['sport', 'lap']: + try: + fittype = record.get_values()['sport'].lower() + except (KeyError, AttributeError): # pragma: no cover + return 'water' + try: + workouttype = mytypes.fitmappinginv[fittype] + except KeyError: # pragma: no cover + return workouttype + + return workouttype + + +def get_workouttype_from_tcx(filename, workouttype='water'): + tcxtype = 'rowing' + if workouttype in mytypes.otwtypes: + return workouttype + try: # pragma: no cover + d = tcxtools.tcx_getdict(filename) + try: + tcxtype = d['Activities']['Activity']['@Sport'].lower() + if tcxtype == 'other': + tcxtype = 'rowing' + except KeyError: + return workouttype + + except TypeError: # pragma: no cover + pass + + try: # pragma: no cover + workouttype = mytypes.garminmappinginv[tcxtype.upper()] + except KeyError: # pragma: no cover + return workouttype + + return workouttype # pragma: no cover + + + + +# Create new workout from data frame and store it in the database +# This routine should be used everywhere in views.py and mailprocessing.py +# Currently there is code duplication + + +# A wrapper around the rowingdata class, with some error catching + + +def rdata(file, rower=rrower()): + try: + res = rrdata(csvfile=file, rower=rower) + except (IOError, IndexError): # pragma: no cover + try: + res = rrdata(csvfile=file + '.gz', rower=rower) + except (IOError, IndexError): + res = rrdata() + except: + res = rrdata() + except EOFError: # pragma: no cover + res = rrdata() + except: # pragma: no cover + res = rrdata() + + return res + +# Remove all stroke data for workout ID from database + + +def delete_strokedata(id, debug=False): + dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id) + try: + shutil.rmtree(dirname) + except OSError: + try: + os.remove(dirname) + except FileNotFoundError: + pass + except FileNotFoundError: # pragma: no cover + pass + +# Replace stroke data in DB with data from CSV file + + +def update_strokedata(id, df, debug=False): + delete_strokedata(id, debug=debug) + _ = dataprep(df, id=id, bands=True, barchart=True, otwpower=True) + +# Test that all data are of a numerical time + + +def testdata(time, distance, pace, spm): # pragma: no cover + t1 = np.issubdtype(time, np.number) + t2 = np.issubdtype(distance, np.number) + t3 = np.issubdtype(pace, np.number) + t4 = np.issubdtype(spm, np.number) + + return t1 and t2 and t3 and t4 + +# Get data from DB for one workout (fetches all data). If data +# is not in DB, read from CSV file (and create DB entry) + + +def getrowdata_db(id=0, doclean=False, convertnewtons=True, + checkefficiency=True): + data = read_df_sql(id) + try: + data['deltat'] = data['time'].diff() + except KeyError: # pragma: no cover + data = pd.DataFrame() + + if data.empty: + rowdata, row = getrowdata(id=id) + if not rowdata.empty: # pragma: no cover + data = dataprep(rowdata.df, id=id, bands=True, + barchart=True, otwpower=True) + else: + data = pd.DataFrame() # returning empty dataframe + else: + row = Workout.objects.get(id=id) + + if checkefficiency is True and not data.empty: + try: + if data['efficiency'].mean() == 0 and data['power'].mean() != 0: # pragma: no cover + data = add_efficiency(id=id) + except KeyError: # pragma: no cover + data = add_efficiency(id=id) + + if doclean: # pragma: no cover + data = clean_df_stats(data, ignorehr=True) + + return data, row + +# Fetch a subset of the data from the DB + + +def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True, compute=True, + debug=False): + # prepmultipledata(ids) + + if ids: + csvfilenames = [ + 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + else: + return pd.DataFrame() + + data = [] + columns = [c for c in columns if c != 'None'] + columns = list(set(columns)) + + if len(ids) > 1: + for id, f in zip(ids, csvfilenames): + try: + df = pd.read_parquet(f, columns=columns) + data.append(df) + except (OSError, ArrowInvalid, IndexError): # pragma: no cover + rowdata, row = getrowdata(id=id) + if rowdata and len(rowdata.df): + _ = dataprep(rowdata.df, id=id, + bands=True, otwpower=True, barchart=True) + df = pd.read_parquet(f, columns=columns) + data.append(df) + + try: + df = pd.concat(data, axis=0) + except ValueError: # pragma: no cover + return pd.DataFrame() + # df = dd.concat(data,axis=0) + + else: + try: + df = pd.read_parquet(csvfilenames[0], columns=columns) + rowdata, row = getrowdata(id=ids[0]) + except (OSError, ArrowInvalid, IndexError): + rowdata, row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): # pragma: no cover + data = dataprep( + rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True) + df = pd.read_parquet(csvfilenames[0], columns=columns) + # df = dd.read_parquet(csvfilenames[0], + # column=columns,engine='pyarrow', + # ) + + # df = df.loc[:,~df.columns.duplicated()] + else: + df = pd.DataFrame() + + if compute and len(df): + data = df.copy() + if doclean: + data = clean_df_stats(data, ignorehr=True, + workstrokesonly=workstrokesonly) + data.dropna(axis=1, how='all', inplace=True) + data.dropna(axis=0, how='any', inplace=True) + return data + + return df + +# Fetch both the workout and the workout stroke data (from CSV file) + + +def getrowdata(id=0): + + # check if valid ID exists (workout exists) + try: + row = Workout.objects.get(id=id) + except Workout.DoesNotExist: # pragma: no cover + return rrdata(), None + + f1 = row.csvfilename + + # get user + + r = row.user + + rr = rrower(hrmax=r.max, hrut2=r.ut2, + hrut1=r.ut1, hrat=r.at, + hrtr=r.tr, hran=r.an, ftp=r.ftp) + + rowdata = rdata(f1, rower=rr) + + return rowdata, row + +# Checks if all rows for a list of workout IDs have entries in the +# stroke_data table. If this is not the case, it creates the stroke +# data +# In theory, this should never yield any work, but it's a good +# safety net for programming errors elsewhere in the app +# Also used heavily when I moved from CSV file only to CSV+Stroke data + + +def prepmultipledata(ids, verbose=False): # pragma: no cover + filenames = glob.glob('media/*.parquet') + ids = [ + id for id in ids if 'media/strokedata_{id}.parquet.gz'.format(id=id) not in filenames] + + for id in ids: + rowdata, row = getrowdata(id=id) + if verbose: + print(id) + if rowdata and len(rowdata.df): + _ = dataprep(rowdata.df, id=id, bands=True, + barchart=True, otwpower=True) + return ids + +# Read a set of columns for a set of workout ids, returns data as a +# pandas dataframe + + +def read_cols_df_sql(ids, columns, convertnewtons=True): + # drop columns that are not in offical list + # axx = [ax[0] for ax in axes] + + extracols = [] + + columns = list(columns) + ['distance', 'spm', 'workoutid'] + columns = [x for x in columns if x != 'None'] + columns = list(set(columns)) + ids = [int(id) for id in ids] + + df = pd.DataFrame() + + if len(ids) == 0: # pragma: no cover + return pd.DataFrame(), extracols + elif len(ids) == 1: # pragma: no cover + try: + filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0]) + df = pd.read_parquet(filename, columns=columns) + except OSError: + rowdata, row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): + _ = dataprep(rowdata.df, + id=ids[0], bands=True, otwpower=True, barchart=True) + df = pd.read_parquet(filename, columns=columns) + else: + data = [] + filenames = [ + 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + for id, f in zip(ids, filenames): + try: + df = pd.read_parquet(f, columns=columns) + data.append(df) + except (OSError, IndexError, ArrowInvalid): + rowdata, row = getrowdata(id=id) + if rowdata and len(rowdata.df): # pragma: no cover + _ = dataprep(rowdata.df, id=id, + bands=True, otwpower=True, barchart=True) + df = pd.read_parquet(f, columns=columns) + data.append(df) + + try: + df = pd.concat(data, axis=0) + except ValueError: # pragma: no cover + return pd.DataFrame(), extracols + + df = df.fillna(value=0) + + if 'peakforce' in columns: + funits = ((w.id, w.forceunit) + for w in Workout.objects.filter(id__in=ids)) + for id, u in funits: + if u == 'lbs': + mask = df['workoutid'] == id + df.loc[mask, 'peakforce'] = df.loc[mask, 'peakforce'] * lbstoN + if 'averageforce' in columns: + funits = ((w.id, w.forceunit) + for w in Workout.objects.filter(id__in=ids)) + for id, u in funits: + if u == 'lbs': + mask = df['workoutid'] == id + df.loc[mask, 'averageforce'] = df.loc[mask, + 'averageforce'] * lbstoN + + return df, extracols + + + +# Read stroke data from the DB for a Workout ID. Returns a pandas dataframe + + +def read_df_sql(id): + try: + f = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = pd.read_parquet(f) + except (OSError, ArrowInvalid, IndexError): # pragma: no cover + rowdata, row = getrowdata(id=id) + if rowdata and len(rowdata.df): + data = dataprep(rowdata.df, id=id, bands=True, + otwpower=True, barchart=True) + try: + df = pd.read_parquet(f) + except OSError: + df = data + else: + df = pd.DataFrame() + + df = df.fillna(value=0) + + return df + + +# data fusion + + +def datafusion(id1, id2, columns, offset): + df1, w1 = getrowdata_db(id=id1) + df1 = df1.drop([ # 'cumdist', + 'hr_ut2', + 'hr_ut1', + 'hr_at', + 'hr_tr', + 'hr_an', + 'hr_max', + 'ftime', + 'fpace', + 'workoutid', + 'id'], + 1, errors='ignore') + + # Add coordinates to DataFrame + latitude, longitude = get_latlon(id1) + + df1[' latitude'] = latitude + df1[' longitude'] = longitude + + df2 = getsmallrowdata_db(['time'] + columns, ids=[id2], doclean=False) + + forceunit = 'N' + + offsetmillisecs = offset.seconds * 1000 + offset.microseconds / 1000. + offsetmillisecs += offset.days * (3600 * 24 * 1000) + df2['time'] = df2['time'] + offsetmillisecs + + keep1 = {c: c for c in set(df1.columns)} + + for c in columns: + keep1.pop(c) + + for c in df1.columns: + if c not in keep1: + df1 = df1.drop(c, 1, errors='ignore') + + df = pd.concat([df1, df2], ignore_index=True) + df = df.sort_values(['time']) + df = df.interpolate(method='linear', axis=0, limit_direction='both', + limit=10) + df.fillna(method='bfill', inplace=True) + + # Some new stuff to try out + df = df.groupby('time', axis=0).mean() + df['time'] = df.index + df.reset_index(drop=True, inplace=True) + + df['time'] = df['time'] / 1000. + df['pace'] = df['pace'] / 1000. + df['cum_dist'] = df['cumdist'] + + return df, forceunit + + +def fix_newtons(id=0, limit=3000): # pragma: no cover + # rowdata,row = getrowdata_db(id=id,doclean=False,convertnewtons=False) + rowdata = getsmallrowdata_db(['peakforce'], ids=[id], doclean=False) + try: + peakforce = rowdata['peakforce'] + if peakforce.mean() > limit: + w = Workout.objects.get(id=id) + + rowdata = rdata(w.csvfilename) + if rowdata and len(rowdata.df): + update_strokedata(w.id, rowdata.df) + except KeyError: + pass + + +def remove_invalid_columns(df): # pragma: no cover + for c in df.columns: + if c not in allowedcolumns: + df.drop(labels=c, axis=1, inplace=True) + + return df + + +def add_efficiency(id=0): # pragma: no cover + rowdata, row = getrowdata_db(id=id, + doclean=False, + convertnewtons=False, + checkefficiency=False) + power = rowdata['power'] + pace = rowdata['pace'] / 1.0e3 + velo = 500. / pace + ergpw = 2.8 * velo**3 + efficiency = 100. * ergpw / power + + efficiency = efficiency.replace([-np.inf, np.inf], np.nan) + efficiency.fillna(method='ffill') + rowdata['efficiency'] = efficiency + + rowdata = remove_invalid_columns(rowdata) + rowdata = rowdata.replace([-np.inf, np.inf], np.nan) + rowdata = rowdata.fillna(method='ffill') + + delete_strokedata(id) + + if id != 0: + rowdata['workoutid'] = id + filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = dd.from_pandas(rowdata, npartitions=1) + df.to_parquet(filename, engine='fastparquet', compression='GZIP') + + return rowdata + +# This is the main routine. +# it reindexes, sorts, filters, and smooths the data, then +# saves it to the stroke_data table in the database +# Takes a rowingdata object's DataFrame as input + + +def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, + empower=True, inboard=0.88, forceunit='lbs', debug=False): + + if rowdatadf.empty: + return 0 + + t = rowdatadf.loc[:, 'TimeStamp (sec)'] + t = pd.Series(t - rowdatadf.loc[:, 'TimeStamp (sec)'].iloc[0]) + + row_index = rowdatadf.loc[:, ' Stroke500mPace (sec/500m)'] > 3000 + rowdatadf.loc[row_index, ' Stroke500mPace (sec/500m)'] = 3000. + + p = rowdatadf.loc[:, ' Stroke500mPace (sec/500m)'] + try: + velo = rowdatadf.loc[:, ' AverageBoatSpeed (m/s)'] + except KeyError: # pragma: no cover + velo = 500./p + + hr = rowdatadf.loc[:, ' HRCur (bpm)'] + spm = rowdatadf.loc[:, ' Cadence (stokes/min)'] + cumdist = rowdatadf.loc[:, 'cum_dist'] + power = rowdatadf.loc[:, ' Power (watts)'] + averageforce = rowdatadf.loc[:, ' AverageDriveForce (lbs)'] + drivelength = rowdatadf.loc[:, ' DriveLength (meters)'] + try: + workoutstate = rowdatadf.loc[:, ' WorkoutState'] + except KeyError: # pragma: no cover + workoutstate = 0 * hr + + peakforce = rowdatadf.loc[:, ' PeakDriveForce (lbs)'] + + forceratio = averageforce / peakforce + forceratio = forceratio.fillna(value=0) + + try: + drivetime = rowdatadf.loc[:, ' DriveTime (ms)'] + recoverytime = rowdatadf.loc[:, ' StrokeRecoveryTime (ms)'] + rhythm = 100. * drivetime / (recoverytime + drivetime) + rhythm = rhythm.fillna(value=0) + except: # pragma: no cover + rhythm = 0.0 * forceratio + + f = rowdatadf['TimeStamp (sec)'].diff().mean() + if f != 0 and not np.isinf(f): + try: + windowsize = 2 * (int(10. / (f))) + 1 + except ValueError: # pragma: no cover + windowsize = 1 + else: + windowsize = 1 + if windowsize <= 3: + windowsize = 5 + + if windowsize > 3 and windowsize < len(hr): + spm = savgol_filter(spm, windowsize, 3) + hr = savgol_filter(hr, windowsize, 3) + drivelength = savgol_filter(drivelength, windowsize, 3) + forceratio = savgol_filter(forceratio, windowsize, 3) + + try: + t2 = t.fillna(method='ffill').apply(lambda x: timedeltaconv(x)) + except TypeError: # pragma: no cover + t2 = 0 * t + + p2 = p.fillna(method='ffill').apply(lambda x: timedeltaconv(x)) + + try: + drivespeed = drivelength / rowdatadf[' DriveTime (ms)'] * 1.0e3 + except TypeError: # pragma: no cover + drivespeed = 0.0 * rowdatadf['TimeStamp (sec)'] + + drivespeed = drivespeed.fillna(value=0) + + try: + driveenergy = rowdatadf['driveenergy'] + except KeyError: # pragma: no cover + if forceunit == 'lbs': + driveenergy = drivelength * averageforce * lbstoN + else: + driveenergy = drivelength * averageforce + + if forceunit == 'lbs': + averageforce *= lbstoN + peakforce *= lbstoN + + powerhr = 60.*power/hr + powerhr = powerhr.fillna(value=0) + + if driveenergy.mean() == 0 and driveenergy.std() == 0: + driveenergy = 0*driveenergy+100 + + distance = rowdatadf.loc[:, 'cum_dist'] + velo = 500. / p + + distanceperstroke = 60. * velo / spm + + data = DataFrame( + dict( + time=t * 1e3, + hr=hr, + pace=p * 1e3, + spm=spm, + velo=velo, + cumdist=cumdist, + ftime=niceformat(t2), + fpace=nicepaceformat(p2), + driveenergy=driveenergy, + power=power, + workoutstate=workoutstate, + averageforce=averageforce, + drivelength=drivelength, + peakforce=peakforce, + forceratio=forceratio, + distance=distance, + drivespeed=drivespeed, + rhythm=rhythm, + distanceperstroke=distanceperstroke, + # powerhr=powerhr, + ) + ) + + if bands: + # HR bands + data['hr_ut2'] = rowdatadf.loc[:, 'hr_ut2'] + data['hr_ut1'] = rowdatadf.loc[:, 'hr_ut1'] + data['hr_at'] = rowdatadf.loc[:, 'hr_at'] + data['hr_tr'] = rowdatadf.loc[:, 'hr_tr'] + data['hr_an'] = rowdatadf.loc[:, 'hr_an'] + data['hr_max'] = rowdatadf.loc[:, 'hr_max'] + data['hr_bottom'] = 0.0 * data['hr'] + + try: + _ = rowdatadf.loc[:, ' ElapsedTime (sec)'] + except KeyError: # pragma: no cover + rowdatadf[' ElapsedTime (sec)'] = rowdatadf['TimeStamp (sec)'] + + if empower: + try: + wash = rowdatadf.loc[:, 'wash'] + except KeyError: + wash = 0 * power + + try: + catch = rowdatadf.loc[:, 'catch'] + except KeyError: + catch = 0 * power + + try: + finish = rowdatadf.loc[:, 'finish'] + except KeyError: + finish = 0 * power + + try: + peakforceangle = rowdatadf.loc[:, 'peakforceangle'] + except KeyError: + peakforceangle = 0 * power + + if data['driveenergy'].mean() == 0: # pragma: no cover + try: + driveenergy = rowdatadf.loc[:, 'driveenergy'] + except KeyError: + driveenergy = power * 60 / spm + else: + driveenergy = data['driveenergy'] + + arclength = (inboard - 0.05) * (np.radians(finish) - np.radians(catch)) + if arclength.mean() > 0: + drivelength = arclength + elif drivelength.mean() == 0: + drivelength = driveenergy / (averageforce * 4.44822) + + try: + slip = rowdatadf.loc[:, 'slip'] + except KeyError: + slip = 0 * power + + try: + totalangle = finish - catch + effectiveangle = finish - wash - catch - slip + except ValueError: # pragma: no cover + totalangle = 0 * power + effectiveangle = 0 * power + + if windowsize > 3 and windowsize < len(slip): + try: + wash = savgol_filter(wash, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + slip = savgol_filter(slip, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + catch = savgol_filter(catch, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + finish = savgol_filter(finish, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + peakforceangle = savgol_filter(peakforceangle, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + driveenergy = savgol_filter(driveenergy, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + drivelength = savgol_filter(drivelength, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + totalangle = savgol_filter(totalangle, windowsize, 3) + except TypeError: # pragma: no cover + pass + try: + effectiveangle = savgol_filter(effectiveangle, windowsize, 3) + except TypeError: # pragma: no cover + pass + + velo = 500. / p + + ergpw = 2.8 * velo**3 + efficiency = 100. * ergpw / power + + efficiency = efficiency.replace([-np.inf, np.inf], np.nan) + efficiency.fillna(method='ffill') + + try: + data['wash'] = wash + data['catch'] = catch + data['slip'] = slip + data['finish'] = finish + data['peakforceangle'] = peakforceangle + data['driveenergy'] = driveenergy + data['drivelength'] = drivelength + data['totalangle'] = totalangle + data['effectiveangle'] = effectiveangle + data['efficiency'] = efficiency + except ValueError: # pragma: no cover + pass + + if otwpower: + try: + nowindpace = rowdatadf.loc[:, 'nowindpace'] + except KeyError: + nowindpace = p + try: + equivergpower = rowdatadf.loc[:, 'equivergpower'] + except KeyError: + equivergpower = 0 * p + 50. + + nowindpace2 = nowindpace.apply(lambda x: timedeltaconv(x)) + ergvelo = (equivergpower / 2.8)**(1. / 3.) + + ergpace = 500. / ergvelo + ergpace[ergpace == np.inf] = 240. + ergpace2 = ergpace.apply(lambda x: timedeltaconv(x)) + + data['ergpace'] = ergpace * 1e3 + data['nowindpace'] = nowindpace * 1e3 + data['equivergpower'] = equivergpower + data['fergpace'] = nicepaceformat(ergpace2) + data['fnowindpace'] = nicepaceformat(nowindpace2) + + data = data.replace([-np.inf, np.inf], np.nan) + data = data.fillna(method='ffill') + + # write data if id given + if id != 0: + data['workoutid'] = id + data.fillna(0, inplace=True) + for k, v in dtypes.items(): + try: + data[k] = data[k].astype(v) + except KeyError: # pragma: no cover + pass + + filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = dd.from_pandas(data, npartitions=1) + df.to_parquet(filename, engine='fastparquet', compression='GZIP') + + return data + + + +def delete_agegroup_db(age, sex, weightcategory, debug=False): + if debug: # pragma: no cover + engine = create_engine(database_url_debug, echo=False) + else: # pragma: no cover + engine = create_engine(database_url, echo=False) + + query = sa.text("DELETE from {table} WHERE age='{age}' and weightcategory='{weightcategory}' and sex='{sex}';".format( + sex=sex, + age=age, + weightcategory=weightcategory, + table='calcagegrouprecords' + )) + with engine.connect() as conn, conn.begin(): + _ = conn.execute(query) + conn.close() + engine.dispose() + + + + + +def update_agegroup_db(age, sex, weightcategory, wcdurations, wcpower, + debug=False): + + delete_agegroup_db(age, sex, weightcategory, debug=debug) + + wcdurations = [None if type(y) is float and np.isnan( + y) else y for y in wcdurations] + wcpower = [None if type(y) is float and np.isnan(y) + else y for y in wcpower] + + df = pd.DataFrame( + { + 'duration': wcdurations, + 'power': wcpower, + } + ) + + df['sex'] = sex + df['age'] = age + df['weightcategory'] = weightcategory + df.replace([np.inf, -np.inf], np.nan, inplace=True) + df.dropna(axis=0, inplace=True) + + if debug: # pragma: no cover # pragma: no cover + engine = create_engine(database_url_debug, echo=False) + else: + engine = create_engine(database_url, echo=False) + + table = 'calcagegrouprecords' + with engine.connect() as conn, conn.begin(): + df.to_sql(table, engine, if_exists='append', index=False) + conn.close() + engine.dispose() + + + +def add_c2_stroke_data_db(strokedata, workoutid, starttimeunix, csvfilename, + debug=False, workouttype='rower'): + + res = make_cumvalues(0.1*strokedata['t']) + cum_time = res[0] + lapidx = res[1] + + unixtime = cum_time+starttimeunix + # unixtime[0] = starttimeunix + seconds = 0.1*strokedata.loc[:, 't'] + + nr_rows = len(unixtime) + + try: # pragma: no cover + latcoord = strokedata.loc[:, 'lat'] + loncoord = strokedata.loc[:, 'lon'] + except: + latcoord = np.zeros(nr_rows) + loncoord = np.zeros(nr_rows) + + try: + strokelength = strokedata.loc[:, 'strokelength'] + except: + strokelength = np.zeros(nr_rows) + + dist2 = 0.1*strokedata.loc[:, 'd'] + + try: + spm = strokedata.loc[:, 'spm'] + except KeyError: # pragma: no cover + spm = 0*dist2 + + try: + hr = strokedata.loc[:, 'hr'] + except KeyError: # pragma: no cover + hr = 0*spm + + pace = strokedata.loc[:, 'p']/10. + pace = np.clip(pace, 0, 1e4) + pace = pace.replace(0, 300) + + velo = 500./pace + power = 2.8*velo**3 + if workouttype == 'bike': # pragma: no cover + velo = 1000./pace + + # save csv + # Create data frame with all necessary data to write to csv + df = pd.DataFrame({'TimeStamp (sec)': unixtime, + ' Horizontal (meters)': dist2, + ' Cadence (stokes/min)': spm, + ' HRCur (bpm)': hr, + ' longitude': loncoord, + ' latitude': latcoord, + ' Stroke500mPace (sec/500m)': pace, + ' Power (watts)': power, + ' DragFactor': np.zeros(nr_rows), + ' DriveLength (meters)': np.zeros(nr_rows), + ' StrokeDistance (meters)': strokelength, + ' DriveTime (ms)': np.zeros(nr_rows), + ' StrokeRecoveryTime (ms)': np.zeros(nr_rows), + ' AverageDriveForce (lbs)': np.zeros(nr_rows), + ' PeakDriveForce (lbs)': np.zeros(nr_rows), + ' lapIdx': lapidx, + ' WorkoutState': 4, + ' ElapsedTime (sec)': seconds, + 'cum_dist': dist2 + }) + + df.sort_values(by='TimeStamp (sec)', ascending=True) + + # Create CSV file name and save data to CSV file + + res = df.to_csv(csvfilename, index_label='index', + compression='gzip') + + + data = dataprep(df, id=workoutid, bands=False, debug=debug) + + return data + +# Creates C2 stroke data +def create_c2_stroke_data_db( + distance, duration, workouttype, + workoutid, starttimeunix, csvfilename, debug=False): # pragma: no cover + + nr_strokes = int(distance/10.) + + totalseconds = duration.hour*3600. + totalseconds += duration.minute*60. + totalseconds += duration.second + totalseconds += duration.microsecond/1.e6 + + try: + spm = 60.*nr_strokes/totalseconds + except ZeroDivisionError: + spm = 20*np.zeros(nr_strokes) + + try: + _ = totalseconds/float(nr_strokes) + except ZeroDivisionError: + return 0 + + elapsed = np.arange(nr_strokes)*totalseconds/(float(nr_strokes-1)) + + d = np.arange(nr_strokes)*distance/(float(nr_strokes-1)) + + unixtime = starttimeunix + elapsed + + pace = 500.*totalseconds/distance + + if workouttype in ['rower', 'slides', 'dynamic']: + try: + velo = distance/totalseconds + except ZeroDivisionError: + velo = 0 + power = 2.8*velo**3 + else: + power = 0 + + df = pd.DataFrame({ + 'TimeStamp (sec)': unixtime, + ' Horizontal (meters)': d, + ' Cadence (stokes/min)': spm, + ' Stroke500mPace (sec/500m)': pace, + ' ElapsedTime (sec)': elapsed, + ' Power (watts)': power, + ' HRCur (bpm)': np.zeros(nr_strokes), + ' longitude': np.zeros(nr_strokes), + ' latitude': np.zeros(nr_strokes), + ' DragFactor': np.zeros(nr_strokes), + ' DriveLength (meters)': np.zeros(nr_strokes), + ' StrokeDistance (meters)': np.zeros(nr_strokes), + ' DriveTime (ms)': np.zeros(nr_strokes), + ' StrokeRecoveryTime (ms)': np.zeros(nr_strokes), + ' AverageDriveForce (lbs)': np.zeros(nr_strokes), + ' PeakDriveForce (lbs)': np.zeros(nr_strokes), + ' lapIdx': np.zeros(nr_strokes), + 'cum_dist': d + }) + + df[' ElapsedTime (sec)'] = df['TimeStamp (sec)'] + + _ = df.to_csv(csvfilename, index_label='index', compression='gzip') + + data = dataprep(df, id=workoutid, bands=False, debug=debug) + + return data + + +def update_empower(id, inboard, oarlength, boattype, df, f1, debug=False): # pragma: no cover + + corr_factor = 1.0 + if 'x' in boattype: + # sweep + a = 0.06 + b = 0.275 + else: + # scull + a = 0.15 + b = 0.275 + + corr_factor = empower_bug_correction(oarlength, inboard, a, b) + + success = False + + try: + df['power empower old'] = df[' Power (watts)'] + df[' Power (watts)'] = df[' Power (watts)'] * corr_factor + df['driveenergy empower old'] = df['driveenergy'] + df['driveenergy'] = df['driveenergy'] * corr_factor + success = True + except KeyError: + pass + + if success: + delete_strokedata(id, debug=debug) + if debug: # pragma: no cover + print("updated ", id) + print("correction ", corr_factor) + else: + if debug: # pragma: no cover + print("not updated ", id) + + _ = dataprep(df, id=id, bands=True, barchart=True, otwpower=True, debug=debug) + + row = rrdata(df=df) + row.write_csv(f1, gzip=True) + + return success + from rowers.tasks import ( handle_sendemail_unrecognized, handle_setcp, diff --git a/rowers/dataroutines.py b/rowers/dataroutines.py deleted file mode 100644 index ead26e09..00000000 --- a/rowers/dataroutines.py +++ /dev/null @@ -1,2080 +0,0 @@ -from rowers.metrics import axes, calc_trimp, rowingmetrics, dtypes, metricsgroups -from rowers.utils import lbstoN, wavg, dologging -from rowers.mytypes import otwtypes, otetypes, rowtypes -import glob -import rowingdata.tcxtools as tcxtools -from rowers.utils import totaltime_sec_to_string -from rowers.datautils import p0 -from scipy import optimize -from rowers.utils import calculate_age -import datetime -from scipy.signal import savgol_filter -from rowers.opaque import encoder -from rowers.database import * -from rowers import mytypes -from rowsandall_app.settings import SITE_URL -import django_rq -from timezonefinder import TimezoneFinder -import rowers.datautils as datautils -import rowers.utils as utils -import sys -import sqlalchemy as sa -from sqlalchemy import create_engine -from django.conf import settings -import math -from fitparse.base import FitHeaderError -from fitparse import FitFile -import itertools -import numpy as np -import pandas as pd -from zipfile import BadZipFile -import zipfile -import os -from rowers.models import strokedatafields - -from rowingdata import ( - KinoMapParser, - ExcelTemplate, - TCXParser, - MysteryParser, - RowProParser, - RitmoTimeParser, - QuiskeParser, - RowPerfectParser, - CoxMateParser, - BoatCoachParser, - BoatCoachOTWParser, - BoatCoachAdvancedParser, - painsledDesktopParser, - speedcoachParser, - SpeedCoach2Parser, - ErgStickParser, - FITParser, - ErgDataParser, - HumonParser, - ETHParser, - NKLiNKLogbookParser, - HeroParser, - SmartRowParser,) - -from rowingdata import make_cumvalues - -# All the data preparation, data cleaning and data mangling should -# be defined here -from rowers.models import ( - Workout, Team, CalcAgePerformance, C2WorldClassAgePerformance, - User -) - -import pytz -import collections -import pendulum -from rowingdata import rowingdata as rrdata - -from rowingdata import rower as rrower - -import yaml -import shutil -from shutil import copyfile - -from rowingdata import ( - get_file_type, get_empower_rigging, get_empower_firmware -) - - -from pandas import DataFrame, Series -import dask.dataframe as dd -from dask.delayed import delayed -import pyarrow.parquet as pq -import pyarrow as pa - -from pyarrow.lib import ArrowInvalid - -from django.utils import timezone -from django.utils.timezone import get_current_timezone -from django.urls import reverse -import requests - -from django.core.exceptions import ValidationError - -from time import strftime -import arrow - -thetimezone = get_current_timezone() - -allowedcolumns = [key for key, value in strokedatafields.items()] - -from rowsandall_app.settings_dev import use_sqlite -from rowsandall_app.settings_dev import DATABASES as DEV_DATABASES - -try: - user = settings.DATABASES['default']['USER'] -except KeyError: # pragma: no cover - user = '' -try: - password = settings.DATABASES['default']['PASSWORD'] -except KeyError: # pragma: no cover - password = '' - -try: - database_name = settings.DATABASES['default']['NAME'] -except KeyError: # pragma: no cover - database_name = '' -try: - host = settings.DATABASES['default']['HOST'] -except KeyError: # pragma: no cover - host = '' -try: - port = settings.DATABASES['default']['PORT'] -except KeyError: # pragma: no cover - port = '' - -database_url = 'mysql://{user}:{password}@{host}:{port}/{database_name}'.format( - user=user, - password=password, - database_name=database_name, - host=host, - port=port, -) - -database_name_dev = DEV_DATABASES['default']['NAME'] - - - -if use_sqlite: - database_url_debug = 'sqlite:///'+database_name_dev - database_url = database_url_debug - -database_url_debug = database_url - - -# mapping the DB column names to the CSV file column names -columndict = { - 'time': 'TimeStamp (sec)', - 'hr': ' HRCur (bpm)', - 'velo': ' AverageBoatSpeed (m/s)', - 'pace': ' Stroke500mPace (sec/500m)', - 'spm': ' Cadence (stokes/min)', - 'power': ' Power (watts)', - 'averageforce': ' AverageDriveForce (lbs)', - 'drivelength': ' DriveLength (meters)', - 'peakforce': ' PeakDriveForce (lbs)', - 'distance': ' Horizontal (meters)', - 'catch': 'catch', - 'finish': 'finish', - 'peakforceangle': 'peakforceangle', - 'wash': 'wash', - 'slip': 'slip', - 'workoutstate': ' WorkoutState', - 'cumdist': 'cum_dist', -} - - -def get_video_data(w, groups=['basic'], mode='water'): - modes = [mode, 'both', 'basic'] - columns = ['time', 'velo', 'spm'] - columns += [name for name, d in rowingmetrics if d['group'] - in groups and d['mode'] in modes] - columns = list(set(columns)) - df = getsmallrowdata_db(columns, ids=[w.id], - workstrokesonly=False, doclean=False, compute=False) - - df['time'] = (df['time']-df['time'].min())/1000. - - df.sort_values(by='time', inplace=True) - - df.set_index(pd.to_timedelta(df['time'], unit='s'), inplace=True) - df2 = df.resample('1s').first().fillna(method='ffill') - df2['time'] = df2.index.total_seconds() - - if 'pace' in columns: - df2['pace'] = df2['pace']/1000. - p = df2['pace'] - p = p.apply(lambda x: timedeltaconv(x)) - p = nicepaceformat(p) - df2['pace'] = p - - df2['time'] = (df2['time']-df2['time'].min()) - - df2 = df2.round(decimals=2) - - boatspeed = (100*df2['velo']).astype(int)/100. - - try: - coordinates = get_latlon_time(w.id) - except KeyError: # pragma: no cover - nulseries = df['time']*0 - coordinates = pd.DataFrame({ - 'time': df['time'], - 'latitude': nulseries, - 'longitude': nulseries, - }) - - coordinates.set_index(pd.to_timedelta( - coordinates['time'], unit='s'), inplace=True) - coordinates = coordinates.resample('1s').mean().interpolate() - coordinates['time'] = coordinates['time']-coordinates['time'].min() - latitude = coordinates['latitude'] - longitude = coordinates['longitude'] - - # bundle data - data = { - 'boatspeed': boatspeed.values.tolist(), - 'latitude': latitude.values.tolist(), - 'longitude': longitude.values.tolist(), - } - - metrics = {} - - for c in columns: - if c != 'time': - try: - if dict(rowingmetrics)[c]['numtype'] == 'integer': # pragma: no cover - data[c] = df2[c].astype(int).tolist() - else: - sigfigs = dict(rowingmetrics)[c]['sigfigs'] - if (c != 'pace'): - da = ((10**sigfigs)*df2[c]).astype(int)/(10**sigfigs) - else: - da = df2[c] - data[c] = da.values.tolist() - metrics[c] = { - 'name': dict(rowingmetrics)[c]['verbose_name'], - 'metric': c, - 'unit': '' - } - except KeyError: # pragma: no cover - pass - - metrics['boatspeed'] = metrics.pop('velo') - # metrics['workperstroke'] = metrics.pop('driveenergy') - metrics = collections.OrderedDict(sorted(metrics.items())) - - maxtime = coordinates['time'].max() - - return data, metrics, maxtime - - -def polarization_index(df, rower): - df['dt'] = df['time'].diff()/6.e4 - # remove rest (spm<15) - df.dropna(axis=0, inplace=True) - df['dt'] = df['dt'].clip(upper=4, lower=0) - - masklow = (df['power'] > 0) & (df['power'] < int(rower.pw_at)) - maskmid = (df['power'] >= rower.pw_at) & (df['power'] < int(rower.pw_an)) - maskhigh = (df['power'] > rower.pw_an) - - time_low_pw = df.loc[masklow, 'dt'].sum() - time_mid_pw = df.loc[maskmid, 'dt'].sum() - time_high_pw = df.loc[maskhigh, 'dt'].sum() - - frac_low = time_low_pw/(time_low_pw+time_mid_pw+time_high_pw) - frac_mid = time_mid_pw/(time_low_pw+time_mid_pw+time_high_pw) - frac_high = time_high_pw/(time_low_pw+time_mid_pw+time_high_pw) - - index = math.log10(frac_high*100.*frac_low/frac_mid) - - return index - - -def get_latlon(id): - try: - w = Workout.objects.get(id=id) - except Workout.DoesNotExist: # pragma: no cover - return False - - rowdata = rdata(w.csvfilename) - - if rowdata.df.empty: # pragma: no cover - return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] - - try: - try: - latitude = rowdata.df.loc[:, ' latitude'] - longitude = rowdata.df.loc[:, ' longitude'] - except KeyError: - latitude = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] - longitude = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] - return [latitude, longitude] - except AttributeError: # pragma: no cover - return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] - - return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] # pragma: no cover - - -def get_latlon_time(id): - try: - w = Workout.objects.get(id=id) - except Workout.DoesNotExist: # pragma: no cover - return False - - rowdata = rdata(w.csvfilename) - - if rowdata.df.empty: # pragma: no cover - return [pd.Series([], dtype='float'), pd.Series([], dtype='float')] - - try: - try: - _ = rowdata.df.loc[:, ' latitude'] - _ = rowdata.df.loc[:, ' longitude'] - except KeyError: # pragma: no cover - rowdata.df['latitude'] = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] - rowdata.df['longitude'] = 0 * rowdata.df.loc[:, 'TimeStamp (sec)'] - except AttributeError: # pragma: no cover - return pd.DataFrame() - - df = pd.DataFrame({ - 'time': rowdata.df['TimeStamp (sec)']-rowdata.df['TimeStamp (sec)'].min(), - 'latitude': rowdata.df[' latitude'], - 'longitude': rowdata.df[' longitude'] - }) - - return df - - -def workout_has_latlon(id): - latitude, longitude = get_latlon(id) - latmean = latitude.mean() - lonmean = longitude.mean() - - if latmean == 0 and lonmean == 0: - return False, latmean, lonmean - - if latitude.std() > 0 and longitude.std() > 0: - return True, latmean, lonmean - - return False, latmean, lonmean - - - -def get_workouts(ids, userid): # pragma: no cover - goodids = [] - for id in ids: - w = Workout.objects.get(id=id) - if int(w.user.user.id) == int(userid): - goodids.append(id) - - return [Workout.objects.get(id=id) for id in goodids] - - -def filter_df(datadf, fieldname, value, largerthan=True): - - try: - _ = datadf[fieldname] - except KeyError: - return datadf - - try: - if largerthan: - mask = datadf[fieldname] < value - else: - mask = datadf[fieldname] >= value - - datadf.loc[mask, fieldname] = np.nan - except TypeError: - pass - - return datadf - -# joins workouts - - - -def df_resample(datadf): - # time stamps must be in seconds - timestamps = datadf['TimeStamp (sec)'].astype('int') - datadf['timestamps'] = timestamps - newdf = datadf.groupby(['timestamps']).mean() - return newdf - - - -def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, - ignoreadvanced=False): - # clean data remove zeros and negative values - - try: - _ = datadf['workoutid'].unique() - except KeyError: - datadf['workoutid'] = 0 - - before = {} - for workoutid in datadf['workoutid'].unique(): - before[workoutid] = len(datadf[datadf['workoutid'] == workoutid]) - - data_orig = datadf.copy() - - # bring metrics which have negative values to positive domain - if len(datadf) == 0: - return datadf - try: - datadf['catch'] = -datadf['catch'] - except (KeyError, TypeError): - pass - - try: - datadf['peakforceangle'] = datadf['peakforceangle'] + 1000 - except (KeyError, TypeError): - pass - - try: - datadf['hr'] = datadf['hr'] + 10 - except (KeyError, TypeError): - pass - - # protect 0 spm values from being nulled - try: - datadf['spm'] = datadf['spm'] + 1.0 - except (KeyError, TypeError): - pass - - # protect 0 workoutstate values from being nulled - try: - datadf['workoutstate'] = datadf['workoutstate'] + 1 - except (KeyError, TypeError): - pass - - try: - datadf = datadf.clip(lower=0) - except TypeError: - pass - - # protect advanced metrics columns - advancedcols = [ - 'rhythm', - 'power', - 'drivelength', - 'forceratio', - 'drivespeed', - 'driveenergy', - 'catch', - 'finish', - 'averageforce', - 'peakforce', - 'slip', - 'wash', - 'peakforceangle', - 'effectiveangle', - ] - - datadf.replace(to_replace=0, value=np.nan, inplace=True) - # datadf = datadf.map_partitions(lambda df:df.replace(to_replace=0,value=np.nan)) - - # bring spm back to real values - try: - datadf['spm'] = datadf['spm'] - 1 - except (TypeError, KeyError): - pass - - # bring workoutstate back to real values - try: - datadf['workoutstate'] = datadf['workoutstate'] - 1 - except (TypeError, KeyError): - pass - - # return from positive domain to negative - try: - datadf['catch'] = -datadf['catch'] - except (KeyError, TypeError): - pass - - try: - datadf['peakforceangle'] = datadf['peakforceangle'] - 1000 - except (KeyError, TypeError): - pass - - try: - datadf['hr'] = datadf['hr'] - 10 - except (KeyError, TypeError): - pass - - # clean data for useful ranges per column - if not ignorehr: - try: - mask = datadf['hr'] < 30 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): # pragma: no cover - pass - - try: - mask = datadf['spm'] < 0 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['efficiency'] > 200. - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['spm'] < 10 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['pace'] / 1000. > 300. - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['efficiency'] < 0. - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['pace'] / 1000. < 60. - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['power'] > 5000 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['spm'] > 120 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['wash'] < 1 - datadf.loc[mask, 'wash'] = np.nan - except (KeyError, TypeError): - pass - - # try to guess ignoreadvanced - if not ignoreadvanced: - for metric in advancedcols: - try: - sum = datadf[metric].std() - if sum == 0 or np.isnan(sum): - ignoreadvanced = True - except KeyError: - pass - - if not ignoreadvanced: - try: - mask = datadf['rhythm'] < 0 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['rhythm'] > 70 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['power'] < 20 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['drivelength'] < 0.5 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['forceratio'] < 0.2 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['forceratio'] > 1.0 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['drivespeed'] < 0.5 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['drivespeed'] > 4 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['driveenergy'] > 2000 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['driveenergy'] < 100 - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - try: - mask = datadf['catch'] > -30. - datadf.mask(mask, inplace=True) - except (KeyError, TypeError): - pass - - # workoutstateswork = [1, 4, 5, 8, 9, 6, 7] - workoutstatesrest = [3] - # workoutstatetransition = [0, 2, 10, 11, 12, 13] - - if workstrokesonly == 'True' or workstrokesonly is True: - try: - datadf = datadf[~datadf['workoutstate'].isin(workoutstatesrest)] - except: - pass - - after = {} - for workoutid in data_orig['workoutid'].unique(): - after[workoutid] = len( - datadf[datadf['workoutid'] == workoutid].dropna()) - ratio = float(after[workoutid])/float(before[workoutid]) - if ratio < 0.01 or after[workoutid] < 2: - return data_orig - - return datadf - - -def getpartofday(row, r): - workoutstartdatetime = row.rowdatetime - try: # pragma: no cover - latavg = row.df[' latitude'].mean() - lonavg = row.df[' longitude'].mean() - - tf = TimezoneFinder() - try: - timezone_str = tf.timezone_at(lng=lonavg, lat=latavg) - except (ValueError, OverflowError): # pragma: no cover - timezone_str = 'UTC' - if timezone_str is None: # pragma: no cover - timezone_str = tf.closest_timezone_at(lng=lonavg, - lat=latavg) - if timezone_str is None: - timezone_str = r.defaulttimezone - try: - workoutstartdatetime = pytz.timezone(timezone_str).localize( - row.rowdatetime - ) - except ValueError: - workoutstartdatetime = row.rowdatetime - except KeyError: - timezone_str = r.defaulttimezone - workoutstartdatetime = row.rowdatetime - - h = workoutstartdatetime.astimezone(pytz.timezone(timezone_str)).hour - - if h < 12: # pragma: no cover - return "Morning" - elif h < 18: # pragma: no cover - return "Afternoon" - elif h < 22: # pragma: no cover - return "Evening" - else: # pragma: no cover - return "Night" - - return None # pragma: no cover - - -def getstatsfields(): - fielddict = {name: d['verbose_name'] for name, d in rowingmetrics} - -# fielddict.pop('ergpace') -# fielddict.pop('hr_an') -# fielddict.pop('hr_tr') -# fielddict.pop('hr_at') -# fielddict.pop('hr_ut2') -# fielddict.pop('hr_ut1') - fielddict.pop('time') - fielddict.pop('distance') -# fielddict.pop('nowindpace') -# fielddict.pop('fnowindpace') -# fielddict.pop('fergpace') -# fielddict.pop('equivergpower') -# fielddict.pop('workoutstate') -# fielddict.pop('fpace') -# fielddict.pop('pace') -# fielddict.pop('id') -# fielddict.pop('ftime') -# fielddict.pop('x_right') -# fielddict.pop('hr_max') -# fielddict.pop('hr_bottom') - fielddict.pop('cumdist') - - try: - fieldlist = [field for field, value in fielddict.iteritems()] - except AttributeError: - fieldlist = [field for field, value in fielddict.items()] - - return fieldlist, fielddict - - -# A string representation for time deltas -def niceformat(values): - out = [] - for v in values: - formattedv = strfdelta(v) - out.append(formattedv) - - return out - -# A nice printable format for time delta values - - -def strfdelta(tdelta): - try: - minutes, seconds = divmod(tdelta.seconds, 60) - tenths = int(tdelta.microseconds / 1e5) - except AttributeError: # pragma: no cover - minutes, seconds = divmod(tdelta.view(np.int64), 60e9) - seconds, rest = divmod(seconds, 1e9) - tenths = int(rest / 1e8) - res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format( - minutes=minutes, - seconds=seconds, - tenths=tenths, - ) - - return res - - -def timedelta_to_seconds(tdelta): # pragma: no cover - return 60.*tdelta.minute+tdelta.second - - -# A nice printable format for pace values - - -def nicepaceformat(values): - out = [] - for v in values: - formattedv = strfdelta(v) - out.append(formattedv) - - return out - -# Convert seconds to a Time Delta value, replacing NaN with a 5:50 pace - - -def timedeltaconv(x): - if np.isfinite(x) and x != 0 and x > 0 and x < 175000: - dt = datetime.timedelta(seconds=x) - else: - dt = datetime.timedelta(seconds=350.) - - return dt - - -def paceformatsecs(values): - out = [] - for v in values: - td = timedeltaconv(v) - formattedv = strfdelta(td) - out.append(formattedv) - - return out - - -def update_c2id_sql(id, c2id): - workout = Workout.objects.get(id=id) - workout.uploadedtoc2 = c2id - workout.save() - - return 1 - - -def getcpdata_sql(rower_id, table='cpdata'): - engine = create_engine(database_url, echo=False) - query = sa.text('SELECT * from {table} WHERE user={rower_id};'.format( - rower_id=rower_id, - table=table, - )) - - _ = engine.raw_connection() - df = pd.read_sql_query(query, engine) - - return df - - -def deletecpdata_sql(rower_id, table='cpdata'): # pragma: no cover - engine = create_engine(database_url, echo=False) - query = sa.text('DELETE from {table} WHERE user={rower_id};'.format( - rower_id=rower_id, - table=table, - )) - with engine.connect() as conn, conn.begin(): - try: - _ = conn.execute(query) - except Exception as e: - print(Exception, e) - print("Database locked") - conn.close() - engine.dispose() - - -def updatecpdata_sql(rower_id, delta, cp, table='cpdata', distance=pd.Series([], dtype='float'), - debug=False): # pragma: no cover - deletecpdata_sql(rower_id) - df = pd.DataFrame( - { - 'delta': delta, - 'cp': cp, - 'user': rower_id - } - ) - - if not distance.empty: - df['distance'] = distance - - engine = create_engine(database_url, echo=False) - with engine.connect() as conn, conn.begin(): - df.to_sql(table, engine, if_exists='append', index=False) - conn.close() - engine.dispose() - - - -def get_workoutsummaries(userid, startdate): # pragma: no cover - u = User.objects.get(id=userid) - r = u.rower - df = workout_summary_to_df(r, startdate=startdate) - df.drop(['Stroke Data TCX', 'Stroke Data CSV'], axis=1, inplace=True) - df = df.sort_values('date', ascending=False) - - return df - - - - - - - - -def checkduplicates(r, workoutdate, workoutstartdatetime, workoutenddatetime): - duplicate = False - ws = Workout.objects.filter(user=r, date=workoutdate, duplicate=False).exclude( - startdatetime__gt=workoutenddatetime - ) - - ws2 = [] - - for ww in ws: - t = ww.duration - delta = datetime.timedelta( - hours=t.hour, minutes=t.minute, seconds=t.second) - enddatetime = ww.startdatetime+delta - if enddatetime > workoutstartdatetime: - ws2.append(ww) - - if (len(ws2) != 0): - duplicate = True - return duplicate - - return duplicate - - - -parsers = { - 'kinomap': KinoMapParser, - 'xls': ExcelTemplate, - 'rp': RowProParser, - 'tcx': TCXParser, - 'mystery': MysteryParser, - 'ritmotime': RitmoTimeParser, - 'quiske': QuiskeParser, - 'rowperfect3': RowPerfectParser, - 'coxmate': CoxMateParser, - 'bcmike': BoatCoachAdvancedParser, - 'boatcoach': BoatCoachParser, - 'boatcoachotw': BoatCoachOTWParser, - 'painsleddesktop': painsledDesktopParser, - 'speedcoach': speedcoachParser, - 'speedcoach2': SpeedCoach2Parser, - 'ergstick': ErgStickParser, - 'fit': FITParser, - 'ergdata': ErgDataParser, - 'humon': HumonParser, - 'eth': ETHParser, - 'nklinklogbook': NKLiNKLogbookParser, - 'hero': HeroParser, - 'smartrow': SmartRowParser, -} - - -def get_startdate_time_zone(r, row, startdatetime=None): - if startdatetime is not None and startdatetime != '': - try: - timezone_str = pendulum.instance(startdatetime).timezone.name - except ValueError: # pragma: no cover - timezone_str = 'Ect/GMT' - elif startdatetime == '': - startdatetime = row.rowdatetime - else: - startdatetime = row.rowdatetime - - try: - _ = startdatetime.tzinfo - except AttributeError: # pragma: no cover - startdatetime = row.rowdatetime - - partofday = getpartofday(row, r) - - if startdatetime.tzinfo is None or str(startdatetime.tzinfo) in ['tzutc()', 'Ect/GMT']: - timezone_str = 'UTC' - try: - startdatetime = timezone.make_aware(startdatetime) - except ValueError: # pragma: no cover - pass - - try: - latavg = row.df[' latitude'].mean() - lonavg = row.df[' longitude'].mean() - - tf = TimezoneFinder() - if row.df[' latitude'].std() != 0: - try: - timezone_str = tf.timezone_at(lng=lonavg, lat=latavg) - except (ValueError, OverflowError): # pragma: no cover - timezone_str = 'UTC' - if timezone_str is None: # pragma: no cover - timezone_str = tf.closest_timezone_at(lng=lonavg, - lat=latavg) - if timezone_str is None: # pragma: no cover - timezone_str = r.defaulttimezone - else: - timezone_str = r.defaulttimezone - try: - startdatetime = pytz.timezone(timezone_str).localize( - row.rowdatetime - ) - except ValueError: # pragma: no cover - startdatetime = startdatetime.astimezone( - pytz.timezone(timezone_str) - ) - except KeyError: # pragma: no cover - timezone_str = r.defaulttimezone - else: - timezone_str = str(startdatetime.tzinfo) - - startdatetime = startdatetime.astimezone(pytz.timezone(timezone_str)) - - startdate = startdatetime.strftime('%Y-%m-%d') - starttime = startdatetime.strftime('%H:%M:%S') - - if timezone_str == 'tzutc()': - timezone_str = 'UTC' # pragma: no cover - - return startdatetime, startdate, starttime, timezone_str, partofday - - -def parsenonpainsled(fileformat, f2, summary, startdatetime='', empowerfirmware=None, inboard=None, oarlength=None): - try: - if fileformat == 'nklinklogbook' and empowerfirmware is not None: # pragma: no cover - if inboard is not None and oarlength is not None: - row = NKLiNKLogbookParser( - f2, firmware=empowerfirmware, inboard=inboard, oarlength=oarlength) - else: - row = NKLiNKLogbookParser(f2) - else: - row = parsers[fileformat](f2) - if startdatetime != '': # pragma: no cover - row.rowdatetime = arrow.get(startdatetime).datetime - hasrecognized = True - except (KeyError, IndexError, ValueError): # pragma: no cover - hasrecognized = False - return None, hasrecognized, '', 'unknown' - - s = 'Parsenonpainsled, start date time = {startdatetime}'.format( - startdatetime=startdatetime, - ) - dologging('debuglog.log', s) - - # handle speed coach GPS 2 - if (fileformat == 'speedcoach2'): - oarlength, inboard = get_empower_rigging(f2) - empowerfirmware = get_empower_firmware(f2) - if empowerfirmware != '': - fileformat = fileformat+'v'+str(empowerfirmware) - else: # pragma: no cover - fileformat = 'speedcoach2v0' - try: - summary = row.allstats() - except ZeroDivisionError: # pragma: no cover - summary = '' - else: - fileformat = fileformat+'v'+str(empowerfirmware) - - # handle FIT - if (fileformat == 'fit'): # pragma: no cover - try: - s = fitsummarydata(f2) - s.setsummary() - summary = s.summarytext - except: - pass - hasrecognized = True - - return row, hasrecognized, summary, fileformat - - -def handle_nonpainsled(f2, fileformat, summary='', startdatetime='', empowerfirmware=None, impeller=False): - oarlength = 2.89 - inboard = 0.88 - hasrecognized = False - - row, hasrecognized, summary, fileformat = parsenonpainsled(fileformat, f2, summary, startdatetime=startdatetime, - empowerfirmware=empowerfirmware) - - # Handle c2log - if (fileformat == 'c2log' or fileformat == 'rowprolog'): # pragma: no cover - return (0, '', 0, 0, '', impeller) - - if not hasrecognized: # pragma: no cover - return (0, '', 0, 0, '', impeller) - - f_to_be_deleted = f2 - # should delete file - f2 = f2[:-4] + 'o.csv' - - row2 = rrdata(df=row.df) - - if 'speedcoach2' in fileformat or 'nklinklogbook' in fileformat: - # impeller consistency - impellerdata, consistent, ratio = row.impellerconsistent(threshold=0.3) - - if impellerdata and consistent: - impeller = True - if impellerdata and not consistent: - row2.use_gpsdata() - if impeller: - row2.use_impellerdata() - - row2.write_csv(f2, gzip=True) - - # os.remove(f2) - try: - os.remove(f_to_be_deleted) - except: # pragma: no cover - try: - os.remove(f_to_be_deleted + '.gz') - except: - pass - - return (f2, summary, oarlength, inboard, fileformat, impeller) - -# Create new workout from file and store it in the database -# This routine should be used everywhere in views.py - - -def get_workouttype_from_fit(filename, workouttype='water'): - try: - fitfile = FitFile(filename, check_crc=False) - except FitHeaderError: # pragma: no cover - return workouttype - - records = fitfile.messages - fittype = 'rowing' - for record in records: - if record.name in ['sport', 'lap']: - try: - fittype = record.get_values()['sport'].lower() - except (KeyError, AttributeError): # pragma: no cover - return 'water' - try: - workouttype = mytypes.fitmappinginv[fittype] - except KeyError: # pragma: no cover - return workouttype - - return workouttype - - -def get_workouttype_from_tcx(filename, workouttype='water'): - tcxtype = 'rowing' - if workouttype in mytypes.otwtypes: - return workouttype - try: # pragma: no cover - d = tcxtools.tcx_getdict(filename) - try: - tcxtype = d['Activities']['Activity']['@Sport'].lower() - if tcxtype == 'other': - tcxtype = 'rowing' - except KeyError: - return workouttype - - except TypeError: # pragma: no cover - pass - - try: # pragma: no cover - workouttype = mytypes.garminmappinginv[tcxtype.upper()] - except KeyError: # pragma: no cover - return workouttype - - return workouttype # pragma: no cover - - - - -# Create new workout from data frame and store it in the database -# This routine should be used everywhere in views.py and mailprocessing.py -# Currently there is code duplication - - -# A wrapper around the rowingdata class, with some error catching - - -def rdata(file, rower=rrower()): - try: - res = rrdata(csvfile=file, rower=rower) - except (IOError, IndexError): # pragma: no cover - try: - res = rrdata(csvfile=file + '.gz', rower=rower) - except (IOError, IndexError): - res = rrdata() - except: - res = rrdata() - except EOFError: # pragma: no cover - res = rrdata() - except: # pragma: no cover - res = rrdata() - - return res - -# Remove all stroke data for workout ID from database - - -def delete_strokedata(id, debug=False): - dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id) - try: - shutil.rmtree(dirname) - except OSError: - try: - os.remove(dirname) - except FileNotFoundError: - pass - except FileNotFoundError: # pragma: no cover - pass - -# Replace stroke data in DB with data from CSV file - - -def update_strokedata(id, df, debug=False): - delete_strokedata(id, debug=debug) - _ = dataprep(df, id=id, bands=True, barchart=True, otwpower=True) - -# Test that all data are of a numerical time - - -def testdata(time, distance, pace, spm): # pragma: no cover - t1 = np.issubdtype(time, np.number) - t2 = np.issubdtype(distance, np.number) - t3 = np.issubdtype(pace, np.number) - t4 = np.issubdtype(spm, np.number) - - return t1 and t2 and t3 and t4 - -# Get data from DB for one workout (fetches all data). If data -# is not in DB, read from CSV file (and create DB entry) - - -def getrowdata_db(id=0, doclean=False, convertnewtons=True, - checkefficiency=True): - data = read_df_sql(id) - try: - data['deltat'] = data['time'].diff() - except KeyError: # pragma: no cover - data = pd.DataFrame() - - if data.empty: - rowdata, row = getrowdata(id=id) - if not rowdata.empty: # pragma: no cover - data = dataprep(rowdata.df, id=id, bands=True, - barchart=True, otwpower=True) - else: - data = pd.DataFrame() # returning empty dataframe - else: - row = Workout.objects.get(id=id) - - if checkefficiency is True and not data.empty: - try: - if data['efficiency'].mean() == 0 and data['power'].mean() != 0: # pragma: no cover - data = add_efficiency(id=id) - except KeyError: # pragma: no cover - data = add_efficiency(id=id) - - if doclean: # pragma: no cover - data = clean_df_stats(data, ignorehr=True) - - return data, row - -# Fetch a subset of the data from the DB - - -def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True, compute=True, - debug=False): - # prepmultipledata(ids) - - if ids: - csvfilenames = [ - 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] - else: - return pd.DataFrame() - - data = [] - columns = [c for c in columns if c != 'None'] - columns = list(set(columns)) - - if len(ids) > 1: - for id, f in zip(ids, csvfilenames): - try: - df = pd.read_parquet(f, columns=columns) - data.append(df) - except (OSError, ArrowInvalid, IndexError): # pragma: no cover - rowdata, row = getrowdata(id=id) - if rowdata and len(rowdata.df): - _ = dataprep(rowdata.df, id=id, - bands=True, otwpower=True, barchart=True) - df = pd.read_parquet(f, columns=columns) - data.append(df) - - try: - df = pd.concat(data, axis=0) - except ValueError: # pragma: no cover - return pd.DataFrame() - # df = dd.concat(data,axis=0) - - else: - try: - df = pd.read_parquet(csvfilenames[0], columns=columns) - rowdata, row = getrowdata(id=ids[0]) - except (OSError, ArrowInvalid, IndexError): - rowdata, row = getrowdata(id=ids[0]) - if rowdata and len(rowdata.df): # pragma: no cover - data = dataprep( - rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True) - df = pd.read_parquet(csvfilenames[0], columns=columns) - # df = dd.read_parquet(csvfilenames[0], - # column=columns,engine='pyarrow', - # ) - - # df = df.loc[:,~df.columns.duplicated()] - else: - df = pd.DataFrame() - - if compute and len(df): - data = df.copy() - if doclean: - data = clean_df_stats(data, ignorehr=True, - workstrokesonly=workstrokesonly) - data.dropna(axis=1, how='all', inplace=True) - data.dropna(axis=0, how='any', inplace=True) - return data - - return df - -# Fetch both the workout and the workout stroke data (from CSV file) - - -def getrowdata(id=0): - - # check if valid ID exists (workout exists) - try: - row = Workout.objects.get(id=id) - except Workout.DoesNotExist: # pragma: no cover - return rrdata(), None - - f1 = row.csvfilename - - # get user - - r = row.user - - rr = rrower(hrmax=r.max, hrut2=r.ut2, - hrut1=r.ut1, hrat=r.at, - hrtr=r.tr, hran=r.an, ftp=r.ftp) - - rowdata = rdata(f1, rower=rr) - - return rowdata, row - -# Checks if all rows for a list of workout IDs have entries in the -# stroke_data table. If this is not the case, it creates the stroke -# data -# In theory, this should never yield any work, but it's a good -# safety net for programming errors elsewhere in the app -# Also used heavily when I moved from CSV file only to CSV+Stroke data - - -def prepmultipledata(ids, verbose=False): # pragma: no cover - filenames = glob.glob('media/*.parquet') - ids = [ - id for id in ids if 'media/strokedata_{id}.parquet.gz'.format(id=id) not in filenames] - - for id in ids: - rowdata, row = getrowdata(id=id) - if verbose: - print(id) - if rowdata and len(rowdata.df): - _ = dataprep(rowdata.df, id=id, bands=True, - barchart=True, otwpower=True) - return ids - -# Read a set of columns for a set of workout ids, returns data as a -# pandas dataframe - - -def read_cols_df_sql(ids, columns, convertnewtons=True): - # drop columns that are not in offical list - # axx = [ax[0] for ax in axes] - - extracols = [] - - columns = list(columns) + ['distance', 'spm', 'workoutid'] - columns = [x for x in columns if x != 'None'] - columns = list(set(columns)) - ids = [int(id) for id in ids] - - df = pd.DataFrame() - - if len(ids) == 0: # pragma: no cover - return pd.DataFrame(), extracols - elif len(ids) == 1: # pragma: no cover - try: - filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0]) - df = pd.read_parquet(filename, columns=columns) - except OSError: - rowdata, row = getrowdata(id=ids[0]) - if rowdata and len(rowdata.df): - _ = dataprep(rowdata.df, - id=ids[0], bands=True, otwpower=True, barchart=True) - df = pd.read_parquet(filename, columns=columns) - else: - data = [] - filenames = [ - 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] - for id, f in zip(ids, filenames): - try: - df = pd.read_parquet(f, columns=columns) - data.append(df) - except (OSError, IndexError, ArrowInvalid): - rowdata, row = getrowdata(id=id) - if rowdata and len(rowdata.df): # pragma: no cover - _ = dataprep(rowdata.df, id=id, - bands=True, otwpower=True, barchart=True) - df = pd.read_parquet(f, columns=columns) - data.append(df) - - try: - df = pd.concat(data, axis=0) - except ValueError: # pragma: no cover - return pd.DataFrame(), extracols - - df = df.fillna(value=0) - - if 'peakforce' in columns: - funits = ((w.id, w.forceunit) - for w in Workout.objects.filter(id__in=ids)) - for id, u in funits: - if u == 'lbs': - mask = df['workoutid'] == id - df.loc[mask, 'peakforce'] = df.loc[mask, 'peakforce'] * lbstoN - if 'averageforce' in columns: - funits = ((w.id, w.forceunit) - for w in Workout.objects.filter(id__in=ids)) - for id, u in funits: - if u == 'lbs': - mask = df['workoutid'] == id - df.loc[mask, 'averageforce'] = df.loc[mask, - 'averageforce'] * lbstoN - - return df, extracols - - - -# Read stroke data from the DB for a Workout ID. Returns a pandas dataframe - - -def read_df_sql(id): - try: - f = 'media/strokedata_{id}.parquet.gz'.format(id=id) - df = pd.read_parquet(f) - except (OSError, ArrowInvalid, IndexError): # pragma: no cover - rowdata, row = getrowdata(id=id) - if rowdata and len(rowdata.df): - data = dataprep(rowdata.df, id=id, bands=True, - otwpower=True, barchart=True) - try: - df = pd.read_parquet(f) - except OSError: - df = data - else: - df = pd.DataFrame() - - df = df.fillna(value=0) - - return df - - -# data fusion - - -def datafusion(id1, id2, columns, offset): - df1, w1 = getrowdata_db(id=id1) - df1 = df1.drop([ # 'cumdist', - 'hr_ut2', - 'hr_ut1', - 'hr_at', - 'hr_tr', - 'hr_an', - 'hr_max', - 'ftime', - 'fpace', - 'workoutid', - 'id'], - 1, errors='ignore') - - # Add coordinates to DataFrame - latitude, longitude = get_latlon(id1) - - df1[' latitude'] = latitude - df1[' longitude'] = longitude - - df2 = getsmallrowdata_db(['time'] + columns, ids=[id2], doclean=False) - - forceunit = 'N' - - offsetmillisecs = offset.seconds * 1000 + offset.microseconds / 1000. - offsetmillisecs += offset.days * (3600 * 24 * 1000) - df2['time'] = df2['time'] + offsetmillisecs - - keep1 = {c: c for c in set(df1.columns)} - - for c in columns: - keep1.pop(c) - - for c in df1.columns: - if c not in keep1: - df1 = df1.drop(c, 1, errors='ignore') - - df = pd.concat([df1, df2], ignore_index=True) - df = df.sort_values(['time']) - df = df.interpolate(method='linear', axis=0, limit_direction='both', - limit=10) - df.fillna(method='bfill', inplace=True) - - # Some new stuff to try out - df = df.groupby('time', axis=0).mean() - df['time'] = df.index - df.reset_index(drop=True, inplace=True) - - df['time'] = df['time'] / 1000. - df['pace'] = df['pace'] / 1000. - df['cum_dist'] = df['cumdist'] - - return df, forceunit - - -def fix_newtons(id=0, limit=3000): # pragma: no cover - # rowdata,row = getrowdata_db(id=id,doclean=False,convertnewtons=False) - rowdata = getsmallrowdata_db(['peakforce'], ids=[id], doclean=False) - try: - peakforce = rowdata['peakforce'] - if peakforce.mean() > limit: - w = Workout.objects.get(id=id) - - rowdata = rdata(w.csvfilename) - if rowdata and len(rowdata.df): - update_strokedata(w.id, rowdata.df) - except KeyError: - pass - - -def remove_invalid_columns(df): # pragma: no cover - for c in df.columns: - if c not in allowedcolumns: - df.drop(labels=c, axis=1, inplace=True) - - return df - - -def add_efficiency(id=0): # pragma: no cover - rowdata, row = getrowdata_db(id=id, - doclean=False, - convertnewtons=False, - checkefficiency=False) - power = rowdata['power'] - pace = rowdata['pace'] / 1.0e3 - velo = 500. / pace - ergpw = 2.8 * velo**3 - efficiency = 100. * ergpw / power - - efficiency = efficiency.replace([-np.inf, np.inf], np.nan) - efficiency.fillna(method='ffill') - rowdata['efficiency'] = efficiency - - rowdata = remove_invalid_columns(rowdata) - rowdata = rowdata.replace([-np.inf, np.inf], np.nan) - rowdata = rowdata.fillna(method='ffill') - - delete_strokedata(id) - - if id != 0: - rowdata['workoutid'] = id - filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) - df = dd.from_pandas(rowdata, npartitions=1) - df.to_parquet(filename, engine='fastparquet', compression='GZIP') - - return rowdata - -# This is the main routine. -# it reindexes, sorts, filters, and smooths the data, then -# saves it to the stroke_data table in the database -# Takes a rowingdata object's DataFrame as input - - -def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, - empower=True, inboard=0.88, forceunit='lbs', debug=False): - - if rowdatadf.empty: - return 0 - - t = rowdatadf.loc[:, 'TimeStamp (sec)'] - t = pd.Series(t - rowdatadf.loc[:, 'TimeStamp (sec)'].iloc[0]) - - row_index = rowdatadf.loc[:, ' Stroke500mPace (sec/500m)'] > 3000 - rowdatadf.loc[row_index, ' Stroke500mPace (sec/500m)'] = 3000. - - p = rowdatadf.loc[:, ' Stroke500mPace (sec/500m)'] - try: - velo = rowdatadf.loc[:, ' AverageBoatSpeed (m/s)'] - except KeyError: # pragma: no cover - velo = 500./p - - hr = rowdatadf.loc[:, ' HRCur (bpm)'] - spm = rowdatadf.loc[:, ' Cadence (stokes/min)'] - cumdist = rowdatadf.loc[:, 'cum_dist'] - power = rowdatadf.loc[:, ' Power (watts)'] - averageforce = rowdatadf.loc[:, ' AverageDriveForce (lbs)'] - drivelength = rowdatadf.loc[:, ' DriveLength (meters)'] - try: - workoutstate = rowdatadf.loc[:, ' WorkoutState'] - except KeyError: # pragma: no cover - workoutstate = 0 * hr - - peakforce = rowdatadf.loc[:, ' PeakDriveForce (lbs)'] - - forceratio = averageforce / peakforce - forceratio = forceratio.fillna(value=0) - - try: - drivetime = rowdatadf.loc[:, ' DriveTime (ms)'] - recoverytime = rowdatadf.loc[:, ' StrokeRecoveryTime (ms)'] - rhythm = 100. * drivetime / (recoverytime + drivetime) - rhythm = rhythm.fillna(value=0) - except: # pragma: no cover - rhythm = 0.0 * forceratio - - f = rowdatadf['TimeStamp (sec)'].diff().mean() - if f != 0 and not np.isinf(f): - try: - windowsize = 2 * (int(10. / (f))) + 1 - except ValueError: # pragma: no cover - windowsize = 1 - else: - windowsize = 1 - if windowsize <= 3: - windowsize = 5 - - if windowsize > 3 and windowsize < len(hr): - spm = savgol_filter(spm, windowsize, 3) - hr = savgol_filter(hr, windowsize, 3) - drivelength = savgol_filter(drivelength, windowsize, 3) - forceratio = savgol_filter(forceratio, windowsize, 3) - - try: - t2 = t.fillna(method='ffill').apply(lambda x: timedeltaconv(x)) - except TypeError: # pragma: no cover - t2 = 0 * t - - p2 = p.fillna(method='ffill').apply(lambda x: timedeltaconv(x)) - - try: - drivespeed = drivelength / rowdatadf[' DriveTime (ms)'] * 1.0e3 - except TypeError: # pragma: no cover - drivespeed = 0.0 * rowdatadf['TimeStamp (sec)'] - - drivespeed = drivespeed.fillna(value=0) - - try: - driveenergy = rowdatadf['driveenergy'] - except KeyError: # pragma: no cover - if forceunit == 'lbs': - driveenergy = drivelength * averageforce * lbstoN - else: - driveenergy = drivelength * averageforce - - if forceunit == 'lbs': - averageforce *= lbstoN - peakforce *= lbstoN - - powerhr = 60.*power/hr - powerhr = powerhr.fillna(value=0) - - if driveenergy.mean() == 0 and driveenergy.std() == 0: - driveenergy = 0*driveenergy+100 - - distance = rowdatadf.loc[:, 'cum_dist'] - velo = 500. / p - - distanceperstroke = 60. * velo / spm - - data = DataFrame( - dict( - time=t * 1e3, - hr=hr, - pace=p * 1e3, - spm=spm, - velo=velo, - cumdist=cumdist, - ftime=niceformat(t2), - fpace=nicepaceformat(p2), - driveenergy=driveenergy, - power=power, - workoutstate=workoutstate, - averageforce=averageforce, - drivelength=drivelength, - peakforce=peakforce, - forceratio=forceratio, - distance=distance, - drivespeed=drivespeed, - rhythm=rhythm, - distanceperstroke=distanceperstroke, - # powerhr=powerhr, - ) - ) - - if bands: - # HR bands - data['hr_ut2'] = rowdatadf.loc[:, 'hr_ut2'] - data['hr_ut1'] = rowdatadf.loc[:, 'hr_ut1'] - data['hr_at'] = rowdatadf.loc[:, 'hr_at'] - data['hr_tr'] = rowdatadf.loc[:, 'hr_tr'] - data['hr_an'] = rowdatadf.loc[:, 'hr_an'] - data['hr_max'] = rowdatadf.loc[:, 'hr_max'] - data['hr_bottom'] = 0.0 * data['hr'] - - try: - _ = rowdatadf.loc[:, ' ElapsedTime (sec)'] - except KeyError: # pragma: no cover - rowdatadf[' ElapsedTime (sec)'] = rowdatadf['TimeStamp (sec)'] - - if empower: - try: - wash = rowdatadf.loc[:, 'wash'] - except KeyError: - wash = 0 * power - - try: - catch = rowdatadf.loc[:, 'catch'] - except KeyError: - catch = 0 * power - - try: - finish = rowdatadf.loc[:, 'finish'] - except KeyError: - finish = 0 * power - - try: - peakforceangle = rowdatadf.loc[:, 'peakforceangle'] - except KeyError: - peakforceangle = 0 * power - - if data['driveenergy'].mean() == 0: # pragma: no cover - try: - driveenergy = rowdatadf.loc[:, 'driveenergy'] - except KeyError: - driveenergy = power * 60 / spm - else: - driveenergy = data['driveenergy'] - - arclength = (inboard - 0.05) * (np.radians(finish) - np.radians(catch)) - if arclength.mean() > 0: - drivelength = arclength - elif drivelength.mean() == 0: - drivelength = driveenergy / (averageforce * 4.44822) - - try: - slip = rowdatadf.loc[:, 'slip'] - except KeyError: - slip = 0 * power - - try: - totalangle = finish - catch - effectiveangle = finish - wash - catch - slip - except ValueError: # pragma: no cover - totalangle = 0 * power - effectiveangle = 0 * power - - if windowsize > 3 and windowsize < len(slip): - try: - wash = savgol_filter(wash, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - slip = savgol_filter(slip, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - catch = savgol_filter(catch, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - finish = savgol_filter(finish, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - peakforceangle = savgol_filter(peakforceangle, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - driveenergy = savgol_filter(driveenergy, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - drivelength = savgol_filter(drivelength, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - totalangle = savgol_filter(totalangle, windowsize, 3) - except TypeError: # pragma: no cover - pass - try: - effectiveangle = savgol_filter(effectiveangle, windowsize, 3) - except TypeError: # pragma: no cover - pass - - velo = 500. / p - - ergpw = 2.8 * velo**3 - efficiency = 100. * ergpw / power - - efficiency = efficiency.replace([-np.inf, np.inf], np.nan) - efficiency.fillna(method='ffill') - - try: - data['wash'] = wash - data['catch'] = catch - data['slip'] = slip - data['finish'] = finish - data['peakforceangle'] = peakforceangle - data['driveenergy'] = driveenergy - data['drivelength'] = drivelength - data['totalangle'] = totalangle - data['effectiveangle'] = effectiveangle - data['efficiency'] = efficiency - except ValueError: # pragma: no cover - pass - - if otwpower: - try: - nowindpace = rowdatadf.loc[:, 'nowindpace'] - except KeyError: - nowindpace = p - try: - equivergpower = rowdatadf.loc[:, 'equivergpower'] - except KeyError: - equivergpower = 0 * p + 50. - - nowindpace2 = nowindpace.apply(lambda x: timedeltaconv(x)) - ergvelo = (equivergpower / 2.8)**(1. / 3.) - - ergpace = 500. / ergvelo - ergpace[ergpace == np.inf] = 240. - ergpace2 = ergpace.apply(lambda x: timedeltaconv(x)) - - data['ergpace'] = ergpace * 1e3 - data['nowindpace'] = nowindpace * 1e3 - data['equivergpower'] = equivergpower - data['fergpace'] = nicepaceformat(ergpace2) - data['fnowindpace'] = nicepaceformat(nowindpace2) - - data = data.replace([-np.inf, np.inf], np.nan) - data = data.fillna(method='ffill') - - # write data if id given - if id != 0: - data['workoutid'] = id - data.fillna(0, inplace=True) - for k, v in dtypes.items(): - try: - data[k] = data[k].astype(v) - except KeyError: # pragma: no cover - pass - - filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) - df = dd.from_pandas(data, npartitions=1) - df.to_parquet(filename, engine='fastparquet', compression='GZIP') - - return data - - - -def delete_agegroup_db(age, sex, weightcategory, debug=False): - if debug: # pragma: no cover - engine = create_engine(database_url_debug, echo=False) - else: # pragma: no cover - engine = create_engine(database_url, echo=False) - - query = sa.text("DELETE from {table} WHERE age='{age}' and weightcategory='{weightcategory}' and sex='{sex}';".format( - sex=sex, - age=age, - weightcategory=weightcategory, - table='calcagegrouprecords' - )) - with engine.connect() as conn, conn.begin(): - _ = conn.execute(query) - conn.close() - engine.dispose() - - - - - -def update_agegroup_db(age, sex, weightcategory, wcdurations, wcpower, - debug=False): - - delete_agegroup_db(age, sex, weightcategory, debug=debug) - - wcdurations = [None if type(y) is float and np.isnan( - y) else y for y in wcdurations] - wcpower = [None if type(y) is float and np.isnan(y) - else y for y in wcpower] - - df = pd.DataFrame( - { - 'duration': wcdurations, - 'power': wcpower, - } - ) - - df['sex'] = sex - df['age'] = age - df['weightcategory'] = weightcategory - df.replace([np.inf, -np.inf], np.nan, inplace=True) - df.dropna(axis=0, inplace=True) - - if debug: # pragma: no cover # pragma: no cover - engine = create_engine(database_url_debug, echo=False) - else: - engine = create_engine(database_url, echo=False) - - table = 'calcagegrouprecords' - with engine.connect() as conn, conn.begin(): - df.to_sql(table, engine, if_exists='append', index=False) - conn.close() - engine.dispose() - - - -def add_c2_stroke_data_db(strokedata, workoutid, starttimeunix, csvfilename, - debug=False, workouttype='rower'): - - res = make_cumvalues(0.1*strokedata['t']) - cum_time = res[0] - lapidx = res[1] - - unixtime = cum_time+starttimeunix - # unixtime[0] = starttimeunix - seconds = 0.1*strokedata.loc[:, 't'] - - nr_rows = len(unixtime) - - try: # pragma: no cover - latcoord = strokedata.loc[:, 'lat'] - loncoord = strokedata.loc[:, 'lon'] - except: - latcoord = np.zeros(nr_rows) - loncoord = np.zeros(nr_rows) - - try: - strokelength = strokedata.loc[:, 'strokelength'] - except: - strokelength = np.zeros(nr_rows) - - dist2 = 0.1*strokedata.loc[:, 'd'] - - try: - spm = strokedata.loc[:, 'spm'] - except KeyError: # pragma: no cover - spm = 0*dist2 - - try: - hr = strokedata.loc[:, 'hr'] - except KeyError: # pragma: no cover - hr = 0*spm - - pace = strokedata.loc[:, 'p']/10. - pace = np.clip(pace, 0, 1e4) - pace = pace.replace(0, 300) - - velo = 500./pace - power = 2.8*velo**3 - if workouttype == 'bike': # pragma: no cover - velo = 1000./pace - - # save csv - # Create data frame with all necessary data to write to csv - df = pd.DataFrame({'TimeStamp (sec)': unixtime, - ' Horizontal (meters)': dist2, - ' Cadence (stokes/min)': spm, - ' HRCur (bpm)': hr, - ' longitude': loncoord, - ' latitude': latcoord, - ' Stroke500mPace (sec/500m)': pace, - ' Power (watts)': power, - ' DragFactor': np.zeros(nr_rows), - ' DriveLength (meters)': np.zeros(nr_rows), - ' StrokeDistance (meters)': strokelength, - ' DriveTime (ms)': np.zeros(nr_rows), - ' StrokeRecoveryTime (ms)': np.zeros(nr_rows), - ' AverageDriveForce (lbs)': np.zeros(nr_rows), - ' PeakDriveForce (lbs)': np.zeros(nr_rows), - ' lapIdx': lapidx, - ' WorkoutState': 4, - ' ElapsedTime (sec)': seconds, - 'cum_dist': dist2 - }) - - df.sort_values(by='TimeStamp (sec)', ascending=True) - - # Create CSV file name and save data to CSV file - - res = df.to_csv(csvfilename, index_label='index', - compression='gzip') - - - data = dataprep(df, id=workoutid, bands=False, debug=debug) - - return data - -# Creates C2 stroke data -def create_c2_stroke_data_db( - distance, duration, workouttype, - workoutid, starttimeunix, csvfilename, debug=False): # pragma: no cover - - nr_strokes = int(distance/10.) - - totalseconds = duration.hour*3600. - totalseconds += duration.minute*60. - totalseconds += duration.second - totalseconds += duration.microsecond/1.e6 - - try: - spm = 60.*nr_strokes/totalseconds - except ZeroDivisionError: - spm = 20*np.zeros(nr_strokes) - - try: - _ = totalseconds/float(nr_strokes) - except ZeroDivisionError: - return 0 - - elapsed = np.arange(nr_strokes)*totalseconds/(float(nr_strokes-1)) - - d = np.arange(nr_strokes)*distance/(float(nr_strokes-1)) - - unixtime = starttimeunix + elapsed - - pace = 500.*totalseconds/distance - - if workouttype in ['rower', 'slides', 'dynamic']: - try: - velo = distance/totalseconds - except ZeroDivisionError: - velo = 0 - power = 2.8*velo**3 - else: - power = 0 - - df = pd.DataFrame({ - 'TimeStamp (sec)': unixtime, - ' Horizontal (meters)': d, - ' Cadence (stokes/min)': spm, - ' Stroke500mPace (sec/500m)': pace, - ' ElapsedTime (sec)': elapsed, - ' Power (watts)': power, - ' HRCur (bpm)': np.zeros(nr_strokes), - ' longitude': np.zeros(nr_strokes), - ' latitude': np.zeros(nr_strokes), - ' DragFactor': np.zeros(nr_strokes), - ' DriveLength (meters)': np.zeros(nr_strokes), - ' StrokeDistance (meters)': np.zeros(nr_strokes), - ' DriveTime (ms)': np.zeros(nr_strokes), - ' StrokeRecoveryTime (ms)': np.zeros(nr_strokes), - ' AverageDriveForce (lbs)': np.zeros(nr_strokes), - ' PeakDriveForce (lbs)': np.zeros(nr_strokes), - ' lapIdx': np.zeros(nr_strokes), - 'cum_dist': d - }) - - df[' ElapsedTime (sec)'] = df['TimeStamp (sec)'] - - _ = df.to_csv(csvfilename, index_label='index', compression='gzip') - - data = dataprep(df, id=workoutid, bands=False, debug=debug) - - return data - - -def update_empower(id, inboard, oarlength, boattype, df, f1, debug=False): # pragma: no cover - - corr_factor = 1.0 - if 'x' in boattype: - # sweep - a = 0.06 - b = 0.275 - else: - # scull - a = 0.15 - b = 0.275 - - corr_factor = empower_bug_correction(oarlength, inboard, a, b) - - success = False - - try: - df['power empower old'] = df[' Power (watts)'] - df[' Power (watts)'] = df[' Power (watts)'] * corr_factor - df['driveenergy empower old'] = df['driveenergy'] - df['driveenergy'] = df['driveenergy'] * corr_factor - success = True - except KeyError: - pass - - if success: - delete_strokedata(id, debug=debug) - if debug: # pragma: no cover - print("updated ", id) - print("correction ", corr_factor) - else: - if debug: # pragma: no cover - print("not updated ", id) - - _ = dataprep(df, id=id, bands=True, barchart=True, otwpower=True, debug=debug) - - row = rrdata(df=df) - row.write_csv(f1, gzip=True) - - return success diff --git a/rowers/tasks.py b/rowers/tasks.py index 39dbe72c..cad3b9da 100644 --- a/rowers/tasks.py +++ b/rowers/tasks.py @@ -54,6 +54,7 @@ import rowingdata from rowingdata import make_cumvalues from uuid import uuid4 from rowingdata import rowingdata as rdata + from datetime import timedelta from rowers.celery import app @@ -102,7 +103,7 @@ from rowers.emails import htmlstrip from rowers import mytypes -from rowers.dataroutines import ( +from rowers.dataprep import ( getsmallrowdata_db, updatecpdata_sql, update_c2id_sql, #update_workout_field_sql, update_agegroup_db, update_strokedata,