diff --git a/rowers/dataroutines.py b/rowers/dataroutines.py index aed97603..8671ee65 100644 --- a/rowers/dataroutines.py +++ b/rowers/dataroutines.py @@ -1,4 +1,4 @@ -from rowers.metrics import axes, calc_trimp, rowingmetrics, dtypes, metricsgroups +from rowers.metrics import axes, calc_trimp, rowingmetrics, dtypes, metricsgroups, metricsdicts from rowers.utils import lbstoN, wavg, dologging from rowers.mytypes import otwtypes, otetypes, rowtypes import glob @@ -35,7 +35,7 @@ import polars as pl import polars.selectors as cs from polars.exceptions import ( ColumnNotFoundError, SchemaError, ComputeError, - InvalidOperationError + InvalidOperationError, ShapeError ) from rowingdata import ( @@ -178,10 +178,18 @@ columndict = { } def remove_nulls_pl(data): - data = data.lazy().fill_nan(None) + data = data.lazy().with_columns( + pl.when( + pl.all().is_infinite() + ).then(None).otherwise(pl.all()).keep_name() + ) + data = data.select(pl.all().forward_fill()) + data = data.select(pl.all().backward_fill()) + data = data.fill_nan(None) data = data.select(cs.by_dtype(pl.NUMERIC_DTYPES)).collect() data = data[[s.name for s in data if not s.is_infinite().sum()]] data = data[[s.name for s in data if not (s.null_count() == data.height)]] + if not data.is_empty(): try: data = data.drop_nulls() @@ -1566,7 +1574,134 @@ def getsmallrowdata_pl(columns, ids=[], doclean=True, workstrokesonly=True, comp return df - +def read_data(columns, ids=[], doclean=True, workstrokesonly=True, debug=False, for_chart=False, compute=True): + if ids: + csvfilenames = [ + 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + else: + return pl.DataFrame() + + data = [] + columns = [c for c in columns if c != 'None'] + ['distance', 'spm', 'workoutid','workoutstate', 'driveenergy'] + columns = list(set(columns)) + + for id, f in zip(ids, csvfilenames): + if os.path.isfile(f): + df = pl.scan_parquet(f) + else: + rowdata, row = getrowdata(id=id) + try: + shutil.rmtree(f) + except: + pass + if rowdata and len(rowdata.df): + _ = dataplep(rowdata.df, id=id, + bands=True, otwpower=True, barchart=True, + polars=True) + df = pl.scan_parquet(f) + data.append(df) + + data = pl.collect_all(data) + + try: + datadf = pl.concat(data).select(columns) + except (SchemaError, ShapeError): + data = [ + df.select(columns) + for df in data] + + # float columns + floatcolumns = [] + intcolumns = [] + for c in columns: + try: + if metricsdicts[c]['numtype'] == 'float': + floatcolumns.append(c) + if metricsdicts[c]['numtype'] == 'integer': + intcolumns.append(c) + except KeyError: + pass + data = [ + df.with_columns( + cs.float().cast(pl.Float64) + ).with_columns( + cs.integer().cast(pl.Int64) + ).with_columns( + cs.by_name(intcolumns).cast(pl.Int64) + ).with_columns( + cs.by_name(floatcolumns).cast(pl.Float64) + ) + for df in data + ] + + datadf = pl.concat(data) + + + exprs = [] + + if workstrokesonly: + workoutstatesrest = [3] + exprs.append(~pl.col("workoutstate").is_in(workoutstatesrest)) + + # got data + if not doclean: + if exprs: + datadf2 = datadf.filter(exprs) + if not datadf2.is_empty(): + return datadf2 + + return datadf + + # do clean + if "spm" in datadf.columns: + exprs.append(pl.col("spm") >= 10 ) + exprs.append(pl.col("spm") <= 120) + if "pace" in datadf.columns: + exprs.append(pl.col("pace") <= 300*1000.) + exprs.append(pl.col("pace") >= 60*1000.) + if "power" in datadf.columns: + exprs.append(pl.col("power") <= 5000) + exprs.append(pl.col("power")>=20) + + if "rhythm" in datadf.columns: + exprs.append(pl.col("rhythm")>=0) + exprs.append(pl.col("rhythm")<=70) + if "efficiency" in datadf.columns: + exprs.append(pl.col("efficiency")<=200) + exprs.append(pl.col("efficiency")>=0) + if "wash" in datadf.columns: + exprs.append(pl.col("wash")>=1) + if "drivelength" in datadf.columns: + exprs.append(pl.col("drivelength")>=0.5) + if "forceratio" in datadf.columns: + exprs.append(pl.col("forceratio")>=0.2) + exprs.append(pl.col("forceratio")<=1.0) + if "drivespeed" in datadf.columns: + exprs.append(pl.col("drivespeed")>=0.5) + exprs.append(pl.col("drivespeed")<=4) + if "driveenergy" in datadf.columns: + exprs.append(pl.col("driveenergy")<=2000) + exprs.append(pl.col("driveenergy")>=100) + if "catch" in datadf.columns: + exprs.append(pl.col("catch")<=-30) + + if exprs: + datadf2 = datadf.filter(exprs) + + if not datadf2.is_empty(): + return datadf2 + + exprs = [] + if workstrokesonly: + workoutstatesrest = [3] + exprs.append(~pl.col("workoutstate").is_in(workoutstatesrest)) + + if exprs: + datadf2 = datadf.filter(exprs) + if not datadf2.is_empty(): + return datadf2 + + return datadf def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True, compute=True, debug=False, for_chart=False): @@ -2105,7 +2240,6 @@ def dataplep(rowdatadf, id=0, inboard=0.88, forceunit='lbs', bands=True, barchar df = df.with_columns((pl.col(" AverageDriveForce (lbs)") * lbstoN).alias(" AverageDriveForce (lbs)")) df = df.with_columns((pl.col(" PeakDriveForce (lbs)") * lbstoN).alias(" PeakDriveForce (lbs)")) - if df["driveenergy"].mean() == 0 and df["driveenergy"].std() == 0: df = df.with_columns((0.0*pl.col("driveenergy")+100).alias("driveenergy")) @@ -2113,7 +2247,7 @@ def dataplep(rowdatadf, id=0, inboard=0.88, forceunit='lbs', bands=True, barchar t2 = df["TimeStamp (sec)"].map_elements(lambda x: timedeltaconv(x), return_dtype=pl.Datetime) p2 = df[" Stroke500mPace"].map_elements(lambda x: timedeltaconv(x), return_dtype=pl.Datetime) - + data = pl.DataFrame( dict( time=df["TimeStamp (sec)"] * 1e3, diff --git a/rowers/interactiveplots.py b/rowers/interactiveplots.py index b71a7260..f5aa990b 100644 --- a/rowers/interactiveplots.py +++ b/rowers/interactiveplots.py @@ -459,13 +459,13 @@ def interactive_forcecurve(theworkouts): columns = columns + [name for name, d in metrics.rowingmetrics] - rowdata = dataprep.getsmallrowdata_pl(columns, ids=ids, + rowdata = dataprep.read_data(columns, ids=ids, workstrokesonly=False) if rowdata.is_empty(): return "", "No Valid Data Available" - rowdata = rowdata.fill_nan(None).drop_nulls() + rowdata = dataprep.remove_nulls_pl(rowdata) data_dict = rowdata.to_dicts() @@ -796,10 +796,10 @@ def interactive_histoall(theworkouts, histoparam, includereststrokes, ids = [int(w.id) for w in theworkouts] - columns = [name for name, d in metrics.rowingmetrics]+['spm', 'driveenergy', 'distance', 'workoutstate', 'workoutid'] + columns = [histoparam, 'spm', 'driveenergy', 'distance', 'workoutstate', 'workoutid'] workstrokesonly = not includereststrokes - rowdata = dataprep.getsmallrowdata_pl( + rowdata = dataprep.read_data( columns, ids=ids, doclean=True, workstrokesonly=workstrokesonly) rowdata = rowdata.fill_nan(None).drop_nulls() @@ -1190,7 +1190,7 @@ def forcecurve_multi_interactive_chart(selected): # pragma: no cover 'workoutstate', 'workoutid', 'driveenergy', 'cumdist'] columns = columns + [name for name, d in metrics.rowingmetrics] - rowdata = dataprep.getsmallrowdata_pl(columns, ids=workoutids, + rowdata = dataprep.read_data(columns, ids=workoutids, workstrokesonly=False) rowdata = rowdata.fill_nan(None).drop_nulls() @@ -1379,7 +1379,7 @@ def interactive_chart(id=0, promember=0, intervaldata={}): TOOLS = 'pan,box_zoom,wheel_zoom,reset,tap,hover,crosshair' columns = ['time', 'pace', 'hr', 'fpace', 'ftime', 'spm'] - datadf = dataprep.getsmallrowdata_pl(columns, ids=[id]) + datadf = dataprep.read_data(columns, ids=[id]) if datadf.is_empty(): return "", "No Valid Data Available" @@ -1572,10 +1572,10 @@ def interactive_cum_flex_chart2(theworkouts, promember=0, datadf = pd.DataFrame() if promember: - datadf = dataprep.getsmallrowdata_pl(columns, ids=ids, doclean=True, + datadf = dataprep.read_data(columns, ids=ids, doclean=True, workstrokesonly=workstrokesonly, for_chart=True) else: - datadf = dataprep.getsmallrowdata_pl(columns_basic, ids=ids, doclean=True, + datadf = dataprep.read_data(columns_basic, ids=ids, doclean=True, workstrokesonly=workstrokesonly, for_chart=True) try: @@ -1803,15 +1803,15 @@ def interactive_flex_chart2(id, r, promember=0, columns = columns + ['spm', 'driveenergy', 'distance','workoutstate'] columns_basic = columns_basic + ['spm', 'driveenergy', 'distance','workoutstate'] - datadf = pd.DataFrame() if promember: - rowdata = dataprep.getsmallrowdata_pl(columns, ids=[id], doclean=True, - workstrokesonly=workstrokesonly, for_chart=True) + rowdata = dataprep.read_data(columns, ids=[id], doclean=True, + workstrokesonly=workstrokesonly, for_chart=True) else: - rowdata = dataprep.getsmallrowdata_pl(columns_basic, ids=[id], doclean=True, - workstrokesonly=workstrokesonly, for_chart=True) + rowdata = dataprep.read_data(columns_basic, ids=[id], doclean=True, + workstrokesonly=workstrokesonly, for_chart=True) + if r.usersmooth > 1: # pragma: no cover for column in columns: try: @@ -1825,18 +1825,16 @@ def interactive_flex_chart2(id, r, promember=0, except KeyError: pass - try: - if len(rowdata) < 2: - if promember: - rowdata = dataprep.getsmallrowdata_pl(columns, ids=[id], - doclean=False, - workstrokesonly=False, for_chart=True) - else: - rowdata = dataprep.getsmallrowdata_pl(columns_basic, ids=[id], doclean=False, - workstrokesonly=False, for_chart=True) - workstrokesonly = False - except (KeyError, TypeError): # pragma: no cover - workstrokesonly = False + if len(rowdata) < 2: + if promember: + rowdata = dataprep.read_data(columns, ids=[id], + doclean=False, + workstrokesonly=False, for_chart=True) + else: + rowdata = dataprep.read_data(columns_basic, ids=[id], doclean=False, + workstrokesonly=False, for_chart=True) + workstrokesonly = False + try: _ = rowdata[yparam2] except (KeyError, TypeError, ColumnNotFoundError): # pragma: no cover @@ -1867,14 +1865,6 @@ def interactive_flex_chart2(id, r, promember=0, if rowdata.is_empty(): return "", "No valid data", workstrokesonly - workoutstatesrest = [3] - - if workstrokesonly: # pragma: no cover - try: - rowdata = rowdata.filter(~pl.col("workoutstate").is_in(workoutstatesrest)) - except (KeyError, ColumnNotFoundError): - pass - try: tseconds = rowdata['time'] except (KeyError, ColumnNotFoundError): # pragma: no cover @@ -2205,9 +2195,11 @@ def get_zones_report_pl(rower, startdate, enddate, trainingzones='hr', date_agg= columns = ['workoutid', 'hr', 'power', 'time'] - df = dataprep.getsmallrowdata_pl(columns, ids=ids) + df = dataprep.read_data(columns, ids=ids, workstrokesonly=False, doclean=False) + df = dataprep.remove_nulls_pl(df) df = df.with_columns((pl.col("time").diff().clip(0, 20*1.e3)).alias("deltat")).lazy() + hrzones = rower.hrzones powerzones = rower.powerzones diff --git a/rowers/tests/testdata/testdata.tcx.gz b/rowers/tests/testdata/testdata.tcx.gz index 6170c42c..76c3e0ac 100644 Binary files a/rowers/tests/testdata/testdata.tcx.gz and b/rowers/tests/testdata/testdata.tcx.gz differ