Private
Public Access
1
0
This commit is contained in:
Sander Roosendaal
2019-10-23 18:15:07 +02:00
parent 4ea24fa5aa
commit fa373d781f

View File

@@ -27,6 +27,8 @@ from rowers.tasks import handle_zip_file
from pandas import DataFrame, Series from pandas import DataFrame, Series
import dask.dataframe as dd import dask.dataframe as dd
from dask.delayed import delayed from dask.delayed import delayed
import pyarrow.parquet as pq
import pyarrow as pa
from django.utils import timezone from django.utils import timezone
from django.utils.timezone import get_current_timezone from django.utils.timezone import get_current_timezone
@@ -114,6 +116,36 @@ columndict = {
'cumdist': 'cum_dist', 'cumdist': 'cum_dist',
} }
# dtypes
dtypes = {'workoutid':int,
'hr':int,
'pace':float,
'velo':float,
'spm':float,
'driveenergy':float,
'power':float,
'averageforce':float,
'peakforce':float,
'drivelength':float,
'distance':float,
'cumdist':float,
'drivespeed':float,
'catch':float,
'slip':float,
'finish':float,
'wash':float,
'peakforceangle':float,
'totalangle':float,
'effectiveangle':float,
'rhythm':float,
'efficiency':float,
'distanceperstroke':float,
'ftime':str,
'fpace':str,
'fergpace':str,
'fnowindpace':str,
}
from scipy.signal import savgol_filter from scipy.signal import savgol_filter
import datetime import datetime
@@ -349,22 +381,23 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
ignoreadvanced=False): ignoreadvanced=False):
# clean data remove zeros and negative values # clean data remove zeros and negative values
# bring metrics which have negative values to positive domain # bring metrics which have negative values to positive domain
if len(datadf)==0: if len(datadf)==0:
return datadf return datadf
try: try:
datadf['catch'] = -datadf['catch'] datadf['catch'] = -datadf['catch']
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
datadf['peakforceangle'] = datadf['peakforceangle'] + 1000 datadf['peakforceangle'] = datadf['peakforceangle'] + 1000
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
datadf['hr'] = datadf['hr'] + 10 datadf['hr'] = datadf['hr'] + 10
except KeyError: except (KeyError,TypeError):
pass pass
# protect 0 spm values from being nulled # protect 0 spm values from being nulled
@@ -390,17 +423,17 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
# return from positive domain to negative # return from positive domain to negative
try: try:
datadf['catch'] = -datadf['catch'] datadf['catch'] = -datadf['catch']
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
datadf['peakforceangle'] = datadf['peakforceangle'] - 1000 datadf['peakforceangle'] = datadf['peakforceangle'] - 1000
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
datadf['hr'] = datadf['hr'] - 10 datadf['hr'] = datadf['hr'] - 10
except KeyError: except (KeyError,TypeError):
pass pass
# clean data for useful ranges per column # clean data for useful ranges per column
@@ -408,123 +441,123 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
try: try:
mask = datadf['hr'] < 30 mask = datadf['hr'] < 30
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['spm'] < 0 mask = datadf['spm'] < 0
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['efficiency'] > 200. mask = datadf['efficiency'] > 200.
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['spm'] < 10 mask = datadf['spm'] < 10
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['pace'] / 1000. > 300. mask = datadf['pace'] / 1000. > 300.
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['efficiency'] < 0. mask = datadf['efficiency'] < 0.
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['pace'] / 1000. < 60. mask = datadf['pace'] / 1000. < 60.
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['spm'] > 60 mask = datadf['spm'] > 60
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['wash'] > 1 mask = datadf['wash'] > 1
datadf.loc[mask, 'wash'] = np.nan datadf.loc[mask, 'wash'] = np.nan
except KeyError: except (KeyError,TypeError):
pass pass
if not ignoreadvanced: if not ignoreadvanced:
try: try:
mask = datadf['rhythm'] < 5 mask = datadf['rhythm'] < 5
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['rhythm'] > 70 mask = datadf['rhythm'] > 70
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['power'] < 20 mask = datadf['power'] < 20
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['drivelength'] < 0.5 mask = datadf['drivelength'] < 0.5
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['forceratio'] < 0.2 mask = datadf['forceratio'] < 0.2
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['forceratio'] > 1.0 mask = datadf['forceratio'] > 1.0
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['drivespeed'] < 0.5 mask = datadf['drivespeed'] < 0.5
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['drivespeed'] > 4 mask = datadf['drivespeed'] > 4
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['driveenergy'] > 2000 mask = datadf['driveenergy'] > 2000
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['driveenergy'] < 100 mask = datadf['driveenergy'] < 100
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
try: try:
mask = datadf['catch'] > -30. mask = datadf['catch'] > -30.
datadf.mask(mask,inplace=True) datadf.mask(mask,inplace=True)
except KeyError: except (KeyError,TypeError):
pass pass
workoutstateswork = [1, 4, 5, 8, 9, 6, 7] workoutstateswork = [1, 4, 5, 8, 9, 6, 7]
@@ -1774,27 +1807,97 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True,
# Fetch a subset of the data from the DB # Fetch a subset of the data from the DB
def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True,compute=True): def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True,compute=True):
prepmultipledata(ids) # prepmultipledata(ids)
csvfilenames = ['media/strokedata_{id}.parquet'.format(id=id) for id in ids] csvfilenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids]
data = [] data = []
columns = [c for c in columns if c != 'None'] columns = [c for c in columns if c != 'None']
columns = list(set(columns))
if len(ids)>1: if len(ids)>1:
for f in csvfilenames: for id,f in zip(ids,csvfilenames):
try: try:
df = dd.read_parquet(f,columns=columns,engine='pyarrow') #df = dd.read_parquet(f,columns=columns,engine='pyarrow')
df = pd.read_parquet(f,columns=columns)
data.append(df) data.append(df)
except OSError: except OSError:
pass rowdata, row = getrowdata(id=id)
if rowdata and len(rowdata.df):
datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True)
# df = dd.read_parquet(f,columns=columns,engine='pyarrow')
df = pd.read_parquet(f,columns=columns)
data.append(df)
df = pd.concat(data,axis=0)
# df = dd.concat(data,axis=0)
else:
try:
df = pd.read_parquet(csvfilenames[0],columns=columns)
except OSError:
rowdata,row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df):
data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True)
df = pd.read_parquet(csvfilenames[0],columns=columns)
# df = dd.read_parquet(csvfilenames[0],
# column=columns,engine='pyarrow',
# )
# df = df.loc[:,~df.columns.duplicated()]
if compute:
data = df.copy()
if doclean:
data = clean_df_stats(data, ignorehr=True,
workstrokesonly=workstrokesonly)
data.dropna(axis=1,how='all',inplace=True)
data.dropna(axis=0,how='any',inplace=True)
return data
return df
def getsmallrowdata_db_dask(columns, ids=[], doclean=True,workstrokesonly=True,compute=True):
# prepmultipledata(ids)
csvfilenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids]
data = []
columns = [c for c in columns if c != 'None']
columns = list(set(columns))
if len(ids)>1:
for id,f in zip(ids,csvfilenames):
try:
#df = dd.read_parquet(f,columns=columns,engine='pyarrow')
df = dd.read_parquet(f,columns=columns)
data.append(df)
except OSError:
rowdata, row = getrowdata(id=id)
if rowdata and len(rowdata.df):
datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True)
# df = dd.read_parquet(f,columns=columns,engine='pyarrow')
df = dd.read_parquet(f,columns=columns)
data.append(df)
df = dd.concat(data,axis=0) df = dd.concat(data,axis=0)
# df = dd.concat(data,axis=0)
else: else:
df = dd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') try:
df = dd.read_parquet(csvfilenames[0],columns=columns)
except OSError:
rowdata,row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df):
data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True)
df = dd.read_parquet(csvfilenames[0],columns=columns)
# df = dd.read_parquet(csvfilenames[0],
# column=columns,engine='pyarrow',
# )
df = df.loc[:,~df.columns.duplicated()] # df = df.loc[:,~df.columns.duplicated()]
if compute: if compute:
data = df.compute() data = df.compute()
@@ -1889,7 +1992,7 @@ import glob
def prepmultipledata(ids, verbose=False): def prepmultipledata(ids, verbose=False):
filenames = glob.glob('media/*.parquet') filenames = glob.glob('media/*.parquet')
ids = [id for id in ids if 'media/strokedata_{id}.parquet'.format(id=id) not in filenames] ids = [id for id in ids if 'media/strokedata_{id}.parquet.gz'.format(id=id) not in filenames]
for id in ids: for id in ids:
rowdata, row = getrowdata(id=id) rowdata, row = getrowdata(id=id)
@@ -2452,9 +2555,22 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
# write data if id given # write data if id given
if id != 0: if id != 0:
data['workoutid'] = id data['workoutid'] = id
filename = 'media/strokedata_{id}.parquet'.format(id=id) data.fillna(0,inplace=True)
# df = dd.from_pandas(data,npartitions=1) data = data.astype(
data.to_parquet(filename,engine='pyarrow',compression='gzip') dtype=dtypes,
)
filename = 'media/strokedata_{id}.parquet.gz'.format(id=id)
df = dd.from_pandas(data,npartitions=1)
#df = df.loc[:,~df.columns.duplicated()]
# data.to_csv(filename,compression='gzip')
df.to_parquet(filename,engine='fastparquet',compression='GZIP')
# data.to_parquet(filename,engine='fastparquet',compression='gzip')
# table = pa.Table.from_pandas(data)
#pq.write_table(table,filename)
return data return data