LINUX.ORG.RU

Celery, проверка результатов асинхронных задач

 , ,


0

1

Все привет. Собираюсь использовать в своем проекте celery, для выполнения некоторых задач. Хочется работы с объектами AsyncResult как с futures. Вот так я работаю с future

def future_wait(f, on_succ=None,
        on_failed=None, on_completed=None):
    def done_callback(f):
        if f.exception():
            if on_failed:
                on_failed(f.exception())
        else:
            if on_succ:
                on_succ(f.result())
        if on_completed:
            on_completed(f)

    f.add_done_callback(done_callback)
    return f

def done_callback(res):
    print res

def failed_callback(res):
    print res

f = do_some_task(...) # f - объект Future

future_wait(f, on_succ=done_callback, on_failed=failed_callback)
Если у future вызвать add_done_callback, то, как только результат таска будет получен, future сам вызовет мой callback. Таким образом, у меня все выполняется асинхронно. А вот в celery такой функции нету, и мне приходится писать некоторый костыль
class ResultKeeper(object):
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(ResultKeeper, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self):
        self.lock = Lock()
        self.results = list()
        self.thread = Thread(target=self.loop)
        self.thread.daemon = True
        self.thread.start()

    def __call__(self, async_res, on_success=None, on_failue=None):
        with self.lock:
            async_res._callback = {states.SUCCESS: on_success,
                                   states.FAILURE: on_failue}
            self.results.append(async_res)

    @property
    def is_alive(self):
        return True

    def process_result(self, async_res):
        callback = async_res._callback.get(async_res.state)
        self.results.remove(async_res)
        if callable(callback):
            callback(async_res.result)

    def loop(self):
        while self.is_alive:
            with self.lock:
                for async_res in iter(self.results):
                    if async_res.ready():
                        self.process_result(async_res)
            sleep(0.2)

    def _remove(self, async_res):
        with self.lock:
            try:
                self.results.remove(async_res)
            except:
                pass


rk = ResultKeeper()
res=convert_audio.delay(src='dfbxdfb', dst='serg')
rk(res,
   on_success=done_callback,
   on_failue=failed_callback)

В loop мне приходится вызывать для каждого результата read(), и потом делать sleep на некоторое время... Мне этот sleep не нравится. Собственно сам вопрос, как избавиться от этого sleep?



Последнее исправление: energyclab (всего исправлений: 1)
Ответ на: комментарий от loz

Проверил, данная штука работает на стороне воркера, а на стороне клиента, ни сигналы не этитятся, ни ивенты не прилетают... А мне нужно на стороне клиента(((

energyclab
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.