LINUX.ORG.RU

Типовое решение при работе с сокетами

 ,


1

2

Есть в коде функция:

result = recvfrom(iSocket, array, BUF_SIZE, 0, (struct sockaddr *)&sipx, (socklen_t*)&len_addr) ;		
В результате ее работы в массиве array оказывается пакет размером BUF_SIZE. Есть ли какое-то стандартное решение при работе с этой функцией, чтобы постоянно принимать пакеты, да еще не все подряд а, отфильтрованные по значению в одном из элементов массива. Я пытался вот такой подход использовать:
do
   result = recvfrom(iSocket, array, BUF_SIZE, 0, (struct sockaddr *)&sipx, (socklen_t*)&len_addr) ;
while ( array[8] == 42 ) ;// принимать пакет пока в array не окажется то, что нужно
Устраивало, до тех пор, пока не потребовалось еще одному процессу читать через этот сокет.



Последнее исправление: podovalov5 (всего исправлений: 1)

В результате ее работы в массиве array оказывается пакет размером BUF_SIZE

Не BUF_SIZE, а result.

while ( array[8] == 42 )

Может сломаться в силу того, что сказано выше

yoghurt ★★★★★
()
Последнее исправление: yoghurt (всего исправлений: 1)

Устраивало, до тех пор, пока не потребовалось еще одному процессу читать через этот сокет.

Твой юз-кейс не совсем понятен, поэтому сложно сказать, как синхронизировать твои потоки, которые читают из сокета

yoghurt ★★★★★
()

Не вариант. Если ты хочешь из одного сокета читать толпой процессов, то делай демон, который будет куда-нибудь сохранять данные и мониторить подключения, т.е. каждому подключенному процессу будет его порцию данных отдавать, запоминая указатели начала/конца.

Eddy_Em ☆☆☆☆☆
()

А вообще, очень странно: сколько ты ни подключишь процессов к сокету, никаких проблем не должно быть, т.к. на каждое подключение сервер родит новый поток или процесс. В итоге получится "автоматически" каждому клиенту давать нужную ему информацию. Так все и делают.

Если же у тебя сервер не делает pthread_create или fork на каждое подключение, то сервер твой — говно. И его нужно переписывать правильно! Вот с этого и начинай.

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

У меня не очень-то сервер. Ко мне не подключается никто, я не устанавливаю соединений ни с кем и сокет у меня датаграммный. Я должен вынимать из сокета нужный пакет, сохранять и что-то с ним делать. А еще нужно сообщать процессам, что пакеты приходить перестали. Eddy_Em, вроде бы первый твой комментарий, то что мне нужно.

podovalov5
() автор топика
Ответ на: комментарий от podovalov5

Ну, тогда определяйся с количеством клиентов и максимальным размером блока данных. В зависимости от этого и реализуй демона. Если клиентам действительно нужно лишь знать, что перестали пакеты приходить, можно тупо в shm мьютекс завести (а то и вообще обыкновенный флаг какой-нибудь). Если им нужно толпу данных передавать, то лучше, наверное, чтобы твой демон сам открывал сокет и на каждое подключение создавал поток, в котором начинал работать с буферами (соответственно, нужно будет завести пару мьютексов или семафоров на модификацию буферов: что область кольцевого буфера уже всеми прочитана и ее можно удалять, а также что клиент все прочитал и ждет новых данных).

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Отлично, осталось навикипедить что такое мьютексы и как с семафорами работать). И не понятно, как процесс-демон должен данные отдавать. Клиент только один.

podovalov5
() автор топика
Ответ на: комментарий от podovalov5

Клиент только один.

А чего ты тогда пишешь

Устраивало, до тех пор, пока не потребовалось еще одному процессу читать через этот сокет.

???

осталось навикипедить что такое мьютексы и как с семафорами работать)

Читай книжку Стивенса про юниксовые IPC.

[offtop]
Я как раз намедни закончил в сортире ее чтение (в 100500-й раз) и перешел на "обработка 2D/3D изображений". Жаль, что Гонсалез&Вудз такие дорогие, а книжки по opencv вообще жесть какие дорогие! Хоть opencv и говно тормозное, но говорят, что в документации очень много полезной информации по использующимся алгоритмам, что собственно мне и нужно. А там, глядишь, может и использую саму opencv для простых вещей (если там не нужно будет сутками информацию обрабатывать)
[/offtop]

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

А чего ты тогда пишешь

Я вообще полагал, что обойдусь одним процессом. Который:

  • Извлекает пакет из сети
  • Фильтрует нужные пакеты
  • Извлекает данные
  • Добавляет свои данные
  • Сохраняет всё полученное в файл
  • И опять в начало

И в принципе всё работало. Но до тех пор, пока в сети что-то есть. Если в сети пакетов не оказывалось, процесс ждал. А нужно так, что если пакеты перестали приходить, файл все равно писать. Вот я и пришел к fork().

podovalov5
() автор топика
Ответ на: комментарий от podovalov5
$ man 7 socket
............
SO_RCVTIMEO and SO_SNDTIMEO
              Specify the receiving or sending  timeouts  until  reporting  an
              error.  The argument is a struct timeval.  If an input or output
              function blocks for this period of time, and data has been  sent
              or  received,  the  return  value  of  that function will be the
              amount of data transferred; if no data has been transferred  and
              the  timeout has been reached then -1 is returned with errno set
              to EAGAIN or EWOULDBLOCK just as if the socket was specified  to
              be  nonblocking.   If  the  timeout is set to zero (the default)
              then the operation  will  never  timeout.   Timeouts  only  have
              effect  for system calls that perform socket I/O (e.g., read(2),
              recvmsg(2), send(2), sendmsg(2)); timeouts have  no  effect  for
              select(2), poll(2), epoll_wait(2), and so on.
imb ★★
()

Коллектор нужно писать. Про размеры уже сказали. Читаешь, пишешь в коллектор, запускаешь анализ, получаешь нужный пакет, оставляя неиспользованный хвост данных и состояние коллектора, либо просто ждешь следующей порции.

anonymous
()

Есть разные подходы для написания приложений в которых присутствует некоторая эээ.... интерактивность. Я говорю о приложениях, которые должны уметь «одновременно» взаимодействовать с разными вещами, с сетью, с пользователем, реагировать на истечение таймаутов и т.д.

Многопоточное приложение, написанное «в лоб» ИМХО самый неудачный вариант.

Я написал немало приложений с сетевой «интерактивностью» и мне больше всего нравится однопотоковые приложения, с архитектурой, основанной на вызове epoll(). Недостаток такого подхода заключается в том, что напрямую с epoll() работать неудобно. Нужна некая библиотека-надстройка над epoll(), которая предоставит более удобный программный интерфейс. Для себя я написал такую библиотеку сам, но можно взять готовую.

Посмотри в сторону Boost::Asio или libevent. Правда я ими особо не пользовался (уже привык к своему решению). Другие могут рассказать больше об этих или подобных библиотеках.

pathfinder ★★★★
()
Ответ на: комментарий от Eddy_Em

epoll слишком сложен и в большинстве случаев не нужен. select'а за глаза.

Что c epoll() напрямую работать, что с select(), одна чёрта. Все равно получится громоздкий код.

Это глупо напрямую работать с системным вызовом-мультиплексором, не важно select()/poll()/epoll() или каким-то другим.

Естественно, должен быть слой абстракции, который скроет все детали реализации. Этот слой абстракции пишут один раз и сложность его написания сильно не меняется от того какую функцию мы положим в основу. Но при этом epoll будет иметь меньше ограничений и работать быстрее.

pathfinder ★★★★
()
Последнее исправление: pathfinder (всего исправлений: 1)
Ответ на: комментарий от pathfinder

Нет, с select меньше кода. И понятней.

Это глупо напрямую работать с системным вызовом-мультиплексором

Нет. Не глупо, а нормально. Глупо запихивать это в обертку. А ту — еще в одну обертку. Потом еще и еще...

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Это глупо...

Нет. Не глупо, а нормально. Глупо...

- Ты дурак. - Нет, ты дурак. - Нет, ты. - Нет, ты...

Кхм, надо бы что-то придумать, а то дискуссия зашла в тупик.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

Это не мешает использовать его в портабельных библиотеках, типа boost::Asio.

Разумеется, но исключительно под ifdef, как альтернативу.

slovazap ★★★★★
()
Ответ на: комментарий от Eddy_Em

Кхм, надо бы что-то придумать, а то дискуссия зашла в тупик.

Давай так, будем исходить из принципа - «лучше один раз увидеть, чем сто раз услышать»

Каждый выдаст свое решение одной и той же задачи. Я покажу как это некрасиво выглядит с использованием с использованием библиотеки-обертки, а ты покажешь свой божественно прекрасный код с прямым использованием вызова select().

Задача: Необходимо написать TCP сервер, который принимает соединения. Клиенты посылают серверу запрос, на который сервер высылает ответ. Если структура запроса нарушена, сервер закрывает соединение. Если от клиента не было никакой активности более 10 секунд, сервер закрывает соединение с ним.

Структура запроса:

uint8_t sig; //сигнатура заголовка, всегда 0xAF
uint8_t func;  //номер функции, всегда 1
uint8_t frame_size; //размер данных в байтах
char data[frame_size]; //frame_size байт с данными

Структура ответа:

uint8_t sig; //сигнатура заголовка, всегда 0xAF
uint8_t func;  //номер функции, всегда 2
uint8_t frame_size; //размер данных в байтах, всегда 1
char data[frame_size]; //1 байт, содержащий все себе xor всех байт запроса

Я приведу то, что мне понадобилось бы написать, для реализации такого сервера имея сделанные ранее наработки по оберткам. Возможно, свое решение приведу только завтра.

pathfinder ★★★★
()
Ответ на: комментарий от Eddy_Em

А пока жду твоего согласия на участие в этом небольшом соревновании.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

Жди до вечера воскресенья. Я у родителей. А т.к. мой ноутбук накрылся медным тазом, сижу в то ли спермерочке, то ли в вошмерочке.

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Если же у тебя сервер не делает pthread_create или fork на каждое подключение, то сервер твой — говно

Вообще-то в случае высокой нагрузки уже давно считается, что форк/трэд на каждого клиента - говно, а асинхронность рулит

yoghurt ★★★★★
()
Ответ на: комментарий от yoghurt

И как ты без потока/процесса собираешься обслуживать клиентов, которые на один и тот же порт щимятся?

Никак!

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

И как ты без потока/процесса собираешься обслуживать клиентов, которые на один и тот же порт щимятся?

Никак!

А я не вижу проблем. Через тот же select() вроде все делается, не?

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

ОК, ты попробуй и расскажи, как ты одним select'ом будешь пару сотен одновременно подключившихся клиентов обслуживать!

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Понял тебя. Если речь идет о сотнях соединений, тогда согласен, у select() есть ограничения. А вот аналог epoll() должен потянуть больше 10 тыс. соединений. Естественно, мы рассматриваем случай, когда большая часть соединений спит и в один момент только несколько проявляют активность. А то, если очень захотеть, то и одно соединение может прогрузить любой сервер.

Вот нашел картинку:http://monkey.org/~provos/libevent/libevent-benchmark.jpg

pathfinder ★★★★
()
Последнее исправление: pathfinder (всего исправлений: 1)
Ответ на: комментарий от pathfinder

Как ты себе представляешь работу с одним сокетом и толпой клиентов без потоков/процессов?

Единственный вариант: заставлять клиентов ждать, пока обслуживаешь очередного. Если у тебя время обслуги — минут 10, а клиентов тысяча, то это будет жесть!

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

Как ты себе представляешь работу с одним сокетом и толпой клиентов без потоков/процессов?

Легко и непринуждённо.

Единственный вариант: заставлять клиентов ждать

Тебе не надо юлить - тебе надо запилить его гуано - там ничего не надо ждать.

Если у тебя время обслуги — минут 10

Пример обслуги на 10минут.

а клиентов тысяча, то это будет жесть!

У тебя? Тысяча? Куллстори.

anonymous
()
Ответ на: комментарий от pathfinder

Я покажу как это некрасиво выглядит с использованием с использованием библиотеки-обертки

Красиво и ваши обёртки вещи не совместимые. Во Что ты будешь оборачивать тот же епул?

Я не знаю нахрен ты вплёл сюда «структуры» запроса/ответа и прочее - это лишнее. Тут хватит эхо.

Я приведу то, что мне понадобилось бы написать, для реализации такого сервера имея сделанные ранее наработки по оберткам.

Приведи, так уж и быть - я за едика выкачу.

anonymous
()
Ответ на: комментарий от Eddy_Em

Если у тебя время обслуги — минут 10, а клиентов тысяча, то это будет жесть!

В таких случаях обычно «обслуга» отправляется в отдельный пул потоков на обработку, а клиенты потом вовремя получают статус выполнения (опять же, можно асинхронно)

yoghurt ★★★★★
()
Ответ на: комментарий от yoghurt

Нет, я не могу понять, как можно более чем с одним клиентом работать на одном потоке!

Вот, допустим, я открываю сокет. Его одновременно открывают три клиента (вообще, это один клиент, но открываем с разными ключами: один канал изображения гонит, другой принимает и обрабатывает управляющие команды, третий передает клиенту сообщения об ошибках).

Ну и как я в этот сокет что-нибудь напишу, чтобы оно именно нужному клиенту пришло? Хранить пул дескрипторов? А теперь представь, что у тебя одновременно толпа народу может к этому сокету обратиться. В итоге у тебя получится бешеный пул дескрипторов и ты в реальном времени вынужден будешь выдирать из пула те, в которые видео писать, писать туда видео; выдирать нужные командные и обрабатывать полученное оттуда...

В общем, геморройное же дело! Проще по потоку завести, и пусть они себе висят и обрабатывают все.

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

А в многонитевом приложении ты как клиенты различаешь? Пойми ты, многонитевое приложение и конечный автомат для обработки нескольких соединений это одно и то же, просто когда есть нити, тебе этого автомата не видно, его ядро и/или библиотека поддержки нитей скрывают.

anonymous
()
Ответ на: комментарий от Eddy_Em

Ну так а на кой черт себе выдумывать лишний геморрой, если можно переложить это на ядро?

Многопоточные приложения отлаживать на порядок сложнее.

pathfinder ★★★★
()
Ответ на: комментарий от Eddy_Em

Ох, выкладываю файлы

main.cpp

#include <stdio.h>
#include <vector>
#include <signal.h>

#include <zCommon/zException.hpp>
#include <zCommon/zEnviroment.hpp>
#include <zCommon/zUtils.hpp>

#include <zIO/zMainLoop.hpp>
#include <zIO/zIOStreamNormal.hpp>
#include <zIO/_zSocketServer.hpp>
#include <zIO/zUnixSocketHelper.hpp>

#include "MyServer.hpp"

using namespace zCommon;
using namespace zIO;
using namespace std;


int main(int argc,char** argv)
{
	zEnviroment::Init(); //Инициализация некоторых сущностей, ничего интересного
	
	zMainLoop loop; //Основной цикл обработки сообщений
	
	MyServer* server = MyServer::Create(1234); //Создаем обработчик сервера
	loop.Handler_Add(server); //Добавляем обработчик в цикл обработки сообщений
	
	loop.Run(); //Запускаем цикл обработки сообщений, в недрах этого метода будет вызываться epoll()

	return 0;
}

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

файл MyServer.hpp

#pragma once

#include <zIO/_zTCPSocketServer.hpp>

/*
 * Класс описывет сервер
 */
class MyServer : public zIO::_zTCPSocketServer
{
public:
	void server_Open(); //Метод вызывается при создании сервера
	void server_Close(); //Метод вызывается при закрытии сервера

	/*
	 * Этот метод вызывается каждый раз, когда кто-то устанавливает соединение.
	 * Метод должен вернуть обработчик отдельного соединения
	 */
	zIO::_zTCPSocketServer::local_Session* server_CreateSession(struct sockaddr_in& addr);
	void server_Think();

	static MyServer* Create(int port);
};
pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

файл MyServer.cpp

#include "MyServer.hpp"
#include "MyServerSession.hpp"

#include <iostream>

using namespace zIO;
using namespace zCommon;
using namespace std;

void MyServer::server_Open()
{
	cout << "MyServer open" << endl;
}
void MyServer::server_Close()
{
	cout << "MyServer close" << endl;
}
zIO::_zTCPSocketServer::local_Session* MyServer::server_CreateSession(struct sockaddr_in& addr)
{
	return new MyServerSession();
}
void MyServer::server_Think()
{

}
MyServer* MyServer::Create(int port)
{
	MyServer* server = new MyServer;
	server->SetAddr(port);
	return server;
}
pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

файл MyServerSession.hpp

#pragma once

#include <zIO/_zTCPSocketServer.hpp>
#include <zCommon/zTime.hpp>

/*
 * Класс отдельного соединения
 */
class MyServerSession : public zIO::_zTCPSocketServerSess
{
private:
	zCommon::zTime t0; //Метка времени последней активности
public:
	virtual void stream_BeginStream(); //Вызывается, когда соединение установлено и готово для обмена
	virtual void stream_EndStream(); //Вызывается, когда соединение разорвалось
	virtual void stream_Recv(); //Вызывается каждый раз, когда приходят какие-нибудь данные
	virtual void stream_Think(); //Вызывается каждые 100 милисекунд, нужно для таймаутов
	virtual void stream_IOError(int error_source,int error_num); //Вызывается при сбоях

private:
	void SendFrame(char r); //В этом методе мы послаем ответный фрейм с XOR-кодом
};
pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

Файл MyServerSession.cpp

#include "MyServerSession.hpp"
#include <zCommon/zCursor.hpp>
#include <iostream>

using namespace std;
using namespace zCommon;
using namespace zIO;


void MyServerSession::stream_BeginStream()
{
	t0.SetNow();
	cout << "session: begin stream" << endl;
}
void MyServerSession::stream_EndStream()
{
	cout << "session: end stream" << endl;
}
void MyServerSession::stream_Recv()
{
	while(true) //Зачем нужен цикл? Теоретически, мы можем получить два фрейма за одно пробуждение
	{
		/*
		 * inq - это циклический буффер, содержащий принятые байты (да, ещё одна обертка)
		 * Если данных в буфере меньше трех байт, то выходим и ждем получения
		 * остальных байт
		 */
		if(inq.GetDataSize()<3)
			break;
		
		uint8_t sig=0;
		uint8_t func=0;
		uint8_t frame_size=0;
		char data[256];

		/*
		 * Данные можно доставать прямиком из циклического буфера inq,
		 * но мы будем использовать курсор буфера.
		 * Курсор полезен, когда для того чтобы понять, пришел ли весь фрейм
		 * надо достать часть данных из буфера. Если фрейм полностью пока ещё
		 * не пришел, то операция отменяется и из циклического буфера как бы и ничего
		 * и не доставали.
		 * В данном случае нас интересует байт frame_size с размером. Без него мы не
		 * узнаем каких размеров весь фрейм
		 */
		zCursor c(&inq);
		c.Pop_UI8(sig);
		c.Pop_UI8(func);
		c.Pop_UI8(frame_size);

		/*
		 * Проверяем на ошибки
		 */
		if(sig!=0xAF)
		{
			cout << "Invalid frame " << endl;
			this->CloseForced();
			return;
		}
		if(func!=1)
		{
			cout << "Unknown function " << (int)func << endl;
			this->CloseForced();
			return;
		}
		if(frame_size==0)
		{
			cout << "Invalid frame " << endl;
			this->CloseForced();
			return;
		}

		c.Pop_CharArray(data,frame_size); //Достаем последние данные
		c.ApplyChanges(); //Достали весь кадр, теперь фиксируем изменения в циклическом буфере

		char r = data[0];
		for(int i=1;i<frame_size;i++)
			r^=data[i]; //Считаем XOR

		SendFrame(r); //Отсылаем фрейм назад
		t0.SetNow(); //устанавливаем метку времени последней активности
	}
}
void MyServerSession::stream_Think()
{
	zTime now;
	now.SetNow();
	if(now.isElapsed(t0,10000))
	{
		this->CloseForced(); //Десять секунд прошло, никакой активности не было, закрываем
		return;
	}
}
void MyServerSession::stream_IOError(int error_source,int error_num)
{
	cout << "session: io error " << error_num << endl;
}

void MyServerSession::SendFrame(char r)
{
	uint8_t sig=0xAF;
	uint8_t func=2;
	uint8_t frame_size=1;
	Write_Begin();
	outq.Push_UI8(sig);
	outq.Push_UI8(func);
	outq.Push_UI8(frame_size);
	outq.Push_CharArray(&r,1);
	Write_End();
}

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

А сфигали у тебя С++? На сях договаривались!

Ну, я тоже так могу:

int main()
  int fd = open_socket(1234);
  socket_add_handler(fd, server);
  socket_process(fd);
  return 0;
}

Eddy_Em ☆☆☆☆☆
()
Ответ на: комментарий от Eddy_Em

На сях договаривались!

Кто договаривался? Я ни слова про Си не сказал. Пиши хоть на брейнфаке, главное чтобы было компактнее, понятнее, красивее и при этом была напрямую работа с системным вызовом select()/poll()/epoll() без всяких архитектурных абстракций, прослоек, оберток.

pathfinder ★★★★
()
Ответ на: комментарий от Eddy_Em

В общем, тебе уже поздно сочинять отмазки, ты согласился. Я привел свой код. Теперь ты выложешь то, что ты там собирался изначально выкладывать.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

Хорошо. Напомни в воскресенье вечером. Что-нибудь на коленке сварганю, чтобы обслуживать N клиентов в N потоков. Скажем, тупое эхо.

Eddy_Em ☆☆☆☆☆
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.