From 537b7392e62806f2d272fe2453fc55ac4135852d Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Mon, 30 Oct 2017 17:27:13 +0100 Subject: [PATCH] works on development - needs testing on production --- rowers/templates/async_tasks.html | 56 ++++++++ rowers/urls.py | 3 + rowers/views.py | 219 ++++++++++++++++++++++++++---- 3 files changed, 250 insertions(+), 28 deletions(-) create mode 100644 rowers/templates/async_tasks.html diff --git a/rowers/templates/async_tasks.html b/rowers/templates/async_tasks.html new file mode 100644 index 00000000..0828105d --- /dev/null +++ b/rowers/templates/async_tasks.html @@ -0,0 +1,56 @@ +{% extends "base.html" %} +{% load staticfiles %} +{% load rowerfilters %} + +{% block title %}Rowsandall - Tasks {% endblock %} + +{% block content %} + +

Your Tasks Status

+

Manage the asynchronous tasks running for you

+ + +
+ + + + + + + + + + + {% for task in taskstatus %} + + + + + {% if task|lookup:'failed' %} + + {% elif task|lookup:'finished' %} + + {% else %} + + {% endif %} + + {% endfor %} + +
IDTaskStatusAction
+ {{ task|lookup:'id' }} + + {{ task|lookup:'verbose' }} + + {{ task|lookup:'status' }} + + Remove + + Remove + +   +
+
+ + + +{% endblock %} diff --git a/rowers/urls.py b/rowers/urls.py index 9a4653f9..84a78222 100644 --- a/rowers/urls.py +++ b/rowers/urls.py @@ -138,6 +138,9 @@ urlpatterns = [ url(r'^user-multiflex-select/user/(?P\d+)/$',views.user_multiflex_select), url(r'^user-multiflex-select/(?P\w+.*)/(?P\w+.*)$',views.user_multiflex_select), url(r'^user-multiflex-select/$',views.user_multiflex_select), + url(r'^list-jobs/$',views.session_jobs_view), + url(r'^jobs-status/$',views.session_jobs_status), + url(r'^job-kill/(?P.*)$',views.kill_async_job), url(r'^list-graphs/$',views.graphs_view), url(r'^(?P\d+)/ote-bests/(?P\w+.*)/(?P\w+.*)$',views.rankings_view), url(r'^(?P\d+)/ote-bests/(?P\d+)$',views.rankings_view), diff --git a/rowers/views.py b/rowers/views.py index e44ba678..fdcb429e 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -130,14 +130,17 @@ import django_rq queue = django_rq.get_queue('default') queuelow = django_rq.get_queue('low') queuehigh = django_rq.get_queue('low') - -from redis import StrictRedis + +from redis import StrictRedis,Redis from rq.exceptions import NoSuchJobError from rq.registry import StartedJobRegistry -from rq import Queue +from rq import Queue,cancel_job +queuefailed = Queue("failed",connection=Redis()) redis_connection = StrictRedis() rq_registry = StartedJobRegistry(queue.name,connection=redis_connection) +rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection) +rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection) from rq.job import Job @@ -170,6 +173,20 @@ from rowers.celery import result as celery_result # Define the API documentation schema_view = get_swagger_view(title='Rowsandall API') +def remove_asynctask(request,id): + try: + oldtasks = request.session['async_tasks'] + except KeyError: + oldtasks = [] + + newtasks = [] + for task in oldtasks: + print task[0] + if id not in task[0]: + newtasks += [(task[0],task[1])] + + request.session['async_tasks'] = newtasks + def get_job_result(jobid): if settings.DEBUG: result = celery_result.AsyncResult(jobid).result @@ -188,6 +205,131 @@ def get_job_result(jobid): return result +verbose_job_status = { + 'updatecp': 'Critical Power Calculation for Ergometer Workouts', + 'updatecpwater': 'Critical Power Calculation for OTW Workouts', + 'otwsetpower': 'Rowing Physics OTW Power Calculation' + } + +def get_job_status(jobid): + if settings.DEBUG: + job = celery_result.AsyncResult(jobid) + jobresult = job.result + if 'fail' in job.status.lower(): + jobresult = '0' + summary = { + 'status': job.status, + 'result': jobresult, + } + else: + try: + job = Job.fetch(jobid,connection=redis_connection) + summary = { + 'status':job.status, + 'result':job.result + } + except NoSuchJobError: + summary = {} + + + if 'fail' in summary['status'].lower(): + summary['failed'] = True + else: + summary['failed'] = False + + if 'success' in summary['status'].lower(): + summary['finished'] = True + elif 'finished' in summary['status'].lower(): + summary['finished'] = True + else: + summary['finished'] = False + + return summary + +def kill_async_job(request,id='aap'): + if settings.DEBUG: + job = celery_result.AsyncResult(id) + job.revoke() + remove_asynctask(request,id) + else: + try: + cancel_job(id) + except NoSuchJobError: + pass + + url = reverse(session_jobs_status) + + return HttpResponseRedirect(url) + + +def get_all_queued_jobs(userid=0): + r = StrictRedis() + + jobs = [] + + celerykeys = r.keys('celery*') + for key in celerykeys: + id= key[17:] + job = celery_result.AsyncResult(id) + jobresult = job.result + if 'fail' in job.status.lower(): + jobresult = '0' + jobs.append( + (id,{ + 'status':job.status, + 'result':jobresult, + 'function':'', + 'meta':job.info, + })) + + ids = [j.id for j in queue.jobs] + ids += [j.id for j in queuehigh.jobs] + ids += [j.id for j in queuelow.jobs] + ids += [j.id for j in queuefailed.jobs] + + + for id in ids: + job = Job.fetch(id,connection=redis_connection) + jobs.append( + (id,{ + 'status':job.get_status(), + 'result':job.result, + 'function':job.func_name, + 'meta':job.meta, + })) + + return jobs + +def get_stored_tasks_status(request): + try: + taskids = request.session['async_tasks'] + except KeyError: + taskids = [] + + taskstatus = [{ + 'id':id, + 'status':get_job_status(id)['status'], + 'failed':get_job_status(id)['failed'], + 'finished':get_job_status(id)['finished'], + 'func_name':func_name, + 'verbose': verbose_job_status[func_name] + } for id,func_name in taskids] + + return taskstatus + +@login_required() +def session_jobs_view(request): + taskstatus = get_stored_tasks_status(request) + + return HttpResponse(json.dumps(taskstatus)) + +@login_required() +def session_jobs_status(request): + taskstatus = get_stored_tasks_status(request) + + return render(request, + 'async_tasks.html', + {'taskstatus':taskstatus}) # Test if row data include candidates def rowhascoordinates(row): @@ -3109,19 +3251,24 @@ def otwrankings_view(request,theuser=0, runningjob = 0 - try: - jobid = request.session['job_id'] - if jobid: - result = get_job_result(jobid) - if result: - messages.info(request,'Your calculation is ready') + taskstatus = get_stored_tasks_status(request) + for task in taskstatus: + if task['func_name'] == 'updatecpwater': + if 'success' in task['status'].lower() or 'finished' in task['status'].lower(): runningjob = 1 - request.session['job_id'] = 0 - else: + messages.info(request,'CP chart data have been updated') + remove_asynctask(request,task['id']) + elif 'fail' in task['status'].lower(): + runningjob = 0 + remove_asynctask(request,task[id]) + messages.error(request,'Oh, your task failed') + elif 'started' in task['status'].lower(): + messages.info(request,'Busy updating CP chart data') runningjob = 1 - messages.info(request,'Your job is still running') - except KeyError: - pass + elif 'queued' in task['status'].lower() or 'pending' in task['status'].lower(): + messages.info(request,'Getting ready to update CP chart data') + runningjob = 1 + if not runningjob: @@ -3131,6 +3278,10 @@ def otwrankings_view(request,theuser=0, enddate=enddate ) request.session['job_id'] = job.id + try: + request.session['async_tasks'] += [(job.id,'updatecpwater')] + except KeyError: + request.session['async_tasks'] = [(job.id,'updatecpwater')] messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') powerdf = pd.DataFrame({ @@ -3358,19 +3509,23 @@ def oterankings_view(request,theuser=0, runningjob = 0 - try: - jobid = request.session['job_id'] - if jobid: - result = get_job_result(jobid) - if result: - messages.info(request,'Your calculation is ready') + taskstatus = get_stored_tasks_status(request) + for task in taskstatus: + if task['func_name'] == 'updatecp': + if 'success' in task['status'].lower() or 'finished' in task['status'].lower(): runningjob = 1 - request.session['job_id'] = 0 - else: + messages.info(request,'CP chart data have been updated') + remove_asynctask(request,task['id']) + elif 'fail' in task['status'].lower(): + runningjob = 0 + remove_asynctask(request,task[id]) + messages.error(request,'Oh, your task failed') + elif 'started' in task['status'].lower(): + messages.info(request,'Busy updating CP chart data') + runningjob = 1 + elif 'queued' in task['status'].lower(): + messages.info(request,'Getting ready to update CP chart data') runningjob = 1 - messages.info(request,'Your job is still running') - except KeyError: - pass if not runningjob: @@ -3380,6 +3535,10 @@ def oterankings_view(request,theuser=0, enddate=enddate ) request.session['job_id'] = job.id + try: + request.session['async_tasks'] += [(job.id,'updatecp')] + except KeyError: + request.session['async_tasks'] = [(job.id,'updatecp')] messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') powerdf = pd.DataFrame({ @@ -5504,19 +5663,23 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""): emailaddress = u.email if settings.DEBUG: - res = handle_otwsetpower.delay(f1,boattype,weightvalue, + job = handle_otwsetpower.delay(f1,boattype,weightvalue, first_name,last_name, emailaddress,id,debug=True, ps=[r.p0,r.p1,r.p2,r.p3], ratio=r.cpratio) else: - res = queuelow.enqueue(handle_otwsetpower,f1,boattype, + job = queuelow.enqueue(handle_otwsetpower,f1,boattype, weightvalue, first_name,last_name,emailaddress,id, ps=[r.p0,r.p1,r.p2,r.p3], ratio=r.cpratio) - + try: + request.session['async_tasks'] += [(job.id,'otwsetpower')] + except KeyError: + request.session['async_tasks'] = [(job.id,'otwsetpower')] + successmessage = "Your calculations have been submitted. You will receive an email when they are done." messages.info(request,successmessage) kwargs = {