diff --git a/rowers/management/commands/processemail.py b/rowers/management/commands/processemail.py
index a0413753..e4f88e28 100644
--- a/rowers/management/commands/processemail.py
+++ b/rowers/management/commands/processemail.py
@@ -65,7 +65,7 @@ def processattachment(rower, fileobj, title, uploadoptions):
workoutcsvfilename = workout.csvfilename[6:-4]
timestr = strftime("%Y%m%d-%H%M%S")
imagename = workoutcsvfilename + timestr + '.png'
- result = uploads.make_plot(
+ result,jobid = uploads.make_plot(
workout.user, workout, workoutcsvfilename,
workout.csvfilename,
plottype, title,
diff --git a/rowers/templates/async_tasks.html b/rowers/templates/async_tasks.html
new file mode 100644
index 00000000..1986acfb
--- /dev/null
+++ b/rowers/templates/async_tasks.html
@@ -0,0 +1,57 @@
+{% extends "base.html" %}
+{% load staticfiles %}
+{% load rowerfilters %}
+
+{% block title %}Rowsandall - Tasks {% endblock %}
+
+{% block content %}
+
+
+
Your Tasks Status
+Manage the asynchronous tasks running for you
+
+
+
+
+
+
+ | ID |
+ Task |
+ Status |
+ Action |
+
+
+
+ {% for task in taskstatus %}
+
+ |
+ {{ task|lookup:'id' }}
+ |
+
+ {{ task|lookup:'verbose' }}
+ |
+
+ {{ task|lookup:'status' }}
+ |
+ {% if task|lookup:'failed' %}
+
+ Remove
+ |
+ {% elif task|lookup:'finished' %}
+
+ Remove
+ |
+ {% else %}
+
+
+ |
+ {% endif %}
+
+ {% endfor %}
+
+
+
+
+
+
+{% endblock %}
diff --git a/rowers/uploads.py b/rowers/uploads.py
index 578d1807..a6fab4a1 100644
--- a/rowers/uploads.py
+++ b/rowers/uploads.py
@@ -175,11 +175,11 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0):
if settings.DEBUG:
- res = handle_makeplot.delay(f1,f2,title,
+ job = handle_makeplot.delay(f1,f2,title,
hrpwrdata,plotnr,
imagename)
else:
- res = queue.enqueue(handle_makeplot,f1,f2,
+ job = queue.enqueue(handle_makeplot,f1,f2,
title,hrpwrdata,
plotnr,imagename)
@@ -195,7 +195,7 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0):
width=width,height=height)
i.save()
- return i.id
+ return i.id,job.id
import c2stuff,stravastuff,sporttracksstuff,runkeeperstuff
import underarmourstuff,tpstuff
diff --git a/rowers/urls.py b/rowers/urls.py
index 9a4653f9..84a78222 100644
--- a/rowers/urls.py
+++ b/rowers/urls.py
@@ -138,6 +138,9 @@ urlpatterns = [
url(r'^user-multiflex-select/user/(?P\d+)/$',views.user_multiflex_select),
url(r'^user-multiflex-select/(?P\w+.*)/(?P\w+.*)$',views.user_multiflex_select),
url(r'^user-multiflex-select/$',views.user_multiflex_select),
+ url(r'^list-jobs/$',views.session_jobs_view),
+ url(r'^jobs-status/$',views.session_jobs_status),
+ url(r'^job-kill/(?P.*)$',views.kill_async_job),
url(r'^list-graphs/$',views.graphs_view),
url(r'^(?P\d+)/ote-bests/(?P\w+.*)/(?P\w+.*)$',views.rankings_view),
url(r'^(?P\d+)/ote-bests/(?P\d+)$',views.rankings_view),
diff --git a/rowers/views.py b/rowers/views.py
index e44ba678..4cd08a09 100644
--- a/rowers/views.py
+++ b/rowers/views.py
@@ -130,14 +130,17 @@ import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
-
-from redis import StrictRedis
+
+from redis import StrictRedis,Redis
from rq.exceptions import NoSuchJobError
from rq.registry import StartedJobRegistry
-from rq import Queue
+from rq import Queue,cancel_job
+queuefailed = Queue("failed",connection=Redis())
redis_connection = StrictRedis()
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
+rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection)
+rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection)
from rq.job import Job
@@ -170,6 +173,20 @@ from rowers.celery import result as celery_result
# Define the API documentation
schema_view = get_swagger_view(title='Rowsandall API')
+def remove_asynctask(request,id):
+ try:
+ oldtasks = request.session['async_tasks']
+ except KeyError:
+ oldtasks = []
+
+ newtasks = []
+ for task in oldtasks:
+ print task[0]
+ if id not in task[0]:
+ newtasks += [(task[0],task[1])]
+
+ request.session['async_tasks'] = newtasks
+
def get_job_result(jobid):
if settings.DEBUG:
result = celery_result.AsyncResult(jobid).result
@@ -188,6 +205,132 @@ def get_job_result(jobid):
return result
+verbose_job_status = {
+ 'updatecp': 'Critical Power Calculation for Ergometer Workouts',
+ 'updatecpwater': 'Critical Power Calculation for OTW Workouts',
+ 'otwsetpower': 'Rowing Physics OTW Power Calculation',
+ 'make_plot': 'Create static chart',
+ }
+
+def get_job_status(jobid):
+ if settings.DEBUG:
+ job = celery_result.AsyncResult(jobid)
+ jobresult = job.result
+ if 'fail' in job.status.lower():
+ jobresult = '0'
+ summary = {
+ 'status': job.status,
+ 'result': jobresult,
+ }
+ else:
+ try:
+ job = Job.fetch(jobid,connection=redis_connection)
+ summary = {
+ 'status':job.status,
+ 'result':job.result
+ }
+ except NoSuchJobError:
+ summary = {}
+
+
+ if 'fail' in summary['status'].lower():
+ summary['failed'] = True
+ else:
+ summary['failed'] = False
+
+ if 'success' in summary['status'].lower():
+ summary['finished'] = True
+ elif 'finished' in summary['status'].lower():
+ summary['finished'] = True
+ else:
+ summary['finished'] = False
+
+ return summary
+
+def kill_async_job(request,id='aap'):
+ if settings.DEBUG:
+ job = celery_result.AsyncResult(id)
+ job.revoke()
+ else:
+ try:
+ cancel_job(id,connect=redis_connection)
+ except NoSuchJobError:
+ pass
+
+ remove_asynctask(request,id)
+ url = reverse(session_jobs_status)
+
+ return HttpResponseRedirect(url)
+
+
+def get_all_queued_jobs(userid=0):
+ r = StrictRedis()
+
+ jobs = []
+
+ celerykeys = r.keys('celery*')
+ for key in celerykeys:
+ id= key[17:]
+ job = celery_result.AsyncResult(id)
+ jobresult = job.result
+ if 'fail' in job.status.lower():
+ jobresult = '0'
+ jobs.append(
+ (id,{
+ 'status':job.status,
+ 'result':jobresult,
+ 'function':'',
+ 'meta':job.info,
+ }))
+
+ ids = [j.id for j in queue.jobs]
+ ids += [j.id for j in queuehigh.jobs]
+ ids += [j.id for j in queuelow.jobs]
+ ids += [j.id for j in queuefailed.jobs]
+
+
+ for id in ids:
+ job = Job.fetch(id,connection=redis_connection)
+ jobs.append(
+ (id,{
+ 'status':job.get_status(),
+ 'result':job.result,
+ 'function':job.func_name,
+ 'meta':job.meta,
+ }))
+
+ return jobs
+
+def get_stored_tasks_status(request):
+ try:
+ taskids = request.session['async_tasks']
+ except KeyError:
+ taskids = []
+
+ taskstatus = [{
+ 'id':id,
+ 'status':get_job_status(id)['status'],
+ 'failed':get_job_status(id)['failed'],
+ 'finished':get_job_status(id)['finished'],
+ 'func_name':func_name,
+ 'verbose': verbose_job_status[func_name]
+ } for id,func_name in taskids]
+
+ return taskstatus
+
+@login_required()
+def session_jobs_view(request):
+ taskstatus = get_stored_tasks_status(request)
+
+ return HttpResponse(json.dumps(taskstatus))
+
+@login_required()
+def session_jobs_status(request):
+ taskstatus = get_stored_tasks_status(request)
+
+ return render(request,
+ 'async_tasks.html',
+ {'taskstatus':taskstatus})
# Test if row data include candidates
def rowhascoordinates(row):
@@ -3109,19 +3252,24 @@ def otwrankings_view(request,theuser=0,
runningjob = 0
- try:
- jobid = request.session['job_id']
- if jobid:
- result = get_job_result(jobid)
- if result:
- messages.info(request,'Your calculation is ready')
+ taskstatus = get_stored_tasks_status(request)
+ for task in taskstatus:
+ if task['func_name'] == 'updatecpwater':
+ if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
runningjob = 1
- request.session['job_id'] = 0
- else:
+ messages.info(request,'CP chart data have been updated')
+ remove_asynctask(request,task['id'])
+ elif 'fail' in task['status'].lower():
+ runningjob = 0
+ remove_asynctask(request,task[id])
+ messages.error(request,'Oh, your task failed')
+ elif 'started' in task['status'].lower():
+ messages.info(request,'Busy updating CP chart data')
runningjob = 1
- messages.info(request,'Your job is still running')
- except KeyError:
- pass
+ elif 'queued' in task['status'].lower() or 'pending' in task['status'].lower():
+ messages.info(request,'Getting ready to update CP chart data')
+ runningjob = 1
+
if not runningjob:
@@ -3131,6 +3279,10 @@ def otwrankings_view(request,theuser=0,
enddate=enddate
)
request.session['job_id'] = job.id
+ try:
+ request.session['async_tasks'] += [(job.id,'updatecpwater')]
+ except KeyError:
+ request.session['async_tasks'] = [(job.id,'updatecpwater')]
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({
@@ -3358,19 +3510,23 @@ def oterankings_view(request,theuser=0,
runningjob = 0
- try:
- jobid = request.session['job_id']
- if jobid:
- result = get_job_result(jobid)
- if result:
- messages.info(request,'Your calculation is ready')
+ taskstatus = get_stored_tasks_status(request)
+ for task in taskstatus:
+ if task['func_name'] == 'updatecp':
+ if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
runningjob = 1
- request.session['job_id'] = 0
- else:
+ messages.info(request,'CP chart data have been updated')
+ remove_asynctask(request,task['id'])
+ elif 'fail' in task['status'].lower():
+ runningjob = 0
+ remove_asynctask(request,task[id])
+ messages.error(request,'Oh, your task failed')
+ elif 'started' in task['status'].lower():
+ messages.info(request,'Busy updating CP chart data')
+ runningjob = 1
+ elif 'queued' in task['status'].lower():
+ messages.info(request,'Getting ready to update CP chart data')
runningjob = 1
- messages.info(request,'Your job is still running')
- except KeyError:
- pass
if not runningjob:
@@ -3380,7 +3536,11 @@ def oterankings_view(request,theuser=0,
enddate=enddate
)
request.session['job_id'] = job.id
- messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
+ try:
+ request.session['async_tasks'] += [(job.id,'updatecp')]
+ except KeyError:
+ request.session['async_tasks'] = [(job.id,'updatecp')]
+ messages.info(request,'New calculation queued.')
powerdf = pd.DataFrame({
'Delta':delta,
@@ -5504,20 +5664,24 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""):
emailaddress = u.email
if settings.DEBUG:
- res = handle_otwsetpower.delay(f1,boattype,weightvalue,
+ job = handle_otwsetpower.delay(f1,boattype,weightvalue,
first_name,last_name,
emailaddress,id,debug=True,
ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio)
else:
- res = queuelow.enqueue(handle_otwsetpower,f1,boattype,
+ job = queuelow.enqueue(handle_otwsetpower,f1,boattype,
weightvalue,
first_name,last_name,emailaddress,id,
ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio)
-
- successmessage = "Your calculations have been submitted. You will receive an email when they are done."
+ try:
+ request.session['async_tasks'] += [(job.id,'otwsetpower')]
+ except KeyError:
+ request.session['async_tasks'] = [(job.id,'otwsetpower')]
+
+ successmessage = 'Your calculations have been submitted. You will receive an email when they are done. You can check the status of your calculations here'
messages.info(request,successmessage)
kwargs = {
'id':int(id)}
@@ -7305,9 +7469,14 @@ def workout_add_chart_view(request,id,plotnr=1):
u = w.user.user
r = getrower(u)
title = w.name
- res = uploads.make_plot(r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr,
- imagename=imagename)
-
+ res,jobid = uploads.make_plot(
+ r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr,
+ imagename=imagename
+ )
+ try:
+ request.session['async_tasks'] += [(jobid,'make_plot')]
+ except KeyError:
+ request.session['async_tasks'] = [(jobid,'make_plot')]
try:
url = request.session['referer']
diff --git a/templates/basebase.html b/templates/basebase.html
index 96cd74d4..1379b2c4 100644
--- a/templates/basebase.html
+++ b/templates/basebase.html
@@ -9,6 +9,31 @@
+
+
+
{% analytical_head_top %}
{% if GOOGLE_ANALYTICS_PROPERTY_ID %}