Все привет. Собираюсь использовать в своем проекте celery, для выполнения некоторых задач. Хочется работы с объектами AsyncResult как с futures. Вот так я работаю с future
def future_wait(f, on_succ=None,
on_failed=None, on_completed=None):
def done_callback(f):
if f.exception():
if on_failed:
on_failed(f.exception())
else:
if on_succ:
on_succ(f.result())
if on_completed:
on_completed(f)
f.add_done_callback(done_callback)
return f
def done_callback(res):
print res
def failed_callback(res):
print res
f = do_some_task(...) # f - объект Future
future_wait(f, on_succ=done_callback, on_failed=failed_callback)
class ResultKeeper(object):
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(ResultKeeper, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.lock = Lock()
self.results = list()
self.thread = Thread(target=self.loop)
self.thread.daemon = True
self.thread.start()
def __call__(self, async_res, on_success=None, on_failue=None):
with self.lock:
async_res._callback = {states.SUCCESS: on_success,
states.FAILURE: on_failue}
self.results.append(async_res)
@property
def is_alive(self):
return True
def process_result(self, async_res):
callback = async_res._callback.get(async_res.state)
self.results.remove(async_res)
if callable(callback):
callback(async_res.result)
def loop(self):
while self.is_alive:
with self.lock:
for async_res in iter(self.results):
if async_res.ready():
self.process_result(async_res)
sleep(0.2)
def _remove(self, async_res):
with self.lock:
try:
self.results.remove(async_res)
except:
pass
rk = ResultKeeper()
res=convert_audio.delay(src='dfbxdfb', dst='serg')
rk(res,
on_success=done_callback,
on_failue=failed_callback)
В loop мне приходится вызывать для каждого результата read(), и потом делать sleep на некоторое время... Мне этот sleep не нравится. Собственно сам вопрос, как избавиться от этого sleep?