LINUX.ORG.RU

Ищу thread-safe реализацию FIFO для C или Python


0

1

Требования:

1. Можно читать / писать произвольными кусками байт, причем оверхед по сравнению с memcpy константен;

2. Если не хватает оперативки, то автоматически расширяется;

3. Можно безопасно писать из одного потока, а читать из другого

Кто-нибудь может что-то подсказать?

★★★

std::list из кусков памяти (сырых или std::vector) + std::mutex. А, C или Python. Ну тогда enjoy your C или Python.

slovazap ★★★★★
()
Ответ на: комментарий от Minoru

http://unix.stackexchange.com/questions/11946/how-big-is-the-pipe-buffer

«the buffer for each pipe can be adjusted via a fcntl on the pipe (up to a maximum capacity which defaults to 1048576 bytes, but can be changed via /proc/sys/fs/pipe-max-size))».

Сделать из этого автоматическое" изменение - дело 30 минут.

И да, совет в силе.

tailgunner ★★★★★
()
Ответ на: комментарий от anonymous

ну типа чтоб в одном потоке не возникало гонки. это ж пузон.

anonymous
()

20 строчек свелосипедить не можешь?

MyTrooName ★★★★★
()

2. Если не хватает оперативки, то автоматически расширяется;

само идёт на сайт магазина и заказывает оперативку с доставкой?

dimon555 ★★★★★
()
Ответ на: комментарий от Eddy_Em

https://developer.gnome.org/glib/stable/glib-Asynchronous-Queues.html

https://docs.python.org/2/library/queue.html

Очереди используй

Правда?

1. Можно читать / писать произвольными кусками байт

Minoru ★★★
() автор топика
Последнее исправление: Minoru (всего исправлений: 1)
Ответ на: комментарий от Eddy_Em

Ага, по ходу придется воспользоваться одним из следующих вариантов:

1. Городить надстройку над UNIX pipes (по совету tailgunner)

2. Городить надстройку над queue

3. Городить надстройку над cStringIO

Всем спасибо.

Minoru ★★★
() автор топика
Ответ на: комментарий от Minoru

4. Подумать еще раз и переделать архитектуру.

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от bj

А вот и костыль:

import sys

from cStringIO import StringIO
from threading import Lock, Thread


class FIFOBuffer(object):
    def __init__(self, compact_threshold=2**20):
        self.buf = StringIO()
        self.lock = Lock()
        self.cthreshold = compact_threshold
        self.read_fp = 0

    def write(self, data):
        with self.lock:
            self.buf.seek(0, 2)
            self.buf.write(data)

    def read(self, size=-1):
        with self.lock:
            self.buf.seek(self.read_fp)
            data = self.buf.read(size)
            self.read_fp = self.buf.tell()
            if self.read_fp > self.cthreshold:
                buf = StringIO()
                buf.write(self.buf.read())
                self.buf.close()
                self.buf = buf
                self.read_fp = 0

            return data


do_read = True
buf = FIFOBuffer()
bufsize = int(sys.argv[1])


def reader():
    while do_read:
        sys.stdout.write(buf.read())


def writer():
    while True:
        data = sys.stdin.read(bufsize)
        if not data:
            break

        buf.write(data)



wt = Thread(target=writer)
rt = Thread(target=reader)
rt.daemon = True

wt.start()
rt.start()

wt.join()
do_read = False
/tmp$ pv data | python2 boo.py 1024 > data.out
 847MiB 0:00:06 [ 136MiB/s] [============>] 100%            
/tmp$ pv data | python2 boo.py 4096 > data.out
 847MiB 0:00:03 [ 263MiB/s] [============>] 100%            
/tmp$ pv data | python2 boo.py 16384 > data.out
 847MiB 0:00:00 [1.01GiB/s] [============>] 100%
bj
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.