from __future__ import absolute_import import numpy as np import time from redis import StrictRedis,Redis from celery import result as celery_result import json redis_connection = StrictRedis() import redis import threading def getvalue(data): perc = 0 total = 1 done = 0 id = 0 session_key = 'noot' for i in data.iteritems(): if i[0] == 'total': total = float(i[1]) if i[0] == 'done': done = float(i[1]) if i[0] == 'id': id = i[1] if i[0] == 'session_key': session_key = i[1] return total,done,id,session_key class Listener(threading.Thread): def __init__(self, r, channels): threading.Thread.__init__(self) self.redis = r self.pubsub = self.redis.pubsub() self.pubsub.subscribe(channels) def work(self, item): try: data = json.loads(item['data']) total,done,id,session_key = getvalue(data) perc = 100.*done/total print perc, '%' print session_key, id except TypeError: print "invalid data" def run(self): for item in self.pubsub.listen(): if item['data'] == "KILL": self.pubsub.unsubscribe() print self, "unsubscribed and finished" break else: self.work(item) def longtask(aantal,jobid=None,debug=False, session_key=None): counter = 0 # if jobid: # if debug: # job = celery_result.AsyncResult(jobid) # else: # job = Job.fetch(jobid,connection=redis_connection) channel = 'tasks' for i in range(aantal): time.sleep(1) counter += 1 if counter > 10: counter = 0 if debug: progress = 100.*i/aantal print progress if jobid != None: redis_connection.publish(channel,json.dumps( { 'done':i, 'total':aantal, 'id':jobid, 'session_key':session_key, } )) return 1