bla
This commit is contained in:
@@ -4,7 +4,6 @@ from __future__ import print_function
|
||||
from __future__ import unicode_literals
|
||||
|
||||
|
||||
|
||||
# All the data preparation, data cleaning and data mangling should
|
||||
# be defined here
|
||||
from __future__ import unicode_literals, absolute_import
|
||||
@@ -26,6 +25,8 @@ from rowers.tasks import handle_sendemail_unrecognized
|
||||
from rowers.tasks import handle_zip_file
|
||||
|
||||
from pandas import DataFrame, Series
|
||||
import dask.dataframe as dd
|
||||
from dask.delayed import delayed
|
||||
|
||||
from django.utils import timezone
|
||||
from django.utils.timezone import get_current_timezone
|
||||
@@ -349,7 +350,7 @@ def clean_df_stats(datadf, workstrokesonly=True, ignorehr=True,
|
||||
# clean data remove zeros and negative values
|
||||
|
||||
# bring metrics which have negative values to positive domain
|
||||
if datadf.empty:
|
||||
if len(datadf)==0:
|
||||
return datadf
|
||||
try:
|
||||
datadf['catch'] = -datadf['catch']
|
||||
@@ -1771,8 +1772,31 @@ def getrowdata_db(id=0, doclean=False, convertnewtons=True,
|
||||
|
||||
# Fetch a subset of the data from the DB
|
||||
|
||||
def getsmallrowdata_db(columns, ids=[], doclean=True,workstrokesonly=True):
|
||||
prepmultipledata(ids)
|
||||
|
||||
def getsmallrowdata_db(columns, ids=[], doclean=True, workstrokesonly=True):
|
||||
csvfilenames = ['media/strokedata_{id}.parquet'.format(id=id) for id in ids]
|
||||
data = []
|
||||
columns = [c for c in columns if c != 'None']
|
||||
|
||||
for f in csvfilenames:
|
||||
df = dd.read_parquet(f,columns=columns,engine='pyarrow')
|
||||
data.append(df)
|
||||
|
||||
df = dd.concat(data,axis=0)
|
||||
|
||||
data = df.compute()
|
||||
data = data.loc[:,~data.columns.duplicated()]
|
||||
extracols = []
|
||||
if doclean:
|
||||
data = clean_df_stats(data, ignorehr=True,
|
||||
workstrokesonly=workstrokesonly)
|
||||
data.dropna(axis=1,how='all',inplace=True)
|
||||
data.dropna(axis=0,how='any',inplace=True)
|
||||
|
||||
return data
|
||||
|
||||
def getsmallrowdata_db_old(columns, ids=[], doclean=True, workstrokesonly=True):
|
||||
prepmultipledata(ids)
|
||||
data,extracols = read_cols_df_sql(ids, columns)
|
||||
if extracols and len(ids)==1:
|
||||
@@ -1850,31 +1874,20 @@ def getrowdata(id=0):
|
||||
# safety net for programming errors elsewhere in the app
|
||||
# Also used heavily when I moved from CSV file only to CSV+Stroke data
|
||||
|
||||
import glob
|
||||
|
||||
def prepmultipledata(ids, verbose=False):
|
||||
query = sa.text('SELECT DISTINCT workoutid FROM strokedata')
|
||||
engine = create_engine(database_url, echo=False)
|
||||
filenames = glob.glob('media/*.parquet')
|
||||
ids = [id for id in ids if 'media/strokedata_{id}.parquet'.format(id=id) not in filenames]
|
||||
|
||||
with engine.connect() as conn, conn.begin():
|
||||
res = conn.execute(query)
|
||||
res = list(itertools.chain.from_iterable(res.fetchall()))
|
||||
conn.close()
|
||||
engine.dispose()
|
||||
|
||||
try:
|
||||
ids2 = [int(id) for id in ids]
|
||||
except ValueError:
|
||||
ids2 = ids
|
||||
|
||||
res = list(set(ids2) - set(res))
|
||||
for id in res:
|
||||
for id in ids:
|
||||
rowdata, row = getrowdata(id=id)
|
||||
if verbose:
|
||||
print(id)
|
||||
if rowdata and len(rowdata.df):
|
||||
data = dataprep(rowdata.df, id=id, bands=True,
|
||||
barchart=True, otwpower=True)
|
||||
return res
|
||||
return ids
|
||||
|
||||
# Read a set of columns for a set of workout ids, returns data as a
|
||||
# pandas dataframe
|
||||
@@ -2292,19 +2305,6 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
|
||||
except KeyError:
|
||||
rowdatadf[' ElapsedTime (sec)'] = rowdatadf['TimeStamp (sec)']
|
||||
|
||||
if barchart:
|
||||
# time increments for bar chart
|
||||
time_increments = rowdatadf.loc[:, ' ElapsedTime (sec)'].diff()
|
||||
try:
|
||||
time_increments.iloc[0] = time_increments.iloc[1]
|
||||
except (KeyError, IndexError):
|
||||
time_increments.iloc[0] = 1.
|
||||
|
||||
time_increments = 0.5 * time_increments + 0.5 * np.abs(time_increments)
|
||||
x_right = (t2 + time_increments.apply(lambda x: timedeltaconv(x)))
|
||||
|
||||
data['x_right'] = x_right
|
||||
|
||||
if empower:
|
||||
try:
|
||||
wash = rowdatadf.loc[:, 'wash']
|
||||
@@ -2441,12 +2441,10 @@ def dataprep(rowdatadf, id=0, bands=True, barchart=True, otwpower=True,
|
||||
# write data if id given
|
||||
if id != 0:
|
||||
data['workoutid'] = id
|
||||
filename = 'media/strokedata_{id}.parquet'.format(id=id)
|
||||
# df = dd.from_pandas(data,npartitions=1)
|
||||
data.to_parquet(filename,engine='pyarrow')
|
||||
|
||||
engine = create_engine(database_url, echo=False)
|
||||
with engine.connect() as conn, conn.begin():
|
||||
data.to_sql('strokedata', engine, if_exists='append', index=False)
|
||||
conn.close()
|
||||
engine.dispose()
|
||||
return data
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user