LINUX.ORG.RU

libev && non-blocking


0

1

Сабж, 3х дневный поиск в инете ничего толкового не дал, помогите кто чем может )

Взял в основу код из этой статьи: http://codefundas.blogspot.com/2010/09/create-tcp-echo-server-using-libev.html

Как ev заставить работать с неблокирующими сокетами? по типу этого примера: парочка вопросов по сети и epoll Есть у libev аналог функции epoll_wait?

Суть задачи такова: процесс принимает соединения и раскидывает их по N(2-16) нитей, которые уже обрабатывают запросы.

libev решил юзать по причине прочтения след страниц, если подскажете что то лучше, с удовольствием послушаю https://groups.google.com/group/fido7.ru.unix.prog/browse_thread/thread/e8f8e... http://habrahabr.ru/blogs/hi/108294/ и ещё чемто с лора

Извиняюсь за читабельность поста, писал сонным и обкуренным манами..


Суть задачи такова: процесс принимает соединения и раскидывает их по N(2-16) нитей, которые уже обрабатывают запросы

Зачем тебе вообще libev? Ведь accept -> передаем сокет в нить.

baverman ★★★
()
Ответ на: комментарий от gandjubas

спасибо, но уже натыкался на подобный

De1in
() автор топика
Ответ на: комментарий от baverman

Видимо я тебя не пойму..

Суть задачи такова: процесс принимает соединения и раскидывает их по N(2-16) нитей, которые уже обрабатывают запросы.

На сколько понимаю, libev юзает epoll\kqueue\... в зависимости от системы, вот на примере того же епула, вот тут парочка вопросов по сети и epoll , разве libev не сможет сделать подобное?

De1in
() автор топика

Вот нечто по типу этого нужно, только через боюсь селекты не будут столь оптимальны при 1к+ активных коннектах, или ошибаюсь?

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

/* port we're listening on */
#define PORT 2020


static struct SBind {
    int fdmax;								/* maximum file descriptor number */
    int listener;							/* listening socket descriptor */
    int newfd;								/* newly accept()ed socket descriptor */

    fd_set master;							/* str_bind.master file descriptor list */
    fd_set read_fds;							/* temp file descriptor list for select() */

    int yes;								/* for setsockopt() SO_REUSEADDR, below */
    uint addrlen;
} str_bind;

static void *accepter(void *_arg)
{
    struct sockaddr_in clientaddr;					/* client address */

    char buf[1024];							/* buffer for client data */
    int nbytes, i;
    long n = (long)_arg;

    /* main loop */
    while (1) {
	str_bind.read_fds = str_bind.master;				/* copy it */

	if (select(str_bind.fdmax+1, &str_bind.read_fds, NULL, NULL, NULL) == -1) {
	    perror("Server-select() error lol!");
	    exit(1);
	}
	printf("%ld: Server-select() is OK...\n", n);

	/*run through the existing connections looking for data to be read*/
	for (i = 0; i <= str_bind.fdmax; i++) {
	    if (FD_ISSET(i, &str_bind.read_fds)) {			/* we got one... */
		if (i == str_bind.listener) {
		    /* handle new connections */
		    str_bind.addrlen = sizeof(clientaddr);

		    if ((str_bind.newfd = accept(str_bind.listener, (struct sockaddr *)&clientaddr, &str_bind.addrlen)) == -1)
			perror(" Server-accept() error lol!");
		    else {
			printf("%ld: Server-accept() is OK...\n", n);

			FD_SET(str_bind.newfd, &str_bind.master); 	/* add to str_bind.master set */
			if (str_bind.newfd > str_bind.fdmax)		/* keep track of the maximum */
			    str_bind.fdmax = str_bind.newfd;

			printf("%ld:  New connection from %s on socket %d\n", n, inet_ntoa(clientaddr.sin_addr), str_bind.newfd);
		    }
		} else {
		    /* handle data from a client */
		    if ((nbytes = recv(i, buf, sizeof(buf), 0)) <= 0) {
			/* got error or connection closed by client */
			if (nbytes == 0) 				/* connection closed */
			    printf(" socket %d hung up\n", i);
			else
			    perror("recv() error lol!");

			close(i);					/* close it... */
			FD_CLR(i, &str_bind.master);			/* remove from str_bind.master set */
		    } else {
			sprintf(buf, "Hi");
			send(i, buf, strlen(buf), 0);

			close(i);					/* close it... */
			FD_CLR(i, &str_bind.master);			/* remove from str_bind.master set */
		    }
		}
	    }
	}
    }
}

static int binding()
{
    struct sockaddr_in serveraddr;					/* server address */

    /* clear the str_bind.master and temp sets */
    FD_ZERO(&str_bind.master);
    FD_ZERO(&str_bind.read_fds);

    /* get the str_bind.listener */
    if ((str_bind.listener = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
	perror("Server-socket() error lol!");
	return 1;							/*just exit lol!*/
    }

    printf("Server-socket() is OK...\n");
    /*"address already in use" error message */
    if (setsockopt(str_bind.listener, SOL_SOCKET, SO_REUSEADDR, &str_bind.yes, sizeof(int)) == -1) {
	perror("Server-setsockopt() error lol!");
	return 1;
    }
    printf("Server-setsockopt() is OK...\n");

    /* bind */
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = INADDR_ANY;
    serveraddr.sin_port = htons(PORT);
    memset(&(serveraddr.sin_zero), '\0', 8);

    if (bind(str_bind.listener, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) == -1) {
	perror("Server-bind() error lol!");
	return 1;
    }
    printf("Server-bind() is OK...\n");

    /* listen */
    if (listen(str_bind.listener, 40) == -1) {
	perror("Server-listen() error lol!");
	return 1;
    }
    printf("Server-listen() is OK...\n");

    FD_SET(str_bind.listener, &str_bind.master);			/* add the str_bind.listener to the str_bind.master set */

    /* keep track of the biggest file descriptor */
    str_bind.fdmax = str_bind.listener; /* so far, it's this one*/

    return 0;
}

int main(int argc, char *argv[])
{
    pthread_t thread_accepter;

    str_bind.yes = 1;

    if (binding() != 0) {
	printf("Error binding.\n");
	return 0;
    }

    pthread_create( &thread_accepter, NULL, accepter, (void *)0);
    pthread_create( &thread_accepter, NULL, accepter, (void *)1);
    pthread_create( &thread_accepter, NULL, accepter, (void *)2);
    pthread_create( &thread_accepter, NULL, accepter, (void *)3);

    getchar();

    return 0;
}
De1in
() автор топика

> cуть задачи такова: процесс принимает соединения и раскидывает их по N(2-16) нитей, которые уже обрабатывают запросы.

libev нет смысла использовать в такой схеме. Либев можно задействовать только во втором и третьем варианте описанных в приведенной тобой ссылке. А начал писать ты свою прожку по четвертому варианту. Ферштейн?)))

Короче, перечитай еще раз приведенную тобой же ссылку.

anonymous
()
Ответ на: комментарий от De1in

В Вашей программе слишком много багов. Часть из них вызвана одновременным исполнением select() разными потоками на одних и тех же дескрипторах. Этого вообще надо избегать, за исключением отдельных случаев, когда алгоритм специально разрабатывается с расчетом на параллельное чтение из одного дескриптора несколькими потоками. Ваш случай - явно не из тех.

Sorcerer ★★★★★
()
Ответ на: комментарий от anonymous

Если из той ссылки - то мне нужен пятый вариант, в будущем предполагал на шестой переходить. Первый и четвёртый варианты не подходят т.к. слишком дорого по ресурсам (сейчас 4 вариант крутится, на блокирующем селексте и libpth, хватало, но до прошлой недели), второй и третий варианты не подходят по причине «иногда долгой обработки клиентов» + этот вариант перед ДДОСом думаю будет слаб, и будут вешатся все клиента.

2 Sorcerer это черновик/пример, но буду признателен если поделишься ссылкой\маном где описывается этот способ корректно.

De1in
() автор топика
Ответ на: комментарий от baverman

libev я думал юзать для кросплатформенности, но не горит пока, так что чувствую что и епулу рабочему буду рад. Подскажите тогда какой подход лучше выбрать мне к такому^ варианту сервера..

De1in
() автор топика
Ответ на: комментарий от De1in

Подскажите тогда какой подход лучше выбрать мне к такому^ варианту сервера

Мне кажется ты не знаешь чего хочешь. Я бы делал так:

Мастер поток: accept_loop (с опциональным использованием libev) Воркеры: каждый со своим libev лупом, в который постятся события (client socket) мастером.

baverman ★★★
()
Ответ на: комментарий от baverman

Скорее не могу нормально сформулировать и уже запутался, вариант который Вы предложили думаю должен подойти, попробую его написать..

De1in
() автор топика
Ответ на: комментарий от baverman

Друг, помоги с ссылками плиз или хоть намёком в виде списка функций/кода )) ну не могу найти в инете хоть что то подобное..(

De1in
() автор топика
Ответ на: комментарий от hizel

В общем плане пока только (в процессе), но суть я думаю что понял, даже исходя из ссылок и кода которые я писал выше.

De1in
() автор топика
Ответ на: комментарий от De1in

Наконец то получилось хотя бы набросок сделать того что хотел. Вот такой вариант исполнения libev имеет право на жизнь?

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <fcntl.h>
#include <errno.h>
#include <ev.h>

struct ev_loop *loop;
long minfd, maxfd, fds[4096];


static void *sender(void *_arg)
{
	time_t now;
	char *ct, buf[1024];
	long i;

	while (1) {
		now = time(NULL);
		ct = ctime(&now);
		sprintf(buf, "%ld: %s\n", (long)_arg, ct);

		for (i = minfd; i <= maxfd; i++)
			if (fds[i] == 1) {
				write(i, buf, strlen(buf));
				printf("sended to fd %ld\n", i);
			}

		sleep((long)_arg);
	}

	return 0;
}

void read_connection(EV_P_ struct ev_io *w, int revents)
{
	int size, buf_size = 1024;
	char buf[1024];
	printf("%s loop %p ev_io %p revents %i (fd %i)\n", __PRETTY_FUNCTION__, EV_A_ w, revents, w->fd);
	size = read(w->fd, buf, buf_size);

	if (size <= 0) {
		if( size == -1 && errno == EAGAIN )
			printf("\t EAGAIN\n");

		fds[(long)w->fd] = 0;
		ev_io_stop(loop, w);
		close(w->fd);
		free(w);
		printf("\t -> closed connection (fd %i)\n", w->fd);

		return;
	}

	write(w->fd, "Hi\n", strlen("Hi\n"));
}

void accept_connection(EV_P_ struct ev_io *w, int revents)
{
	printf("%s loop %p ev_io %p revents %i (fd %i)\n", __PRETTY_FUNCTION__, EV_A_ w, revents, w->fd);
	struct ev_io *io = malloc(sizeof(struct ev_io));
	struct sockaddr sa;
	socklen_t sizeof_sa = sizeof(sa);
	long fd = accept(w->fd, &sa, &sizeof_sa);
	if (fd <= 0) return;

	fds[fd] = 1;
	printf("fds[%ld] = 1;\n", fd);

	if (fd > maxfd)
		maxfd = fd;

	fcntl(fd, F_SETFL, O_NONBLOCK);
	ev_io_init(io, read_connection, fd, EV_READ);
	ev_io_start(loop, io);
}

int main()
{
	pthread_t thread_sender;
	struct sockaddr_in addr;
	int fd;

	pthread_create( &thread_sender, NULL, sender, (void *)5);
	pthread_create( &thread_sender, NULL, sender, (void *)3);

	// Create server socket
	if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
		perror("socket error");
		return -1;
	}

	bzero(&addr, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(2020);
	addr.sin_addr.s_addr = INADDR_ANY;

	// Bind socket to address
	if (bind(fd, (struct sockaddr*) &addr, sizeof(addr)) != 0) {
		perror("bind error");
		return -1;
	}

	// Start listing on the socket
	if (listen(fd, 128) < 0) {
		perror("listen error");
		return -1;
	}

	minfd = fd + 1;
	maxfd = minfd;

	loop = ev_default_loop(EVBACKEND_EPOLL);

	struct ev_io *io = malloc(sizeof(struct ev_io));
	ev_io_init(io, accept_connection, fd, EV_READ);
	ev_io_start(loop, io);

	// Start infinite loop
	while (1)
		ev_loop(loop, 10);

	return 0;
}
De1in
() автор топика
Ответ на: комментарий от De1in

имхо лучше бы так

основная нить делает другие нити - делает сокет - и начинает слушать его
при этом делает на каждую нить - пайп для обшения - и также этот пайп заносит в то что слушает

нити - получают при старте fd свой стороны пайпа - и также заносят его в свои слушатели - и начинают слушать


тоесть до коннектов - основная нить слушает Listen порт - а нити слушают пайпы от основной нити


когда поступает коннект - его принимает основная нить - ассептит - и через пайп сообщает какоой то из нитей - приказ обрабатывать такойто новый fd - посче чего также продолжает слушать основной

нить - через пайп который она слушает получает приказ слушать новый fd - добавляет его в свой список слушателей - и начинает поолить уже и пайп и клиенский fd


и так далее

ae1234 ★★
()
Ответ на: комментарий от De1in

только это имеет смысл - когда обработка более менее продолжительна
иначе - все сьест контент свитчь между процессами/нитями - он очень дорогой

ae1234 ★★
()
Ответ на: комментарий от ae1234

Думаю такой вариант будет действительно тяжеловат, обработки как правило быстрые, но бывают и исключения, там их думаю постараюсь вешать на отдельные нити..

Подключение нитей в последнем листинге корректное?

De1in
() автор топика
Ответ на: комментарий от De1in

ну - если небрать в расчет синхронизацию доступа к счетчику fd у тебя - синхронизацию между нитями
и неучитывать закрывание fd

то наверно да - чтение то всеравно в основном потоке идет из них

ae1234 ★★
()
Ответ на: комментарий от De1in

а кстати - можно тот алгоритм что я описал - чуть улучшить

например основная нить ассептит - и обрабатывает внутри себя коннекты
но при этом сдедит за какимто критерием нагрузки - если по этому критерию она перестает успевать справляться - то например

она скопом - могет отдать на обработку половину коннектов второй нити

тоесть накопила основная 20 коннектов - обрабатывает их - приходит 21 коннект - и нить решает что многовато
и отдает приказ через пайп второй нити - обрабатывать с 10 по 21 дескрипторы - при этом удаляя у себя за ними слежение

ae1234 ★★
()
Ответ на: комментарий от ae1234

Угу, так в принципе и хотел, но пока что и с 1 нитью не получалось ) сейчас начну впихивать это всё в проэкт

De1in
() автор топика
Ответ на: комментарий от De1in

а чего там проверять то
простейшая проверка

[code]
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>

struct timeval tv;
int fd[2],fd2[2];

static void *worker(void *argv){
   int i,k;
   for(i=0;i<1000000;i++){
      if((long int)argv==0){
         write(fd[1],&k,1);
         read(fd2[0],&k,1);
         }
      if((long int)argv==1){
         read(fd[0],&k,1);
         write(fd2[1],&k,1);
         }
      }

   }

main(){
   int i,j,k,g;
   pthread_t w1,w2;

   pipe(fd);
   pipe(fd2);

pthread_create(&w1,NULL,&worker, (void*)0);
pthread_create(&w2,NULL,&worker, (void*)1);
   pthread_join(w1,NULL);

   }
[/code]

gcc a.c -lpthread
time ./a.out
1.10user 11.44system 0:12.58elapsed 99%CPU (0avgtext+0avgdata 2096maxresident)k
0inputs+0outputs (0major+173minor)pagefaults 0swaps

1000000/13 = 77т в секунду
другие средства - вроде мутексов - такуюж скорость показывют

ae1234 ★★
()
Ответ на: комментарий от ae1234

Сенкс, вначале подумал про более замудрёный код

De1in
() автор топика
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.