diff --git a/rowers/dataprep.py b/rowers/dataprep.py index c5195951..f6de443b 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -8,6 +8,7 @@ from rowers.datautils import p0 from scipy import optimize from rowers.utils import calculate_age import datetime +import gzip from scipy.signal import savgol_filter from rowers.opaque import encoder from rowers.database import * @@ -27,6 +28,7 @@ from fitparse import FitFile import itertools import numpy as np import pandas as pd +import polars as pl from zipfile import BadZipFile import zipfile import os @@ -219,18 +221,18 @@ def check_marker(workout): ids.append(w.id) gms.append(gmstandard) - df = pd.DataFrame({ + df = pl.DataFrame({ 'id': ids, 'gms': gms, }) - if df.empty: # pragma: no cover + if df.is_empty(): # pragma: no cover workout.ranking = True workout.save() return workout - indexmax = df['gms'].idxmax() - theid = df.loc[indexmax, 'id'] + theid = df.filter(pl.col("gms") == pl.col("gms").max())['id'][0] + wmax = Workout.objects.get(id=theid) # gms_max = wmax.goldmedalstandard @@ -326,7 +328,7 @@ def workout_summary_to_df( goldstandarddurations.append(int(goldstandardduration)) rankingpieces.append(w.rankingpiece) - df = pd.DataFrame({ + df = pl.DataFrame({ 'ID': ids, 'date': startdatetimes, 'name': names, @@ -420,19 +422,21 @@ def calculate_goldmedalstandard(rower, workout, recurrance=True): try: df = pl.read_parquet(cpfile) except: + df = getsmallrowdata_pl(['power'], ids=[workout.id]) background = True if settings.TESTING: background = False - df, delta, cpvalues = setcp(workout, background=background) - if df.empty: - return 0, 0 - df = pl.from_pandas(df) + if recurrance: + df, delta, cpvalues = setcp(workout, background=background) + if df.is_empty(): + return 0, 0 + else: + return 0,0 if df.is_empty() and recurrance: # pragma: no cover df, delta, cpvalues = setcp(workout, recurrance=False, background=True) - if df.empty: + if df.is_empty(): return 0, 0 - df = pl.from_pandas(df) age = calculate_age(rower.birthdate, today=workout.date) @@ -459,7 +463,7 @@ def calculate_goldmedalstandard(rower, workout, recurrance=True): if getrecords: # pragma: no cover durations = [1, 4, 30, 60] distances = [100, 500, 1000, 2000, 5000, 6000, 10000, 21097, 42195] - df2 = pd.DataFrame( + df2 = pl.DataFrame( list( C2WorldClassAgePerformance.objects.filter( sex=rower.sex, @@ -467,7 +471,7 @@ def calculate_goldmedalstandard(rower, workout, recurrance=True): ).values() ) ) - jsondf = df2.to_json() + jsondf = df2.write_json() _ = myqueue(queuelow, handle_getagegrouprecords, jsondf, distances, durations, age, rower.sex, rower.weightcategory) @@ -511,21 +515,21 @@ def calculate_goldmedalstandard(rower, workout, recurrance=True): def setcp(workout, background=False, recurrance=True): try: filename = 'media/cpdata_{id}.parquet.gz'.format(id=workout.id) - df = pd.read_parquet(filename) + df = pl.read_parquet(filename) - if not df.empty: + if not df.is_empty(): # check dts tarr = datautils.getlogarr(4000) if df['delta'][0] in tarr: return(df, df['delta'], df['cp']) - except: + except Exception as e: pass - strokesdf = getsmallrowdata_db( + strokesdf = getsmallrowdata_pl( ['power', 'workoutid', 'time'], ids=[workout.id]) - if strokesdf.empty: - return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') + if strokesdf.is_empty(): + return pl.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') totaltime = strokesdf['time'].max() maxt = totaltime/1000. @@ -565,23 +569,27 @@ def setcp(workout, background=False, recurrance=True): dologging('metrics.log', traceback.format_exc()) return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') - delta = pd.Series(np.array(response.delta)) - cpvalues = pd.Series(np.array(response.power)) + delta = pl.Series(np.array(response.delta)) + cpvalues = pl.Series(np.array(response.power)) powermean = response.avgpower - - - df = pd.DataFrame({ + df = pl.DataFrame({ 'delta': delta, 'cp': cpvalues, 'id': workout.id, }) - df.to_parquet(filename, engine='fastparquet', compression='GZIP') + df = df.drop_nulls() + + with gzip.open(filename, 'w') as f: + df.write_parquet(f) + + + #df.to_parquet(filename, engine='fastparquet', compression='GZIP') if recurrance: goldmedalstandard, goldmedalduration = calculate_goldmedalstandard( - workout.user, workout) + workout.user, workout, recurrance=False) workout.goldmedalstandard = goldmedalstandard workout.goldmedalduration = goldmedalduration workout.save() @@ -737,24 +745,24 @@ def fetchcp_new(rower, workouts): except: # CP data file doesn't exist yet. has to be created df, delta, cpvalues = setcp(workout) - df['workout'] = str(workout) - df['url'] = workout.url() + df = df.with_columns((pl.lit(str(workout))).alias("workout")) + df = df.with_columns((pl.lit(workout.url())).alias("url")) data.append(df) if len(data) == 0: - return pd.Series(dtype='float'), pd.Series(dtype='float'), 0, pd.Series(dtype='float'), pd.Series(dtype='float') + return pl.Series(dtype='float'), pl.Series(dtype='float'), 0, pl.Series(dtype='float'), pl.Series(dtype='float') if len(data) > 1: - df = pd.concat(data, axis=0) + df = pl.concat(data) + + #df = df.to_pandas() try: - df = df[df['cp'] == df.groupby(['delta'])['cp'].transform('max')] - except KeyError: # pragma: no cover + df = df.group_by(pl.col("delta")).agg(pl.max("cp"), pl.max("workout"), pl.max("url")).sort("delta") + except (KeyError, ColumnNotFoundError): # pragma: no cover return pd.Series(dtype='float'), pd.Series(dtype='float'), 0, pd.Series(dtype='float'), pd.Series(dtype='float') - df = df.sort_values(['delta']).reset_index() - df = df[df['cp']>20] - + df = df.filter(pl.col("cp")>20) return df['delta'], df['cp'], 0, df['workout'], df['url'] @@ -810,16 +818,16 @@ def update_rolling_cp(r, types, mode='water', dosend=False): delta, cp, avgpower, workoutnames, urls = fetchcp_new(r, workouts) - powerdf = pd.DataFrame({ + powerdf = pl.DataFrame({ 'Delta': delta, 'CP': cp, }) - powerdf = powerdf[powerdf['CP'] > 0] - powerdf.dropna(axis=0, inplace=True) - powerdf.sort_values(['Delta', 'CP'], ascending=[1, 0], inplace=True) - powerdf.drop_duplicates(subset='Delta', keep='first', inplace=True) - if powerdf.empty: + powerdf = powerdf.filter(pl.col("CP")>0) + powerdf = powerdf.fill_nan(None).drop_nulls().sort(["Delta", "CP"]) + powerdf = powerdf.unique(subset=["Delta"], keep="first") + + if powerdf.is_empty(): return False res2 = datautils.cpfit(powerdf) @@ -1062,7 +1070,7 @@ def checkbreakthrough(w, r): workouttype = w.workouttype if workouttype in rowtypes: cpdf, delta, cpvalues = setcp(w) - if not cpdf.empty: + if not cpdf.is_empty(): if workouttype in otwtypes: try: res, btvalues, res2 = utils.isbreakthrough( diff --git a/rowers/interactiveplots.py b/rowers/interactiveplots.py index 072fe65c..80517b36 100644 --- a/rowers/interactiveplots.py +++ b/rowers/interactiveplots.py @@ -1380,7 +1380,7 @@ def interactive_chart(id=0, promember=0, intervaldata={}): columns = ['time', 'pace', 'hr', 'fpace', 'ftime', 'spm'] datadf = dataprep.getsmallrowdata_pl(columns, ids=[id]) - if datadf.is_eompty(): + if datadf.is_empty(): return "", "No Valid Data Available" datadf = datadf.fill_nan(None).drop_nulls() diff --git a/rowers/tasks.py b/rowers/tasks.py index 1a54e8c3..f2a7355f 100644 --- a/rowers/tasks.py +++ b/rowers/tasks.py @@ -562,7 +562,7 @@ def handle_sporttracks_workout_from_data(user, importid, source, strokedata = pd.DataFrame.from_dict({ key: pd.Series(value, dtype='object') for key, value in data.items() }) - + try: workouttype = data['type'] except KeyError: # pragma: no cover diff --git a/rowers/tests/test_imports.py b/rowers/tests/test_imports.py index 16346575..d5e9470d 100644 --- a/rowers/tests/test_imports.py +++ b/rowers/tests/test_imports.py @@ -1307,7 +1307,7 @@ class STObjects(DjangoTestCase): self.r.sporttrackstoken = '12' - self.r.sporttracksrefreshtoken = '12' + self.r.sporttroacksrefreshtoken = '12' self.r.sporttrackstokenexpirydate = arrow.get(datetime.datetime.now()+datetime.timedelta(days=1)).datetime self.r.save() diff --git a/rowers/tests/testdata/testdata.tcx.gz b/rowers/tests/testdata/testdata.tcx.gz index f0e0d26a..132867e1 100644 Binary files a/rowers/tests/testdata/testdata.tcx.gz and b/rowers/tests/testdata/testdata.tcx.gz differ diff --git a/rowers/utils.py b/rowers/utils.py index 8ea2225b..7cb1413b 100644 --- a/rowers/utils.py +++ b/rowers/utils.py @@ -6,6 +6,7 @@ from django.utils import timezone import math import numpy as np import pandas as pd +import polars as pl import colorsys from django.conf import settings import collections @@ -342,22 +343,22 @@ def isbreakthrough(delta, cpvalues, p0, p1, p2, p3, ratio): pwr *= ratio - delta = delta.astype(int, errors='ignore').values - cpvalues = cpvalues.astype(int, errors='ignore').values - pwr = pwr.astype(int, errors='ignore').values + delta = delta.cast(pl.Int32) + cpvalues = cpvalues.cast(pl.Int32) + pwr = pwr.cast(pl.Int32) - res = np.sum(cpvalues > pwr+1) - res2 = np.sum(cpvalues > pwr2+1) + btdf = pl.DataFrame({ + 'delta': delta, + 'cpvalues': cpvalues, + 'pwr': pwr, + 'pwr2': pwr2 + }) - btdf = pd.DataFrame( - { - 'delta': delta[cpvalues > pwr], - 'cpvalues': cpvalues[cpvalues > pwr], - 'pwr': pwr[cpvalues > pwr], - } - ) + res = btdf.select(pl.col("cpvalues")>pl.col("pwr")+1)['cpvalues'].sum() + res2 = btdf.select(pl.col("cpvalues")> pl.col("pwr2")+1)['cpvalues'].sum() - btdf.sort_values('delta', axis=0, inplace=True) + btdf = btdf.filter(pl.col("cpvalues")>pl.col("pwr")) + btdf = btdf.sort('delta') return res >= 1, btdf, res2 >= 1 diff --git a/rowers/views/exportviews.py b/rowers/views/exportviews.py index 15bf47f6..593fa23d 100644 --- a/rowers/views/exportviews.py +++ b/rowers/views/exportviews.py @@ -208,7 +208,7 @@ def workouts_summaries_email_view(request): ) df = dataprep.workout_summary_to_df( r, startdate=startdate, enddate=enddate) - df.to_csv(filename, encoding='utf-8') + df.write_csv(filename) _ = myqueue(queuehigh, handle_sendemailsummary, r.user.first_name, r.user.last_name,