LINUX.ORG.RU

[python] multiprocessing, обмен данными между процессами

 


0

2

Всем привет.

Суть такая: есть парсер страниц, которые иногда возвращает те ссылки, по которым уже прошлись. Чтобы избежать дублирования я добавил некий tasks_done, в который скидываю те ссылки, по которым уже прошелся. Соответственно, в очередь падает задание, только если его нет в tasks_done. Чтобы избежать всяких косяков в multuprocessing, я, как я понимаю (а может и неправильно понимаю), нужно юзать менеджер. Но он что-то не работает.

__author__ = 'user'
import multiprocessing
from time import sleep

class A:
    def __init__(self, num_proc=4):
        self.m = multiprocessing.Manager()
        self.tasks_done = self.m.list()

        self.q = multiprocessing.Queue()

        self.num_proc = num_proc


    def worker(self):
        while True:
            task = self.q.get()
            print task
            self.tasks_done.append(task)

            for i in range(task-5, task+5):
                if i not in self.tasks_done:
                    self.q.put(i)


    def init_processes(self):
        for i in range(self.num_proc):
            p = multiprocessing.Process(target=self.worker)
            p.daemon = True
            p.start()
        p.join()


    def add_task(self, i):
        self.q.put(i)


#if __name__ == '__main__':
a = A()
a.add_task(100)
a.init_processes()

Условие «if i not in self.tasks_done:» в цикле выполняется настолько редко, что почти вся очередь состоит из дубликатов. Без lock.acquire/release это сделать как-нибудь можно? И в чем в таком случае назначение этого менеджера?

Ответ на: комментарий от iSlava

Вдобавок что-то lock() ведет себя далеко не так, как я ожидал: данные в tasks_done все равно не успевают обновляться и процессы используют не тот (старый) список.

Напрашивается вывод, что у меня где-то косяк с поточной идеологией :|

gevent - парсю мануал пока.

P.S. Если все даташаре-модели в моем случае не работают, то как они работают в других местах?..

division_hell
() автор топика
import random
from multiprocessing import Pool, Manager

def worker(task, already_done):
    if task in already_done:
        print 'task %d is already done' % task
    else:
        already_done[task] = True
        print 'do %d' % task


if __name__ == '__main__':
    manager = Manager()
    already_done = manager.dict()

    pool = Pool(processes=4)

    for _ in xrange(100):
        task = random.randint(1, 10)
        pool.apply_async(worker, (task, already_done))

    pool.close()
    pool.join()

// Для твоей задачи прекрасно подойдут потоки.

baverman ★★★
()
Ответ на: комментарий от baverman

Проблема заключается в том, что очередь пополняется данными, полученными в worker'e. Поэтому такой вариант мне не очень подходит.

С потоками было бы проще, но парсить нужно примерно несколько гиг, а дальше эта цифра будет только расти. Т.е. с потоками проц загружается процентов на 30, мультипроцессинг на ~80.

Короче, пока я обошелся через map(), но блин хотелось бы начальную задачу решить в общем виде, а то за спиной косяки оставлять как-то некайфово)

division_hell
() автор топика

Я не понял двух вещей - зачем тебе класс A и почему наличие объекта в tasks_done проверяет именно рабочий процесс.

tailgunner ★★★★★
()
Ответ на: комментарий от division_hell

Поэтому такой вариант мне не очень подходит.

Какой такой? Я только показал как использовать шаред словарь.

baverman ★★★
()
Ответ на: комментарий от baverman

Я только показал как использовать шаред словарь.

Тогда ясно.

Все же меня терзают смутные сомнения :). В описанном мной выше случае получается так, что данные в словарь я пишу, но читать процессы из него могут до того, как данные записались. Почему? В итоге процесс получает устаревшие данные и добавляет в очередь уже выполненное задание. Я думал, что Manager решает проблему очередности чтения/записи, а оказалось, что нет.

Т.е. как сделать обращение к данным последовательным? Я пробовал заюзать семафор, но что-то не але. А аналогичный код на сях вроде бы работал:

sem_t semlock;

some_data_type get_rc(){
   while (sem_wait(&semlock) == -1)
      if (errno != EINTR)
         return 0;
      return rc;
}

int rel_rc(){
    sem_post(&semlock);
}

p.s. задача-то решена, но мне любопытно как код выше привести к работоспособному состоянию.

division_hell
() автор топика
Ответ на: комментарий от BattleCoder

Лол. Если не лучше, тогда зачем рекомендовал?

baverman ★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.