GLib проблема с потоками GMainLoop
Форум — Development
Всем привет! Опять я в ступоре, мысли кончились, на сей раз проблема с GMainLoop и блокированием потока. Есть не-default GMainContext, к которому приаттачены отдельными потоками секундный таймер и слушающий TCP-сервер. Проблема в том, что когда подключается клиент, таймер перестает работать. Отключается клиент - работа таймера восстанавливается. Как побороть данную проблему? Ниже привожу полный код.
main.h:
/*
* main.h
*
* Created on: Jan 15, 2014
* Author: xuch
*/
#ifndef MAIN_H_
#define MAIN_H_
using namespace std;
#include <stdio.h>
#include <glib.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netdb.h>
#include <signal.h>
#include <syslog.h>
#include <fcntl.h>
#include <string.h>
#include <stdarg.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
GMainLoop *main_loop;
static int daemon_proc;
typedef void Sigfunc(int); /* for signal handlers */
#define SA struct sockaddr
#define LISTENQ 1024
class Dat
{
public:
Dat(void);
int Status;
};
class ControlServer
{
public:
Dat *CData;
GMainContext *Ctx;
ControlServer(Dat *, GMainContext *, const char *, const char *);
int GThStart();
private:
char host[254];
char port[254];
static bool Handler(GIOChannel *, GIOCondition, gpointer);
static bool GStartStatic(gpointer);
static bool GStartStatic2(gpointer);
int TcpListen(const char *, const char *, socklen_t *);
void ErrQuit(const char *, ...);
static void ErrSys(const char *, ...);
static void ErrDoIt(int, int, const char *, va_list);
void Listen(int, int);
int Close(int);
int SetSockOpt(int, int, int, const void *, int);
};
class Parent
{
public:
GMainLoop *MainLoop;
Parent(const char *, const char *);
~Parent(void);
void Start(size_t);
void Init(void);
void StartTimer(void);
private:
Dat *Data;
GMainContext *Context;
ControlServer *Srv;
static gboolean TimeoutCallback(gpointer);
static gboolean TimeoutCallback2(gpointer);
static gpointer SecTimer(gpointer);
};
#endif /* MAIN_H_ */
main.cpp:
/*
* main.cpp
*
* Created on: Jan 15, 2014
* Author: xuch
*/
#include "main.h"
using namespace std;
int main(void)
{
Parent *Par;
printf("Starting main...\n");
Par = new Parent("127.0.0.1", "54321");
Par->StartTimer();
Par->RunMainLoop();
delete Par;
}
Parent::Parent(const char *ip, const char *port)
{
Data = new Dat();
Context = g_main_context_new();
Srv = new ControlServer(Data, Context, ip, port);
MainLoop = g_main_loop_new(Context, false);
}
Parent::~Parent()
{
delete Data;
delete Srv;
}
void Parent::Start(size_t interval)
{
g_thread_init(NULL);
Srv->GThStart(); // Start listen TCP Server
main_loop = g_main_loop_new(NULL, FALSE);
g_timeout_add_seconds(interval, TimeoutCallback, Data);
g_main_loop_run(main_loop);
}
void Parent::RunMainLoop()
{
g_main_loop_run(MainLoop);
}
gpointer Parent::SecTimer(gpointer data)
{
Parent *Prnt = (Parent *) data;
GSource *Source = g_timeout_source_new(1000); // 1 second interval
g_source_set_callback(Source, TimeoutCallback, Prnt->Data, NULL);
g_source_attach(Source, Prnt->Context);
g_source_unref(Source);
return NULL;
}
gboolean Parent::TimeoutCallback(gpointer data)
{
Dat *Data = (Dat *) data;
Data->Status++;
printf("%ld-> Timeout Callback! Status now: '%d'\n", time(NULL), Data->Status);
return TRUE;
}
void Parent::StartTimer()
{
GError *error = NULL;
g_thread_init(NULL);
Srv->GThStart(); // Start listen TCP Server
GThread *t = g_thread_create(SecTimer, this, TRUE, &error);
g_thread_join(t);
}
ControlServer::ControlServer(Dat *data, GMainContext * gtx, const char *ip, const char *prt)
{
CData = data;
Ctx = gtx;
strcpy(host, ip);
strcpy(port, prt);
}
bool ControlServer::Handler(GIOChannel *in, GIOCondition condition, gpointer data)
{
Dat *Dt = (Dat *) data;
struct sockaddr_storage income;
int insock, newsock;
socklen_t income_len;
struct sockaddr peer;
socklen_t size;
int count = 0;
insock = g_io_channel_unix_get_fd(in);
income_len = sizeof(income);
newsock = accept(insock, (struct sockaddr *) &income, &income_len);
size = sizeof(peer);
getpeername(newsock, &peer, &size);
struct sockaddr_in *ipv4 = (struct sockaddr_in *) &peer;
printf("%ld-> New Client connected -> ", time(NULL));
printf("Host IP:Port '%s:%d'\n", inet_ntoa(ipv4->sin_addr), ipv4->sin_port);
do
{
const int len = 1024 * 1024;
char buf[len + 1];
bzero(buf, len + 1);
count = recv(newsock, buf, len, 0);
buf[len] = '\0';
Dt->Status++;
printf("%ld-> Received string: '%s'; Dat.Status = %d\n", time(NULL), buf, Dt->Status);
} while (count > 0);
printf("%ld-> Connection closed for Host IP:Port '%s:%d'; Dat.Status = %d\n", time(NULL), inet_ntoa(ipv4->sin_addr), ipv4->sin_port, Dt->Status);
return true;
}
bool ControlServer::GStartStatic(gpointer data)
{
ControlServer *CServer = (ControlServer *) data;
int listenfd;
socklen_t addr_len;
GIOChannel *Channel;
GSource *Source;
listenfd = CServer->TcpListen(CServer->host, CServer->port, &addr_len);
Channel = g_io_channel_unix_new(listenfd);
Source = g_io_create_watch(Channel, G_IO_IN);
g_source_set_callback(Source, (GSourceFunc) Handler, (gpointer) CServer->CData, NULL);
g_source_attach(Source, CServer->Ctx);
g_source_unref(Source);
return false;
}
int ControlServer::GThStart()
{
GError *error;
GThread *t = g_thread_create((GThreadFunc) GStartStatic, this, TRUE, &error);
g_thread_join(t);
return 0;
}
void ControlServer::ErrDoIt(int errno_flag, int level, const char *fmt, va_list ap)
{
int errno_save, n;
char buf[254 + 1];
errno_save = errno;
#ifdef HAVE_VSNPRINTF
vsnprintf(buf, CHAR_BUF_SIZE, fmt, ap);
#else
vsprintf(buf, fmt, ap);
#endif
n = strlen(buf);
if (errno_flag)
snprintf(buf + n, 254 - n, ": %s", strerror(errno_save));
strcat(buf, "\n");
if (daemon_proc)
{
syslog(level, buf);
}
else
{
fflush(stdout);
fputs(buf, stderr);
fflush(stderr);
}
return;
}
void ControlServer::ErrQuit(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
ErrDoIt(0, LOG_ERR, fmt, ap);
va_end(ap);
exit(1);
}
void ControlServer::ErrSys(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
ErrDoIt(1, LOG_ERR, fmt, ap);
va_end(ap);
exit(1);
}
int ControlServer::Close(int fd)
{
return close(fd);
}
int ControlServer::SetSockOpt(int sock_fd, int level, int opt_name, const void *opt_val, int opt_len)
{
return setsockopt(sock_fd, level, opt_name, opt_val, opt_len);
}
void ControlServer::Listen(int fd, int backlog)
{
char *ptr;
if ((ptr = getenv("LISTENQ")) != NULL)
backlog = atoi(ptr);
if (listen(fd, backlog) < 0)
ErrSys("listen error");
}
int ControlServer::TcpListen(const char *host, const char *serv, socklen_t *addrlenp)
{
int listen_fd, n;
const int on = 1;
struct addrinfo hints, *res, *res_save;
bzero(&hints, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if ((n = getaddrinfo(host, serv, &hints, &res)) != 0)
{
ErrQuit("TcpListen error for %s. %s: %s\n", host, serv, gai_strerror(n));
}
res_save = res;
do
{
listen_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (listen_fd < 0)
continue;
SetSockOpt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (bind(listen_fd, res->ai_addr, res->ai_addrlen) == 0)
break;
Close(listen_fd);
} while ((res = res->ai_next) != NULL);
if (res == NULL)
{
ErrSys("TcpListen error for %s, %s", host, serv);
}
Listen(listen_fd, LISTENQ);
if (addrlenp)
*addrlenp = res->ai_addrlen;
return listen_fd;
}
Dat::Dat()
{
Status = 0;
}