Private
Public Access
1
0

Merge branch 'feature/betterqueues' into develop

This commit is contained in:
Sander Roosendaal
2017-10-30 22:35:34 +01:00
6 changed files with 291 additions and 37 deletions

View File

@@ -65,7 +65,7 @@ def processattachment(rower, fileobj, title, uploadoptions):
workoutcsvfilename = workout.csvfilename[6:-4] workoutcsvfilename = workout.csvfilename[6:-4]
timestr = strftime("%Y%m%d-%H%M%S") timestr = strftime("%Y%m%d-%H%M%S")
imagename = workoutcsvfilename + timestr + '.png' imagename = workoutcsvfilename + timestr + '.png'
result = uploads.make_plot( result,jobid = uploads.make_plot(
workout.user, workout, workoutcsvfilename, workout.user, workout, workoutcsvfilename,
workout.csvfilename, workout.csvfilename,
plottype, title, plottype, title,

View File

@@ -0,0 +1,57 @@
{% extends "base.html" %}
{% load staticfiles %}
{% load rowerfilters %}
{% block title %}Rowsandall - Tasks {% endblock %}
{% block content %}
<h1>Your Tasks Status</h1>
<p>Manage the asynchronous tasks running for you</p>
<div class="grid_12 alpha">
<table width="100%" class="listtable shortpadded">
<thead>
<tr>
<th>ID</th>
<th style="width:180">Task</th>
<th>Status</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for task in taskstatus %}
<tr>
<td>
{{ task|lookup:'id' }}
</td>
<td>
{{ task|lookup:'verbose' }}
</td>
<td>
{{ task|lookup:'status' }}
</td>
{% if task|lookup:'failed' %}
<td>
<a href="/rowers/job-kill/{{ task|lookup:'id' }}" class="button red small">Remove</a>
</td>
{% elif task|lookup:'finished' %}
<td>
<a href="/rowers/job-kill/{{ task|lookup:'id' }}" class="button green small">Remove</a>
</td>
{% else %}
<td>
&nbsp;
</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endblock %}

View File

@@ -175,11 +175,11 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0):
if settings.DEBUG: if settings.DEBUG:
res = handle_makeplot.delay(f1,f2,title, job = handle_makeplot.delay(f1,f2,title,
hrpwrdata,plotnr, hrpwrdata,plotnr,
imagename) imagename)
else: else:
res = queue.enqueue(handle_makeplot,f1,f2, job = queue.enqueue(handle_makeplot,f1,f2,
title,hrpwrdata, title,hrpwrdata,
plotnr,imagename) plotnr,imagename)
@@ -195,7 +195,7 @@ def make_plot(r,w,f1,f2,plottype,title,imagename='',plotnr=0):
width=width,height=height) width=width,height=height)
i.save() i.save()
return i.id return i.id,job.id
import c2stuff,stravastuff,sporttracksstuff,runkeeperstuff import c2stuff,stravastuff,sporttracksstuff,runkeeperstuff
import underarmourstuff,tpstuff import underarmourstuff,tpstuff

View File

@@ -138,6 +138,9 @@ urlpatterns = [
url(r'^user-multiflex-select/user/(?P<userid>\d+)/$',views.user_multiflex_select), url(r'^user-multiflex-select/user/(?P<userid>\d+)/$',views.user_multiflex_select),
url(r'^user-multiflex-select/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.user_multiflex_select), url(r'^user-multiflex-select/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.user_multiflex_select),
url(r'^user-multiflex-select/$',views.user_multiflex_select), url(r'^user-multiflex-select/$',views.user_multiflex_select),
url(r'^list-jobs/$',views.session_jobs_view),
url(r'^jobs-status/$',views.session_jobs_status),
url(r'^job-kill/(?P<id>.*)$',views.kill_async_job),
url(r'^list-graphs/$',views.graphs_view), url(r'^list-graphs/$',views.graphs_view),
url(r'^(?P<theuser>\d+)/ote-bests/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.rankings_view), url(r'^(?P<theuser>\d+)/ote-bests/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.rankings_view),
url(r'^(?P<theuser>\d+)/ote-bests/(?P<deltadays>\d+)$',views.rankings_view), url(r'^(?P<theuser>\d+)/ote-bests/(?P<deltadays>\d+)$',views.rankings_view),

View File

@@ -131,13 +131,16 @@ queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low') queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low') queuehigh = django_rq.get_queue('low')
from redis import StrictRedis from redis import StrictRedis,Redis
from rq.exceptions import NoSuchJobError from rq.exceptions import NoSuchJobError
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
from rq import Queue from rq import Queue,cancel_job
queuefailed = Queue("failed",connection=Redis())
redis_connection = StrictRedis() redis_connection = StrictRedis()
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection) rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection)
rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection)
from rq.job import Job from rq.job import Job
@@ -170,6 +173,20 @@ from rowers.celery import result as celery_result
# Define the API documentation # Define the API documentation
schema_view = get_swagger_view(title='Rowsandall API') schema_view = get_swagger_view(title='Rowsandall API')
def remove_asynctask(request,id):
try:
oldtasks = request.session['async_tasks']
except KeyError:
oldtasks = []
newtasks = []
for task in oldtasks:
print task[0]
if id not in task[0]:
newtasks += [(task[0],task[1])]
request.session['async_tasks'] = newtasks
def get_job_result(jobid): def get_job_result(jobid):
if settings.DEBUG: if settings.DEBUG:
result = celery_result.AsyncResult(jobid).result result = celery_result.AsyncResult(jobid).result
@@ -188,6 +205,132 @@ def get_job_result(jobid):
return result return result
verbose_job_status = {
'updatecp': 'Critical Power Calculation for Ergometer Workouts',
'updatecpwater': 'Critical Power Calculation for OTW Workouts',
'otwsetpower': 'Rowing Physics OTW Power Calculation',
'make_plot': 'Create static chart',
}
def get_job_status(jobid):
if settings.DEBUG:
job = celery_result.AsyncResult(jobid)
jobresult = job.result
if 'fail' in job.status.lower():
jobresult = '0'
summary = {
'status': job.status,
'result': jobresult,
}
else:
try:
job = Job.fetch(jobid,connection=redis_connection)
summary = {
'status':job.status,
'result':job.result
}
except NoSuchJobError:
summary = {}
if 'fail' in summary['status'].lower():
summary['failed'] = True
else:
summary['failed'] = False
if 'success' in summary['status'].lower():
summary['finished'] = True
elif 'finished' in summary['status'].lower():
summary['finished'] = True
else:
summary['finished'] = False
return summary
def kill_async_job(request,id='aap'):
if settings.DEBUG:
job = celery_result.AsyncResult(id)
job.revoke()
else:
try:
cancel_job(id,connect=redis_connection)
except NoSuchJobError:
pass
remove_asynctask(request,id)
url = reverse(session_jobs_status)
return HttpResponseRedirect(url)
def get_all_queued_jobs(userid=0):
r = StrictRedis()
jobs = []
celerykeys = r.keys('celery*')
for key in celerykeys:
id= key[17:]
job = celery_result.AsyncResult(id)
jobresult = job.result
if 'fail' in job.status.lower():
jobresult = '0'
jobs.append(
(id,{
'status':job.status,
'result':jobresult,
'function':'',
'meta':job.info,
}))
ids = [j.id for j in queue.jobs]
ids += [j.id for j in queuehigh.jobs]
ids += [j.id for j in queuelow.jobs]
ids += [j.id for j in queuefailed.jobs]
for id in ids:
job = Job.fetch(id,connection=redis_connection)
jobs.append(
(id,{
'status':job.get_status(),
'result':job.result,
'function':job.func_name,
'meta':job.meta,
}))
return jobs
def get_stored_tasks_status(request):
try:
taskids = request.session['async_tasks']
except KeyError:
taskids = []
taskstatus = [{
'id':id,
'status':get_job_status(id)['status'],
'failed':get_job_status(id)['failed'],
'finished':get_job_status(id)['finished'],
'func_name':func_name,
'verbose': verbose_job_status[func_name]
} for id,func_name in taskids]
return taskstatus
@login_required()
def session_jobs_view(request):
taskstatus = get_stored_tasks_status(request)
return HttpResponse(json.dumps(taskstatus))
@login_required()
def session_jobs_status(request):
taskstatus = get_stored_tasks_status(request)
return render(request,
'async_tasks.html',
{'taskstatus':taskstatus})
# Test if row data include candidates # Test if row data include candidates
def rowhascoordinates(row): def rowhascoordinates(row):
@@ -3109,19 +3252,24 @@ def otwrankings_view(request,theuser=0,
runningjob = 0 runningjob = 0
try: taskstatus = get_stored_tasks_status(request)
jobid = request.session['job_id'] for task in taskstatus:
if jobid: if task['func_name'] == 'updatecpwater':
result = get_job_result(jobid) if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
if result:
messages.info(request,'Your calculation is ready')
runningjob = 1 runningjob = 1
request.session['job_id'] = 0 messages.info(request,'CP chart data have been updated')
else: remove_asynctask(request,task['id'])
elif 'fail' in task['status'].lower():
runningjob = 0
remove_asynctask(request,task[id])
messages.error(request,'Oh, your task failed')
elif 'started' in task['status'].lower():
messages.info(request,'Busy updating CP chart data')
runningjob = 1 runningjob = 1
messages.info(request,'Your job is still running') elif 'queued' in task['status'].lower() or 'pending' in task['status'].lower():
except KeyError: messages.info(request,'Getting ready to update CP chart data')
pass runningjob = 1
if not runningjob: if not runningjob:
@@ -3131,6 +3279,10 @@ def otwrankings_view(request,theuser=0,
enddate=enddate enddate=enddate
) )
request.session['job_id'] = job.id request.session['job_id'] = job.id
try:
request.session['async_tasks'] += [(job.id,'updatecpwater')]
except KeyError:
request.session['async_tasks'] = [(job.id,'updatecpwater')]
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({ powerdf = pd.DataFrame({
@@ -3358,19 +3510,23 @@ def oterankings_view(request,theuser=0,
runningjob = 0 runningjob = 0
try: taskstatus = get_stored_tasks_status(request)
jobid = request.session['job_id'] for task in taskstatus:
if jobid: if task['func_name'] == 'updatecp':
result = get_job_result(jobid) if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
if result:
messages.info(request,'Your calculation is ready')
runningjob = 1 runningjob = 1
request.session['job_id'] = 0 messages.info(request,'CP chart data have been updated')
else: remove_asynctask(request,task['id'])
elif 'fail' in task['status'].lower():
runningjob = 0
remove_asynctask(request,task[id])
messages.error(request,'Oh, your task failed')
elif 'started' in task['status'].lower():
messages.info(request,'Busy updating CP chart data')
runningjob = 1
elif 'queued' in task['status'].lower():
messages.info(request,'Getting ready to update CP chart data')
runningjob = 1 runningjob = 1
messages.info(request,'Your job is still running')
except KeyError:
pass
if not runningjob: if not runningjob:
@@ -3380,7 +3536,11 @@ def oterankings_view(request,theuser=0,
enddate=enddate enddate=enddate
) )
request.session['job_id'] = job.id request.session['job_id'] = job.id
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result') try:
request.session['async_tasks'] += [(job.id,'updatecp')]
except KeyError:
request.session['async_tasks'] = [(job.id,'updatecp')]
messages.info(request,'New calculation queued.')
powerdf = pd.DataFrame({ powerdf = pd.DataFrame({
'Delta':delta, 'Delta':delta,
@@ -5504,20 +5664,24 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""):
emailaddress = u.email emailaddress = u.email
if settings.DEBUG: if settings.DEBUG:
res = handle_otwsetpower.delay(f1,boattype,weightvalue, job = handle_otwsetpower.delay(f1,boattype,weightvalue,
first_name,last_name, first_name,last_name,
emailaddress,id,debug=True, emailaddress,id,debug=True,
ps=[r.p0,r.p1,r.p2,r.p3], ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio) ratio=r.cpratio)
else: else:
res = queuelow.enqueue(handle_otwsetpower,f1,boattype, job = queuelow.enqueue(handle_otwsetpower,f1,boattype,
weightvalue, weightvalue,
first_name,last_name,emailaddress,id, first_name,last_name,emailaddress,id,
ps=[r.p0,r.p1,r.p2,r.p3], ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio) ratio=r.cpratio)
try:
request.session['async_tasks'] += [(job.id,'otwsetpower')]
except KeyError:
request.session['async_tasks'] = [(job.id,'otwsetpower')]
successmessage = "Your calculations have been submitted. You will receive an email when they are done." successmessage = 'Your calculations have been submitted. You will receive an email when they are done. You can check the status of your calculations <a href="/rowers/jobs-status/">here</a>'
messages.info(request,successmessage) messages.info(request,successmessage)
kwargs = { kwargs = {
'id':int(id)} 'id':int(id)}
@@ -7305,9 +7469,14 @@ def workout_add_chart_view(request,id,plotnr=1):
u = w.user.user u = w.user.user
r = getrower(u) r = getrower(u)
title = w.name title = w.name
res = uploads.make_plot(r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr, res,jobid = uploads.make_plot(
imagename=imagename) r,w,f1,w.csvfilename,'timeplot',title,plotnr=plotnr,
imagename=imagename
)
try:
request.session['async_tasks'] += [(jobid,'make_plot')]
except KeyError:
request.session['async_tasks'] = [(jobid,'make_plot')]
try: try:
url = request.session['referer'] url = request.session['referer']

View File

@@ -9,6 +9,31 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"> <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN">
<html lang="en"> <html lang="en">
<head> <head>
<script src="//ajax.googleapis.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script>
<script>
var previous = null;
var current = null;
$(document).ready(function() {
$.getJSON('http://localhost:8000/rowers/list-jobs/', function(json) {
current = JSON.stringify(json);
previous = current;
});
console.log('initial poll');
});
setInterval(function() {
$.getJSON('http://localhost:8000/rowers/list-jobs/', function(json) {
current = JSON.stringify(json);
if (previous && current && previous !== current) {
console.log('refresh');
location.reload();
};
previous = current;
});
console.log('polling');
}, 15000)
</script>
<script src="/static/cookielaw/js/cookielaw.js"></script> <script src="/static/cookielaw/js/cookielaw.js"></script>
{% analytical_head_top %} {% analytical_head_top %}
{% if GOOGLE_ANALYTICS_PROPERTY_ID %} {% if GOOGLE_ANALYTICS_PROPERTY_ID %}