task progress monitoring celery works on client
This commit is contained in:
61
rowers/longtask.py
Normal file
61
rowers/longtask.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import numpy as np
|
||||||
|
import time
|
||||||
|
|
||||||
|
from redis import StrictRedis,Redis
|
||||||
|
from celery import result as celery_result
|
||||||
|
import json
|
||||||
|
|
||||||
|
redis_connection = StrictRedis()
|
||||||
|
|
||||||
|
import redis
|
||||||
|
import threading
|
||||||
|
|
||||||
|
class Listener(threading.Thread):
|
||||||
|
def __init__(self, r, channels):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.redis = r
|
||||||
|
self.pubsub = self.redis.pubsub()
|
||||||
|
self.pubsub.subscribe(channels)
|
||||||
|
|
||||||
|
def work(self, item):
|
||||||
|
print item['channel'], ":", item['data']
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
for item in self.pubsub.listen():
|
||||||
|
if item['data'] == "KILL":
|
||||||
|
self.pubsub.unsubscribe()
|
||||||
|
print self, "unsubscribed and finished"
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
self.work(item)
|
||||||
|
|
||||||
|
|
||||||
|
def longtask(aantal,jobid=None,debug=False):
|
||||||
|
if jobid:
|
||||||
|
if debug:
|
||||||
|
job = celery_result.AsyncResult(jobid)
|
||||||
|
else:
|
||||||
|
job = Job.fetch(jobid,connection=redis_connection)
|
||||||
|
counter = 0
|
||||||
|
channel = 'tasks'
|
||||||
|
for i in range(aantal):
|
||||||
|
time.sleep(1)
|
||||||
|
counter += 1
|
||||||
|
if counter > 10:
|
||||||
|
counter = 0
|
||||||
|
if debug:
|
||||||
|
progress = 100.*i/aantal
|
||||||
|
print progress
|
||||||
|
if jobid != None:
|
||||||
|
redis_connection.publish(channel,json.dumps(
|
||||||
|
{
|
||||||
|
'done':i,
|
||||||
|
'total':aantal,
|
||||||
|
'id':jobid,
|
||||||
|
}
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
redis_connection.publish(channel,'KILL')
|
||||||
|
|
||||||
|
return 1
|
||||||
@@ -35,6 +35,8 @@ from django.db.utils import OperationalError
|
|||||||
import datautils
|
import datautils
|
||||||
import utils
|
import utils
|
||||||
|
|
||||||
|
import longtask
|
||||||
|
|
||||||
# testing task
|
# testing task
|
||||||
|
|
||||||
|
|
||||||
@@ -42,9 +44,14 @@ import utils
|
|||||||
def add(x, y):
|
def add(x, y):
|
||||||
return x + y
|
return x + y
|
||||||
|
|
||||||
|
|
||||||
|
@app.task(bind=True)
|
||||||
|
def long_test_task(self,aantal,debug=False,job=None):
|
||||||
|
job = self.request
|
||||||
|
print job.id
|
||||||
|
return longtask.longtask(aantal,jobid=job.id,debug=debug)
|
||||||
|
|
||||||
# create workout
|
# create workout
|
||||||
|
|
||||||
|
|
||||||
@app.task
|
@app.task
|
||||||
def handle_new_workout_from_file(r, f2,
|
def handle_new_workout_from_file(r, f2,
|
||||||
workouttype='rower',
|
workouttype='rower',
|
||||||
|
|||||||
@@ -141,6 +141,7 @@ urlpatterns = [
|
|||||||
url(r'^list-jobs/$',views.session_jobs_view),
|
url(r'^list-jobs/$',views.session_jobs_view),
|
||||||
url(r'^jobs-status/$',views.session_jobs_status),
|
url(r'^jobs-status/$',views.session_jobs_status),
|
||||||
url(r'^job-kill/(?P<id>.*)$',views.kill_async_job),
|
url(r'^job-kill/(?P<id>.*)$',views.kill_async_job),
|
||||||
|
url(r'^test-job/(?P<aantal>\d+)$',views.test_job_view),
|
||||||
url(r'^list-graphs/$',views.graphs_view),
|
url(r'^list-graphs/$',views.graphs_view),
|
||||||
url(r'^(?P<theuser>\d+)/ote-bests/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.rankings_view),
|
url(r'^(?P<theuser>\d+)/ote-bests/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.rankings_view),
|
||||||
url(r'^(?P<theuser>\d+)/ote-bests/(?P<deltadays>\d+)$',views.rankings_view),
|
url(r'^(?P<theuser>\d+)/ote-bests/(?P<deltadays>\d+)$',views.rankings_view),
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ from rowers.tasks import handle_makeplot,handle_otwsetpower,handle_sendemailtcx,
|
|||||||
from rowers.tasks import (
|
from rowers.tasks import (
|
||||||
handle_sendemail_unrecognized,handle_sendemailnewcomment,
|
handle_sendemail_unrecognized,handle_sendemailnewcomment,
|
||||||
handle_sendemailnewresponse, handle_updatedps,
|
handle_sendemailnewresponse, handle_updatedps,
|
||||||
handle_updatecp
|
handle_updatecp,long_test_task
|
||||||
)
|
)
|
||||||
|
|
||||||
from scipy.signal import savgol_filter
|
from scipy.signal import savgol_filter
|
||||||
@@ -138,6 +138,12 @@ from rq import Queue,cancel_job
|
|||||||
|
|
||||||
queuefailed = Queue("failed",connection=Redis())
|
queuefailed = Queue("failed",connection=Redis())
|
||||||
redis_connection = StrictRedis()
|
redis_connection = StrictRedis()
|
||||||
|
r = Redis()
|
||||||
|
from .longtask import Listener
|
||||||
|
client = Listener(r,['tasks'])
|
||||||
|
client.start()
|
||||||
|
|
||||||
|
|
||||||
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
|
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
|
||||||
rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection)
|
rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection)
|
||||||
rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection)
|
rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection)
|
||||||
@@ -188,7 +194,7 @@ def remove_asynctask(request,id):
|
|||||||
request.session['async_tasks'] = newtasks
|
request.session['async_tasks'] = newtasks
|
||||||
|
|
||||||
def get_job_result(jobid):
|
def get_job_result(jobid):
|
||||||
if settings.EBUG:
|
if settings.DEBUG:
|
||||||
result = celery_result.AsyncResult(jobid).result
|
result = celery_result.AsyncResult(jobid).result
|
||||||
else:
|
else:
|
||||||
running_job_ids = rq_registry.get_job_ids()
|
running_job_ids = rq_registry.get_job_ids()
|
||||||
@@ -210,12 +216,16 @@ verbose_job_status = {
|
|||||||
'updatecpwater': 'Critical Power Calculation for OTW Workouts',
|
'updatecpwater': 'Critical Power Calculation for OTW Workouts',
|
||||||
'otwsetpower': 'Rowing Physics OTW Power Calculation',
|
'otwsetpower': 'Rowing Physics OTW Power Calculation',
|
||||||
'make_plot': 'Create static chart',
|
'make_plot': 'Create static chart',
|
||||||
|
'long_test_task': 'Long Test Task',
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_job_status(jobid):
|
def get_job_status(jobid):
|
||||||
if settings.DEBUG:
|
if settings.DEBUG:
|
||||||
job = celery_result.AsyncResult(jobid)
|
job = celery_result.AsyncResult(jobid)
|
||||||
jobresult = job.result
|
jobresult = job.result
|
||||||
|
channel = 'task:<'+job.id+'>:progress'
|
||||||
|
channel = 'noot'
|
||||||
|
|
||||||
if 'fail' in job.status.lower():
|
if 'fail' in job.status.lower():
|
||||||
jobresult = '0'
|
jobresult = '0'
|
||||||
summary = {
|
summary = {
|
||||||
@@ -265,6 +275,19 @@ def kill_async_job(request,id='aap'):
|
|||||||
|
|
||||||
return HttpResponseRedirect(url)
|
return HttpResponseRedirect(url)
|
||||||
|
|
||||||
|
@login_required()
|
||||||
|
def test_job_view(request,aantal=100):
|
||||||
|
|
||||||
|
job = myqueue(queuehigh,long_test_task,int(aantal))
|
||||||
|
try:
|
||||||
|
request.session['async_tasks'] += [(job.id,'long_test_task')]
|
||||||
|
except KeyError:
|
||||||
|
request.session['async_tasks'] = [(job.id,'long_test_task')]
|
||||||
|
|
||||||
|
url = reverse(session_jobs_status)
|
||||||
|
|
||||||
|
return HttpResponseRedirect(url)
|
||||||
|
|
||||||
|
|
||||||
def get_all_queued_jobs(userid=0):
|
def get_all_queued_jobs(userid=0):
|
||||||
r = StrictRedis()
|
r = StrictRedis()
|
||||||
@@ -6528,7 +6551,6 @@ def workout_flexchart3_view(request,*args,**kwargs):
|
|||||||
if request.user == row.user.user:
|
if request.user == row.user.user:
|
||||||
mayedit=1
|
mayedit=1
|
||||||
|
|
||||||
|
|
||||||
workouttype = 'ote'
|
workouttype = 'ote'
|
||||||
if row.workouttype in ('water','coastal'):
|
if row.workouttype in ('water','coastal'):
|
||||||
workouttype = 'otw'
|
workouttype = 'otw'
|
||||||
@@ -6578,6 +6600,7 @@ def workout_flexchart3_view(request,*args,**kwargs):
|
|||||||
else:
|
else:
|
||||||
yparam2 = 'hr'
|
yparam2 = 'hr'
|
||||||
|
|
||||||
|
if not request.user.is_anonymous():
|
||||||
if favoritenr>=0 and r.showfavoritechartnotes:
|
if favoritenr>=0 and r.showfavoritechartnotes:
|
||||||
favoritechartnotes = favorites[favoritenr].notes
|
favoritechartnotes = favorites[favoritenr].notes
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user