LINUX.ORG.RU

Почему в QTcpSocket могут не приходить данные?

 ,


0

1

Есть сервак, написанный на Кьютах (прошу не холиварить на эту тему, он не сильно нагруженный). К нему по сокетам коннектятся железки и передают информацию.

Иногда происходит вот такая бяка - почему-то коннекты появляются, но данные не идут, я коннекты рву по таймауту, они опять появляются, но данных так и нет. Из-за чего такое может происходить? Замечено, что в это же время не открывается файл лога (я его открываю на каждую запись).

Коннекты удаляются корректно, открытых файлов не держу, утечек памяти нет, проц не грузится.

★★★★★

QTcpSocket работает асинхронно, возможно где-то блокировка потока, без кода вряд ли кто-то скажет)

alchemist
()

Замечено, что в это же время не открывается файл лога (я его открываю на каждую запись).

flush после каждой записи должно быть эквивалентно, если идея в том чтобы не потерять данные при падении/завершении работы.

xaizek ★★★★★
()
Ответ на: комментарий от DELIRIUM

ConnectionManager.cpp

#include <QtCore/QFile>
#include <QtCore/QDir>
#include <QtCore/QCoreApplication>

#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>

#include <functional>
#include <iostream>

using namespace std::placeholders;

#include "Core/LibraryLoader.h"
#include "Core/XmlParser.h"

#include "DeviceConnection.h"

#include "ConnectionManager.h"

ConnectionManager::ConnectionManager (const LibraryLoader &libraryLoader, quint16 port, quint32 timeout, bool isPipe)
	: libraryLoader_ (libraryLoader)
	  , port_ (port)
	  , timeout_ (timeout)
	  , isPipe_ (isPipe)
	  , journalPath_ (QCoreApplication::applicationDirPath ())
	  , server_ (std::make_unique<QTcpServer> ())
{
	QObject::connect (server_.get (), &QTcpServer::newConnection,
					  std::bind (&ConnectionManager::onNewConnection, this));
}

ConnectionManager::~ConnectionManager ()
{

}

bool ConnectionManager::start ()
{
	const bool result = server_->listen (QHostAddress::Any, port_);

	if (result) {
		qDebug () << "ConnectionManager:" << QString ("listen port %1").arg (port_);
	} else {
		qCritical ()
		<< "ConnectionManager:"
		<< QString ("error listen port %1: %2").arg (port_).arg (server_->errorString ());
	}

	return result;
}

void ConnectionManager::onNewConnection ()
{
	QTcpSocket *socket = server_->nextPendingConnection ();

	qDebug () << "ConnectionManager: " << "new connection from host " << socket->peerAddress ().toString ();

	connectedDevices_.push_back (
		new DeviceConnection (socket,
							  libraryLoader_.makeDeviceHandler (),
							  timeout_,
							  std::bind (&ConnectionManager::onDataReceived, this, _1, _2),
							  std::bind (&ConnectionManager::xmlReceived, this, _1),
							  std::bind (&ConnectionManager::onDisconnected, this, _1))
	);
}

void ConnectionManager::onDataReceived (const QByteArray &deviceId, const QByteArray &data)
{
	const QByteArray dataToSend = data.toHex ();

	const QByteArray sendData = "DATA RECEIVED device_id="
								+ deviceId
								+ " size="
								+ QByteArray::number (dataToSend.size ())
								+ "\n"
								+ dataToSend.constData ();

	if (isPipe_) {
		std::cout << sendData.constData ();
		std::cout.flush ();
	}
}

void ConnectionManager::onDisconnected (DeviceConnection *deviceConnection)
{
	const auto it = std::find (connectedDevices_.begin (), connectedDevices_.end (), deviceConnection);

	Q_ASSERT(it != connectedDevices_.end ());

	if (it != connectedDevices_.end ()) {
		libraryLoader_.destroyDeviceHandler (deviceConnection->deviceHandler ());
		connectedDevices_.erase (it);
		deviceConnection->deleteLater ();
	}
}

void ConnectionManager::xmlReceived (const QByteArray &xml)
{
	qDebug () << "Xml received:" << xml;

	const QByteArray sendData = "XML RECEIVED size=" + QByteArray::number (xml.size ()) + "\n" + xml;

	if (isPipe_) {
		std::cout << sendData.constData ();
		std::cout.flush ();
	}
	saveXmlToJournal (xml);
}

void ConnectionManager::saveXmlToJournal (const QByteArray &xml) const
{
	XmlParser xmlParser (xml);
	QFile file (journalPath_ + QDir::separator () + xmlParser.deviceId () + ".log");
	if (file.open (QIODevice::WriteOnly | QIODevice::Append)) {
		file.write (xml + "\n");
	}
}

void ConnectionManager::setJournalPath (const QString &path)
{
	journalPath_ = path;
}
panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

ConnectionManager.h

#pragma once

#include <QtCore/QString>

#include <memory>

class LibraryLoader;

class DeviceConnection;

class ConnectionManager
{
public:
	ConnectionManager (const LibraryLoader &libraryLoader, quint16 port, quint32 timeout, bool isPipe);

	~ConnectionManager ();

	bool start ();

	void setJournalPath (const QString &path);

private:
	Q_DISABLE_COPY(ConnectionManager)

	void onNewConnection ();

	void onDataReceived (const QByteArray &deviceId, const QByteArray &data);

	void onDisconnected (DeviceConnection *deviceConnection);

	void xmlReceived (const QByteArray &xml);

	void saveXmlToJournal (const QByteArray &xml) const;

private:
	const LibraryLoader &libraryLoader_;
	const quint16 port_;
	const quint32 timeout_;
	const bool isPipe_;
	QString journalPath_;
	std::unique_ptr<class QTcpServer> server_;

	QVector<DeviceConnection *> connectedDevices_;
};
panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

DeviceCommunicator.cpp

#include <QtCore/QTimer>
#include <QtCore/QSharedPointer>

#include <QtNetwork/QTcpSocket>
#include <QtNetwork/QHostAddress>

#include <functional>

using namespace std::placeholders;

#include "Core/IDevicePlugin2.h"
#include "Core/XmlParser.h"

#include "DeviceConnection.h"

class DeviceConnectionImpl
	: public IDeviceConnection
{
	using TimerPtr = QSharedPointer<QTimer>;

public:
	using SendToSocketCallback = std::function<void (const QByteArray &data)>;
	using CloseConnectionCallback = std::function<void ()>;

public:
	DeviceConnectionImpl (IDeviceHandler *deviceHandler,
						  SendToSocketCallback &&sendToSocketCallback,
						  DeviceConnection::XmlReceivedCallback &&sendXmlCallback,
						  CloseConnectionCallback &&closeConnectionCallback)
		: IDeviceConnection ()
		, deviceHandler_ (deviceHandler)
		, sendToSocketCallback_ (sendToSocketCallback)
		, sendXmlCallback_ (sendXmlCallback)
		, closeConnectionCallback_ (closeConnectionCallback)
	{
		Q_ASSERT(deviceHandler_);
		Q_ASSERT(sendToSocketCallback_);
		Q_ASSERT(sendXmlCallback_);
		Q_ASSERT(closeConnectionCallback_);
	}

	virtual ~DeviceConnectionImpl ()
	{

	}

	QByteArray deviceId () const
	{
		return deviceId_;
	}

	virtual void send (const char *buffer, int len) override
	{
		if (sendToSocketCallback_) {
			sendToSocketCallback_ (QByteArray (buffer, len));
		}
	}

	virtual void parsed (const char *xmlData, int len) override
	{
		const QByteArray xml (xmlData, len);
		extractDeviceId (xml);

		if (sendXmlCallback_) {
			sendXmlCallback_ (xml);
		}
	}

	virtual void close () override
	{
		if (closeConnectionCallback_) {
			closeConnectionCallback_ ();
		}
	}

	virtual int startTimer (int msec) override
	{
		const int timerId = nextTimerId ();
		if (timerId < 0) {
			return timerId;
		}

		const TimerPtr timer (new QTimer);

		QObject::connect (timer.data (), &QTimer::timeout,
						  std::bind(&IDeviceHandler::timerEvent, deviceHandler_, timerId));
		timer->start (msec);
		timers_.insert (timerId, timer);
		return timerId;
	}

	virtual void killTimer (int timerId) override
	{
		timers_.remove (timerId);
	}

private:
	Q_DISABLE_COPY(DeviceConnectionImpl)

	void extractDeviceId (const QByteArray &xml)
	{
		const QByteArray &deviceIdFromXml = XmlParser (xml).deviceId ();

		if (!deviceIdFromXml.isEmpty ()) {
			deviceId_ = deviceIdFromXml;
		}
	}

	int nextTimerId () const
	{
		int result = -1;

		for (int i = 0, count = std::numeric_limits<int>::max (); i < count; ++i) {
			if (!timers_.contains (i)) {
				result = i;
				break;
			}
		}

		return result;
	}

private:
	IDeviceHandler *deviceHandler_;
	SendToSocketCallback sendToSocketCallback_;
	DeviceConnection::XmlReceivedCallback sendXmlCallback_;
	CloseConnectionCallback closeConnectionCallback_;
	QByteArray deviceId_;
	QMap<int, TimerPtr> timers_;
};

DeviceConnection::DeviceConnection (QTcpSocket *socket,
									IDeviceHandler *deviceHandler,
									quint32 timeout,
									DataReceivedCallback &&dataReceivedCallback,
									XmlReceivedCallback &&xmlReceivedCallback,
									DisconnectedCallback &&disconnectedCallback)
	: socket_ (socket)
	, peerAddress_ (socket_->peerAddress ().toString ())
	, deviceHandler_ (deviceHandler)
	, connectionWatchDogTimer_ (new QTimer (this))
	, deviceConnection_ (std::make_unique<DeviceConnectionImpl>
							 (
								 deviceHandler_,
								 std::bind (&DeviceConnection::sendToSocket, this, _1),
								 std::bind (&DeviceConnection::sendXml, this, _1),
								 std::bind (&QTcpSocket::close, socket_)
							 ))
	, dataReceivedCallback_ (dataReceivedCallback)
	, xmlReceivedCallback_ (xmlReceivedCallback)
	, disconnectedCallback_ (disconnectedCallback)
{
	Q_ASSERT(socket_);
	Q_ASSERT(deviceHandler_);
	Q_ASSERT(dataReceivedCallback_);
	Q_ASSERT(disconnectedCallback_);

	socket_->setParent (this);

	connectionWatchDogTimer_->setInterval (timeout * 1000);
	connectionWatchDogTimer_->setSingleShot (true);
	connectionWatchDogTimer_->start ();
	connect (connectionWatchDogTimer_, &QTimer::timeout,
			 [] () { qWarning () << "DeviceConnection: connection closed by timeout"; }
	);
	connect (connectionWatchDogTimer_, &QTimer::timeout, socket_, &QTcpSocket::disconnectFromHost);

	deviceHandler_->connected (deviceConnection_.get ());

	connect (socket_, &QTcpSocket::readyRead, this, &DeviceConnection::readSocket, Qt::QueuedConnection);
	connect (socket_, &QTcpSocket::disconnected, this, &DeviceConnection::onSocketDisconnected, Qt::QueuedConnection);
	readSocket ();
}

DeviceConnection::~DeviceConnection ()
{
}

IDeviceHandler *DeviceConnection::deviceHandler () const
{
	return deviceHandler_;
}

void DeviceConnection::readSocket ()
{
	while (socket_->bytesAvailable () > 0) {
		connectionWatchDogTimer_->stop ();
		const QByteArray socketData = socket_->readAll ();

		qDebug () << "Data received from DeviceId" << getDeviceId () << ":" << socketData.toHex ();

		deviceHandler_->dataReceived (socketData.constData (), socketData.size ());

		const QByteArray deviceId = getDeviceId ();

		if (dataReceivedCallback_ && !deviceId.isEmpty ()) {
			dataReceivedCallback_ (deviceId, socketData);
		}

		connectionWatchDogTimer_->start ();
	}
}

void DeviceConnection::sendToSocket (const QByteArray &data)
{
	qDebug () << "DeviceConnection: " << "Send to socket: " << data.toHex ();
	socket_->write (data);
}

void DeviceConnection::onSocketDisconnected ()
{
	qDebug () << "DeviceConnection: " << QString ("host %1 disconnected").arg (peerAddress_);
	if (disconnectedCallback_) {
		disconnectedCallback_ (this);
	}
}

QByteArray DeviceConnection::getDeviceId () const
{
	const auto *deviceConnection = static_cast<DeviceConnectionImpl *>(deviceConnection_.get ());
	return deviceConnection ? deviceConnection->deviceId () : QByteArray ();
}

void DeviceConnection::sendXml (const QByteArray &xml)
{
	static const QString ipTagPattern = "<IP>%1</IP>";
	xmlReceivedCallback_(xml + ipTagPattern.arg(peerAddress_).toLatin1 ());
}
panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

DeviceCommunicator.h

#pragma once

#include <QtCore/QObject>

#include <memory>
#include <functional>

class QTcpSocket;

struct IDeviceHandler;
struct IDeviceConnection;

class DeviceConnection : public QObject
{
Q_OBJECT

public:
	using XmlReceivedCallback = std::function<void (const QByteArray &xml)>;
	using DataReceivedCallback = std::function<void (const QByteArray &deviceId, const QByteArray &xml)>;
	using DisconnectedCallback = std::function<void (DeviceConnection *deviceConnection)>;

public:
	DeviceConnection (QTcpSocket *socket,
					  IDeviceHandler *deviceHandler,
					  quint32 timeout,
					  DataReceivedCallback &&dataReceivedCallback,
					  XmlReceivedCallback &&xmlReceivedCallback,
					  DisconnectedCallback &&disconnectedCallback);

	virtual ~DeviceConnection ();

	IDeviceHandler *deviceHandler () const;

private:
	Q_DISABLE_COPY(DeviceConnection)

	void sendToSocket (const QByteArray &data);

	void sendXml (const QByteArray &xml);

private Q_SLOTS:

	void readSocket ();

	void onSocketDisconnected ();

	QByteArray getDeviceId () const;

private:
	QTcpSocket *socket_;
	const QString peerAddress_;
	IDeviceHandler *deviceHandler_;

	class QTimer *connectionWatchDogTimer_;

	std::unique_ptr<IDeviceConnection> deviceConnection_;
	DataReceivedCallback dataReceivedCallback_;
	XmlReceivedCallback xmlReceivedCallback_;
	DisconnectedCallback disconnectedCallback_;
};
panter_dsd ★★★★★
() автор топика

в это же время не открывается файл лога

ну а почему он не открывается?
ругани на подключения сигнал/слот в рантайме никакой нет?

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Почему не открывается не знаю. Надо будет в stderr закинуть...

Про сигналы/слоты ругани никакой нет.

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Почему не открывается не знаю. Надо будет в stderr закинуть...

ну так смотри QFile::errorString()
может у тебя лимиты на дескрипторы

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Это я выведу в stderr, но повторюсь - открытых файлов я не держу. Каждый файл открывается, туда записывается строка и файл закрывается.

panter_dsd ★★★★★
() автор топика

может, остатки данных где-то в буфере, не? сожрал какие-то остатки прерванной последовательности и заткнулся, ожидая какого-то условия, например.

Iron_Bug ★★★★★
()
Ответ на: комментарий от panter_dsd

ну, это предположение. я не знаю, как внутри эта хрень устроена и как она буферизует данные. просто мы как-то давненько на такие грабли уже наступали: сокеты сбросили, а обработка на более высоком уровне застряла.

Iron_Bug ★★★★★
()
Ответ на: комментарий от panter_dsd

ещё бывает, что при обработке ошибок и нестандартном выходе из функции (облом сокета) программист забывает разлочить какие-нить локи.

Iron_Bug ★★★★★
()

wireshark/tcpdump/... еще не советовали?

annulen ★★★★★
()

Да, снифани траф акулой. Хоть наверняка узнаешь — если данные с железок идут, значит трабла точно у тебя. А телнетом пробовал к своему серверу приходить?

deep-purple ★★★★★
()
Ответ на: комментарий от x905

К сожалению, такого предоставить не могу. Да и ошибка происходит после нескольких недель аптайма.

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от deep-purple

Да тут вот такой косяк - данные потерять нельзя, поэтому железки сразу уводят на другой сервак и, соотвественно, нормально подебажить не могу. :( Сейчас хочется просто узнать возможные места, где может быть проблема, дабы на них обратить внимание.

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Да и ошибка происходит после нескольких недель аптайма.

а netstat что показывает в этой ситуации?

dib2 ★★★★★
()
Ответ на: комментарий от panter_dsd

/proc/$pid/fd на кол-во дескрипторов, и /proc/$pid/net/sockstat
очень похоже что сокеты/дескрипторы текут (файл не открывается, соединение принимает слушающий сокет, но новый создать судя по всему не может).

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Ага, это тоже сделаю. Спасибо за подсказки.

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

readyRead эмитируется в промежутке времени между установкой соединения, и newConn = serv->nextPendingConnection(); connect(newConn,&QTcpSocket::ReadyRead,obj,<обработчик>)

после установки обработчика нужно принудительно проверять bytesAvailable, в приемном буфере к этому моменту может уже лежать первый запрос пошагового протокола от шустрого клиента.

PS код не читал

anonymous
()
Ответ на: комментарий от anonymous

Конечно, сразу же пытаюсь вычитать. На это уже напарывался.

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Конкретики, товарищ. Не надо необоснованно кидаться какашками.

Сокеты в Qt не совсем асинхронные, их работа основана на цикле обработки сообщений. На нём же основана работа сигналов/слотов, таймеров и т.п. Понять из кусков твоего кода, где ты заблокировал цикл обработки сообщений и тем самым работу с сокетами, не представляется возможным.

Либо выкладывай всё целиком, либо переписывай всё нормально, Заводи отдельный поток, в нём заводи сервер, сокеты и запускай цикл обработки сообщений. А обработку делай в другом потоке.

dnf83
()
Ответ на: комментарий от dnf83

Это большая часть кода. Все работает в главном потоке. Если соединения создаются, значит, очередь сообщений работает. Не так ли?

panter_dsd ★★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Если соединения создаются, значит, очередь сообщений работает. Не так ли?

Теоретически - да, на практике с Qt сокетами всегда вылазят подобного рода баги, вроде всё работает, а сигналы не приходят.

Зачем ты указываешь Qt::QueuedConnection если все работает в одном главном потоке?

dnf83
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.