From fa373d781ff231ceef6d2d3ce811b4367a60b462 Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Wed, 23 Oct 2019 18:15:07 +0200 Subject: [PATCH] ds --- rowers/dataprep.py | 190 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 153 insertions(+), 37 deletions(-) diff --git a/rowers/dataprep.py b/rowers/dataprep.py index b5f74f84..f96b6c3f 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -27,6 +27,8 @@ from rowers.tasks import handle_zip_file from pandas import DataFrame, Series import dask.dataframe as dd from dask.delayed import delayed +import pyarrow.parquet as pq +import pyarrow as pa from django.utils import timezone from django.utils.timezone import get_current_timezone @@ -114,6 +116,36 @@ columndict = { 'cumdist': 'cum_dist', } +# dtypes +dtypes = {'workoutid':int, + 'hr':int, + 'pace':float, + 'velo':float, + 'spm':float, + 'driveenergy':float, + 'power':float, + 'averageforce':float, + 'peakforce':float, + 'drivelength':float, + 'distance':float, + 'cumdist':float, + 'drivespeed':float, + 'catch':float, + 'slip':float, + 'finish':float, + 'wash':float, + 'peakforceangle':float, + 'totalangle':float, + 'effectiveangle':float, + 'rhythm':float, + 'efficiency':float, + 'distanceperstroke':float, + 'ftime':str, + 'fpace':str, + 'fergpace':str, + 'fnowindpace':str, +} + from scipy.signal import savgol_filter import datetime @@ -349,22 +381,23 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, ignoreadvanced=False): # clean data remove zeros and negative values + # bring metrics which have negative values to positive domain if len(datadf)==0: return datadf try: datadf['catch'] = -datadf['catch'] - except KeyError: + except (KeyError,TypeError): pass try: datadf['peakforceangle'] = datadf['peakforceangle'] + 1000 - except KeyError: + except (KeyError,TypeError): pass try: datadf['hr'] = datadf['hr'] + 10 - except KeyError: + except (KeyError,TypeError): pass # protect 0 spm values from being nulled @@ -390,17 +423,17 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, # return from positive domain to negative try: datadf['catch'] = -datadf['catch'] - except KeyError: + except (KeyError,TypeError): pass try: datadf['peakforceangle'] = datadf['peakforceangle'] - 1000 - except KeyError: + except (KeyError,TypeError): pass try: datadf['hr'] = datadf['hr'] - 10 - except KeyError: + except (KeyError,TypeError): pass # clean data for useful ranges per column @@ -408,123 +441,123 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, try: mask = datadf['hr'] < 30 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['spm'] < 0 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['efficiency'] > 200. datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['spm'] < 10 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['pace'] / 1000. > 300. datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['efficiency'] < 0. datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['pace'] / 1000. < 60. datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['spm'] > 60 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['wash'] > 1 datadf.loc[mask, 'wash'] = np.nan - except KeyError: + except (KeyError,TypeError): pass if not ignoreadvanced: try: mask = datadf['rhythm'] < 5 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['rhythm'] > 70 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['power'] < 20 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['drivelength'] < 0.5 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['forceratio'] < 0.2 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['forceratio'] > 1.0 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['drivespeed'] < 0.5 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['drivespeed'] > 4 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['driveenergy'] > 2000 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['driveenergy'] < 100 datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass try: mask = datadf['catch'] > -30. datadf.mask(mask,inplace=True) - except KeyError: + except (KeyError,TypeError): pass workoutstateswork = [1, 4, 5, 8, 9, 6, 7] @@ -1774,27 +1807,97 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True, # Fetch a subset of the data from the DB def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True,compute=True): - prepmultipledata(ids) + # prepmultipledata(ids) - csvfilenames = ['media/strokedata_{id}.parquet'.format(id=id) for id in ids] + csvfilenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] data = [] columns = [c for c in columns if c != 'None'] + columns = list(set(columns)) if len(ids)>1: - for f in csvfilenames: + for id,f in zip(ids,csvfilenames): try: - df = dd.read_parquet(f,columns=columns,engine='pyarrow') + #df = dd.read_parquet(f,columns=columns,engine='pyarrow') + df = pd.read_parquet(f,columns=columns) data.append(df) except OSError: - pass + rowdata, row = getrowdata(id=id) + if rowdata and len(rowdata.df): + datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True) + # df = dd.read_parquet(f,columns=columns,engine='pyarrow') + df = pd.read_parquet(f,columns=columns) + data.append(df) + + df = pd.concat(data,axis=0) + # df = dd.concat(data,axis=0) + + else: + try: + df = pd.read_parquet(csvfilenames[0],columns=columns) + except OSError: + rowdata,row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): + data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True) + df = pd.read_parquet(csvfilenames[0],columns=columns) + # df = dd.read_parquet(csvfilenames[0], + # column=columns,engine='pyarrow', + # ) + + # df = df.loc[:,~df.columns.duplicated()] + + if compute: + data = df.copy() + if doclean: + data = clean_df_stats(data, ignorehr=True, + workstrokesonly=workstrokesonly) + data.dropna(axis=1,how='all',inplace=True) + data.dropna(axis=0,how='any',inplace=True) + return data + + return df + +def getsmallrowdata_db_dask(columns, ids=[], doclean=True,workstrokesonly=True,compute=True): + # prepmultipledata(ids) + + csvfilenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + data = [] + columns = [c for c in columns if c != 'None'] + columns = list(set(columns)) + + if len(ids)>1: + for id,f in zip(ids,csvfilenames): + try: + #df = dd.read_parquet(f,columns=columns,engine='pyarrow') + df = dd.read_parquet(f,columns=columns) + data.append(df) + except OSError: + rowdata, row = getrowdata(id=id) + if rowdata and len(rowdata.df): + datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True) + # df = dd.read_parquet(f,columns=columns,engine='pyarrow') + df = dd.read_parquet(f,columns=columns) + data.append(df) + df = dd.concat(data,axis=0) + # df = dd.concat(data,axis=0) + else: - df = dd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') + try: + df = dd.read_parquet(csvfilenames[0],columns=columns) + except OSError: + rowdata,row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): + data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True) + df = dd.read_parquet(csvfilenames[0],columns=columns) + # df = dd.read_parquet(csvfilenames[0], + # column=columns,engine='pyarrow', + # ) - df = df.loc[:,~df.columns.duplicated()] + # df = df.loc[:,~df.columns.duplicated()] + if compute: data = df.compute() @@ -1889,7 +1992,7 @@ import glob def prepmultipledata(ids, verbose=False): filenames = glob.glob('media/*.parquet') - ids = [id for id in ids if 'media/strokedata_{id}.parquet'.format(id=id) not in filenames] + ids = [id for id in ids if 'media/strokedata_{id}.parquet.gz'.format(id=id) not in filenames] for id in ids: rowdata, row = getrowdata(id=id) @@ -2452,9 +2555,22 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, # write data if id given if id != 0: data['workoutid'] = id - filename = 'media/strokedata_{id}.parquet'.format(id=id) -# df = dd.from_pandas(data,npartitions=1) - data.to_parquet(filename,engine='pyarrow',compression='gzip') + data.fillna(0,inplace=True) + data = data.astype( + dtype=dtypes, + ) + + + filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) + df = dd.from_pandas(data,npartitions=1) + #df = df.loc[:,~df.columns.duplicated()] + # data.to_csv(filename,compression='gzip') + + df.to_parquet(filename,engine='fastparquet',compression='GZIP') + + # data.to_parquet(filename,engine='fastparquet',compression='gzip') + # table = pa.Table.from_pandas(data) + #pq.write_table(table,filename) return data