From 537b7392e62806f2d272fe2453fc55ac4135852d Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Mon, 30 Oct 2017 17:27:13 +0100 Subject: [PATCH 1/4] works on development - needs testing on production --- rowers/templates/async_tasks.html | 56 ++++++++ rowers/urls.py | 3 + rowers/views.py | 219 ++++++++++++++++++++++++++---- 3 files changed, 250 insertions(+), 28 deletions(-) create mode 100644 rowers/templates/async_tasks.html diff --git a/rowers/templates/async_tasks.html b/rowers/templates/async_tasks.html new file mode 100644 index 00000000..0828105d --- /dev/null +++ b/rowers/templates/async_tasks.html @@ -0,0 +1,56 @@ +{% extends "base.html" %} +{% load staticfiles %} +{% load rowerfilters %} + +{% block title %}Rowsandall - Tasks {% endblock %} + +{% block content %} + +

Your Tasks Status

+

Manage the asynchronous tasks running for you

+ + +
+ + + + + + + + + + + {% for task in taskstatus %} + + + + + {% if task|lookup:'failed' %} + + {% elif task|lookup:'finished' %} + + {% else %} + + {% endif %} + + {% endfor %} + +
IDTaskStatusAction
+ {{ task|lookup:'id' }} + + {{ task|lookup:'verbose' }} + + {{ task|lookup:'status' }} + + Remove + + Remove + +   +
+
+ + + +{% endblock %} diff --git a/rowers/urls.py b/rowers/urls.py index 9a4653f9..84a78222 100644 --- a/rowers/urls.py +++ b/rowers/urls.py @@ -138,6 +138,9 @@ urlpatterns = [ url(r'^user-multiflex-select/user/(?P\d+)/$',views.user_multiflex_select), url(r'^user-multiflex-select/(?P\w+.*)/(?P\w+.*)$',views.user_multiflex_select), url(r'^user-multiflex-select/$',views.user_multiflex_select), + url(r'^list-jobs/$',views.session_jobs_view), + url(r'^jobs-status/$',views.session_jobs_status), + url(r'^job-kill/(?P.*)$',views.kill_async_job), url(r'^list-graphs/$',views.graphs_view), url(r'^(?P\d+)/ote-bests/(?P\w+.*)/(?P\w+.*)$',views.rankings_view), url(r'^(?P\d+)/ote-bests/(?P\d+)$',views.rankings_view), diff --git a/rowers/views.py b/rowers/views.py index e44ba678..fdcb429e 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -130,14 +130,17 @@ import django_rq queue = django_rq.get_queue('default') queuelow = django_rq.get_queue('low') queuehigh = django_rq.get_queue('low') - -from redis import StrictRedis + +from redis import StrictRedis,Redis from rq.exceptions import NoSuchJobError from rq.registry import StartedJobRegistry -from rq import Queue +from rq import Queue,cancel_job +queuefailed = Queue("failed",connection=Redis()) redis_connection = StrictRedis() rq_registry = StartedJobRegistry(queue.name,connection=redis_connection) +rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection) +rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection) from rq.job import Job @@ -170,6 +173,20 @@ from rowers.celery import result as celery_result # Define the API documentation schema_view = get_swagger_view(title='Rowsandall API') +def remove_asynctask(request,id): + try: + oldtasks = request.session['async_tasks'] + except KeyError: + oldtasks = [] + + newtasks = [] + for task in oldtasks: + print task[0] + if id not in task[0]: + newtasks += [(task[0],task[1])] + + request.session['async_tasks'] = newtasks + def get_job_result(jobid): if settings.DEBUG: result = celery_result.AsyncResult(jobid).result @@ -188,6 +205,131 @@ def get_job_result(jobid): return result +verbose_job_status = { + 'updatecp': 'Critical Power Calculation for Ergometer Workouts', + 'updatecpwater': 'Critical Power Calculation for OTW Workouts', + 'otwsetpower': 'Rowing Physics OTW Power Calculation' + } + +def get_job_status(jobid): + if settings.DEBUG: + job = celery_result.AsyncResult(jobid) + jobresult = job.result + if 'fail' in job.status.lower(): + jobresult = '0' + summary = { + 'status': job.status, + 'result': jobresult, + } + else: + try: + job = Job.fetch(jobid,connection=redis_connection) + summary = { + 'status':job.status, + 'result':job.result + } + except NoSuchJobError: + summary = {} + + + if 'fail' in summary['status'].lower(): + summary['failed'] = True + else: + summary['failed'] = False + + if 'success' in summary['status'].lower(): + summary['finished'] = True + elif 'finished' in summary['status'].lower(): + summary['finished'] = True + else: + summary['finished'] = False + + return summary + +def kill_async_job(request,id='aap'): + if settings.DEBUG: + job = celery_result.AsyncResult(id) + job.revoke() + remove_asynctask(request,id) + else: + try: + cancel_job(id) + except NoSuchJobError: + pass + + url = reverse(session_jobs_status) + + return HttpResponseRedirect(url) + + +def get_all_queued_jobs(userid=0): + r = StrictRedis() + + jobs = [] + + celerykeys = r.keys('celery*') + for key in celerykeys: + id= key[17:] + job = celery_result.AsyncResult(id) + jobresult = job.result + if 'fail' in job.status.lower(): + jobresult = '0' + jobs.append( + (id,{ + 'status':job.status, + 'result':jobresult, + 'function':'', + 'meta':job.info, + })) + + ids = [j.id for j in queue.jobs] + ids += [j.id for j in queuehigh.jobs] + ids += [j.id for j in queuelow.jobs] + ids += [j.id for j in queuefailed.jobs] + + + for id in ids: + job = Job.fetch(id,connection=redis_connection) + jobs.append( + (id,{ + 'status':job.get_status(), + 'result':job.result, + 'function':job.func_name, + 'meta':job.meta, + })) + + return jobs + +def get_stored_tasks_status(request): + try: + taskids = request.session['async_tasks'] + except KeyError: + taskids = [] + + taskstatus = [{ + 'id':id, + 'status':get_job_status(id)['status'], + 'failed':get_job_status(id)['failed'], + 'finished':get_job_status(id)['finished'], + 'func_name':func_name, + 'verbose': verbose_job_status[func_name] + } for id,func_name in taskids] + + return taskstatus + +@login_required() +def session_jobs_view(request): + taskstatus = get_stored_tasks_status(request) + + return HttpResponse(json.dumps(taskstatus)) + +@login_required() +def session_jobs_status(request): + taskstatus = get_stored_tasks_status(request) + + return render(request, + 'async_tasks.html', + {'taskstatus':taskstatus}) # Test if row data include candidates def rowhascoordinates(row): @@ -3109,19 +3251,24 @@ def otwrankings_view(request,theuser=0, runningjob = 0 - try: - jobid = request.session['job_id'] - if jobid: - result = get_job_result(jobid) - if result: - messages.info(request,'Your calculation is ready') + taskstatus = get_stored_tasks_status(request) + for task in taskstatus: + if task['func_name'] == 'updatecpwater': + if 'success' in task['status'].lower() or 'finished' in task['status'].lower(): runningjob = 1 - request.session['job_id'] = 0 - else: + messages.info(request,'CP chart data have been updated') + remove_asynctask(request,task['id']) + elif 'fail' in task['status'].lower(): + runningjob = 0 + remove_asynctask(request,task[id]) + messages.error(request,'Oh, your task failed') + elif 'started' in task['status'].lower(): + messages.info(request,'Busy updating CP chart data') runningjob = 1 - messages.info(request,'Your job is still running') - except KeyError: - pass + elif 'queued' in task['status'].lower() or 'pending' in task['status'].lower(): + messages.info(request,'Getting ready to update CP chart data') + runningjob = 1 + if not runningjob: @@ -3131,6 +3278,10 @@ def otwrankings_view(request,theuser=0, enddate=enddate ) request.session['job_id'] = job.id + try: + request.session['async_tasks'] += [(job.id,'updatecpwater')] + except KeyError: + request.session['async_tasks'] = [(job.id,'updatecpwater')] messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') powerdf = pd.DataFrame({ @@ -3358,19 +3509,23 @@ def oterankings_view(request,theuser=0, runningjob = 0 - try: - jobid = request.session['job_id'] - if jobid: - result = get_job_result(jobid) - if result: - messages.info(request,'Your calculation is ready') + taskstatus = get_stored_tasks_status(request) + for task in taskstatus: + if task['func_name'] == 'updatecp': + if 'success' in task['status'].lower() or 'finished' in task['status'].lower(): runningjob = 1 - request.session['job_id'] = 0 - else: + messages.info(request,'CP chart data have been updated') + remove_asynctask(request,task['id']) + elif 'fail' in task['status'].lower(): + runningjob = 0 + remove_asynctask(request,task[id]) + messages.error(request,'Oh, your task failed') + elif 'started' in task['status'].lower(): + messages.info(request,'Busy updating CP chart data') + runningjob = 1 + elif 'queued' in task['status'].lower(): + messages.info(request,'Getting ready to update CP chart data') runningjob = 1 - messages.info(request,'Your job is still running') - except KeyError: - pass if not runningjob: @@ -3380,6 +3535,10 @@ def oterankings_view(request,theuser=0, enddate=enddate ) request.session['job_id'] = job.id + try: + request.session['async_tasks'] += [(job.id,'updatecp')] + except KeyError: + request.session['async_tasks'] = [(job.id,'updatecp')] messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') powerdf = pd.DataFrame({ @@ -5504,19 +5663,23 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""): emailaddress = u.email if settings.DEBUG: - res = handle_otwsetpower.delay(f1,boattype,weightvalue, + job = handle_otwsetpower.delay(f1,boattype,weightvalue, first_name,last_name, emailaddress,id,debug=True, ps=[r.p0,r.p1,r.p2,r.p3], ratio=r.cpratio) else: - res = queuelow.enqueue(handle_otwsetpower,f1,boattype, + job = queuelow.enqueue(handle_otwsetpower,f1,boattype, weightvalue, first_name,last_name,emailaddress,id, ps=[r.p0,r.p1,r.p2,r.p3], ratio=r.cpratio) - + try: + request.session['async_tasks'] += [(job.id,'otwsetpower')] + except KeyError: + request.session['async_tasks'] = [(job.id,'otwsetpower')] + successmessage = "Your calculations have been submitted. You will receive an email when they are done." messages.info(request,successmessage) kwargs = { From 5fe6db02dcd6617811afa9d89f68278aec6f346e Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Mon, 30 Oct 2017 17:33:58 +0100 Subject: [PATCH 2/4] bug fix --- rowers/views.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rowers/views.py b/rowers/views.py index fdcb429e..8b5d4f7a 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -250,13 +250,13 @@ def kill_async_job(request,id='aap'): if settings.DEBUG: job = celery_result.AsyncResult(id) job.revoke() - remove_asynctask(request,id) else: try: - cancel_job(id) + cancel_job(id,connect=redis_connection) except NoSuchJobError: pass + remove_asynctask(request,id) url = reverse(session_jobs_status) return HttpResponseRedirect(url) From 21be4d278b14cf4b4127d0655548775b7783aabe Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Mon, 30 Oct 2017 21:22:07 +0100 Subject: [PATCH 3/4] bux fix --- rowers/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rowers/views.py b/rowers/views.py index 8b5d4f7a..c0c6ab01 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -5680,7 +5680,7 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""): except KeyError: request.session['async_tasks'] = [(job.id,'otwsetpower')] - successmessage = "Your calculations have been submitted. You will receive an email when they are done." + successmessage = 'Your calculations have been submitted. You will receive an email when they are done. You can check the status of your calculations here' messages.info(request,successmessage) kwargs = { 'id':int(id)} From 98abc68289ac606573e2e09ab94aaa95f7b16dd9 Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Mon, 30 Oct 2017 22:32:34 +0100 Subject: [PATCH 4/4] with jquery to refresh when tasks are complete jquery lives in basebase.html. Perhaps should live in individual pages --- rowers/management/commands/processemail.py | 2 +- rowers/templates/async_tasks.html | 1 + rowers/uploads.py | 6 +++--- rowers/views.py | 16 +++++++++----- templates/basebase.html | 25 ++++++++++++++++++++++ 5 files changed, 41 insertions(+), 9 deletions(-) diff --git a/rowers/management/commands/processemail.py b/rowers/management/commands/processemail.py index a0413753..e4f88e28 100644 --- a/rowers/management/commands/processemail.py +++ b/rowers/management/commands/processemail.py @@ -65,7 +65,7 @@ def processattachment(rower, fileobj, title, uploadoptions): workoutcsvfilename = workout.csvfilename[6:-4] timestr = strftime("%Y%m%d-%H%M%S") imagename = workoutcsvfilename + timestr + '.png' - result = uploads.make_plot( + result,jobid = uploads.make_plot( workout.user, workout, workoutcsvfilename, workout.csvfilename, plottype, title, diff --git a/rowers/templates/async_tasks.html b/rowers/templates/async_tasks.html index 0828105d..1986acfb 100644 --- a/rowers/templates/async_tasks.html +++ b/rowers/templates/async_tasks.html @@ -6,6 +6,7 @@ {% block content %} +

Your Tasks Status

Manage the asynchronous tasks running for you

diff --git a/rowers/uploads.py b/rowers/uploads.py index 578d1807..a6fab4a1 100644 --- a/rowers/uploads.py +++ b/rowers/uploads.py @@ -175,11 +175,11 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0): if settings.DEBUG: - res = handle_makeplot.delay(f1,f2,title, + job = handle_makeplot.delay(f1,f2,title, hrpwrdata,plotnr, imagename) else: - res = queue.enqueue(handle_makeplot,f1,f2, + job = queue.enqueue(handle_makeplot,f1,f2, title,hrpwrdata, plotnr,imagename) @@ -195,7 +195,7 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0): width=width,height=height) i.save() - return i.id + return i.id,job.id import c2stuff,stravastuff,sporttracksstuff,runkeeperstuff import underarmourstuff,tpstuff diff --git a/rowers/views.py b/rowers/views.py index c0c6ab01..4cd08a09 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -208,7 +208,8 @@ def get_job_result(jobid): verbose_job_status = { 'updatecp': 'Critical Power Calculation for Ergometer Workouts', 'updatecpwater': 'Critical Power Calculation for OTW Workouts', - 'otwsetpower': 'Rowing Physics OTW Power Calculation' + 'otwsetpower': 'Rowing Physics OTW Power Calculation', + 'make_plot': 'Create static chart', } def get_job_status(jobid): @@ -3539,7 +3540,7 @@ def oterankings_view(request,theuser=0, request.session['async_tasks'] += [(job.id,'updatecp')] except KeyError: request.session['async_tasks'] = [(job.id,'updatecp')] - messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') + messages.info(request,'New calculation queued.') powerdf = pd.DataFrame({ 'Delta':delta, @@ -7468,9 +7469,14 @@ def workout_add_chart_view(request,id,plotnr=1): u = w.user.user r = getrower(u) title = w.name - res = uploads.make_plot(r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr, - imagename=imagename) - + res,jobid = uploads.make_plot( + r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr, + imagename=imagename + ) + try: + request.session['async_tasks'] += [(jobid,'make_plot')] + except KeyError: + request.session['async_tasks'] = [(jobid,'make_plot')] try: url = request.session['referer'] diff --git a/templates/basebase.html b/templates/basebase.html index 96cd74d4..1379b2c4 100644 --- a/templates/basebase.html +++ b/templates/basebase.html @@ -9,6 +9,31 @@ + + + {% analytical_head_top %} {% if GOOGLE_ANALYTICS_PROPERTY_ID %}