diff --git a/rowers/dataprep.py b/rowers/dataprep.py index f206a457..26684810 100644 --- a/rowers/dataprep.py +++ b/rowers/dataprep.py @@ -31,6 +31,11 @@ from zipfile import BadZipFile import zipfile import os from rowers.models import strokedatafields +import grpc +import grpc.experimental +import rowers.rowing_workout_metrics_pb2 as metrics_pb2 +import rowers.rowing_workout_metrics_pb2_grpc as metrics_pb2_grpc +import traceback from rowingdata import ( KinoMapParser, @@ -497,53 +502,85 @@ def calculate_goldmedalstandard(rower, workout, recurrance=True): def setcp(workout, background=False, recurrance=True): - filename = 'media/cpdata_{id}.parquet.gz'.format(id=workout.id) + try: + filename = 'media/cpdata_{id}.parquet.gz'.format(id=workout.id) + df = pd.read_parquet(filename) + + if not df.empty: + # check dts + tarr = datautils.getlogarr(4000) + if df['delta'][0] in tarr: + return(df, df['delta'], df['cp']) + except: + pass strokesdf = getsmallrowdata_db( ['power', 'workoutid', 'time'], ids=[workout.id]) - try: - if strokesdf['power'].std() == 0: - return pd.DataFrame(), pd.Series(dtype='float'), pd.Series(dtype='float') - except (KeyError, TypeError): - return pd.DataFrame(), pd.Series(dtype='float'), pd.Series(dtype='float') - - if background: # pragma: no cover - _ = myqueue(queuelow, handle_setcp, strokesdf, filename, workout.id) + if strokesdf.empty: return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') + + totaltime = strokesdf['time'].max() + maxt = totaltime/1000. + logarr = datautils.getlogarr(maxt) + + csvfilename = workout.csvfilename + # check what the real file name is + if os.path.exists(csvfilename): + csvfile = csvfilename + elif os.path.exists(csvfilename+'.csv'): # pragma: no cover + csvfile = csvfilename+'.csv' + elif os.path.exists(csvfilename+'.gz'): # pragma: no cover + csvfile = csvfilename+'.gz' + else: # pragma: no cover + return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') + csvfile = os.path.abspath(csvfile) - if not strokesdf.empty: - totaltime = strokesdf['time'].max() + + with grpc.insecure_channel( + target='localhost:50052', + options=[('grpc.lb_policy_name', 'pick_first'), + ('grpc.enable_retries', 0), ('grpc.keepalive_timeout_ms', + 10000)] + ) as channel: try: - powermean = strokesdf['power'].mean() - except KeyError: # pragma: no cover - powermean = 0 + grpc.channel_ready_future(channel).result(timeout=10) + except grpc.FutureTimeoutError: # pragma: no cover + dologging('metrics.log','grpc channel time out in setcp') + return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') - if powermean != 0: - thesecs = totaltime - maxt = 1.05 * thesecs + stub = metrics_pb2_grpc.MetricsStub(channel) + req = metrics_pb2.CPRequest(filename = csvfile, filetype = "CSV", tarr = logarr) - if maxt > 0: - logarr = datautils.getlogarr(maxt) - dfgrouped = strokesdf.groupby(['workoutid']) - delta, cpvalues, avgpower = datautils.getcp(dfgrouped, logarr) + try: + response = stub.GetCP(req, timeout=60) + except Exception as e: + dologging('metrics.log', traceback.format_exc()) + return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') - df = pd.DataFrame({ - 'delta': delta, - 'cp': cpvalues, - 'id': workout.id, - }) - df.to_parquet(filename, engine='fastparquet', - compression='GZIP') - if recurrance: - goldmedalstandard, goldmedalduration = calculate_goldmedalstandard( - workout.user, workout) - workout.goldmedalstandard = goldmedalstandard - workout.goldmedalduration = goldmedalduration - workout.save() - return df, delta, cpvalues + delta = pd.Series(np.array(response.delta)) + cpvalues = pd.Series(np.array(response.power)) + powermean = response.avgpower - return pd.DataFrame({'delta': [], 'cp': []}), pd.Series(dtype='float'), pd.Series(dtype='float') + + + df = pd.DataFrame({ + 'delta': delta, + 'cp': cpvalues, + 'id': workout.id, + }) + + df.to_parquet(filename, engine='fastparquet', compression='GZIP') + + if recurrance: + goldmedalstandard, goldmedalduration = calculate_goldmedalstandard( + workout.user, workout) + workout.goldmedalstandard = goldmedalstandard + workout.goldmedalduration = goldmedalduration + workout.save() + + return df, delta, cpvalues + def update_wps(r, types, mode='water', asynchron=True): @@ -679,9 +716,11 @@ def join_workouts(r, ids, title='Joined Workout', def fetchcp_new(rower, workouts): data = [] + for workout in workouts: cpfile = 'media/cpdata_{id}.parquet.gz'.format(id=workout.id) try: + df, delta, cpvalues = setcp(workout) df = pd.read_parquet(cpfile) df['workout'] = str(workout) df['url'] = workout.url() @@ -698,12 +737,15 @@ def fetchcp_new(rower, workouts): if len(data) > 1: df = pd.concat(data, axis=0) + try: df = df[df['cp'] == df.groupby(['delta'])['cp'].transform('max')] except KeyError: # pragma: no cover return pd.Series(dtype='float'), pd.Series(dtype='float'), 0, pd.Series(dtype='float'), pd.Series(dtype='float') df = df.sort_values(['delta']).reset_index() + df = df[df['cp']>20] + return df['delta'], df['cp'], 0, df['workout'], df['url'] diff --git a/rowers/datautils.py b/rowers/datautils.py index 029de2f3..cb17a357 100644 --- a/rowers/datautils.py +++ b/rowers/datautils.py @@ -1,4 +1,3 @@ - import pandas as pd import numpy as np from scipy.interpolate import griddata @@ -124,18 +123,18 @@ def cpfit(powerdf, fraclimit=0.0001, nmax=1000): def getlogarr(maxt): - maxlog10 = np.log10(maxt-5) + dtmin = 10 + maxlog10 = np.log10(maxt) # print(maxlog10,round(maxlog10)) - aantal = 10*round(maxlog10) + aantal = 40 logarr = np.arange(aantal+1)/10. + vs = 10.**logarr + res = [] - for la in logarr: - try: - v = 5+int(10.**(la)) - except ValueError: # pragma: no cover - v = 0 - res.append(v) + for v in vs: + if v > dtmin and v pwr+1) res2 = np.sum(cpvalues > pwr2+1)