# All the data preparation, data cleaning and data mangling should # be defined here from rowers.models import Workout, User, Rower,StrokeData from rowingdata import rowingdata as rrdata from rowers.tasks import handle_sendemail_unrecognized from rowingdata import rower as rrower from rowingdata import main as rmain from rowingdata import get_file_type from pandas import DataFrame,Series from pytz import timezone as tz,utc from django.utils.timezone import get_current_timezone thetimezone = get_current_timezone() from rowingdata import ( TCXParser,RowProParser,ErgDataParser,TCXParserNoHR, BoatCoachParser,RowPerfectParser,BoatCoachAdvancedParser, MysteryParser, painsledDesktopParser,speedcoachParser,ErgStickParser, SpeedCoach2Parser,FITParser,fitsummarydata, make_cumvalues, summarydata,get_file_type, ) import os import pandas as pd import numpy as np import itertools from tasks import handle_sendemail_unrecognized from django.conf import settings from sqlalchemy import create_engine import sqlalchemy as sa import sys user = settings.DATABASES['default']['USER'] password = settings.DATABASES['default']['PASSWORD'] database_name = settings.DATABASES['default']['NAME'] host = settings.DATABASES['default']['HOST'] port = settings.DATABASES['default']['PORT'] database_url = 'mysql://{user}:{password}@{host}:{port}/{database_name}'.format( user=user, password=password, database_name=database_name, host=host, port=port, ) # Use SQLite local database when we're in debug mode if settings.DEBUG or user=='': # database_url = 'sqlite:///db.sqlite3' database_url = 'sqlite:///'+database_name # mapping the DB column names to the CSV file column names columndict = { 'time':'TimeStamp (sec)', 'hr':' HRCur (bpm)', 'pace':' Stroke500mPace (sec/500m)', 'spm':' Cadence (stokes/min)', 'power':' Power (watts)', 'averageforce':' AverageDriveForce (lbs)', 'drivelength':' DriveLength (meters)', 'peakforce':' PeakDriveForce (lbs)', 'distance':' Horizontal (meters)', 'catch':'catch', 'finish':'finish', 'peakforceangle':'peakforceangle', 'wash':'wash', 'slip':'wash', } from scipy.signal import savgol_filter import datetime def clean_df_stats(datadf,workstrokesonly=True): # clean data remove zeros and negative values datadf=datadf.clip(lower=0) datadf.replace(to_replace=0,value=np.nan,inplace=True) # clean data for useful ranges per column mask = datadf['hr'] < 30 datadf.loc[mask,'hr'] = np.nan mask = datadf['rhythm'] < 5 datadf.loc[mask,'rhythm'] = np.nan mask = datadf['rhythm'] > 70 datadf.loc[mask,'rhythm'] = np.nan mask = datadf['power'] < 20 datadf.loc[mask,'power'] = np.nan mask = datadf['drivelength'] < 0.5 datadf.loc[mask,'drivelength'] = np.nan mask = datadf['forceratio'] < 0.2 datadf.loc[mask,'forceratio'] = np.nan mask = datadf['forceratio'] > 1.0 datadf.loc[mask,'forceratio'] = np.nan mask = datadf['spm'] < 10 datadf.loc[mask,'spm'] = np.nan mask = datadf['spm'] > 60 datadf.loc[mask,'spm'] = np.nan mask = datadf['drivespeed'] < 0.5 datadf.loc[mask,'drivespeed'] = np.nan mask = datadf['drivespeed'] > 4 datadf.loc[mask,'drivespeed'] = np.nan mask = datadf['driveenergy'] > 2000 datadf.loc[mask,'driveenergy'] = np.nan mask = datadf['driveenergy'] < 100 datadf.loc[mask,'driveenergy'] = np.nan workoutstateswork = [1,4,5,8,9,6,7] workoutstatesrest = [3] workoutstatetransition = [0,2,10,11,12,13] if workstrokesonly=='True' or workstrokesonly==True: try: datadf = datadf[~datadf['workoutstate'].isin(workoutstatesrest)] except: pass return datadf def getstatsfields(): # Get field names and remove those that are not useful in stats fields = StrokeData._meta.get_fields() fielddict = {field.name:field.verbose_name for field in fields} fielddict.pop('workoutid') fielddict.pop('ergpace') fielddict.pop('hr_an') fielddict.pop('hr_tr') fielddict.pop('hr_at') fielddict.pop('hr_ut2') fielddict.pop('hr_ut1') fielddict.pop('time') fielddict.pop('distance') fielddict.pop('nowindpace') fielddict.pop('fnowindpace') fielddict.pop('fergpace') fielddict.pop('equivergpower') # fielddict.pop('workoutstate') fielddict.pop('fpace') fielddict.pop('pace') fielddict.pop('id') fielddict.pop('ftime') fielddict.pop('x_right') fielddict.pop('hr_max') fielddict.pop('hr_bottom') fielddict.pop('cumdist') fieldlist = [field for field,value in fielddict.iteritems()] return fieldlist,fielddict # A string representation for time deltas def niceformat(values): out = [] for v in values: formattedv = strfdelta(v) out.append(formattedv) return out # A nice printable format for time delta values def strfdelta(tdelta): try: minutes,seconds = divmod(tdelta.seconds,60) tenths = int(tdelta.microseconds/1e5) except AttributeError: minutes,seconds = divmod(tdelta.view(np.int64),60e9) seconds,rest = divmod(seconds,1e9) tenths = int(rest/1e8) res = "{minutes:0>2}:{seconds:0>2}.{tenths:0>1}".format( minutes=minutes, seconds=seconds, tenths=tenths, ) return res # A nice printable format for pace values def nicepaceformat(values): out = [] for v in values: formattedv = strfdelta(v) out.append(formattedv) return out # Convert seconds to a Time Delta value, replacing NaN with a 5:50 pace def timedeltaconv(x): if not np.isnan(x): dt = datetime.timedelta(seconds=x) else: dt = datetime.timedelta(seconds=350.) return dt # Create new workout from file and store it in the database # This routine should be used everywhere in views.py and mailprocessing.py # Currently there is code duplication def new_workout_from_file(r,f2, workouttype='rower', title='Workout', notes=''): message = None fileformat = get_file_type(f2) summary = '' if len(fileformat)==3 and fileformat[0]=='zip': f_to_be_deleted = f2 with zipfile.ZipFile(f2) as z: # for now, we're getting only the first file # from the NK zip file (issue #69 on bitbucket) f2 = z.extract(z.namelist()[0],path='media/') fileformat = fileformat[2] os.remove(f_to_be_deleted) # Some people try to upload Concept2 logbook summaries if fileformat == 'c2log': os.remove(f2) message = "This C2 logbook summary does not contain stroke data. Please download the Export Stroke Data file from the workout details on the C2 logbook." return (0,message) # Some people try to upload RowPro summary logs if fileformat == 'rowprolog': os.remove(f2) message = "This RowPro logbook summary does not contain stroke data. Please use the Stroke Data CSV file for the individual workout in your log." return (0,message) # Sometimes people try an unsupported file type. # Send an email to info@rowsandall.com with the file attached # for me to check if it is a bug, or a new file type # worth supporting if fileformat == 'unknown': message = "We couldn't recognize the file type" if settings.DEBUG: res = handle_sendemail_unrecognized.delay(f2, request.user.email) else: res = queuehigh.enqueue(handle_sendemail_unrecognized, f2,request.user.email) return (0,'message') # handle non-Painsled by converting it to painsled compatible CSV if (fileformat != 'csv'): # handle RowPro: if (fileformat == 'rp'): row = RowProParser(f2) # handle TCX if (fileformat == 'tcx'): row = TCXParser(f2) # handle Mystery if (fileformat == 'mystery'): row = MysteryParser(f2) # handle TCX no HR if (fileformat == 'tcxnohr'): row = TCXParserNoHR(f2) # handle RowPerfect if (fileformat == 'rowperfect3'): row = RowPerfectParser(f2) # handle ErgData if (fileformat == 'ergdata'): row = ErgDataParser(f2) # handle Mike if (fileformat == 'bcmike'): row = BoatCoachAdvancedParser(f2) # handle BoatCoach if (fileformat == 'boatcoach'): row = BoatCoachParser(f2) # handle painsled desktop if (fileformat == 'painsleddesktop'): row = painsledDesktopParser(f2) # handle speed coach GPS if (fileformat == 'speedcoach'): row = speedcoachParser(f2) # handle speed coach GPS 2 if (fileformat == 'speedcoach2'): row = SpeedCoach2Parser(f2) try: summary = row.allstats() except: pass # handle ErgStick if (fileformat == 'ergstick'): row = ErgStickParser(f2) # handle FIT if (fileformat == 'fit'): row = FITParser(f2) s = fitsummarydata(f2) s.setsummary() summary = s.summarytext f_to_be_deleted = f2 # should delete file f2 = f2[:-4]+'o.csv' row.write_csv(f2,gzip=True) #os.remove(f2) try: os.remove(f_to_be_deleted) except: os.remove(f_to_be_deleted+'.gz') powerperc = 100*np.array([r.pw_ut2, r.pw_ut1, r.pw_at, r.pw_tr,r.pw_an])/r.ftp # make workout and put in database rr = rrower(hrmax=r.max,hrut2=r.ut2, hrut1=r.ut1,hrat=r.at, hrtr=r.tr,hran=r.an,ftp=r.ftp, powerperc=powerperc,powerzones=r.powerzones) row = rdata(f2,rower=rr) if row == 0: return (0,'Error: CSV data file not found') # auto smoothing pace = row.df[' Stroke500mPace (sec/500m)'].values velo = 500./pace f = row.df['TimeStamp (sec)'].diff().mean() windowsize = 2*(int(10./(f)))+1 if not 'originalvelo' in row.df: row.df['originalvelo'] = velo if windowsize > 3 and windowsize23: message = 'Warning: The workout duration was longer than 23 hours' hours = 23 minutes = int((totaltime - 3600.*hours)/60.) seconds = int(totaltime - 3600.*hours - 60.*minutes) tenths = int(10*(totaltime - 3600.*hours - 60.*minutes - seconds)) duration = "%s:%s:%s.%s" % (hours,minutes,seconds,tenths) workoutdate = row.rowdatetime.strftime('%Y-%m-%d') workoutstarttime = row.rowdatetime.strftime('%H:%M:%S') workoutstartdatetime = thetimezone.localize(row.rowdatetime).astimezone(utc) # check for duplicate start times ws = Workout.objects.filter(starttime=workoutstarttime, user=r) if (len(ws) != 0): message = "Warning: This workout probably already exists in the database" w = Workout(user=r,name=title,date=workoutdate, workouttype=workouttype, duration=duration,distance=totaldist, weightcategory=r.weightcategory, starttime=workoutstarttime, csvfilename=f2,notes=notes,summary=summary, maxhr=maxhr,averagehr=averagehr, startdatetime=workoutstartdatetime) w.save() # put stroke data in database res = dataprep(row.df,id=w.id,bands=True, barchart=True,otwpower=True,empower=True) return (w.id,message) # Compare the data from the CSV file and the database # Currently only calculates number of strokes. To be expanded with # more elaborate testing if needed def compare_data(id): row = Workout.objects.get(id=id) f1 = row.csvfilename try: rowdata = rdata(f1) l1 = len(rowdata.df) except AttributeError: rowdata = 0 l1 = 0 engine = create_engine(database_url, echo=False) query = sa.text('SELECT COUNT(*) FROM strokedata WHERE workoutid={id};'.format( id=id, )) with engine.connect() as conn, conn.begin(): try: res = conn.execute(query) l2 = res.fetchall()[0][0] except: print "Database Locked" conn.close() engine.dispose() lfile = l1 ldb = l2 return l1==l2 and l1 != 0,ldb,lfile # Repair data for workouts where the CSV file is lost (or the DB entries # don't exist) def repair_data(verbose=False): ws = Workout.objects.all() for w in ws: if verbose: sys.stdout.write(".") test,ldb,lfile = compare_data(w.id) if not test: if verbose: print w.id,lfile,ldb try: rowdata = rdata(w.csvfilename) if rowdata and len(rowdata.df): update_strokedata(w.id,rowdata.df) except IOError, AttributeError: pass if lfile==0: # if not ldb - delete workout try: data = read_df_sql(w.id) try: datalength = len(data) except AttributeError: datalength = 0 if datalength != 0: data.rename(columns = columndict,inplace=True) res = data.to_csv(w.csvfilename+'.gz', index_label='index', compression='gzip') print 'adding csv file' else: print w.id,' No stroke records anywhere' w.delete() except: print 'failed' print str(sys.exc_info()[0]) pass # A wrapper around the rowingdata class, with some error catching def rdata(file,rower=rrower()): try: res = rrdata(file,rower=rower) except IOError,IndexError: try: res = rrdata(file+'.gz',rower=rower) except IOError,IndexError: res = 0 return res # Remove all stroke data for workout ID from database def delete_strokedata(id): engine = create_engine(database_url, echo=False) query = sa.text('DELETE FROM strokedata WHERE workoutid={id};'.format( id=id, )) with engine.connect() as conn, conn.begin(): try: result = conn.execute(query) except: print "Database Locked" conn.close() engine.dispose() # Replace stroke data in DB with data from CSV file def update_strokedata(id,df): delete_strokedata(id) rowdata = dataprep(df,id=id,bands=True,barchart=True,otwpower=True) # Test that all data are of a numerical time def testdata(time,distance,pace,spm): t1 = np.issubdtype(time,np.number) t2 = np.issubdtype(distance,np.number) t3 = np.issubdtype(pace,np.number) t4 = np.issubdtype(spm,np.number) return t1 and t2 and t3 and t4 # Get data from DB for one workout (fetches all data). If data # is not in DB, read from CSV file (and create DB entry) def getrowdata_db(id=0): data = read_df_sql(id) data['x_right'] = data['x_right']/1.0e6 if data.empty: rowdata,row = getrowdata(id=id) if rowdata: data = dataprep(rowdata.df,id=id,bands=True,barchart=True,otwpower=True) else: data = pd.DataFrame() # returning empty dataframe else: row = Workout.objects.get(id=id) return data,row # Fetch a subset of the data from the DB def getsmallrowdata_db(columns,ids=[]): prepmultipledata(ids) data = read_cols_df_sql(ids,columns) return data # Fetch both the workout and the workout stroke data (from CSV file) def getrowdata(id=0): # check if valid ID exists (workout exists) row = Workout.objects.get(id=id) f1 = row.csvfilename # get user r = row.user u = r.user rr = rrower(hrmax=r.max,hrut2=r.ut2, hrut1=r.ut1,hrat=r.at, hrtr=r.tr,hran=r.an,ftp=r.ftp) rowdata = rdata(f1,rower=rr) return rowdata,row # Checks if all rows for a list of workout IDs have entries in the # stroke_data table. If this is not the case, it creates the stroke # data # In theory, this should never yield any work, but it's a good # safety net for programming errors elsewhere in the app # Also used heavily when I moved from CSV file only to CSV+Stroke data def prepmultipledata(ids,verbose=False): query = sa.text('SELECT DISTINCT workoutid FROM strokedata') engine = create_engine(database_url, echo=False) with engine.connect() as conn, conn.begin(): res = conn.execute(query) res = list(itertools.chain.from_iterable(res.fetchall())) conn.close() engine.dispose() try: ids2 = [int(id) for id in ids] except ValueError: ids2 = ids res = list(set(ids2)-set(res)) for id in res: rowdata,row = getrowdata(id=id) if verbose: print id if rowdata and len(rowdata.df): data = dataprep(rowdata.df,id=id,bands=True,barchart=True,otwpower=True) return res # Read a set of columns for a set of workout ids, returns data as a # pandas dataframe def read_cols_df_sql(ids,columns): columns = list(columns)+['distance','spm'] columns = [x for x in columns if x != 'None'] columns = list(set(columns)) cls = '' engine = create_engine(database_url, echo=False) for column in columns: cls += column+', ' cls = cls[:-2] if len(ids) == 0: query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid=0'.format( columns = cls, )) elif len(ids) == 1: query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid={id}'.format( id = ids[0], columns = cls, )) else: query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid IN {ids}'.format( columns = cls, ids = tuple(ids), )) connection = engine.raw_connection() df = pd.read_sql_query(query,engine) df = df.fillna(value=0) engine.dispose() return df # Read stroke data from the DB for a Workout ID. Returns a pandas dataframe def read_df_sql(id): engine = create_engine(database_url, echo=False) df = pd.read_sql_query(sa.text('SELECT * FROM strokedata WHERE workoutid={id}'.format( id=id)), engine) engine.dispose() df = df.fillna(value=0) return df # Get the necessary data from the strokedata table in the DB. # For the flex plot def smalldataprep(therows,xparam,yparam1,yparam2): df = pd.DataFrame() if yparam2 == 'None': yparam2 = 'power' df[xparam] = [] df[yparam1] = [] df[yparam2] = [] df['distance'] = [] df['spm'] = [] for workout in therows: f1 = workout.csvfilename try: rowdata = dataprep(rrdata(f1).df) rowdata = pd.DataFrame({xparam: rowdata[xparam], yparam1: rowdata[yparam1], yparam2: rowdata[yparam2], 'distance': rowdata['distance'], 'spm': rowdata['spm'], } ) df = pd.concat([df,rowdata],ignore_index=True) except IOError: try: rowdata = dataprep(rrdata(f1+'.gz').df) rowdata = pd.DataFrame({xparam: rowdata[xparam], yparam1: rowdata[yparam1], yparam2: rowdata[yparam2], 'distance': rowdata['distance'], 'spm': rowdata['spm'], } ) df = pd.concat([df,rowdata],ignore_index=True) except IOError: pass return df # This is the main routine. # it reindexes, sorts, filters, and smooths the data, then # saves it to the stroke_data table in the database # Takes a rowingdata object's DataFrame as input def dataprep(rowdatadf,id=0,bands=True,barchart=True,otwpower=True, empower=True): rowdatadf.set_index([range(len(rowdatadf))],inplace=True) t = rowdatadf.ix[:,'TimeStamp (sec)'] t = pd.Series(t-rowdatadf.ix[0,'TimeStamp (sec)']) row_index = rowdatadf.ix[:,' Stroke500mPace (sec/500m)'] > 3000 rowdatadf.loc[row_index,' Stroke500mPace (sec/500m)'] = 3000. p = rowdatadf.ix[:,' Stroke500mPace (sec/500m)'] hr = rowdatadf.ix[:,' HRCur (bpm)'] spm = rowdatadf.ix[:,' Cadence (stokes/min)'] cumdist = rowdatadf.ix[:,'cum_dist'] power = rowdatadf.ix[:,' Power (watts)'] averageforce = rowdatadf.ix[:,' AverageDriveForce (lbs)'] drivelength = rowdatadf.ix[:,' DriveLength (meters)'] try: workoutstate = rowdatadf.ix[:,' WorkoutState'] except KeyError: workoutstate = 0*hr peakforce = rowdatadf.ix[:,' PeakDriveForce (lbs)'] forceratio = averageforce/peakforce forceratio = forceratio.fillna(value=0) try: drivetime = rowdatadf.ix[:,' DriveTime (ms)'] recoverytime = rowdatadf.ix[:,' StrokeRecoveryTime (ms)'] rhythm = 100.*drivetime/(recoverytime+drivetime) rhythm = rhythm.fillna(value=0) except: rhythm = 0.0*forceratio f = rowdatadf['TimeStamp (sec)'].diff().mean() windowsize = 2*(int(10./(f)))+1 if windowsize <= 3: windowsize = 5 if windowsize > 3 and windowsize 3 and windowsize