Private
Public Access
1
0

updated dataprepnodjango

This commit is contained in:
Sander Roosendaal
2019-10-23 20:04:28 +02:00
parent fa373d781f
commit c0e8e448e3
3 changed files with 172 additions and 146 deletions

View File

@@ -15,6 +15,7 @@ from rowingdata import rowingdata as rrdata
from rowingdata import rower as rrower from rowingdata import rower as rrower
import shutil
from shutil import copyfile from shutil import copyfile
from rowingdata import ( from rowingdata import (
@@ -1651,75 +1652,7 @@ def new_workout_from_df(r, df,
return (id, message) return (id, message)
# Compare the data from the CSV file and the database
# Currently only calculates number of strokes. To be expanded with
# more elaborate testing if needed
def compare_data(id):
row = Workout.objects.get(id=id)
f1 = row.csvfilename
try:
rowdata = rdata(f1)
l1 = len(rowdata.df)
except AttributeError:
rowdata = 0
l1 = 0
engine = create_engine(database_url, echo=False)
query = sa.text('SELECT COUNT(*) FROM strokedata WHERE workoutid={id};'.format(
id=id,
))
with engine.connect() as conn, conn.begin():
try:
res = conn.execute(query)
l2 = res.fetchall()[0][0]
except:
print("Database Locked")
conn.close()
engine.dispose()
lfile = l1
ldb = l2
return l1 == l2 and l1 != 0, ldb, lfile
# Repair data for workouts where the CSV file is lost (or the DB entries
# don't exist)
def repair_data(verbose=False):
ws = Workout.objects.all()
for w in ws:
if verbose:
sys.stdout.write(".")
test, ldb, lfile = compare_data(w.id)
if not test:
if verbose:
print(w.id, lfile, ldb)
try:
rowdata = rdata(w.csvfilename)
if rowdata and len(rowdata.df):
update_strokedata(w.id, rowdata.df)
except (IOError, AttributeError):
pass
if lfile == 0:
# if not ldb - delete workout
try:
data = read_df_sql(w.id)
try:
datalength = len(data)
except AttributeError:
datalength = 0
if datalength != 0:
data.rename(columns=columndict, inplace=True)
res = data.to_csv(w.csvfilename + '.gz',
index_label='index',
compression='gzip')
else:
w.delete()
except:
pass
# A wrapper around the rowingdata class, with some error catching # A wrapper around the rowingdata class, with some error catching
@@ -1745,17 +1678,11 @@ def rdata(file, rower=rrower()):
def delete_strokedata(id): def delete_strokedata(id):
engine = create_engine(database_url, echo=False) dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id)
query = sa.text('DELETE FROM strokedata WHERE workoutid={id};'.format( try:
id=id, shutil.rmtree(dirname)
)) except FileNotFoundError:
with engine.connect() as conn, conn.begin(): pass
try:
result = conn.execute(query)
except:
print("Database Locked")
conn.close()
engine.dispose()
# Replace stroke data in DB with data from CSV file # Replace stroke data in DB with data from CSV file
@@ -1782,7 +1709,6 @@ def testdata(time, distance, pace, spm):
def getrowdata_db(id=0, doclean=False, convertnewtons=True, def getrowdata_db(id=0, doclean=False, convertnewtons=True,
checkefficiency=True): checkefficiency=True):
data = read_df_sql(id) data = read_df_sql(id)
data['x_right'] = data['x_right'] / 1.0e6
data['deltat'] = data['time'].diff() data['deltat'] = data['time'].diff()
if data.empty: if data.empty:
@@ -2010,6 +1936,66 @@ def prepmultipledata(ids, verbose=False):
def read_cols_df_sql(ids, columns, convertnewtons=True): def read_cols_df_sql(ids, columns, convertnewtons=True):
# drop columns that are not in offical list # drop columns that are not in offical list
# axx = [ax[0] for ax in axes] # axx = [ax[0] for ax in axes]
extracols = []
columns = list(columns) + ['distance', 'spm', 'workoutid']
columns = [x for x in columns if x != 'None']
columns = list(set(columns))
ids = [int(id) for id in ids]
if len(ids) == 0:
return pd.DataFrame(),extracols
elif len(ids) == 1:
try:
filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0])
df = pd.read_parquet(filename,columns=columns)
except OSError:
rowdata,row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df):
datadf = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True)
df = pd.read_parquet(filename,columns=columns)
else:
data = []
filenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids]
for id,f in zip(ids,filenames):
try:
df = pd.read_parquet(f,columns=columns)
data.append(df)
except OSError:
rowdata,row = getrowdata(id=id)
if rowdata and len(rowdata.df):
datadf = dataprep(rowdata.df,id=id,bands=True,otwpower=True,barchart=True)
df = pd.read_parquet(f,columns=columns)
data.append(df)
df = pd.concat(data,axis=0)
df = df.fillna(value=0)
if 'peakforce' in columns:
funits = ((w.id, w.forceunit)
for w in Workout.objects.filter(id__in=ids))
for id, u in funits:
if u == 'lbs':
mask = df['workoutid'] == id
df.loc[mask, 'peakforce'] = df.loc[mask, 'peakforce'] * lbstoN
if 'averageforce' in columns:
funits = ((w.id, w.forceunit)
for w in Workout.objects.filter(id__in=ids))
for id, u in funits:
if u == 'lbs':
mask = df['workoutid'] == id
df.loc[mask, 'averageforce'] = df.loc[mask,
'averageforce'] * lbstoN
return df,extracols
def read_cols_df_sql_old(ids, columns, convertnewtons=True):
# drop columns that are not in offical list
# axx = [ax[0] for ax in axes]
prepmultipledata(ids) prepmultipledata(ids)
axx = [f.name for f in StrokeData._meta.get_fields()] axx = [f.name for f in StrokeData._meta.get_fields()]
@@ -2076,8 +2062,34 @@ def read_cols_df_sql(ids, columns, convertnewtons=True):
# Read stroke data from the DB for a Workout ID. Returns a pandas dataframe # Read stroke data from the DB for a Workout ID. Returns a pandas dataframe
def read_df_sql(id): def read_df_sql(id):
try:
f = 'media/strokedata_{id}.parquet.gz'.format(id=id)
df = pd.read_parquet(f)
except OSError:
rowdata,row = getrowdata(id=ids[0])
if rowdata and len(rowdata.df):
data = dataprep(rowdata.df,id=ids[0],bands=True,otwpower=True,barchart=True)
df = pd.read_parquet(f)
df = df.fillna(value=0)
funit = Workout.objects.get(id=id).forceunit
if funit == 'lbs':
try:
df['peakforce'] = df['peakforce'] * lbstoN
except KeyError:
pass
try:
df['averageforce'] = df['averageforce'] * lbstoN
except KeyError:
pass
return df
def read_df_sql_old(id):
engine = create_engine(database_url, echo=False) engine = create_engine(database_url, echo=False)
df = pd.read_sql_query(sa.text('SELECT * FROM strokedata WHERE workoutid={id} ORDER BY time ASC'.format( df = pd.read_sql_query(sa.text('SELECT * FROM strokedata WHERE workoutid={id} ORDER BY time ASC'.format(
@@ -2269,14 +2281,13 @@ def add_efficiency(id=0):
rowdata = rowdata.fillna(method='ffill') rowdata = rowdata.fillna(method='ffill')
delete_strokedata(id) delete_strokedata(id)
if id != 0: if id != 0:
rowdata['workoutid'] = id rowdata['workoutid'] = id
engine = create_engine(database_url, echo=False) filename = 'media/strokedata_{id}.parquet.gz'.format(id=id)
with engine.connect() as conn, conn.begin(): df = dd.from_pandas(rowdata,npartitions=1)
rowdata.to_sql('strokedata', engine, df.to_parquet(filename,engine='fastparquet',compression='GZIP')
if_exists='append', index=False)
conn.close()
engine.dispose()
return rowdata return rowdata
# This is the main routine. # This is the main routine.
@@ -2563,14 +2574,8 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
filename = 'media/strokedata_{id}.parquet.gz'.format(id=id) filename = 'media/strokedata_{id}.parquet.gz'.format(id=id)
df = dd.from_pandas(data,npartitions=1) df = dd.from_pandas(data,npartitions=1)
#df = df.loc[:,~df.columns.duplicated()]
# data.to_csv(filename,compression='gzip')
df.to_parquet(filename,engine='fastparquet',compression='GZIP') df.to_parquet(filename,engine='fastparquet',compression='GZIP')
# data.to_parquet(filename,engine='fastparquet',compression='gzip')
# table = pa.Table.from_pandas(data)
#pq.write_table(table,filename)
return data return data

View File

@@ -28,6 +28,35 @@ from rowsandall_app.settings_dev import use_sqlite
from rowers.utils import lbstoN from rowers.utils import lbstoN
# dtypes
dtypes = {'workoutid':int,
'hr':int,
'pace':float,
'velo':float,
'spm':float,
'driveenergy':float,
'power':float,
'averageforce':float,
'peakforce':float,
'drivelength':float,
'distance':float,
'cumdist':float,
'drivespeed':float,
'catch':float,
'slip':float,
'finish':float,
'wash':float,
'peakforceangle':float,
'totalangle':float,
'effectiveangle':float,
'rhythm':float,
'efficiency':float,
'distanceperstroke':float,
'ftime':str,
'fpace':str,
'fergpace':str,
'fnowindpace':str,
}
try: try:
user = DATABASES['default']['USER'] user = DATABASES['default']['USER']
@@ -637,20 +666,11 @@ def new_workout_from_file(r,f2,
return (id,message,f2) return (id,message,f2)
def delete_strokedata(id,debug=False): def delete_strokedata(id,debug=False):
if debug: dirname = 'media/strokedata_{id}.parquet.gz'.format(id=id)
engine = create_engine(database_url_debug, echo=False) try:
else: shutil.rmtree(dirname)
engine = create_engine(database_url, echo=False) except FileNotFoundError:
query = sa.text('DELETE FROM strokedata WHERE workoutid={id};'.format( pass
id=id,
))
with engine.connect() as conn, conn.begin():
try:
result = conn.execute(query)
except:
print("Database Locked")
conn.close()
engine.dispose()
def update_strokedata(id,df,debug=False): def update_strokedata(id,df,debug=False):
delete_strokedata(id,debug=debug) delete_strokedata(id,debug=debug)
@@ -723,19 +743,18 @@ def getsmallrowdata_db(columns,ids=[],debug=False):
if len(ids)>1: if len(ids)>1:
for f in csvfilenames: for f in csvfilenames:
try: try:
df = dd.read_parquet(f,columns=columns,engine='pyarrow') df = pd.read_parquet(f,columns=columns,engine='pyarrow')
data.append(df) data.append(df)
except OSError: except OSError:
pass pass
df = dd.concat(data,axis=0) df = pd.concat(data,axis=0)
else: else:
df = dd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow') df = pd.read_parquet(csvfilenames[0],columns=columns,engine='pyarrow')
data = df.compute()
return data return df
def fitnessmetric_to_sql(m,table='powertimefitnessmetric',debug=False, def fitnessmetric_to_sql(m,table='powertimefitnessmetric',debug=False,
doclean=False): doclean=False):
@@ -779,51 +798,42 @@ def read_cols_df_sql(ids,columns,debug=False):
columns = list(columns)+['distance','spm'] columns = list(columns)+['distance','spm']
columns = [x for x in columns if x != 'None'] columns = [x for x in columns if x != 'None']
columns = list(set(columns)) columns = list(set(columns))
cls = ''
ids = [int(id) for id in ids] ids = [int(id) for id in ids]
if debug:
engine = create_engine(database_url_debug, echo=False)
else:
engine = create_engine(database_url, echo=False)
for column in columns:
cls += column+', '
cls = cls[:-2]
if len(ids) == 0: if len(ids) == 0:
query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid=0'.format( return pd.DataFrame()
columns = cls,
))
elif len(ids) == 1: elif len(ids) == 1:
query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid={id}'.format( try:
id = ids[0], filename = 'media/strokedata_{id}.parquet.gz'.format(id=ids[0])
columns = cls, df = pd.read_parquet(filename,columns=columns)
)) except OSError:
pass
else: else:
query = sa.text('SELECT {columns} FROM strokedata WHERE workoutid IN {ids}'.format( data = []
columns = cls, filenames = ['media/strokedata_{id}.parquet.gz'.format(id=id) for id in ids]
ids = tuple(ids), for id,f in zip(ids,filenames):
)) try:
df = pd.read_parquet(f,columns=columns)
df = pd.read_sql_query(query,engine) data.append(df)
engine.dispose() except OSError:
pass
df = pd.concat(data,axis=0)
return df return df
def read_df_sql(id,debug=False): def read_df_sql(id,debug=False):
if debug: try:
engine = create_engine(database_url_debug, echo=False) f = 'media/strokedata_{id}.parquet.gz'.format(id=id)
print("read_df",id) df = pd.read_parquet(f)
print(database_url_debug) except OSError:
else: pass
engine = create_engine(database_url, echo=False)
df = pd.read_sql_query(sa.text( df = df.fillna(value=0)
'SELECT * FROM strokedata WHERE workoutid={id}'.format(
id=id
)), engine)
engine.dispose()
return df return df
def getcpdata_sql(rower_id,table='cpdata',debug=False): def getcpdata_sql(rower_id,table='cpdata',debug=False):
@@ -1266,6 +1276,9 @@ def dataprep(rowdatadf,id=0,bands=True,barchart=True,otwpower=True,
# write data if id given # write data if id given
if id != 0: if id != 0:
data['workoutid'] = id data['workoutid'] = id
data.to_parquet(filename,engine='pyarrow',compression='gzip') data = data.astype(dtype=dtypes)
filename = 'media/strokedata_{id}.parquet.gz'.format(id=id)
df = dd.from_pandas(data,npartitions=1)
df.to_parquet(filename,engine='fastparquet',compression='GZIP')
return data return data

View File

@@ -28,6 +28,8 @@ from django_countries.fields import CountryField
from scipy.interpolate import splprep, splev, CubicSpline from scipy.interpolate import splprep, splev, CubicSpline
import numpy as np import numpy as np
import shutil
from django.conf import settings from django.conf import settings
from sqlalchemy import create_engine from sqlalchemy import create_engine
import sqlalchemy as sa import sqlalchemy as sa
@@ -2805,6 +2807,12 @@ def auto_delete_file_on_delete(sender, instance, **kwargs):
if instance.csvfilename+'.gz': if instance.csvfilename+'.gz':
if os.path.isfile(instance.csvfilename+'.gz'): if os.path.isfile(instance.csvfilename+'.gz'):
os.remove(instance.csvfilename+'.gz') os.remove(instance.csvfilename+'.gz')
# remove parquet file
try:
dirname = 'media/strokedata_{id}.parquet.gz'.format(id=instance.id)
shutil.rmtree(dirname)
except FileNotFoundError:
pass
@receiver(models.signals.post_delete,sender=Workout) @receiver(models.signals.post_delete,sender=Workout)
def update_duplicates_on_delete(sender, instance, **kwargs): def update_duplicates_on_delete(sender, instance, **kwargs):