Private
Public Access
1
0

first routines with polars

This commit is contained in:
2024-04-07 19:19:44 +02:00
parent c94da7bd6c
commit 12915ad6b7
3 changed files with 382 additions and 41 deletions

View File

@@ -31,6 +31,7 @@ from zipfile import BadZipFile
import zipfile
import os
from rowers.models import strokedatafields
import polars as pl
from rowingdata import (
KinoMapParser,
@@ -400,12 +401,16 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
try:
_ = datadf['workoutid'].unique()
except KeyError:
datadf['workoutid'] = 0
try:
datadf['workoutid'] = 0
except TypeError:
datadf = datadf.with_columns(pl.lit(0).alias("workoutid"))
before = {}
for workoutid in datadf['workoutid'].unique():
ids = datadf['workoutid'].unique()
for workoutid in ids:
before[workoutid] = len(datadf[datadf['workoutid'] == workoutid])
data_orig = datadf.copy()
# bring metrics which have negative values to positive domain
@@ -654,6 +659,258 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
return datadf
def replace_zeros_with_nan(x):
return np.nan if x == 0 else x
def clean_df_stats_pl(datadf, workstrokesonly=True, ignorehr=True,
ignoreadvanced=False, for_chart=False):
# clean data remove zeros and negative values
try:
_ = datadf['workoutid'].unique()
except KeyError:
try:
datadf['workoutid'] = 0
except TypeError:
datadf = datadf.with_columns(pl.lit(0).alias("workoutid"))
before = {}
ids = list(datadf['workoutid'].unique())
for workoutid in ids:
before[workoutid] = len(datadf.filter(pl.col("workoutid")==workoutid))
data_orig = datadf.clone()
# bring metrics which have negative values to positive domain
if len(datadf) == 0:
return datadf
try:
datadf = datadf.with_columns((-pl.col('catch')).alias('catch'))
except (KeyError, TypeError):
pass
try:
datadf = datadf.with_columns((pl.col('peakforceangle')+1000).alias('peakforceangle'))
except (KeyError, TypeError):
pass
try:
datadf = datadf.with_columns((pl.col('hr')+10).alias('hr'))
except (KeyError, TypeError):
pass
# protect 0 spm values from being nulled
try:
datadf = datadf.with_columns((pl.col('spm')+1.0).alias('spm'))
except (KeyError, TypeError):
pass
# protect 0 workoutstate values from being nulled
try:
datadf = datadf.with_columns((pl.col('workoutstate')+1).alias('workoutstate'))
except (KeyError, TypeError):
pass
try:
datadf = datadf.select(pl.all().clip(lower_bound=0))
# datadf = datadf.clip(lower=0)
except TypeError:
pass
# protect advanced metrics columns
advancedcols = [
'rhythm',
'power',
'drivelength',
'forceratio',
'drivespeed',
'driveenergy',
'catch',
'finish',
'averageforce',
'peakforce',
'slip',
'wash',
'peakforceangle',
'effectiveangle',
]
for col in datadf.columns:
datadf = datadf.with_columns(
pl.when(datadf[col] == 0).then(pl.lit(np.nan)).otherwise(datadf[col]),
name=col
)
# datadf = datadf.map_partitions(lambda df:df.replace(to_replace=0,value=np.nan))
# bring spm back to real values
try:
datadf = datadf.with_columns((pl.col('spm')-1.0).alias('spm'))
except (TypeError, KeyError):
pass
# bring workoutstate back to real values
try:
datadf = datadf.with_columns((pl.col('workoutstate')-1).alias('workoutstate'))
except (TypeError, KeyError):
pass
# return from positive domain to negative
try:
datadf = datadf.with_columns((-pl.col('catch')).alias('catch'))
except (KeyError, TypeError):
pass
try:
datadf = datadf.with_columns((pl.col('peakforceangle')-1000).alias('peakforceangle'))
except (KeyError, TypeError):
pass
try:
datadf = datadf.with_columns((pl.col('hr')+10).alias('hr'))
except (KeyError, TypeError):
pass
# clean data for useful ranges per column
if not ignorehr:
try:
datadf = datadf.filter(pl.col("hr")>=30)
except (KeyError, TypeError): # pragma: no cover
pass
try:
datadf = datadf.filter(pl.col("spm") >=0)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("efficiency")<=200)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("spm")>=10)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("pace")<=300*1000.)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("efficiency")>=0)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("pace")>=60*1000)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("power")<=5000)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("spm")<=120)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("wash")>=1)
except (KeyError, TypeError):
pass
# try to guess ignoreadvanced
if not ignoreadvanced:
for metric in advancedcols:
try:
sum = datadf[metric].std()
if sum == 0 or np.isnan(sum):
ignoreadvanced = True
except KeyError:
pass
if not ignoreadvanced:
try:
datadf = datadf.filter(pl.col("rhythm")>=0)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("rhythm")<=70)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("power")>=20)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("drivelength")>=0.5)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("forceratio")>=0.2)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("forceratio")<=1.0)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("drivespeed")>=0.5)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("drivespeed")<=4)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("driveenergy")<=2000)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("driveenergy")>=100)
except (KeyError, TypeError):
pass
try:
datadf = datadf.filter(pl.col("catch")<=-30)
except (KeyError, TypeError):
pass
# workoutstateswork = [1, 4, 5, 8, 9, 6, 7]
workoutstatesrest = [3]
# workoutstatetransition = [0, 2, 10, 11, 12, 13]
if workstrokesonly == 'True' or workstrokesonly is True:
try:
datadf = datadf.filter(~pl.col("workoutstate").is_in(workoutstatesrest))
except:
pass
after = {}
if for_chart:
return datadf
for workoutid in data_orig['workoutid'].unique():
after[workoutid] = len(datadf.filter(pl.col("workoutid")==workoutid))
ratio = float(after[workoutid])/float(before[workoutid])
if ratio < 0.01 or after[workoutid] < 2:
return data_orig
return datadf
def getpartofday(row, r):
workoutstartdatetime = row.rowdatetime
@@ -863,12 +1120,6 @@ def get_workoutsummaries(userid, startdate): # pragma: no cover
return df
def checkduplicates(r, workoutdate, workoutstartdatetime, workoutenddatetime):
duplicate = False
ws = Workout.objects.filter(user=r, date=workoutdate, duplicate=False).exclude(
@@ -1244,6 +1495,84 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True,
# Fetch a subset of the data from the DB
def getsmallrowdata_pl(columns, ids=[], doclean=True, workstrokesonly=True, compute=True,
debug=False, for_chart=False):
if ids:
csvfilenames = [
'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids]
else:
return pl.DataFrame()
data = []
columns = [c for c in columns if c != 'None'] + ['distance', 'spm', 'workoutid']
columns = list(set(columns))
df = pl.DataFrame()
if len(ids) > 1:
for id, f in zip(ids, csvfilenames):
try:
df = pl.read_parquet(f, columns=columns)
data.append(df)
except (IsADirectoryError, FileNotFoundError, OSError, ArrowInvalid, IndexError): # pragma: no cover
rowdata, row = getrowdata(id=id)
shutil.rmtree(f)
if rowdata and len(rowdata.df):
_ = dataprep(rowdata.df, id=id,
bands=True, otwpower=True, barchart=True,
polars=True)
try:
df = pl.read_parquet(f, columns=columns)
data.append(df)
except (OSError, ArrowInvalid, IndexError):
pass
try:
df = pl.concat(data, rechunk=True)
except ValueError: # pragma: no cover
return pl.DataFrame()
else:
try:
df = pl.read_parquet(csvfilenames[0], columns=columns)
rowdata, row = getrowdata(id=ids[0])
except (OSError, IndexError, ArrowInvalid):
rowdata, row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df): # pragma: no cover
data = dataprep(
rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True)
try:
df = pd.read_parquet(csvfilenames[0], columns=columns)
except:
df = pd.DataFrame
else:
df = pd.DataFrame()
except:
rowdata, row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df): # pragma: no cover
data = dataprep(
rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True)
try:
df = pd.read_parquet(csvfilenames[0], columns=columns)
except:
df = pd.DataFrame()
else:
df = pd.DataFrame()
if compute and len(df):
data = df.clone()
if doclean:
data = clean_df_stats_pl(data, ignorehr=True,
workstrokesonly=workstrokesonly,
for_chart=for_chart)
data = data.drop_nulls()
return data
return df
def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True, compute=True,
debug=False, for_chart=False):
@@ -1593,7 +1922,7 @@ def add_efficiency(id=0): # pragma: no cover
def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
empower=True, inboard=0.88, forceunit='lbs', debug=False):
empower=True, inboard=0.88, forceunit='lbs', debug=False, polars=True):
if rowdatadf.empty:
return 0
@@ -1873,11 +2202,15 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
filename = 'media/strokedata_{id}.parquet.gz'.format(id=id)
df = dd.from_pandas(data, npartitions=1)
try:
df.to_parquet(filename, engine='fastparquet', compression='GZIP')
except FileNotFoundError:
df2 = dd.from_pandas(df, npartitions=1)
df2.to_parquet(filename, engine='fastparquet', compression='GZIP')
if polars:
pldf = pl.from_pandas(data)
pldf.write_parquet(filename, compression='gzip')
else:
try:
df.to_parquet(filename, engine='fastparquet', compression='gzip')
except FileNotFoundError:
df2 = dd.from_pandas(df, npartitions=1)
df2.to_parquet(filename, engine='fastparquet', compression='gzip')
return data