diff --git a/rowers/dataprep.py b/rowers/dataprep.py index f96b6c3f..158c74a3 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -15,6 +15,7 @@ from rowingdata import rowingdata as rrdata from rowingdata import rower as rrower +import shutil from shutil import copyfile from rowingdata import ( @@ -1651,75 +1652,7 @@ def new_workout_from_df(r, df, return (id, message) -# Compare the data from the CSV file and the database -# Currently only calculates number of strokes. To be expanded with -# more elaborate testing if needed -def compare_data(id): - row = Workout.objects.get(id=id) - f1 = row.csvfilename - try: - rowdata = rdata(f1) - l1 = len(rowdata.df) - except AttributeError: - rowdata = 0 - l1 = 0 - engine = create_engine(database_url, echo=False) - query = sa.text('SELECT COUNT(*) FROM strokedata WHERE workoutid={id};'.format( - id=id, - )) - with engine.connect() as conn, conn.begin(): - try: - res = conn.execute(query) - l2 = res.fetchall()[0][0] - except: - print("Database Locked") - conn.close() - engine.dispose() - lfile = l1 - ldb = l2 - return l1 == l2 and l1 != 0, ldb, lfile - -# Repair data for workouts where the CSV file is lost (or the DB entries -# don't exist) - - -def repair_data(verbose=False): - ws = Workout.objects.all() - for w in ws: - if verbose: - sys.stdout.write(".") - test, ldb, lfile = compare_data(w.id) - if not test: - if verbose: - print(w.id, lfile, ldb) - try: - rowdata = rdata(w.csvfilename) - if rowdata and len(rowdata.df): - update_strokedata(w.id, rowdata.df) - - except (IOError, AttributeError): - pass - - if lfile == 0: - # if not ldb - delete workout - - try: - data = read_df_sql(w.id) - try: - datalength = len(data) - except AttributeError: - datalength = 0 - - if datalength != 0: - data.rename(columns=columndict, inplace=True) - res = data.to_csv(w.csvfilename + '.gz', - index_label='index', - compression='gzip') - else: - w.delete() - except: - pass # A wrapper around the rowingdata class, with some error catching @@ -1745,17 +1678,11 @@ def rdata(file, rower=rrower()): def delete_strokedata(id): - engine = create_engine(database_url, echo=False) - query = sa.text('DELETE FROM strokedata WHERE workoutid={id};'.format( - id=id, - )) - with engine.connect() as conn, conn.begin(): - try: - result = conn.execute(query) - except: - print("Database Locked") - conn.close() - engine.dispose() + dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id) + try: + shutil.rmtree(dirname) + except FileNotFoundError: + pass # Replace stroke data in DB with data from CSV file @@ -1782,7 +1709,6 @@ def testdata(time, distance, pace, spm): def getrowdata_db(id=0, doclean=False, convertnewtons=True, checkefficiency=True): data = read_df_sql(id) - data['x_right'] = data['x_right'] / 1.0e6 data['deltat'] = data['time'].diff() if data.empty: @@ -2010,6 +1936,66 @@ def prepmultipledata(ids, verbose=False): def read_cols_df_sql(ids, columns, convertnewtons=True): # drop columns that are not in offical list # axx = [ax[0] for ax in axes] + + extracols = [] + + columns = list(columns) + ['distance', 'spm', 'workoutid'] + columns = [x for x in columns if x != 'None'] + columns = list(set(columns)) + ids = [int(id) for id in ids] + + + if len(ids) == 0: + return pd.DataFrame(),extracols + elif len(ids) == 1: + try: + filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0]) + df = pd.read_parquet(filename,columns=columns) + except OSError: + rowdata,row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): + datadf = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True) + df = pd.read_parquet(filename,columns=columns) + else: + data = [] + filenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + for id,f in zip(ids,filenames): + try: + df = pd.read_parquet(f,columns=columns) + data.append(df) + except OSError: + rowdata,row = getrowdata(id=id) + if rowdata and len(rowdata.df): + datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True) + df = pd.read_parquet(f,columns=columns) + data.append(df) + + df = pd.concat(data,axis=0) + + + df = df.fillna(value=0) + + if 'peakforce' in columns: + funits = ((w.id, w.forceunit) + for w in Workout.objects.filter(id__in=ids)) + for id, u in funits: + if u == 'lbs': + mask = df['workoutid'] == id + df.loc[mask, 'peakforce'] = df.loc[mask, 'peakforce'] * lbstoN + if 'averageforce' in columns: + funits = ((w.id, w.forceunit) + for w in Workout.objects.filter(id__in=ids)) + for id, u in funits: + if u == 'lbs': + mask = df['workoutid'] == id + df.loc[mask, 'averageforce'] = df.loc[mask, + 'averageforce'] * lbstoN + + return df,extracols + +def read_cols_df_sql_old(ids, columns, convertnewtons=True): + # drop columns that are not in offical list + # axx = [ax[0] for ax in axes] prepmultipledata(ids) axx = [f.name for f in StrokeData._meta.get_fields()] @@ -2076,8 +2062,34 @@ def read_cols_df_sql(ids, columns, convertnewtons=True): # Read stroke data from the DB for a Workout ID. Returns a pandas dataframe - def read_df_sql(id): + try: + f = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = pd.read_parquet(f) + except OSError: + rowdata,row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): + data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True) + df = pd.read_parquet(f) + + df = df.fillna(value=0) + + funit = Workout.objects.get(id=id).forceunit + + if funit == 'lbs': + try: + df['peakforce'] = df['peakforce'] * lbstoN + except KeyError: + pass + + try: + df['averageforce'] = df['averageforce'] * lbstoN + except KeyError: + pass + + return df + +def read_df_sql_old(id): engine = create_engine(database_url, echo=False) df = pd.read_sql_query(sa.text('SELECT * FROM strokedata WHERE workoutid={id} ORDER BY time ASC'.format( @@ -2269,14 +2281,13 @@ def add_efficiency(id=0): rowdata = rowdata.fillna(method='ffill') delete_strokedata(id) + if id != 0: rowdata['workoutid'] = id - engine = create_engine(database_url, echo=False) - with engine.connect() as conn, conn.begin(): - rowdata.to_sql('strokedata', engine, - if_exists='append', index=False) - conn.close() - engine.dispose() + filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = dd.from_pandas(rowdata,npartitions=1) + df.to_parquet(filename,engine='fastparquet',compression='GZIP') + return rowdata # This is the main routine. @@ -2563,14 +2574,8 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) df = dd.from_pandas(data,npartitions=1) - #df = df.loc[:,~df.columns.duplicated()] - # data.to_csv(filename,compression='gzip') - df.to_parquet(filename,engine='fastparquet',compression='GZIP') - # data.to_parquet(filename,engine='fastparquet',compression='gzip') - # table = pa.Table.from_pandas(data) - #pq.write_table(table,filename) return data diff --git a/rowers/dataprepnodjango.py b/rowers/dataprepnodjango.py index 3347ac49..11a3a45d 100644 --- a/rowers/dataprepnodjango.py +++ b/rowers/dataprepnodjango.py @@ -28,6 +28,35 @@ from rowsandall_app.settings_dev import use_sqlite from rowers.utils import lbstoN +# dtypes +dtypes = {'workoutid':int, + 'hr':int, + 'pace':float, + 'velo':float, + 'spm':float, + 'driveenergy':float, + 'power':float, + 'averageforce':float, + 'peakforce':float, + 'drivelength':float, + 'distance':float, + 'cumdist':float, + 'drivespeed':float, + 'catch':float, + 'slip':float, + 'finish':float, + 'wash':float, + 'peakforceangle':float, + 'totalangle':float, + 'effectiveangle':float, + 'rhythm':float, + 'efficiency':float, + 'distanceperstroke':float, + 'ftime':str, + 'fpace':str, + 'fergpace':str, + 'fnowindpace':str, +} try: user = DATABASES['default']['USER'] @@ -637,20 +666,11 @@ def new_workout_from_file(r,f2, return (id,message,f2) def delete_strokedata(id,debug=False): - if debug: - engine = create_engine(database_url_debug, echo=False) - else: - engine = create_engine(database_url, echo=False) - query = sa.text('DELETE FROM strokedata WHERE workoutid={id};'.format( - id=id, - )) - with engine.connect() as conn, conn.begin(): - try: - result = conn.execute(query) - except: - print("Database Locked") - conn.close() - engine.dispose() + dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id) + try: + shutil.rmtree(dirname) + except FileNotFoundError: + pass def update_strokedata(id,df,debug=False): delete_strokedata(id,debug=debug) @@ -723,19 +743,18 @@ def getsmallrowdata_db(columns,ids=[],debug=False): if len(ids)>1: for f in csvfilenames: try: - df = dd.read_parquet(f,columns=columns,engine='pyarrow') + df = pd.read_parquet(f,columns=columns,engine='pyarrow') data.append(df) except OSError: pass - df = dd.concat(data,axis=0) + df = pd.concat(data,axis=0) else: - df = dd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') + df = pd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') - data = df.compute() - return data + return df def fitnessmetric_to_sql(m,table='powertimefitnessmetric',debug=False, doclean=False): @@ -779,51 +798,42 @@ def read_cols_df_sql(ids,columns,debug=False): columns = list(columns)+['distance','spm'] columns = [x for x in columns if x != 'None'] columns = list(set(columns)) - cls = '' + ids = [int(id) for id in ids] - if debug: - engine = create_engine(database_url_debug, echo=False) - else: - engine = create_engine(database_url, echo=False) - for column in columns: - cls += column+', ' - cls = cls[:-2] if len(ids) == 0: - query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid=0'.format( - columns = cls, - )) + return pd.DataFrame() elif len(ids) == 1: - query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid={id}'.format( - id = ids[0], - columns = cls, - )) + try: + filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0]) + df = pd.read_parquet(filename,columns=columns) + except OSError: + pass else: - query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid IN {ids}'.format( - columns = cls, - ids = tuple(ids), - )) - - df = pd.read_sql_query(query,engine) - engine.dispose() + data = [] + filenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + for id,f in zip(ids,filenames): + try: + df = pd.read_parquet(f,columns=columns) + data.append(df) + except OSError: + pass + + df = pd.concat(data,axis=0) + return df def read_df_sql(id,debug=False): - if debug: - engine = create_engine(database_url_debug, echo=False) - print("read_df",id) - print(database_url_debug) - else: - engine = create_engine(database_url, echo=False) + try: + f = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = pd.read_parquet(f) + except OSError: + pass - df = pd.read_sql_query(sa.text( - 'SELECT * FROM strokedata WHERE workoutid={id}'.format( - id=id - )), engine) + df = df.fillna(value=0) - engine.dispose() return df def getcpdata_sql(rower_id,table='cpdata',debug=False): @@ -1266,6 +1276,9 @@ def dataprep(rowdatadf,id=0,bands=True,barchart=True,otwpower=True, # write data if id given if id != 0: data['workoutid'] = id - data.to_parquet(filename,engine='pyarrow',compression='gzip') + data = data.astype(dtype=dtypes) + filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = dd.from_pandas(data,npartitions=1) + df.to_parquet(filename,engine='fastparquet',compression='GZIP') return data diff --git a/rowers/models.py b/rowers/models.py index 02fb531f..dbe4062a 100644 --- a/rowers/models.py +++ b/rowers/models.py @@ -28,6 +28,8 @@ from django_countries.fields import CountryField from scipy.interpolate import splprep, splev, CubicSpline import numpy as np +import shutil + from django.conf import settings from sqlalchemy import create_engine import sqlalchemy as sa @@ -2805,6 +2807,12 @@ def auto_delete_file_on_delete(sender, instance, **kwargs): if instance.csvfilename+'.gz': if os.path.isfile(instance.csvfilename+'.gz'): os.remove(instance.csvfilename+'.gz') + # remove parquet file + try: + dirname = 'media/strokedata_{id}.parquet.gz'.format(id=instance.id) + shutil.rmtree(dirname) + except FileNotFoundError: + pass @receiver(models.signals.post_delete,sender=Workout) def update_duplicates_on_delete(sender, instance, **kwargs):