diff --git a/rowers/dataprep.py b/rowers/dataprep.py index 25f02385..b5f74f84 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -379,6 +379,7 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, pass datadf.replace(to_replace=0, value=np.nan, inplace=True) + # datadf = datadf.map_partitions(lambda df:df.replace(to_replace=0,value=np.nan)) # bring spm back to real values try: @@ -406,55 +407,55 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, if not ignorehr: try: mask = datadf['hr'] < 30 - datadf.loc[mask, 'hr'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['spm'] < 0 - datadf.loc[mask,'spm'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['efficiency'] > 200. - datadf.loc[mask, 'efficiency'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['spm'] < 10 - datadf.loc[mask, 'spm'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['pace'] / 1000. > 300. - datadf.loc[mask, 'pace'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['efficiency'] < 0. - datadf.loc[mask, 'efficiency'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['pace'] / 1000. < 60. - datadf.loc[mask, 'pace'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['spm'] > 60 - datadf.loc[mask, 'spm'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: - mask = datadf['wash'] < 1 + mask = datadf['wash'] > 1 datadf.loc[mask, 'wash'] = np.nan except KeyError: pass @@ -462,67 +463,67 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, if not ignoreadvanced: try: mask = datadf['rhythm'] < 5 - datadf.loc[mask, 'rhythm'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['rhythm'] > 70 - datadf.loc[mask, 'rhythm'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['power'] < 20 - datadf.loc[mask, 'power'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['drivelength'] < 0.5 - datadf.loc[mask, 'drivelength'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['forceratio'] < 0.2 - datadf.loc[mask, 'forceratio'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['forceratio'] > 1.0 - datadf.loc[mask, 'forceratio'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['drivespeed'] < 0.5 - datadf.loc[mask, 'drivespeed'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['drivespeed'] > 4 - datadf.loc[mask, 'drivespeed'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['driveenergy'] > 2000 - datadf.loc[mask, 'driveenergy'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['driveenergy'] < 100 - datadf.loc[mask, 'driveenergy'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass try: mask = datadf['catch'] > -30. - datadf.loc[mask, 'catch'] = np.nan + datadf.mask(mask,inplace=True) except KeyError: pass @@ -1772,7 +1773,7 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True, # Fetch a subset of the data from the DB -def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True): +def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True,compute=True): prepmultipledata(ids) csvfilenames = ['media/strokedata_{id}.parquet'.format(id=id) for id in ids] @@ -1792,16 +1793,19 @@ def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True): else: df = dd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') - data = df.compute() - data = data.loc[:,~data.columns.duplicated()] - extracols = [] - if doclean: - data = clean_df_stats(data, ignorehr=True, - workstrokesonly=workstrokesonly) + df = df.loc[:,~df.columns.duplicated()] + + + if compute: + data = df.compute() + if doclean: + data = clean_df_stats(data, ignorehr=True, + workstrokesonly=workstrokesonly) data.dropna(axis=1,how='all',inplace=True) data.dropna(axis=0,how='any',inplace=True) - - return data + return data + + return df def getsmallrowdata_db_old(columns, ids=[], doclean=True, workstrokesonly=True): prepmultipledata(ids) diff --git a/rowers/dataprepnodjango.py b/rowers/dataprepnodjango.py index a2ac7764..3347ac49 100644 --- a/rowers/dataprepnodjango.py +++ b/rowers/dataprepnodjango.py @@ -16,6 +16,8 @@ from pandas import DataFrame,Series import pandas as pd import numpy as np import itertools +import dask.dataframe as dd +from dask.delayed import delayed from sqlalchemy import create_engine import sqlalchemy as sa