From ac5500a1d832d8551531d5cdee6b7c1887f8e114 Mon Sep 17 00:00:00 2001 From: Sander Roosendaal Date: Wed, 1 Nov 2017 17:16:08 +0100 Subject: [PATCH] status progress bar works on develop --- rowers/longtask.py | 50 ++++++++--- rowers/tasks.py | 7 +- rowers/templates/async_tasks.html | 56 +++++++++++- rowers/templatetags/rowerfilters.py | 7 ++ rowers/utils.py | 3 + rowers/views.py | 129 ++++++++++++++++++++++------ rowsandall_app/settings.py | 1 + 7 files changed, 212 insertions(+), 41 deletions(-) diff --git a/rowers/longtask.py b/rowers/longtask.py index 0a22bf0a..b1ac3042 100644 --- a/rowers/longtask.py +++ b/rowers/longtask.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import numpy as np import time @@ -10,6 +11,24 @@ redis_connection = StrictRedis() import redis import threading +def getvalue(data): + perc = 0 + total = 1 + done = 0 + id = 0 + session_key = 'noot' + for i in data.iteritems(): + if i[0] == 'total': + total = float(i[1]) + if i[0] == 'done': + done = float(i[1]) + if i[0] == 'id': + id = i[1] + if i[0] == 'session_key': + session_key = i[1] + + return total,done,id,session_key + class Listener(threading.Thread): def __init__(self, r, channels): threading.Thread.__init__(self) @@ -18,8 +37,15 @@ class Listener(threading.Thread): self.pubsub.subscribe(channels) def work(self, item): - print item['channel'], ":", item['data'] - + try: + data = json.loads(item['data']) + total,done,id,session_key = getvalue(data) + perc = 100.*done/total + print perc, '%' + print session_key, id + except TypeError: + print "invalid data" + def run(self): for item in self.pubsub.listen(): if item['data'] == "KILL": @@ -30,13 +56,15 @@ class Listener(threading.Thread): self.work(item) -def longtask(aantal,jobid=None,debug=False): - if jobid: - if debug: - job = celery_result.AsyncResult(jobid) - else: - job = Job.fetch(jobid,connection=redis_connection) - counter = 0 +def longtask(aantal,jobid=None,debug=False, + session_key=None): + counter = 0 +# if jobid: +# if debug: +# job = celery_result.AsyncResult(jobid) +# else: +# job = Job.fetch(jobid,connection=redis_connection) + channel = 'tasks' for i in range(aantal): time.sleep(1) @@ -51,11 +79,11 @@ def longtask(aantal,jobid=None,debug=False): { 'done':i, 'total':aantal, - 'id':jobid, + 'id':jobid, + 'session_key':session_key, } )) - redis_connection.publish(channel,'KILL') return 1 diff --git a/rowers/tasks.py b/rowers/tasks.py index 03e77c27..ae656006 100644 --- a/rowers/tasks.py +++ b/rowers/tasks.py @@ -46,10 +46,11 @@ def add(x, y): @app.task(bind=True) -def long_test_task(self,aantal,debug=False,job=None): +def long_test_task(self,aantal,debug=False,job=None,session_key=None): job = self.request - print job.id - return longtask.longtask(aantal,jobid=job.id,debug=debug) + + return longtask.longtask(aantal,jobid=job.id,debug=debug, + session_key=session_key) # create workout @app.task diff --git a/rowers/templates/async_tasks.html b/rowers/templates/async_tasks.html index 1986acfb..dedf883d 100644 --- a/rowers/templates/async_tasks.html +++ b/rowers/templates/async_tasks.html @@ -4,6 +4,53 @@ {% block title %}Rowsandall - Tasks {% endblock %} +{% block meta %} + + + + + +{% endblock %} + {% block content %} @@ -16,7 +63,8 @@ ID - Task + Task + Progress Status Action @@ -31,6 +79,12 @@ {{ task|lookup:'verbose' }} +
+
{{ task|lookup:'progress' }}
+
+ + + {{ task|lookup:'status' }} {% if task|lookup:'failed' %} diff --git a/rowers/templatetags/rowerfilters.py b/rowers/templatetags/rowerfilters.py index 123faa9e..eebaaaf3 100644 --- a/rowers/templatetags/rowerfilters.py +++ b/rowers/templatetags/rowerfilters.py @@ -1,6 +1,8 @@ from django import template +from django.utils.safestring import mark_safe from time import strftime import dateutil.parser +import json register = template.Library() @@ -65,6 +67,11 @@ def deltatimeprint(d): return strfdeltah(d) +@register.filter(is_safe=True) +def jsdict(dict,key): + s = dict.get(key) + return mark_safe(json.dumps(s)) + @register.filter def lookup(dict, key): s = dict.get(key) diff --git a/rowers/utils.py b/rowers/utils.py index b6b81eec..03c59140 100644 --- a/rowers/utils.py +++ b/rowers/utils.py @@ -5,6 +5,7 @@ import colorsys from django.conf import settings + lbstoN = 4.44822 landingpages = ( @@ -226,3 +227,5 @@ def myqueue(queue,function,*args,**kwargs): job = queue.enqueue(function,*args,**kwargs) return job + + diff --git a/rowers/views.py b/rowers/views.py index 9fdbce12..adfe26fd 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -14,6 +14,9 @@ from django.views.generic.base import TemplateView from django.db.models import Q from django import template from django.db import IntegrityError, transaction +#from django.contrib.sessions.backends.db import SessionStore +from importlib import import_module +from django.contrib.sessions.models import Session from django.shortcuts import render from django.http import ( HttpResponse, HttpResponseRedirect, @@ -130,17 +133,71 @@ import django_rq queue = django_rq.get_queue('default') queuelow = django_rq.get_queue('low') queuehigh = django_rq.get_queue('low') - + +import redis +import threading from redis import StrictRedis,Redis from rq.exceptions import NoSuchJobError from rq.registry import StartedJobRegistry from rq import Queue,cancel_job +from django.core.cache import cache + +# Redis related +session_engine = import_module(settings.SESSION_ENGINE) + + +def getvalue(data): + perc = 0 + total = 1 + done = 0 + id = 0 + session_key = 'noot' + for i in data.iteritems(): + if i[0] == 'total': + total = float(i[1]) + if i[0] == 'done': + done = float(i[1]) + if i[0] == 'id': + id = i[1] + if i[0] == 'session_key': + session_key = i[1] + + return total,done,id,session_key + +class SessionTaskListener(threading.Thread): + def __init__(self, r, channels): + threading.Thread.__init__(self) + self.redis = r + self.pubsub = self.redis.pubsub() + self.pubsub.subscribe(channels) + + def work(self, item): + + try: + data = json.loads(item['data']) + total,done,id,session_key = getvalue(data) + perc = 100.*done/total + cache.set(id,perc) + + except TypeError: + pass + + def run(self): + for item in self.pubsub.listen(): + if item['data'] == "KILL": + self.pubsub.unsubscribe() + print self, "unsubscribed and finished" + break + else: + self.work(item) + + queuefailed = Queue("failed",connection=Redis()) redis_connection = StrictRedis() r = Redis() -from .longtask import Listener -client = Listener(r,['tasks']) + +client = SessionTaskListener(r,['tasks']) client.start() @@ -187,9 +244,8 @@ def remove_asynctask(request,id): newtasks = [] for task in oldtasks: - print task[0] if id not in task[0]: - newtasks += [(task[0],task[1])] + newtasks += [(task[0],task[1],task[2])] request.session['async_tasks'] = newtasks @@ -223,8 +279,6 @@ def get_job_status(jobid): if settings.DEBUG: job = celery_result.AsyncResult(jobid) jobresult = job.result - channel = 'task:<'+job.id+'>:progress' - channel = 'noot' if 'fail' in job.status.lower(): jobresult = '0' @@ -271,6 +325,7 @@ def kill_async_job(request,id='aap'): pass remove_asynctask(request,id) + cache.delete(id) url = reverse(session_jobs_status) return HttpResponseRedirect(url) @@ -278,11 +333,16 @@ def kill_async_job(request,id='aap'): @login_required() def test_job_view(request,aantal=100): - job = myqueue(queuehigh,long_test_task,int(aantal)) + session_key = request.session._session_key + + job = myqueue(queuehigh,long_test_task,int(aantal), + session_key=session_key) + + try: - request.session['async_tasks'] += [(job.id,'long_test_task')] + request.session['async_tasks'] += [(job.id,'long_test_task',0)] except KeyError: - request.session['async_tasks'] = [(job.id,'long_test_task')] + request.session['async_tasks'] = [(job.id,'long_test_task',0)] url = reverse(session_jobs_status) @@ -333,15 +393,32 @@ def get_stored_tasks_status(request): except KeyError: taskids = [] - taskstatus = [{ - 'id':id, - 'status':get_job_status(id)['status'], - 'failed':get_job_status(id)['failed'], - 'finished':get_job_status(id)['finished'], - 'func_name':func_name, - 'verbose': verbose_job_status[func_name] - } for id,func_name in taskids] + taskstatus = [] + for id,func_name,session_progress in taskids: + progress = 0 + cached_progress = cache.get(id) + finished = get_job_status(id)['finished'] + if finished: + cache.set(id,100) + progress = 100 + elif cached_progress>0: + progress = cached_progress + else: + progress = session_progress + this_task_status = { + 'id':id, + 'status':get_job_status(id)['status'], + 'failed':get_job_status(id)['failed'], + 'finished':get_job_status(id)['finished'], + 'func_name':func_name, + 'verbose': verbose_job_status[func_name], + 'progress': progress, + } + + taskstatus.append(this_task_status) + + return taskstatus @login_required() @@ -3268,9 +3345,9 @@ def otwrankings_view(request,theuser=0, ) request.session['job_id'] = job.id try: - request.session['async_tasks'] += [(job.id,'updatecpwater')] + request.session['async_tasks'] += [(job.id,'updatecpwater',0)] except KeyError: - request.session['async_tasks'] = [(job.id,'updatecpwater')] + request.session['async_tasks'] = [(job.id,'updatecpwater',0)] messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') powerdf = pd.DataFrame({ @@ -3525,9 +3602,9 @@ def oterankings_view(request,theuser=0, ) request.session['job_id'] = job.id try: - request.session['async_tasks'] += [(job.id,'updatecp')] + request.session['async_tasks'] += [(job.id,'updatecp',0)] except KeyError: - request.session['async_tasks'] = [(job.id,'updatecp')] + request.session['async_tasks'] = [(job.id,'updatecp',0)] messages.info(request,'New calculation queued.') powerdf = pd.DataFrame({ @@ -5659,9 +5736,9 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""): ratio=r.cpratio) try: - request.session['async_tasks'] += [(job.id,'otwsetpower')] + request.session['async_tasks'] += [(job.id,'otwsetpower',0)] except KeyError: - request.session['async_tasks'] = [(job.id,'otwsetpower')] + request.session['async_tasks'] = [(job.id,'otwsetpower',0)] successmessage = 'Your calculations have been submitted. You will receive an email when they are done. You can check the status of your calculations here' messages.info(request,successmessage) @@ -7437,9 +7514,9 @@ def workout_add_chart_view(request,id,plotnr=1): imagename=imagename ) try: - request.session['async_tasks'] += [(jobid,'make_plot')] + request.session['async_tasks'] += [(jobid,'make_plot',0)] except KeyError: - request.session['async_tasks'] = [(jobid,'make_plot')] + request.session['async_tasks'] = [(jobid,'make_plot',0)] try: url = request.session['referer'] diff --git a/rowsandall_app/settings.py b/rowsandall_app/settings.py index beadcf86..7f8a63b2 100644 --- a/rowsandall_app/settings.py +++ b/rowsandall_app/settings.py @@ -287,6 +287,7 @@ RQ_QUEUES = { #SESSION_ENGINE = "django.contrib.sessions.backends.signed_cookies" #SESSION_ENGINE = "django.contrib.sessions.backends.cached_db" SESSION_ENGINE = "django.contrib.sessions.backends.cache" +SESSION_SAVE_EVERY_REQUEST = True # admin stuff for error reporting SERVER_EMAIL='admin@rowsandall.com'