diff --git a/rowers/longtask.py b/rowers/longtask.py new file mode 100644 index 00000000..0a22bf0a --- /dev/null +++ b/rowers/longtask.py @@ -0,0 +1,61 @@ +import numpy as np +import time + +from redis import StrictRedis,Redis +from celery import result as celery_result +import json + +redis_connection = StrictRedis() + +import redis +import threading + +class Listener(threading.Thread): + def __init__(self, r, channels): + threading.Thread.__init__(self) + self.redis = r + self.pubsub = self.redis.pubsub() + self.pubsub.subscribe(channels) + + def work(self, item): + print item['channel'], ":", item['data'] + + def run(self): + for item in self.pubsub.listen(): + if item['data'] == "KILL": + self.pubsub.unsubscribe() + print self, "unsubscribed and finished" + break + else: + self.work(item) + + +def longtask(aantal,jobid=None,debug=False): + if jobid: + if debug: + job = celery_result.AsyncResult(jobid) + else: + job = Job.fetch(jobid,connection=redis_connection) + counter = 0 + channel = 'tasks' + for i in range(aantal): + time.sleep(1) + counter += 1 + if counter > 10: + counter = 0 + if debug: + progress = 100.*i/aantal + print progress + if jobid != None: + redis_connection.publish(channel,json.dumps( + { + 'done':i, + 'total':aantal, + 'id':jobid, + } + )) + + + redis_connection.publish(channel,'KILL') + + return 1 diff --git a/rowers/tasks.py b/rowers/tasks.py index 237f9be4..03e77c27 100644 --- a/rowers/tasks.py +++ b/rowers/tasks.py @@ -35,6 +35,8 @@ from django.db.utils import OperationalError import datautils import utils +import longtask + # testing task @@ -42,9 +44,14 @@ import utils def add(x, y): return x + y + +@app.task(bind=True) +def long_test_task(self,aantal,debug=False,job=None): + job = self.request + print job.id + return longtask.longtask(aantal,jobid=job.id,debug=debug) + # create workout - - @app.task def handle_new_workout_from_file(r, f2, workouttype='rower', diff --git a/rowers/urls.py b/rowers/urls.py index 84a78222..f592717a 100644 --- a/rowers/urls.py +++ b/rowers/urls.py @@ -141,6 +141,7 @@ urlpatterns = [ url(r'^list-jobs/$',views.session_jobs_view), url(r'^jobs-status/$',views.session_jobs_status), url(r'^job-kill/(?P.*)$',views.kill_async_job), + url(r'^test-job/(?P\d+)$',views.test_job_view), url(r'^list-graphs/$',views.graphs_view), url(r'^(?P\d+)/ote-bests/(?P\w+.*)/(?P\w+.*)$',views.rankings_view), url(r'^(?P\d+)/ote-bests/(?P\d+)$',views.rankings_view), diff --git a/rowers/views.py b/rowers/views.py index 0ad77a0f..9fdbce12 100644 --- a/rowers/views.py +++ b/rowers/views.py @@ -99,7 +99,7 @@ from rowers.tasks import handle_makeplot,handle_otwsetpower,handle_sendemailtcx, from rowers.tasks import ( handle_sendemail_unrecognized,handle_sendemailnewcomment, handle_sendemailnewresponse, handle_updatedps, - handle_updatecp + handle_updatecp,long_test_task ) from scipy.signal import savgol_filter @@ -138,6 +138,12 @@ from rq import Queue,cancel_job queuefailed = Queue("failed",connection=Redis()) redis_connection = StrictRedis() +r = Redis() +from .longtask import Listener +client = Listener(r,['tasks']) +client.start() + + rq_registry = StartedJobRegistry(queue.name,connection=redis_connection) rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection) rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection) @@ -188,7 +194,7 @@ def remove_asynctask(request,id): request.session['async_tasks'] = newtasks def get_job_result(jobid): - if settings.EBUG: + if settings.DEBUG: result = celery_result.AsyncResult(jobid).result else: running_job_ids = rq_registry.get_job_ids() @@ -210,12 +216,16 @@ verbose_job_status = { 'updatecpwater': 'Critical Power Calculation for OTW Workouts', 'otwsetpower': 'Rowing Physics OTW Power Calculation', 'make_plot': 'Create static chart', + 'long_test_task': 'Long Test Task', } def get_job_status(jobid): if settings.DEBUG: job = celery_result.AsyncResult(jobid) jobresult = job.result + channel = 'task:<'+job.id+'>:progress' + channel = 'noot' + if 'fail' in job.status.lower(): jobresult = '0' summary = { @@ -264,7 +274,20 @@ def kill_async_job(request,id='aap'): url = reverse(session_jobs_status) return HttpResponseRedirect(url) - + +@login_required() +def test_job_view(request,aantal=100): + + job = myqueue(queuehigh,long_test_task,int(aantal)) + try: + request.session['async_tasks'] += [(job.id,'long_test_task')] + except KeyError: + request.session['async_tasks'] = [(job.id,'long_test_task')] + + url = reverse(session_jobs_status) + + return HttpResponseRedirect(url) + def get_all_queued_jobs(userid=0): r = StrictRedis() @@ -6528,7 +6551,6 @@ def workout_flexchart3_view(request,*args,**kwargs): if request.user == row.user.user: mayedit=1 - workouttype = 'ote' if row.workouttype in ('water','coastal'): workouttype = 'otw' @@ -6578,8 +6600,9 @@ def workout_flexchart3_view(request,*args,**kwargs): else: yparam2 = 'hr' - if favoritenr>=0 and r.showfavoritechartnotes: - favoritechartnotes = favorites[favoritenr].notes + if not request.user.is_anonymous(): + if favoritenr>=0 and r.showfavoritechartnotes: + favoritechartnotes = favorites[favoritenr].notes else: favoritechartnotes = '' favoritenr = 0