LINUX.ORG.RU

[Blocking queue] Не много ли потоков?


0

1

Пишу серверное приложение. Но могу уточнить, клиентское серверное приложение) Тоесть сервер будет запущен на обычных десктопах, к нему не будет обычно более 50 подключений.

Хорошая ли идея использовать следующее решение. На каждое соединение 3 потока и 2 блокирующие очереди. Приходят сообщения, их надо обработать и ответить. Каждое сообщение проходит приблизительно так.

сеть -> MessageReceiverThread -> MessagesQueue -> MessageHandlerThread -> MessageResponsesQueue -> MessageSenderThread -> сеть

Каждый тред ждет на блокирующей очереди или сети, потом парсит/сереализирует/обрабатывает и передает дальше

У этой схемы есть плюс - хороший контроль над перегрузками в любой точке обработки сообщения. При перегрузке на любом этапе сообщение можно выбросить. Система такая что это не страшно, и поддерживается алгоритмом, который это сообщение отправлял.

Смущает банально количество тредов. Пока система не подводила, но и нагрузок особых не было. С другой стороны два из них просто занимаются IO.

Похожий проект был на .NET на Asynchronous IO. Все бы хорошо, только Thread Pool там был прибитый гвоздями внутри .NET и контроля за ним особо не было. И когда много обработчиков какого-то типа чего-то ждали, то не хватало места под, например, отправку сообщений. И, опа, новые треды тред пул не давал. Сейчас разбираюсь как это в Java. Надеюсь есть смысл.

Сколько тредов - много?

★★★★★

>На каждое соединение 3 потока

много. даже очень.
ThreadPool можно сделать свой, с нардами и юдзёями

k0l0b0k ★★
()
Ответ на: комментарий от k0l0b0k

Только вопрос, стоит ли считать полноценным потоком, занмающим значительное процессорное время получателей и отправителей, они же на IO висят.

vertexua ★★★★★
() автор топика

Весь ввод-вывод можно делать в одном потоке. Или в нескольких потоках, число которых постоянно, но для <50 клиентов это не имеет смысла. В итоге будет на каждого клиента один поток и две очереди плюс ещё один поток на всех.

const86 ★★★★★
()
Ответ на: комментарий от vertexua
for(;;) {
  // Waiting for events
  selector.select();
  // Get keys
  Set keys = selector.selectedKeys();
  Iterator i = keys.iterator();

  // For each keys...
  while(i.hasNext()) {
    SelectionKey key = (SelectionKey) i.next();

    // Remove the current key
    i.remove();

    // if isAccetable = true
    // then a client required a connection
    if (key.isAcceptable()) {
      // get client socket channel
      SocketChannel client = server.accept();
      // Non Blocking I/O
      client.configureBlocking(false);
      // recording to the selector (reading)
      client.register(selector, SelectionKey.OP_READ);
      continue;
    }

    // if isReadable = true
    // then the server is ready to read 
    if (key.isReadable()) {

      SocketChannel client = (SocketChannel) key.channel();

      // Read byte coming from the client
      int BUFFER_SIZE = 32;
      ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
      try {
        client.read(buffer);
      }
      catch (Exception e) {
        // client is no longer active
        e.printStackTrace();
        continue;
      }

      // Show bytes on the console
      buffer.flip();
      Charset charset=Charset.forName("ISO-8859-1");
      CharsetDecoder decoder = charset.newDecoder();
      CharBuffer charBuffer = decoder.decode(buffer);
      System.out.print(charBuffer.toString());
      continue;
    }
  }
}

Вот семпл простого асинхронного сервера. Может так его и сделать, а обрабатывать сообщения через Executors

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

все равно поток, со своим стеком, контекстом и пр.
золотое правило - сегодня 50, а завтра 100, а послезавтра 500... (или чего там еще манагерам в голову взбредет)

с ThreadPool ты реально сможешь подкручивая один параметр получить уменьшение кол-ва потоков в случае жопы, или увеличение если хочется испробовать сабжевое предложение (поставил MaxThreads = 150-200 и будет то что ты хочешь, но с рычагом).

k0l0b0k ★★
()
Ответ на: комментарий от k0l0b0k

Я не понимаю, как можно было сделать такой ThreadPool, который где-то внутри и еще в определенный момент перестанет плодить треды, причем когда они будут очень нужны. Я бы например хотел бы 3 ThreadPool для того, чтобы отправка, получение и обработка регулировались отдельно

vertexua ★★★★★
() автор топика
Ответ на: комментарий от Reset

Действительно, классическая задача из разряда C10K problem при большом кол-ве подключений, неужели для джавы нигде не описано?

frey ★★
()

я таки не понимаю, при нагрузке в < 50 пользователей заниматься излишней оптимизацией в управлении соединениями? нахрена оно нужно? нет, ну если в перспективе планируется рост, то быть может, но если изначально известно, что их не будет больше, то возиться и усложнять логику ни к чему. доставит лишь геморрой в последующем сопровождении кода.

Deleted
()

А как у тебя происходит контроль за количеством сообщений в очередях? А то так под нагрузкой и в out of memory свалиться можно.

dizza ★★★★★
()
Ответ на: комментарий от dizza

Каждое сообщение знает свой размер. Потому размер очереди известен. Не влезает, выбрасываем.

vertexua ★★★★★
() автор топика

Пока система не подводила, но и нагрузок особых не было. С другой стороны два из них просто занимаются IO.

man JMeter

wfrr ★★☆
()
Ответ на: комментарий от const86

+1

Действительно, имеет смысл отделить обработку сообщения от его приема/отправки, но следует это правильно спроектировать.

Случай, когда есть общий «сервер сообщений», который разруливает прием и отправку сообщений для каждого потока-потребителя сообщений - более общий, чем приведенный ТС. Его наличие даст приложению бОльшую гибкость и может покажется неожиданным - простоту реализации и понимания схемы работы. Во всяком случае, такая конструкция гораздо понятнее, чем та, которую описал ТС.

Тред потребитель регистрируется на «сервере сообщений». Для него там выделяется «порт» - для отправки и отсылки, и заводится пара очередей. А дальше тред-потребитель просто тупо пишет в него или читает, не заботясь ни о чем.

А «сервер сообщений» реализуйте потом так, как захотите - например в несколько потоков.

anonymous
()
Ответ на: комментарий от vertexua

Я бы например хотел бы 3 ThreadPool для того, чтобы отправка, получение и обработка регулировались отдельно

Ээ? А нафейхоа? У тебя один поток диспетчера принимает запрос и посылает его в очередь пула, пул выделает поток для обработки запроса, в сем потоке запрос принимается, обрабатывается и выдается клиенту результат (все в одном потоке на один запрос). У тебя получается 1+N потокоы на N одновременных соединений.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

Мысль вообще правильная. Но ведь, например, Erlang для вашей задачи подходит больше - в нем мессадж пассинг конкурренси уже реализована.

anonymous
()
Ответ на: комментарий от wfrr

Сейчас делаю так:

1. На входе Async IO сервер, который принимает подключения (в своем потоке) и запросы на чтение.

2. После этого запросы на чтение передаются в ExecutorService, каждый тред которого накапливает в буффере данные до получения нового сообщения. Здесь есть нюанс, может ли Async IO выдать новый запрос на чтение во время того, как какой-то тред уже читает? Будут ли они накапливаться?

3. После этого сообщение передается в другой ExecutorService для обработки.

4. После обработки - в ExecutorService для отправки.

Осталось малое, выбрать одно из двух

1. Оставить как есть и ограничить количество тредов в Executors и эти регулировать нагрузку. Смущает безконтрольность накапливаемых сообщений. Может есть решение о котором я не знаю?

2. Решение о котором знаю: между ExecutorService влепить по BlockingQueue и посему буду знать общий размер очереди сообщений и отбрасывать после превышения лимита.

vertexua ★★★★★
() автор топика
Ответ на: комментарий от wfrr

Это чтоб пока следующее сообщение обрабатывается, предыдущее уже «отправилось» (правда в таких системах больше подходит фраза «типа отправилось»)

anonymous
()

Я парюсь из-за того что система может быстро качать, а обработчики будут все висеть на каком-нибуть зловредном локе иногда. Или наоборот быстро получать сообщения от одних клиентов, быстро обрабатывать, а получатели сообщения будут медленно закачивать. Или вовсе специально не качать чтобы угробить сервер

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Есть ощущение, что вы плохо представляете архитектуру того, что должно получиться в результате, и зачем все это. Извините, если не так.

anonymous
()
Ответ на: комментарий от anonymous

Дык, с описаной мной архитектурой так и будет, но не «типа» а на самом деле, ибо новое сообщение будет обрабатываться в другом потоке.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

После этого запросы на чтение передаются в ExecutorService, каждый тред которого накапливает в буффере данные до получения нового сообщения. Здесь есть нюанс, может ли Async IO выдать новый запрос на чтение во время того, как какой-то тред уже читает? Будут ли они накапливаться?

Можно, все если читать вы будете не в диспетчере а в потоке обработки запроса.

. После этого сообщение передается в другой ExecutorService для обработки.

4. После обработки - в ExecutorService для отправки.

Зачем? Что вам мешает читать и обрабатывать в одном потоке? Или хочется поиметь гимор с перекидыванием больших объемов данных между потоками? И как вы это сделает если например запрос имеет размер в 3гб и может обрабатываться исключительно поточно.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

Вот хороший совет - прочитайте первые 3 главы книги изд-ва Oreilly по языку Erlang, а также документацию по некоторым модулям системы Erlang/OTP.

Что в них хорошего, так это то, что там описаны т.н. типовые решения или паттерны проектирования многопоточных приложений, в т.ч. и такие, которое вы сейчас создаете. Т.е. там описана схема того, какие модули должны быть и чем должен каждый заниматься. Безотносительно самого языка.

По существу хочу сказать следующее: 1) должен быть отдельный модуль, разруливающий создание тредов-обработчиков на каждое подключение. сойдет и в один процесс. 2) должен быть отдельный модуль, обслуживающий передачу сообщений тредам и от них. может быть многотредовым.

anonymous
()
Ответ на: комментарий от wfrr

Все намного хуже. Клиент не обязательно сам получает запрос. Его запрос может вызвать ТАКУЮ рассылку что мама не горюй. Не ему, а другим.

vertexua ★★★★★
() автор топика
Ответ на: комментарий от wfrr

Можно, все если читать вы будете не в диспетчере а в потоке обработки запроса.

Я бы хотел регулировать количество потоков закачки и обработки. И то и другой по отдельности может занимать очень много времени. Вот диспетчер - это цикл. Он получает SelectionKey на обработку. Если отдать это куда-то еще, то цикл сделает итерацию и заново может вернуть тот же ключ. Он ведь мог не успеть быть прочитанным.

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Может вся фишка Async IO в том чтобы вращать этот цикл в многих тредах и в них же и обрабатывать?

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

И что вы таки полагает что «может» требует введение двух лишних потоков и гимора с рассылкой?

Во первый определитесь будет ли у вас соединение с клиентами постоянным или по необходимости. В обоих случаях свои тонкости но в любом случае предложенное вами решение даст оверхед.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

Я бы хотел регулировать количество потоков закачки и обработки.

Это самоцель?

И то и другой по отдельности может занимать очень много времени.

И что с этого? Посмотрите как сделаны вебсервера, а через них прогоняется херова туча данных и обработка запроса может занимать вагон времени.

Вот диспетчер - это цикл. Он получает SelectionKey на обработку. Если отдать это куда-то еще, то цикл сделает итерацию и заново может вернуть тот же ключ. Он ведь мог не успеть быть прочитанным.

Начиная с того что вы пишите raw коллекции, заканчивая тем что чтот оне понимаете в работе io ежели планируете читать гигабайты данных в диспетчере.

Может вся фишка Async IO в том чтобы вращать этот цикл в многих тредах и в них же и обрабатывать?

Это не фишка а феерически гиморой, с синхронизацией.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

тогда можно даже меньше чем по потоку на клиента. 150 - потоков на 50 клиентов это писец.

wfrr ★★☆
()
Ответ на: комментарий от vertexua

Поток в котором крутится приведенный вам код. У него основное условие - он не должен ничего делать что могло бы занять много времени, т.е. никаких там чтений или синхронизаций.

зы. почему у вас нет «keys.clear();» после считывания ключей?

wfrr ★★☆
()
Ответ на: комментарий от wfrr

Там есть i.remove()

Я нашел то, что искал. Мне как раз нужен был метод interestOps(). Он решает мою проблему, так как позволяет отключить отдачу ключей, пока происходит обработка. Я его как то пропустил.

vertexua ★★★★★
() автор топика
Ответ на: комментарий от wfrr

На следующем select. Этот select может 100500 раз сработать пока дойдет до моего обрабочика. Почитал статейку, там как раз вырубали, а после чтения в другом потоке врубали interestOps

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Может вся фишка Async IO в том чтобы вращать этот цикл в многих тредах и в них же и обрабатывать?

Ну вот в nginx/lighttpd, в которых паттерн обработки конкурентных входящих запросов реализуется посредством epoll/kevent (а.к.а. написание конечного автомата обработки), практикуется использование количества тредов равное количеству ядер - т.е. на четырёх ядрах работают четыре треда, которые обрабатывают все те 10k приходящих соединений. Есть ещё реализация этого паттерна посредством модели лёгкий-процесс-на-соединение (Erlang), или продолжений, или один-ОС-тред-на-соединение (Apache). Вы, вроде, пытаетесь совместить модель поток-на-соединение и polling, чего обычно не делается.

В случае обработки в стиле nginx возникает проблема с обработкой тяжёлых запросов, но тут уже сказали, что можно отправлять всю тяжёлую обработку отдельному (одному) потоку.

Т.е. если вам нужно обрабатывать 50 конкурентных соединений, то либо заводить 50+1 (максимум) потоков, либо делать всё в одном poll цикле (скорее всего вариант более запарный - если обработка запросов тяжёлая). И, кстати, 50 тредов - это немного. Тот же apache может на-создавать до 1000 потоков.

quasimoto ★★★★
()
Ответ на: комментарий от quasimoto

Вы, вроде, пытаетесь совместить модель поток-на-соединение и polling, чего обычно не делается.

Я пытаюсь совместить Thread Pool и polling. Причем разные. Тоесть например 5 закачивателей, 8 обработчиков, и 5 отправителей обрабатывают 1000 соединений.

Почему 1000? Я сказал 50, но это на клиенте. Клиенты соединяются между собой peer-2-peer. Но также ВСЕ они соеденены с сервером.

Мне не хочется отдельный компонент связи писать для клиента и сервера, потому пишу универсальный. Если бы я соорудил то, что написал в самом начале поста, то серверный компонент бы таки пришлось писать сначала. А вот такох хитрый компонент подойдет и для клиентов и для главного сервера

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Клиенты. Акцептор, он же диспетчер, работает в своём потоке и принимает соединение (от другого «клиентского» приложения), он не делает много работы (так как нужно сразу переключится в ожидание следующего входящего запроса), а создаёт один поток и вызывает в его контексте функцию в которую передаёт ссылку на буфер, содержащий запрос, и дескриптор по которому нужно отвечать. Дальше этот созданный поток может работать сколько угодно (при этом ацептор идёт дальше) - пока не обработает весь запрос и не запишет все данные в дескриптор того клиента. Если у каждого клиента будет около 50 конкурентных запросов в одно время - такая смеха вполне подходит (щадящая нагрузка).

Центральный сервер. Всё тоже самое ведь? Только запросов (от клиентских приложений) около 1000. Т.е. работает та же схема, но функция обработки в обрабатывающем потоке вызывается другая. И 1000 одновременных клиентов для такой схемы - тоже край. Нужно чтобы центральный сервер работал на хорошем железе. Либо полностью его переписывать на основе polling модели - только реализовать polling модель способную обрабатывать «тяжёлые» запросы это совсем нетривиальная задача.

Или я чего-то не понял?

quasimoto ★★★★
()

А MessageHandlerThread вычислительный или в нем много IO? Думаю, от этого надо исходить. Если вычислительный, то много вычисляющих потоков будет тяжелым испытанием для системы. Если же там много IO, то можно ли это IO как-то «асинхронизировать» по пулу потоков? Решения могут быть такими разными.

dave ★★★★★
()
Ответ на: комментарий от dave

Ни тот ни тот. Работа с диском в больших количествах. Чтобы максимально приближенно описать что я хочу, можете думать о битторрент клиентах и сервере

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Ну это конечно IO, но еще много хеширования, отдыха на мьютексах, потому даже непонятно что это

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

>> но для <50 клиентов это не имеет смысла

Для скольки клиентов имеет смысл?

Хороший вопрос. Зависит скорее от интенсивности передачи сообщений. Если один поток успевает в реальном времени принять и отправить, упираясь только в сеть, а не в проц, то одного и достаточно. Сомневаюсь, что у ТС трафик настолько интенсивный, он же его ещё и обработать хочет успеть и всё это на десктопе.

const86 ★★★★★
()
Ответ на: комментарий от dave

> Если же там много IO, то можно ли это IO как-то «асинхронизировать» по пулу потоков?

К сожалению, работу с ФС асинхронизировать трудно, хоть и можно, да и вряд ли оправданно. И я не припомню ни одной библиотеки, которая бы умела свои обращения к ФС делать асинхронно.

const86 ★★★★★
()
Ответ на: комментарий от const86

Java NIO, кажется файлы тоже умеет

vertexua ★★★★★
() автор топика
Ответ на: комментарий от vertexua

Тогда, пожалуй, 50 потоков может и выдержать :)

dave ★★★★★
()
Ответ на: комментарий от const86

F# asynchronous workflow (монада Async) умеет легко и изящно прятать асинхронные обращения к IO в красивую оболочку, которую легко написать, легко использовать. И в дотнете некоторые операции с IO могут быть асинхронными. Полагаю, что в Win32/64 тоже. Не знаю, есть ли что-то подобное в яве. Скорее всего, нет.

Вообще, если у ТС всего пять десятков потоков с интесивным IO и блокировками, то, наверное, особо мудрить с задачей не стоит :)

dave ★★★★★
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.