В результате ее работы в массиве array оказывается пакет размером BUF_SIZE. Есть ли какое-то стандартное решение при работе с этой функцией, чтобы постоянно принимать пакеты, да еще не все подряд а, отфильтрованные по значению в одном из элементов массива. Я пытался вот такой подход использовать:
do
result = recvfrom(iSocket, array, BUF_SIZE, 0, (struct sockaddr *)&sipx, (socklen_t*)&len_addr) ;
while ( array[8] == 42 ) ;// принимать пакет пока в array не окажется то, что нужно
Устраивало, до тех пор, пока не потребовалось еще одному процессу читать через этот сокет.
Не вариант. Если ты хочешь из одного сокета читать толпой процессов, то делай демон, который будет куда-нибудь сохранять данные и мониторить подключения, т.е. каждому подключенному процессу будет его порцию данных отдавать, запоминая указатели начала/конца.
А вообще, очень странно: сколько ты ни подключишь процессов к сокету, никаких проблем не должно быть, т.к. на каждое подключение сервер родит новый поток или процесс. В итоге получится "автоматически" каждому клиенту давать нужную ему информацию. Так все и делают.
Если же у тебя сервер не делает pthread_create или fork на каждое подключение, то сервер твой — говно. И его нужно переписывать правильно!
Вот с этого и начинай.
У меня не очень-то сервер. Ко мне не подключается никто, я не устанавливаю соединений ни с кем и сокет у меня датаграммный. Я должен вынимать из сокета нужный пакет, сохранять и что-то с ним делать. А еще нужно сообщать процессам, что пакеты приходить перестали. Eddy_Em, вроде бы первый твой комментарий, то что мне нужно.
Ну, тогда определяйся с количеством клиентов и максимальным размером блока данных. В зависимости от этого и реализуй демона. Если клиентам действительно нужно лишь знать, что перестали пакеты приходить, можно тупо в shm мьютекс завести (а то и вообще обыкновенный флаг какой-нибудь). Если им нужно толпу данных передавать, то лучше, наверное, чтобы твой демон сам открывал сокет и на каждое подключение создавал поток, в котором начинал работать с буферами (соответственно, нужно будет завести пару мьютексов или семафоров на модификацию буферов: что область кольцевого буфера уже всеми прочитана и ее можно удалять, а также что клиент все прочитал и ждет новых данных).
Отлично, осталось навикипедить что такое мьютексы и как с семафорами работать). И не понятно, как процесс-демон должен данные отдавать. Клиент только один.
Устраивало, до тех пор, пока не потребовалось еще одному процессу читать через этот сокет.
???
осталось навикипедить что такое мьютексы и как с семафорами работать)
Читай книжку Стивенса про юниксовые IPC.
[offtop]
Я как раз намедни закончил в сортире ее чтение (в 100500-й раз) и перешел на "обработка 2D/3D изображений". Жаль, что Гонсалез&Вудз такие дорогие, а книжки по opencv вообще жесть какие дорогие! Хоть opencv и говно тормозное, но говорят, что в документации очень много полезной информации по использующимся алгоритмам, что собственно мне и нужно. А там, глядишь, может и использую саму opencv для простых вещей (если там не нужно будет сутками информацию обрабатывать)
[/offtop]
Я вообще полагал, что обойдусь одним процессом. Который:
Извлекает пакет из сети
Фильтрует нужные пакеты
Извлекает данные
Добавляет свои данные
Сохраняет всё полученное в файл
И опять в начало
И в принципе всё работало. Но до тех пор, пока в сети что-то есть. Если в сети пакетов не оказывалось, процесс ждал. А нужно так, что если пакеты перестали приходить, файл все равно писать. Вот я и пришел к fork().
$ man 7 socket
............
SO_RCVTIMEO and SO_SNDTIMEO
Specify the receiving or sending timeouts until reporting an
error. The argument is a struct timeval. If an input or output
function blocks for this period of time, and data has been sent
or received, the return value of that function will be the
amount of data transferred; if no data has been transferred and
the timeout has been reached then -1 is returned with errno set
to EAGAIN or EWOULDBLOCK just as if the socket was specified to
be nonblocking. If the timeout is set to zero (the default)
then the operation will never timeout. Timeouts only have
effect for system calls that perform socket I/O (e.g., read(2),
recvmsg(2), send(2), sendmsg(2)); timeouts have no effect for
select(2), poll(2), epoll_wait(2), and so on.
Коллектор нужно писать. Про размеры уже сказали. Читаешь, пишешь в коллектор, запускаешь анализ, получаешь нужный пакет, оставляя неиспользованный хвост данных и состояние коллектора, либо просто ждешь следующей порции.
Есть разные подходы для написания приложений в которых присутствует некоторая эээ.... интерактивность. Я говорю о приложениях, которые должны уметь «одновременно» взаимодействовать с разными вещами, с сетью, с пользователем, реагировать на истечение таймаутов и т.д.
Многопоточное приложение, написанное «в лоб» ИМХО самый неудачный вариант.
Я написал немало приложений с сетевой «интерактивностью» и мне больше всего нравится однопотоковые приложения, с архитектурой, основанной на вызове epoll(). Недостаток такого подхода заключается в том, что напрямую с epoll() работать неудобно. Нужна некая библиотека-надстройка над epoll(), которая предоставит более удобный программный интерфейс. Для себя я написал такую библиотеку сам, но можно взять готовую.
Посмотри в сторону Boost::Asio или libevent. Правда я ими особо не пользовался (уже привык к своему решению). Другие могут рассказать больше об этих или подобных библиотеках.
epoll слишком сложен и в большинстве случаев не нужен. select'а за глаза.
Что c epoll() напрямую работать, что с select(), одна чёрта. Все равно получится громоздкий код.
Это глупо напрямую работать с системным вызовом-мультиплексором, не важно select()/poll()/epoll() или каким-то другим.
Естественно, должен быть слой абстракции, который скроет все детали реализации. Этот слой абстракции пишут один раз и сложность его написания сильно не меняется от того какую функцию мы положим в основу. Но при этом epoll будет иметь меньше ограничений и работать быстрее.
Кхм, надо бы что-то придумать, а то дискуссия зашла в тупик.
Давай так, будем исходить из принципа - «лучше один раз увидеть, чем сто раз услышать»
Каждый выдаст свое решение одной и той же задачи. Я покажу как это некрасиво выглядит с использованием с использованием библиотеки-обертки, а ты покажешь свой божественно прекрасный код с прямым использованием вызова select().
Задача: Необходимо написать TCP сервер, который принимает соединения. Клиенты посылают серверу запрос, на который сервер высылает ответ. Если структура запроса нарушена, сервер закрывает соединение. Если от клиента не было никакой активности более 10 секунд, сервер закрывает соединение с ним.
Структура запроса:
uint8_t sig; //сигнатура заголовка, всегда 0xAF
uint8_t func; //номер функции, всегда 1
uint8_t frame_size; //размер данных в байтах
char data[frame_size]; //frame_size байт с данными
Структура ответа:
uint8_t sig; //сигнатура заголовка, всегда 0xAF
uint8_t func; //номер функции, всегда 2
uint8_t frame_size; //размер данных в байтах, всегда 1
char data[frame_size]; //1 байт, содержащий все себе xor всех байт запроса
Я приведу то, что мне понадобилось бы написать, для реализации такого сервера имея сделанные ранее наработки по оберткам. Возможно, свое решение приведу только завтра.
Понял тебя. Если речь идет о сотнях соединений, тогда согласен, у select() есть ограничения. А вот аналог epoll() должен потянуть больше 10 тыс. соединений. Естественно, мы рассматриваем случай, когда большая часть соединений спит и в один момент только несколько проявляют активность. А то, если очень захотеть, то и одно соединение может прогрузить любой сервер.
Как ты себе представляешь работу с одним сокетом и толпой клиентов без потоков/процессов?
Единственный вариант: заставлять клиентов ждать, пока обслуживаешь очередного. Если у тебя время обслуги — минут 10, а клиентов тысяча, то это будет жесть!
Если у тебя время обслуги — минут 10, а клиентов тысяча, то это будет жесть!
В таких случаях обычно «обслуга» отправляется в отдельный пул потоков на обработку, а клиенты потом вовремя получают статус выполнения (опять же, можно асинхронно)
Нет, я не могу понять, как можно более чем с одним клиентом работать на одном потоке!
Вот, допустим, я открываю сокет. Его одновременно открывают три клиента (вообще, это один клиент, но открываем с разными ключами: один канал изображения гонит, другой принимает и обрабатывает управляющие команды, третий передает клиенту сообщения об ошибках).
Ну и как я в этот сокет что-нибудь напишу, чтобы оно именно нужному клиенту пришло? Хранить пул дескрипторов? А теперь представь, что у тебя одновременно толпа народу может к этому сокету обратиться. В итоге у тебя получится бешеный пул дескрипторов и ты в реальном времени вынужден будешь выдирать из пула те, в которые видео писать, писать туда видео; выдирать нужные командные и обрабатывать полученное оттуда...
В общем, геморройное же дело! Проще по потоку завести, и пусть они себе висят и обрабатывают все.
А в многонитевом приложении ты как клиенты различаешь? Пойми ты, многонитевое приложение и конечный автомат для обработки нескольких соединений это одно и то же, просто когда есть нити, тебе этого автомата не видно, его ядро и/или библиотека поддержки нитей скрывают.
#include <stdio.h>
#include <vector>
#include <signal.h>
#include <zCommon/zException.hpp>
#include <zCommon/zEnviroment.hpp>
#include <zCommon/zUtils.hpp>
#include <zIO/zMainLoop.hpp>
#include <zIO/zIOStreamNormal.hpp>
#include <zIO/_zSocketServer.hpp>
#include <zIO/zUnixSocketHelper.hpp>
#include "MyServer.hpp"
using namespace zCommon;
using namespace zIO;
using namespace std;
int main(int argc,char** argv)
{
zEnviroment::Init(); //Инициализация некоторых сущностей, ничего интересного
zMainLoop loop; //Основной цикл обработки сообщений
MyServer* server = MyServer::Create(1234); //Создаем обработчик сервера
loop.Handler_Add(server); //Добавляем обработчик в цикл обработки сообщений
loop.Run(); //Запускаем цикл обработки сообщений, в недрах этого метода будет вызываться epoll()
return 0;
}
#pragma once
#include <zIO/_zTCPSocketServer.hpp>
/*
* Класс описывет сервер
*/
class MyServer : public zIO::_zTCPSocketServer
{
public:
void server_Open(); //Метод вызывается при создании сервера
void server_Close(); //Метод вызывается при закрытии сервера
/*
* Этот метод вызывается каждый раз, когда кто-то устанавливает соединение.
* Метод должен вернуть обработчик отдельного соединения
*/
zIO::_zTCPSocketServer::local_Session* server_CreateSession(struct sockaddr_in& addr);
void server_Think();
static MyServer* Create(int port);
};
#pragma once
#include <zIO/_zTCPSocketServer.hpp>
#include <zCommon/zTime.hpp>
/*
* Класс отдельного соединения
*/
class MyServerSession : public zIO::_zTCPSocketServerSess
{
private:
zCommon::zTime t0; //Метка времени последней активности
public:
virtual void stream_BeginStream(); //Вызывается, когда соединение установлено и готово для обмена
virtual void stream_EndStream(); //Вызывается, когда соединение разорвалось
virtual void stream_Recv(); //Вызывается каждый раз, когда приходят какие-нибудь данные
virtual void stream_Think(); //Вызывается каждые 100 милисекунд, нужно для таймаутов
virtual void stream_IOError(int error_source,int error_num); //Вызывается при сбоях
private:
void SendFrame(char r); //В этом методе мы послаем ответный фрейм с XOR-кодом
};
#include "MyServerSession.hpp"
#include <zCommon/zCursor.hpp>
#include <iostream>
using namespace std;
using namespace zCommon;
using namespace zIO;
void MyServerSession::stream_BeginStream()
{
t0.SetNow();
cout << "session: begin stream" << endl;
}
void MyServerSession::stream_EndStream()
{
cout << "session: end stream" << endl;
}
void MyServerSession::stream_Recv()
{
while(true) //Зачем нужен цикл? Теоретически, мы можем получить два фрейма за одно пробуждение
{
/*
* inq - это циклический буффер, содержащий принятые байты (да, ещё одна обертка)
* Если данных в буфере меньше трех байт, то выходим и ждем получения
* остальных байт
*/
if(inq.GetDataSize()<3)
break;
uint8_t sig=0;
uint8_t func=0;
uint8_t frame_size=0;
char data[256];
/*
* Данные можно доставать прямиком из циклического буфера inq,
* но мы будем использовать курсор буфера.
* Курсор полезен, когда для того чтобы понять, пришел ли весь фрейм
* надо достать часть данных из буфера. Если фрейм полностью пока ещё
* не пришел, то операция отменяется и из циклического буфера как бы и ничего
* и не доставали.
* В данном случае нас интересует байт frame_size с размером. Без него мы не
* узнаем каких размеров весь фрейм
*/
zCursor c(&inq);
c.Pop_UI8(sig);
c.Pop_UI8(func);
c.Pop_UI8(frame_size);
/*
* Проверяем на ошибки
*/
if(sig!=0xAF)
{
cout << "Invalid frame " << endl;
this->CloseForced();
return;
}
if(func!=1)
{
cout << "Unknown function " << (int)func << endl;
this->CloseForced();
return;
}
if(frame_size==0)
{
cout << "Invalid frame " << endl;
this->CloseForced();
return;
}
c.Pop_CharArray(data,frame_size); //Достаем последние данные
c.ApplyChanges(); //Достали весь кадр, теперь фиксируем изменения в циклическом буфере
char r = data[0];
for(int i=1;i<frame_size;i++)
r^=data[i]; //Считаем XOR
SendFrame(r); //Отсылаем фрейм назад
t0.SetNow(); //устанавливаем метку времени последней активности
}
}
void MyServerSession::stream_Think()
{
zTime now;
now.SetNow();
if(now.isElapsed(t0,10000))
{
this->CloseForced(); //Десять секунд прошло, никакой активности не было, закрываем
return;
}
}
void MyServerSession::stream_IOError(int error_source,int error_num)
{
cout << "session: io error " << error_num << endl;
}
void MyServerSession::SendFrame(char r)
{
uint8_t sig=0xAF;
uint8_t func=2;
uint8_t frame_size=1;
Write_Begin();
outq.Push_UI8(sig);
outq.Push_UI8(func);
outq.Push_UI8(frame_size);
outq.Push_CharArray(&r,1);
Write_End();
}
Кто договаривался? Я ни слова про Си не сказал. Пиши хоть на брейнфаке, главное чтобы было компактнее, понятнее, красивее и при этом была напрямую работа с системным вызовом select()/poll()/epoll() без всяких архитектурных абстракций, прослоек, оберток.