diff --git a/rowers/dataroutines.py b/rowers/dataroutines.py index 3bcd683d..822230d3 100644 --- a/rowers/dataroutines.py +++ b/rowers/dataroutines.py @@ -31,6 +31,7 @@ from zipfile import BadZipFile import zipfile import os from rowers.models import strokedatafields +import polars as pl from rowingdata import ( KinoMapParser, @@ -400,12 +401,16 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, try: _ = datadf['workoutid'].unique() except KeyError: - datadf['workoutid'] = 0 + try: + datadf['workoutid'] = 0 + except TypeError: + datadf = datadf.with_columns(pl.lit(0).alias("workoutid")) before = {} - for workoutid in datadf['workoutid'].unique(): + ids = datadf['workoutid'].unique() + for workoutid in ids: before[workoutid] = len(datadf[datadf['workoutid'] == workoutid]) - + data_orig = datadf.copy() # bring metrics which have negative values to positive domain @@ -654,6 +659,258 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True, return datadf +def replace_zeros_with_nan(x): + return np.nan if x == 0 else x + +def clean_df_stats_pl(datadf, workstrokesonly=True, ignorehr=True, + ignoreadvanced=False, for_chart=False): + # clean data remove zeros and negative values + + try: + _ = datadf['workoutid'].unique() + except KeyError: + try: + datadf['workoutid'] = 0 + except TypeError: + datadf = datadf.with_columns(pl.lit(0).alias("workoutid")) + + before = {} + ids = list(datadf['workoutid'].unique()) + for workoutid in ids: + before[workoutid] = len(datadf.filter(pl.col("workoutid")==workoutid)) + + data_orig = datadf.clone() + + # bring metrics which have negative values to positive domain + if len(datadf) == 0: + return datadf + try: + datadf = datadf.with_columns((-pl.col('catch')).alias('catch')) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.with_columns((pl.col('peakforceangle')+1000).alias('peakforceangle')) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.with_columns((pl.col('hr')+10).alias('hr')) + except (KeyError, TypeError): + pass + + # protect 0 spm values from being nulled + try: + datadf = datadf.with_columns((pl.col('spm')+1.0).alias('spm')) + except (KeyError, TypeError): + pass + + # protect 0 workoutstate values from being nulled + try: + datadf = datadf.with_columns((pl.col('workoutstate')+1).alias('workoutstate')) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.select(pl.all().clip(lower_bound=0)) + # datadf = datadf.clip(lower=0) + except TypeError: + pass + + # protect advanced metrics columns + advancedcols = [ + 'rhythm', + 'power', + 'drivelength', + 'forceratio', + 'drivespeed', + 'driveenergy', + 'catch', + 'finish', + 'averageforce', + 'peakforce', + 'slip', + 'wash', + 'peakforceangle', + 'effectiveangle', + ] + + for col in datadf.columns: + datadf = datadf.with_columns( + pl.when(datadf[col] == 0).then(pl.lit(np.nan)).otherwise(datadf[col]), + name=col + ) + + # datadf = datadf.map_partitions(lambda df:df.replace(to_replace=0,value=np.nan)) + + # bring spm back to real values + try: + datadf = datadf.with_columns((pl.col('spm')-1.0).alias('spm')) + except (TypeError, KeyError): + pass + + # bring workoutstate back to real values + try: + datadf = datadf.with_columns((pl.col('workoutstate')-1).alias('workoutstate')) + except (TypeError, KeyError): + pass + + # return from positive domain to negative + try: + datadf = datadf.with_columns((-pl.col('catch')).alias('catch')) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.with_columns((pl.col('peakforceangle')-1000).alias('peakforceangle')) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.with_columns((pl.col('hr')+10).alias('hr')) + except (KeyError, TypeError): + pass + + # clean data for useful ranges per column + if not ignorehr: + try: + datadf = datadf.filter(pl.col("hr")>=30) + except (KeyError, TypeError): # pragma: no cover + pass + + try: + datadf = datadf.filter(pl.col("spm") >=0) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("efficiency")<=200) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("spm")>=10) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("pace")<=300*1000.) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("efficiency")>=0) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("pace")>=60*1000) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("power")<=5000) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("spm")<=120) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("wash")>=1) + except (KeyError, TypeError): + pass + + # try to guess ignoreadvanced + if not ignoreadvanced: + for metric in advancedcols: + try: + sum = datadf[metric].std() + if sum == 0 or np.isnan(sum): + ignoreadvanced = True + except KeyError: + pass + + if not ignoreadvanced: + try: + datadf = datadf.filter(pl.col("rhythm")>=0) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("rhythm")<=70) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("power")>=20) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("drivelength")>=0.5) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("forceratio")>=0.2) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("forceratio")<=1.0) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("drivespeed")>=0.5) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("drivespeed")<=4) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("driveenergy")<=2000) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("driveenergy")>=100) + except (KeyError, TypeError): + pass + + try: + datadf = datadf.filter(pl.col("catch")<=-30) + except (KeyError, TypeError): + pass + + # workoutstateswork = [1, 4, 5, 8, 9, 6, 7] + workoutstatesrest = [3] + # workoutstatetransition = [0, 2, 10, 11, 12, 13] + + if workstrokesonly == 'True' or workstrokesonly is True: + try: + datadf = datadf.filter(~pl.col("workoutstate").is_in(workoutstatesrest)) + except: + pass + + after = {} + + if for_chart: + return datadf + for workoutid in data_orig['workoutid'].unique(): + after[workoutid] = len(datadf.filter(pl.col("workoutid")==workoutid)) + ratio = float(after[workoutid])/float(before[workoutid]) + if ratio < 0.01 or after[workoutid] < 2: + return data_orig + + return datadf + def getpartofday(row, r): workoutstartdatetime = row.rowdatetime @@ -863,12 +1120,6 @@ def get_workoutsummaries(userid, startdate): # pragma: no cover return df - - - - - - def checkduplicates(r, workoutdate, workoutstartdatetime, workoutenddatetime): duplicate = False ws = Workout.objects.filter(user=r, date=workoutdate, duplicate=False).exclude( @@ -1244,6 +1495,84 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True, # Fetch a subset of the data from the DB +def getsmallrowdata_pl(columns, ids=[], doclean=True, workstrokesonly=True, compute=True, + debug=False, for_chart=False): + if ids: + csvfilenames = [ + 'media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids] + else: + return pl.DataFrame() + + data = [] + columns = [c for c in columns if c != 'None'] + ['distance', 'spm', 'workoutid'] + columns = list(set(columns)) + + df = pl.DataFrame() + if len(ids) > 1: + for id, f in zip(ids, csvfilenames): + try: + df = pl.read_parquet(f, columns=columns) + data.append(df) + except (IsADirectoryError, FileNotFoundError, OSError, ArrowInvalid, IndexError): # pragma: no cover + rowdata, row = getrowdata(id=id) + shutil.rmtree(f) + if rowdata and len(rowdata.df): + _ = dataprep(rowdata.df, id=id, + bands=True, otwpower=True, barchart=True, + polars=True) + try: + df = pl.read_parquet(f, columns=columns) + data.append(df) + except (OSError, ArrowInvalid, IndexError): + pass + try: + df = pl.concat(data, rechunk=True) + except ValueError: # pragma: no cover + return pl.DataFrame() + + else: + try: + df = pl.read_parquet(csvfilenames[0], columns=columns) + rowdata, row = getrowdata(id=ids[0]) + except (OSError, IndexError, ArrowInvalid): + rowdata, row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): # pragma: no cover + data = dataprep( + rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True) + try: + df = pd.read_parquet(csvfilenames[0], columns=columns) + except: + df = pd.DataFrame + else: + df = pd.DataFrame() + except: + rowdata, row = getrowdata(id=ids[0]) + if rowdata and len(rowdata.df): # pragma: no cover + data = dataprep( + rowdata.df, id=ids[0], bands=True, otwpower=True, barchart=True) + try: + df = pd.read_parquet(csvfilenames[0], columns=columns) + except: + df = pd.DataFrame() + else: + df = pd.DataFrame() + + if compute and len(df): + data = df.clone() + if doclean: + data = clean_df_stats_pl(data, ignorehr=True, + workstrokesonly=workstrokesonly, + for_chart=for_chart) + + data = data.drop_nulls() + + return data + + + return df + + + def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True, compute=True, debug=False, for_chart=False): @@ -1593,7 +1922,7 @@ def add_efficiency(id=0): # pragma: no cover def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, - empower=True, inboard=0.88, forceunit='lbs', debug=False): + empower=True, inboard=0.88, forceunit='lbs', debug=False, polars=True): if rowdatadf.empty: return 0 @@ -1873,11 +2202,15 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True, filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) df = dd.from_pandas(data, npartitions=1) - try: - df.to_parquet(filename, engine='fastparquet', compression='GZIP') - except FileNotFoundError: - df2 = dd.from_pandas(df, npartitions=1) - df2.to_parquet(filename, engine='fastparquet', compression='GZIP') + if polars: + pldf = pl.from_pandas(data) + pldf.write_parquet(filename, compression='gzip') + else: + try: + df.to_parquet(filename, engine='fastparquet', compression='gzip') + except FileNotFoundError: + df2 = dd.from_pandas(df, npartitions=1) + df2.to_parquet(filename, engine='fastparquet', compression='gzip') return data diff --git a/rowers/interactiveplots.py b/rowers/interactiveplots.py index 4276704f..0178d52f 100644 --- a/rowers/interactiveplots.py +++ b/rowers/interactiveplots.py @@ -18,8 +18,10 @@ import rowers.metrics as metrics import rowers.dataprep as dataprep from rowers.dataprep import rdata import rowers.utils as utils +import polars as pl from rowers.rower_rules import ispromember +from polars.exceptions import ColumnNotFoundError from scipy.interpolate import griddata from scipy.signal import savgol_filter @@ -2306,29 +2308,30 @@ def interactive_cum_flex_chart2(theworkouts, promember=0, columns = [name for name, d in metrics.rowingmetrics] columns_basic = [name for name, d in metrics.rowingmetrics if d['group'] == 'basic'] - columns = columns + ['spm', 'driveenergy', 'distance'] - columns_basic = columns_basic + ['spm', 'driveenergy', 'distance'] + columns = columns + ['spm', 'driveenergy', 'distance' ,'workoutstate'] + columns_basic = columns_basic + ['spm', 'driveenergy', 'distance', 'workoutstate'] datadf = pd.DataFrame() if promember: - datadf = dataprep.getsmallrowdata_db(columns, ids=ids, doclean=True, + datadf = dataprep.getsmallrowdata_pl(columns, ids=ids, doclean=True, workstrokesonly=workstrokesonly, for_chart=True) else: - datadf = dataprep.getsmallrowdata_db(columns_basic, ids=ids, doclean=True, + datadf = dataprep.getsmallrowdata_pl(columns_basic, ids=ids, doclean=True, workstrokesonly=workstrokesonly, for_chart=True) try: _ = datadf[yparam2] - except KeyError: # pragma: no cover + except (KeyError, ColumnNotFoundError): # pragma: no cover yparam2 = 'None' try: _ = datadf[yparam1] - except KeyError: + except (KeyError, ColumnNotFoundError): yparam1 = 'None' - datadf.dropna(axis=1, how='all', inplace=True) - datadf.dropna(axis=0, how='any', inplace=True) + datadf.drop_nulls() + #datadf.dropna(axis=1, how='all', inplace=True) + #datadf.dropna(axis=0, how='any', inplace=True) # test if we have drive energy try: # pragma: no cover @@ -2347,42 +2350,43 @@ def interactive_cum_flex_chart2(theworkouts, promember=0, yparamname2 = axlabels[yparam2] # check if dataframe not empty - if datadf.empty: # pragma: no cover + if datadf.is_empty(): # pragma: no cover return ['', '
No non-zero data in selection
', '', ''] try: - datadf['x1'] = datadf.loc[:, xparam] + datadf = datadf.with_columns(pl.col(xparam).alias("x1")) except KeyError: # pragma: no cover try: - datadf['x1'] = datadf['distance'] + datadf = datadf.with_columns(pl.col("distance").alias("x1")) except KeyError: try: - datadf['x1'] = datadf['time'] + datadf = datadf.with_columns(pl.col('time').alias("x1")) except KeyError: # pragma: no cover return ['', 'No non-zero data in selection
', '', ''] try: - datadf['y1'] = datadf.loc[:, yparam1] + datadf = datadf.with_columns(pl.col(yparam1).alias("y1")) except KeyError: try: - datadf['y1'] = datadf['pace'] + datadf = datadf.with_columns(pl.col('pace').alias("y1")) except KeyError: # pragma: no cover return ['', 'No non-zero data in selection
', '', ''] if yparam2 != 'None': try: - datadf['y2'] = datadf.loc[:, yparam2] + datadf = datadf.with_columns(pl.col(yparam2).alias("y2")) except KeyError: # pragma: no cover - datadf['y2'] = datadf['y1'] + datadf = datadf.with_columns(pl.col("y1").alias("y2")) else: # pragma: no cover - datadf['y2'] = datadf['y1'] + datadf = datadf.with_columns(pl.col("y1").alias("y2")) - datadf['xname'] = axlabels[xparam] - datadf['yname1'] = axlabels[yparam1] + datadf = datadf.with_columns(xname = pl.lit(axlabels[xparam])) + datadf = datadf.with_columns(yname1 = pl.lit(axlabels[yparam1])) + if yparam2 != 'None': - datadf['yname2'] = axlabels[yparam2] + datadf = datadf.with_columns(yname2 = pl.lit(axlabels[yparam2])) else: # pragma: no cover - datadf['yname2'] = axlabels[yparam1] + datadf = datadf.with_columns(yname2 = pl.lit(axlabels[yparam1])) def func(x, a, b): return a*x+b @@ -2392,11 +2396,12 @@ def interactive_cum_flex_chart2(theworkouts, promember=0, try: popt, pcov = optimize.curve_fit(func, x1, y1) ytrend = func(x1, popt[0], popt[1]) - datadf['ytrend'] = ytrend + datadf= datadf.with_columns(ytrend = ytrend) except TypeError: - datadf['ytrend'] = y1 + datadf = datadf.with_columns(ytrend = y1) - data_dict = datadf.to_dict("records") + + data_dict = datadf.to_dicts() metrics_list = [{'name': name, 'rowingmetrics':d } for name, d in metrics.rowingmetrics] diff --git a/rowers/views/analysisviews.py b/rowers/views/analysisviews.py index 86e353ca..4ff7d4ca 100644 --- a/rowers/views/analysisviews.py +++ b/rowers/views/analysisviews.py @@ -296,7 +296,10 @@ def analysis_new(request, df = cpdata(tw, options) options['savedata'] = False request.session['options'] = options - response = HttpResponse(df.to_csv()) + try: + response = HttpResponse(df.to_csv()) + except AttributeError: + response = HttpResponse(df.write_csv()) code = str(uuid4()) filename = code+'.csv' chartform.fields['savedata'].initial = False @@ -568,7 +571,7 @@ def flexalldata(workouts, options): workstrokesonly = not includereststrokes columns = [xparam, yparam1, yparam2, 'spm', 'driveenergy', 'distance'] ids = [int(w.id) for w in workouts] - df = dataprep.getsmallrowdata_db(columns, ids=ids, + df = dataprep.getsmallrowdata_pl(columns, ids=ids, workstrokesonly=workstrokesonly, doclean=True, )