Private
Public Access
1
0

redis monitor for CP update jobs

Stores job id in a cookie. Upon reload, checks if job is still busy. If
busy, no new jobs are started. Some messaging for users to give them
information about the job status.
This commit is contained in:
Sander Roosendaal
2017-10-27 11:52:47 +02:00
parent c0f785ed3b
commit 6b793724ed
5 changed files with 125 additions and 14 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import absolute_import
import os
from celery import Celery
from celery import result
# Only used for testing with Celery on localhost. RQ is not available
# on Windows, so I use Celery on my notebook.
@@ -17,6 +18,7 @@ app = Celery('tasks',
broker='redis://localhost',
backend='redis://localhost',)
class Config:
CELERY_TIMEZONE = 'Europe/Prague'
@@ -31,3 +33,4 @@ database_url = 'sqlite:///db.sqlite3'
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

View File

@@ -473,9 +473,11 @@ def updatecpdata_sql(rower_id,delta,cp,table='cpdata',distance=[]):
engine.dispose()
def runcpupdate(rower,type='water'):
startdate = timezone.now()-datetime.timedelta(days=365)
enddate = timezone.now()+datetime.timedelta(days=5)
def runcpupdate(
rower,type='water',
startdate=timezone.now()-datetime.timedelta(days=365),
enddate=timezone.now()+datetime.timedelta(days=5)
):
if type == 'water':
theworkouts = Workout.objects.filter(
user=rower,rankingpiece=True,
@@ -499,11 +501,12 @@ def runcpupdate(rower,type='water'):
theids = [w.id for w in theworkouts]
if settings.DEBUG:
res = handle_updatecp.delay(rower.id,theids,debug=True,table=table)
job = handle_updatecp.delay(rower.id,theids,debug=True,table=table)
else:
res = queue.enqueue(handle_updatecp,rower.id,theids,table=table)
job = queue.enqueue(handle_updatecp,rower.id,theids,table=table)
return job
def fetchcperg(rower,theworkouts):
theids = [int(w.id) for w in theworkouts]

View File

@@ -263,6 +263,15 @@ class Rower(models.Model):
p2 = models.FloatField(default=1.0,verbose_name="CP p3")
p3 = models.FloatField(default=1.0,verbose_name="CP p4")
cpratio = models.FloatField(default=1.0,verbose_name="CP fit ratio")
ep0 = models.FloatField(default=1.0,verbose_name="erg CP p1")
ep1 = models.FloatField(default=1.0,verbose_name="erg CP p2")
ep2 = models.FloatField(default=1.0,verbose_name="erg CP p3")
ep3 = models.FloatField(default=1.0,verbose_name="erg CP p4")
ecpratio = models.FloatField(default=1.0,verbose_name="erg CP fit ratio")
otwslack = models.IntegerField(default=0,verbose_name="OTW Power slack")

View File

@@ -50,7 +50,11 @@
</button>
<div class="dropdown-content">
{% for member in user|team_members %}
{% if workouttype == 'water' %}
<a class="button green small" href="/rowers/{{ member.id }}/otw-bests/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}">{{ member.first_name }} {{ member.last_name }}</a>
{% else %}
<a class="button green small" href="/rowers/{{ member.id }}/ote-ranking/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}">{{ member.first_name }} {{ member.last_name }}</a>
{% endif %}
{% endfor %}
</div>
{% else %}
@@ -64,14 +68,22 @@
between {{ startdate|date }} and {{ enddate|date }}</p>
<p>Direct link for other users:
<a href="/rowers/{{ id }}/otw-bests/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}">https://rowsandall.com/rowers/{{ id }}/otw-bests/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}</a>
{% if workouttype == 'water' %}
<a href="/rowers/{{ id }}/otw-bests/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}">https://rowsandall.com/rowers/{{ id }}/otw-bests/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}</a>
{% else %}
<a href="/rowers/{{ id }}/ote-ranking/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}">https://rowsandall.com/rowers/{{ id }}/ote-ranking/{{ startdate|date:"Y-m-d" }}/{{ enddate|date:"Y-m-d" }}</a>
{% endif %}
</p>
<p>The table gives the OTW efforts you marked as Ranking Piece.
<p>The table gives the efforts you marked as Ranking Piece.
The graph shows the best segments from those pieces, plotted as
average power (over the segment) vs the duration of the segment/
In other words: How long you can hold that power.
</p>
<p>When you change the date range, the algorithm calculates new
parameters in a background process. You may have to reload the
page to get an updated prediction.</p>
<p>At the bottom of the page, you will find predictions derived from the model.</p>
</div>
<div id="form" class="grid_6 omega">

View File

@@ -131,6 +131,16 @@ queue = django_rq.get_queue('default')
queuelow = django_rq.get_queue('low')
queuehigh = django_rq.get_queue('low')
from redis import StrictRedis
from rq.exceptions import NoSuchJobError
from rq.registry import StartedJobRegistry
from rq import Queue
redis_connection = StrictRedis()
rq_registry = StartedJobRegistry(queue.name,connection=redis_connection)
from rq.job import Job
from rest_framework_swagger.views import get_swagger_view
from rest_framework.renderers import JSONRenderer
from rest_framework.parsers import JSONParser
@@ -155,10 +165,30 @@ from scipy.interpolate import griddata
USER_LANGUAGE = 'en-US'
from interactiveplots import *
from rowers.celery import result as celery_result
# Define the API documentation
schema_view = get_swagger_view(title='Rowsandall API')
def get_job_result(jobid):
if settings.DEBUG:
result = celery_result.AsyncResult(jobid).result
else:
running_job_ids = rq_registry.get_job_ids()
if len(running_job_ids) and jobid in running_job_ids:
# job is running
return None
else:
# job is ready
try:
job = Job.fetch(jobid,connection=redis_connection)
result = job.result
except NoSuchJobError:
return None
return result
# Test if row data include candidates
def rowhascoordinates(row):
# create interactive plot
@@ -3076,6 +3106,32 @@ def otwrankings_view(request,theuser=0,
delta,cpvalue,avgpower = dataprep.fetchcp(r,theworkouts)
runningjob = 0
try:
jobid = request.session['job_id']
if jobid:
result = get_job_result(jobid)
if result:
messages.info(request,'Your calculation is ready')
runningjob = 1
request.session['job_id'] = 0
else:
runningjob = 1
messages.info(request,'Your job is still running')
except KeyError:
pass
if not runningjob:
job = dataprep.runcpupdate(
r,type='water',
startdate=startdate,
enddate=enddate
)
request.session['job_id'] = job.id
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({
'Delta':delta,
@@ -3181,6 +3237,7 @@ def otwrankings_view(request,theuser=0,
'startdate':startdate,
'enddate':enddate,
'teams':get_my_teams(request.user),
'workouttype':'water',
})
# Show ranking distances including predicted paces
@user_passes_test(ispromember,login_url="/",redirect_field_name=None)
@@ -3298,7 +3355,33 @@ def oterankings_view(request,theuser=0,
delta,cpvalue,avgpower = dataprep.fetchcp(
r,theworkouts,table='cpergdata'
)
runningjob = 0
try:
jobid = request.session['job_id']
if jobid:
result = get_job_result(jobid)
if result:
messages.info(request,'Your calculation is ready')
runningjob = 1
request.session['job_id'] = 0
else:
runningjob = 1
messages.info(request,'Your job is still running')
except KeyError:
pass
if not runningjob:
job = dataprep.runcpupdate(
r,type='rower',
startdate=startdate,
enddate=enddate
)
request.session['job_id'] = job.id
messages.info(request,'New calculation queued. Refresh page or resubmit the date form to get the result')
powerdf = pd.DataFrame({
'Delta':delta,
'CP':cpvalue,
@@ -3320,11 +3403,11 @@ def oterankings_view(request,theuser=0,
div = res[1]
p1 = res[2]
ratio = res[3]
r.p0 = p1[0]
r.p1 = p1[1]
r.p2 = p1[2]
r.p3 = p1[3]
r.cpratio = ratio
r.ep0 = p1[0]
r.ep1 = p1[1]
r.ep2 = p1[2]
r.ep3 = p1[3]
r.ecpratio = ratio
r.save()
paulslope = 1
paulintercept = 1
@@ -3403,6 +3486,7 @@ def oterankings_view(request,theuser=0,
'startdate':startdate,
'enddate':enddate,
'teams':get_my_teams(request.user),
'workouttype':'rower',
})
# Reload the workout and calculate the summary from the stroke data (lapIDx)
@@ -5427,7 +5511,7 @@ def workout_otwsetpower_view(request,id=0,message="",successmessage=""):
ratio=r.cpratio)
else:
res = queuelow.enqueue(handle_otwsetpower,f1,boattype,
weightvalue,
weightvalue,
first_name,last_name,emailaddress,id,
ps=[r.p0,r.p1,r.p2,r.p3],
ratio=r.cpratio)