Есть такая задача: есть большая база данных, оттуда нужно выгребать записи, параллельно обрабатывать и класть обратно. Записей много, поэтому выгребать нужно постепенно. Параллельных процессов тоже много, поэтому не хочется на каждый заводить отдельное подключение к БД.
Итого, нужно в основном процессе выгребать данные, отправлять worker'ам, получать от worker'ов результаты и складывать в базу. Как это лаконичнее всего организовать на питоне?
Первая идея была использовать готовый Pool. Написал тестовую прогу:
import multiprocessing
import sys
import time
def Process(arg):
print("processing {}".format(arg), file=sys.stderr)
time.sleep(1)
return 10000 + arg
def Generate():
for val in range(1000):
print("generating {}".format(val), file=sys.stderr)
yield val
if __name__ == '__main__':
pool = multiprocessing.Pool(10)
for res in pool.imap(Process, Generate(), 1):
print("finalizing {}".format(res), file=sys.stderr)
pool.close()
pool.join()
На практике, понятно, в Generate() будет чтение из базы, а в цикле по imap - запись.
Вроде всё зашибись, кроме того что все 1000 значений генерируются в начале программы. На практике это будет означать что программа сразу выгребет из базы кучу данных и выжрет всю память. Причём если в Generate поставить sleep то всё хорошо, обработчики будут ждать данных, но на практике они работают медленнее базы. Обернуть в какую-нибудь lazy штуку не получится, потому что тогда она будет выполняться в worker процессе где не будет подключения к бд. Что можно сделать чтобы imap просил следующую порцию данных только когда у него есть свободный процесс или место в (ограниченной) очереди для её обработки?
Или придётся всё писать руками? Руками создавать N процессов, две очереди (для заданий worker'ам и для ответов от них)? Тогда я не могу придумать как красиво обрабатывать эти две очереди, поскольку нужен аналог select'а, иначе получим дедлоки. Вырисовывается такой уродец:
tasks_in_fly = 0
while True:
task = get_next_from_db();
if not task:
# задания закончились
break
try:
# нельзя просто put, иначе получим deadlock если обе очереди заполнены
task_queue.put_nowait(task)
tasks_in_fly += 1
except queue.Full:
# если места в очереди нет, нужно разобрать ответы
try:
while True:
# выгребать ответы пока очередь не опустеет и не кинется исключение
result = result_queue.get_nowait()
tasks_in_fly -= 1
put_to_db(result)
except queue.Empty:
pass
# и ещё нужно дождаться последних ответов и оработать их
while tasks_in_fly:
result = result_queue.get()
tasks_in_fly -= 1
put_to_db(result)
Если только так, может есть идеи как этот код улучшить?