Private
Public Access
1
0

works on development - needs testing on production

This commit is contained in:
Sander Roosendaal
2017-10-30 17:27:13 +01:00
parent c37a0e6b24
commit 537b7392e6
3 changed files with 250 additions and 28 deletions

View File

@@ -0,0 +1,56 @@
{% extends "base.html" %}
{% load staticfiles %}
{% load rowerfilters %}
{% block title %}Rowsandall - Tasks {% endblock %}
{% block content %}
<h1>Your Tasks Status</h1>
<p>Manage the asynchronous tasks running for you</p>
<div class="grid_12 alpha">
<table width="100%" class="listtable shortpadded">
<thead>
<tr>
<th>ID</th>
<th style="width:180">Task</th>
<th>Status</th>
<th>Action</th>
</tr>
</thead>
<tbody>
{% for task in taskstatus %}
<tr>
<td>
{{ task|lookup:'id' }}
</td>
<td>
{{ task|lookup:'verbose' }}
</td>
<td>
{{ task|lookup:'status' }}
</td>
{% if task|lookup:'failed' %}
<td>
<a href="/rowers/job-kill/{{ task|lookup:'id' }}" class="button red small">Remove</a>
</td>
{% elif task|lookup:'finished' %}
<td>
<a href="/rowers/job-kill/{{ task|lookup:'id' }}" class="button green small">Remove</a>
</td>
{% else %}
<td>
&nbsp;
</td>
{% endif %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endblock %}

View File

@@ -138,6 +138,9 @@ urlpatterns = [
url(r'^user-multiflex-select/user/(?P<userid>\d+)/$',views.user_multiflex_select),
url(r'^user-multiflex-select/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.user_multiflex_select),
url(r'^user-multiflex-select/$',views.user_multiflex_select),
url(r'^list-jobs/$',views.session_jobs_view),
url(r'^jobs-status/$',views.session_jobs_status),
url(r'^job-kill/(?P<id>.*)$',views.kill_async_job),
url(r'^list-graphs/$',views.graphs_view),
url(r'^(?P<theuser>\d+)/ote-bests/(?P<startdatestring>\w+.*)/(?P<enddatestring>\w+.*)$',views.rankings_view),
url(r'^(?P<theuser>\d+)/ote-bests/(?P<deltadays>\d+)$',views.rankings_view),

View File

@@ -130,14 +130,17 @@ import django_rq
queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
from redis import StrictRedis
from redis import StrictRedis,Redis
from rq.exceptions import NoSuchJobError
from rq.registry import StartedJobRegistry
from rq import Queue
from rq import Queue,cancel_job
queuefailed = Queue("failed",connection=Redis())
redis_connection = StrictRedis()
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
rq_registryhigh = StartedJobRegistry(queuehigh.name,connection=redis_connection)
rq_registrylow = StartedJobRegistry(queuelow.name,connection=redis_connection)
from rq.job import Job
@@ -170,6 +173,20 @@ from rowers.celery import result as celery_result
# Define the API documentation
schema_view = get_swagger_view(title='Rowsandall API')
def remove_asynctask(request,id):
try:
oldtasks = request.session['async_tasks']
except KeyError:
oldtasks = []
newtasks = []
for task in oldtasks:
print task[0]
if id not in task[0]:
newtasks += [(task[0],task[1])]
request.session['async_tasks'] = newtasks
def get_job_result(jobid):
if settings.DEBUG:
result = celery_result.AsyncResult(jobid).result
@@ -188,6 +205,131 @@ def get_job_result(jobid):
return result
verbose_job_status = {
'updatecp': 'Critical Power Calculation for Ergometer Workouts',
'updatecpwater': 'Critical Power Calculation for OTW Workouts',
'otwsetpower': 'Rowing Physics OTW Power Calculation'
}
def get_job_status(jobid):
if settings.DEBUG:
job = celery_result.AsyncResult(jobid)
jobresult = job.result
if 'fail' in job.status.lower():
jobresult = '0'
summary = {
'status': job.status,
'result': jobresult,
}
else:
try:
job = Job.fetch(jobid,connection=redis_connection)
summary = {
'status':job.status,
'result':job.result
}
except NoSuchJobError:
summary = {}
if 'fail' in summary['status'].lower():
summary['failed'] = True
else:
summary['failed'] = False
if 'success' in summary['status'].lower():
summary['finished'] = True
elif 'finished' in summary['status'].lower():
summary['finished'] = True
else:
summary['finished'] = False
return summary
def kill_async_job(request,id='aap'):
if settings.DEBUG:
job = celery_result.AsyncResult(id)
job.revoke()
remove_asynctask(request,id)
else:
try:
cancel_job(id)
except NoSuchJobError:
pass
url = reverse(session_jobs_status)
return HttpResponseRedirect(url)
def get_all_queued_jobs(userid=0):
r = StrictRedis()
jobs = []
celerykeys = r.keys('celery*')
for key in celerykeys:
id= key[17:]
job = celery_result.AsyncResult(id)
jobresult = job.result
if 'fail' in job.status.lower():
jobresult = '0'
jobs.append(
(id,{
'status':job.status,
'result':jobresult,
'function':'',
'meta':job.info,
}))
ids = [j.id for j in queue.jobs]
ids += [j.id for j in queuehigh.jobs]
ids += [j.id for j in queuelow.jobs]
ids += [j.id for j in queuefailed.jobs]
for id in ids:
job = Job.fetch(id,connection=redis_connection)
jobs.append(
(id,{
'status':job.get_status(),
'result':job.result,
'function':job.func_name,
'meta':job.meta,
}))
return jobs
def get_stored_tasks_status(request):
try:
taskids = request.session['async_tasks']
except KeyError:
taskids = []
taskstatus = [{
'id':id,
'status':get_job_status(id)['status'],
'failed':get_job_status(id)['failed'],
'finished':get_job_status(id)['finished'],
'func_name':func_name,
'verbose': verbose_job_status[func_name]
} for id,func_name in taskids]
return taskstatus
@login_required()
def session_jobs_view(request):
taskstatus = get_stored_tasks_status(request)
return HttpResponse(json.dumps(taskstatus))
@login_required()
def session_jobs_status(request):
taskstatus = get_stored_tasks_status(request)
return render(request,
'async_tasks.html',
{'taskstatus':taskstatus})
# Test if row data include candidates
def rowhascoordinates(row):
@@ -3109,19 +3251,24 @@ def otwrankings_view(request,theuser=0,
runningjob = 0
try:
jobid = request.session['job_id']
if jobid:
result = get_job_result(jobid)
if result:
messages.info(request,'Your calculation is ready')
taskstatus = get_stored_tasks_status(request)
for task in taskstatus:
if task['func_name'] == 'updatecpwater':
if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
runningjob = 1
request.session['job_id'] = 0
else:
messages.info(request,'CP chart data have been updated')
remove_asynctask(request,task['id'])
elif 'fail' in task['status'].lower():
runningjob = 0
remove_asynctask(request,task[id])
messages.error(request,'Oh, your task failed')
elif 'started' in task['status'].lower():
messages.info(request,'Busy updating CP chart data')
runningjob = 1
messages.info(request,'Your job is still running')
except KeyError:
pass
elif 'queued' in task['status'].lower() or 'pending' in task['status'].lower():
messages.info(request,'Getting ready to update CP chart data')
runningjob = 1
if not runningjob:
@@ -3131,6 +3278,10 @@ def otwrankings_view(request,theuser=0,
enddate=enddate
)
request.session['job_id'] = job.id
try:
request.session['async_tasks'] += [(job.id,'updatecpwater')]
except KeyError:
request.session['async_tasks'] = [(job.id,'updatecpwater')]
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({
@@ -3358,19 +3509,23 @@ def oterankings_view(request,theuser=0,
runningjob = 0
try:
jobid = request.session['job_id']
if jobid:
result = get_job_result(jobid)
if result:
messages.info(request,'Your calculation is ready')
taskstatus = get_stored_tasks_status(request)
for task in taskstatus:
if task['func_name'] == 'updatecp':
if 'success' in task['status'].lower() or 'finished' in task['status'].lower():
runningjob = 1
request.session['job_id'] = 0
else:
messages.info(request,'CP chart data have been updated')
remove_asynctask(request,task['id'])
elif 'fail' in task['status'].lower():
runningjob = 0
remove_asynctask(request,task[id])
messages.error(request,'Oh, your task failed')
elif 'started' in task['status'].lower():
messages.info(request,'Busy updating CP chart data')
runningjob = 1
elif 'queued' in task['status'].lower():
messages.info(request,'Getting ready to update CP chart data')
runningjob = 1
messages.info(request,'Your job is still running')
except KeyError:
pass
if not runningjob:
@@ -3380,6 +3535,10 @@ def oterankings_view(request,theuser=0,
enddate=enddate
)
request.session['job_id'] = job.id
try:
request.session['async_tasks'] += [(job.id,'updatecp')]
except KeyError:
request.session['async_tasks'] = [(job.id,'updatecp')]
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({
@@ -5504,19 +5663,23 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""):
emailaddress = u.email
if settings.DEBUG:
res = handle_otwsetpower.delay(f1,boattype,weightvalue,
job = handle_otwsetpower.delay(f1,boattype,weightvalue,
first_name,last_name,
emailaddress,id,debug=True,
ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio)
else:
res = queuelow.enqueue(handle_otwsetpower,f1,boattype,
job = queuelow.enqueue(handle_otwsetpower,f1,boattype,
weightvalue,
first_name,last_name,emailaddress,id,
ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio)
try:
request.session['async_tasks'] += [(job.id,'otwsetpower')]
except KeyError:
request.session['async_tasks'] = [(job.id,'otwsetpower')]
successmessage = "Your calculations have been submitted. You will receive an email when they are done."
messages.info(request,successmessage)
kwargs = {