LINUX.ORG.RU

Почему в QTcpSocket могут не приходить данные?

 ,


0

1

Есть сервак, написанный на Кьютах (прошу не холиварить на эту тему, он не сильно нагруженный). К нему по сокетам коннектятся железки и передают информацию.

Иногда происходит вот такая бяка - почему-то коннекты появляются, но данные не идут, я коннекты рву по таймауту, они опять появляются, но данных так и нет. Из-за чего такое может происходить? Замечено, что в это же время не открывается файл лога (я его открываю на каждую запись).

Коннекты удаляются корректно, открытых файлов не держу, утечек памяти нет, проц не грузится.

QTcpSocket работает асинхронно, возможно где-то блокировка потока, без кода вряд ли кто-то скажет)

alchemist
()

Замечено, что в это же время не открывается файл лога (я его открываю на каждую запись).

flush после каждой записи должно быть эквивалентно, если идея в том чтобы не потерять данные при падении/завершении работы.

xaizek ★★★★★
()
Ответ на: комментарий от DELIRIUM

ConnectionManager.cpp

#include <QtCore/QFile>
#include <QtCore/QDir>
#include <QtCore/QCoreApplication>

#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>

#include <functional>
#include <iostream>

using namespace std::placeholders;

#include "Core/LibraryLoader.h"
#include "Core/XmlParser.h"

#include "DeviceConnection.h"

#include "ConnectionManager.h"

ConnectionManager::ConnectionManager (const LibraryLoader &libraryLoader, quint16 port, quint32 timeout, bool isPipe)
	: libraryLoader_ (libraryLoader)
	  , port_ (port)
	  , timeout_ (timeout)
	  , isPipe_ (isPipe)
	  , journalPath_ (QCoreApplication::applicationDirPath ())
	  , server_ (std::make_unique<QTcpServer> ())
{
	QObject::connect (server_.get (), &QTcpServer::newConnection,
					  std::bind (&ConnectionManager::onNewConnection, this));
}

ConnectionManager::~ConnectionManager ()
{

}

bool ConnectionManager::start ()
{
	const bool result = server_->listen (QHostAddress::Any, port_);

	if (result) {
		qDebug () << "ConnectionManager:" << QString ("listen port %1").arg (port_);
	} else {
		qCritical ()
		<< "ConnectionManager:"
		<< QString ("error listen port %1: %2").arg (port_).arg (server_->errorString ());
	}

	return result;
}

void ConnectionManager::onNewConnection ()
{
	QTcpSocket *socket = server_->nextPendingConnection ();

	qDebug () << "ConnectionManager: " << "new connection from host " << socket->peerAddress ().toString ();

	connectedDevices_.push_back (
		new DeviceConnection (socket,
							  libraryLoader_.makeDeviceHandler (),
							  timeout_,
							  std::bind (&ConnectionManager::onDataReceived, this, _1, _2),
							  std::bind (&ConnectionManager::xmlReceived, this, _1),
							  std::bind (&ConnectionManager::onDisconnected, this, _1))
	);
}

void ConnectionManager::onDataReceived (const QByteArray &deviceId, const QByteArray &data)
{
	const QByteArray dataToSend = data.toHex ();

	const QByteArray sendData = "DATA RECEIVED device_id="
								+ deviceId
								+ " size="
								+ QByteArray::number (dataToSend.size ())
								+ "\n"
								+ dataToSend.constData ();

	if (isPipe_) {
		std::cout << sendData.constData ();
		std::cout.flush ();
	}
}

void ConnectionManager::onDisconnected (DeviceConnection *deviceConnection)
{
	const auto it = std::find (connectedDevices_.begin (), connectedDevices_.end (), deviceConnection);

	Q_ASSERT(it != connectedDevices_.end ());

	if (it != connectedDevices_.end ()) {
		libraryLoader_.destroyDeviceHandler (deviceConnection->deviceHandler ());
		connectedDevices_.erase (it);
		deviceConnection->deleteLater ();
	}
}

void ConnectionManager::xmlReceived (const QByteArray &xml)
{
	qDebug () << "Xml received:" << xml;

	const QByteArray sendData = "XML RECEIVED size=" + QByteArray::number (xml.size ()) + "\n" + xml;

	if (isPipe_) {
		std::cout << sendData.constData ();
		std::cout.flush ();
	}
	saveXmlToJournal (xml);
}

void ConnectionManager::saveXmlToJournal (const QByteArray &xml) const
{
	XmlParser xmlParser (xml);
	QFile file (journalPath_ + QDir::separator () + xmlParser.deviceId () + ".log");
	if (file.open (QIODevice::WriteOnly | QIODevice::Append)) {
		file.write (xml + "\n");
	}
}

void ConnectionManager::setJournalPath (const QString &path)
{
	journalPath_ = path;
}
panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

ConnectionManager.h

#pragma once

#include <QtCore/QString>

#include <memory>

class LibraryLoader;

class DeviceConnection;

class ConnectionManager
{
public:
	ConnectionManager (const LibraryLoader &libraryLoader, quint16 port, quint32 timeout, bool isPipe);

	~ConnectionManager ();

	bool start ();

	void setJournalPath (const QString &path);

private:
	Q_DISABLE_COPY(ConnectionManager)

	void onNewConnection ();

	void onDataReceived (const QByteArray &deviceId, const QByteArray &data);

	void onDisconnected (DeviceConnection *deviceConnection);

	void xmlReceived (const QByteArray &xml);

	void saveXmlToJournal (const QByteArray &xml) const;

private:
	const LibraryLoader &libraryLoader_;
	const quint16 port_;
	const quint32 timeout_;
	const bool isPipe_;
	QString journalPath_;
	std::unique_ptr<class QTcpServer> server_;

	QVector<DeviceConnection *> connectedDevices_;
};
panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

DeviceCommunicator.cpp

#include <QtCore/QTimer>
#include <QtCore/QSharedPointer>

#include <QtNetwork/QTcpSocket>
#include <QtNetwork/QHostAddress>

#include <functional>

using namespace std::placeholders;

#include "Core/IDevicePlugin2.h"
#include "Core/XmlParser.h"

#include "DeviceConnection.h"

class DeviceConnectionImpl
	: public IDeviceConnection
{
	using TimerPtr = QSharedPointer<QTimer>;

public:
	using SendToSocketCallback = std::function<void (const QByteArray &data)>;
	using CloseConnectionCallback = std::function<void ()>;

public:
	DeviceConnectionImpl (IDeviceHandler *deviceHandler,
						  SendToSocketCallback &&sendToSocketCallback,
						  DeviceConnection::XmlReceivedCallback &&sendXmlCallback,
						  CloseConnectionCallback &&closeConnectionCallback)
		: IDeviceConnection ()
		, deviceHandler_ (deviceHandler)
		, sendToSocketCallback_ (sendToSocketCallback)
		, sendXmlCallback_ (sendXmlCallback)
		, closeConnectionCallback_ (closeConnectionCallback)
	{
		Q_ASSERT(deviceHandler_);
		Q_ASSERT(sendToSocketCallback_);
		Q_ASSERT(sendXmlCallback_);
		Q_ASSERT(closeConnectionCallback_);
	}

	virtual ~DeviceConnectionImpl ()
	{

	}

	QByteArray deviceId () const
	{
		return deviceId_;
	}

	virtual void send (const char *buffer, int len) override
	{
		if (sendToSocketCallback_) {
			sendToSocketCallback_ (QByteArray (buffer, len));
		}
	}

	virtual void parsed (const char *xmlData, int len) override
	{
		const QByteArray xml (xmlData, len);
		extractDeviceId (xml);

		if (sendXmlCallback_) {
			sendXmlCallback_ (xml);
		}
	}

	virtual void close () override
	{
		if (closeConnectionCallback_) {
			closeConnectionCallback_ ();
		}
	}

	virtual int startTimer (int msec) override
	{
		const int timerId = nextTimerId ();
		if (timerId < 0) {
			return timerId;
		}

		const TimerPtr timer (new QTimer);

		QObject::connect (timer.data (), &QTimer::timeout,
						  std::bind(&IDeviceHandler::timerEvent, deviceHandler_, timerId));
		timer->start (msec);
		timers_.insert (timerId, timer);
		return timerId;
	}

	virtual void killTimer (int timerId) override
	{
		timers_.remove (timerId);
	}

private:
	Q_DISABLE_COPY(DeviceConnectionImpl)

	void extractDeviceId (const QByteArray &xml)
	{
		const QByteArray &deviceIdFromXml = XmlParser (xml).deviceId ();

		if (!deviceIdFromXml.isEmpty ()) {
			deviceId_ = deviceIdFromXml;
		}
	}

	int nextTimerId () const
	{
		int result = -1;

		for (int i = 0, count = std::numeric_limits<int>::max (); i < count; ++i) {
			if (!timers_.contains (i)) {
				result = i;
				break;
			}
		}

		return result;
	}

private:
	IDeviceHandler *deviceHandler_;
	SendToSocketCallback sendToSocketCallback_;
	DeviceConnection::XmlReceivedCallback sendXmlCallback_;
	CloseConnectionCallback closeConnectionCallback_;
	QByteArray deviceId_;
	QMap<int, TimerPtr> timers_;
};

DeviceConnection::DeviceConnection (QTcpSocket *socket,
									IDeviceHandler *deviceHandler,
									quint32 timeout,
									DataReceivedCallback &&dataReceivedCallback,
									XmlReceivedCallback &&xmlReceivedCallback,
									DisconnectedCallback &&disconnectedCallback)
	: socket_ (socket)
	, peerAddress_ (socket_->peerAddress ().toString ())
	, deviceHandler_ (deviceHandler)
	, connectionWatchDogTimer_ (new QTimer (this))
	, deviceConnection_ (std::make_unique<DeviceConnectionImpl>
							 (
								 deviceHandler_,
								 std::bind (&DeviceConnection::sendToSocket, this, _1),
								 std::bind (&DeviceConnection::sendXml, this, _1),
								 std::bind (&QTcpSocket::close, socket_)
							 ))
	, dataReceivedCallback_ (dataReceivedCallback)
	, xmlReceivedCallback_ (xmlReceivedCallback)
	, disconnectedCallback_ (disconnectedCallback)
{
	Q_ASSERT(socket_);
	Q_ASSERT(deviceHandler_);
	Q_ASSERT(dataReceivedCallback_);
	Q_ASSERT(disconnectedCallback_);

	socket_->setParent (this);

	connectionWatchDogTimer_->setInterval (timeout * 1000);
	connectionWatchDogTimer_->setSingleShot (true);
	connectionWatchDogTimer_->start ();
	connect (connectionWatchDogTimer_, &QTimer::timeout,
			 [] () { qWarning () << "DeviceConnection: connection closed by timeout"; }
	);
	connect (connectionWatchDogTimer_, &QTimer::timeout, socket_, &QTcpSocket::disconnectFromHost);

	deviceHandler_->connected (deviceConnection_.get ());

	connect (socket_, &QTcpSocket::readyRead, this, &DeviceConnection::readSocket, Qt::QueuedConnection);
	connect (socket_, &QTcpSocket::disconnected, this, &DeviceConnection::onSocketDisconnected, Qt::QueuedConnection);
	readSocket ();
}

DeviceConnection::~DeviceConnection ()
{
}

IDeviceHandler *DeviceConnection::deviceHandler () const
{
	return deviceHandler_;
}

void DeviceConnection::readSocket ()
{
	while (socket_->bytesAvailable () > 0) {
		connectionWatchDogTimer_->stop ();
		const QByteArray socketData = socket_->readAll ();

		qDebug () << "Data received from DeviceId" << getDeviceId () << ":" << socketData.toHex ();

		deviceHandler_->dataReceived (socketData.constData (), socketData.size ());

		const QByteArray deviceId = getDeviceId ();

		if (dataReceivedCallback_ && !deviceId.isEmpty ()) {
			dataReceivedCallback_ (deviceId, socketData);
		}

		connectionWatchDogTimer_->start ();
	}
}

void DeviceConnection::sendToSocket (const QByteArray &data)
{
	qDebug () << "DeviceConnection: " << "Send to socket: " << data.toHex ();
	socket_->write (data);
}

void DeviceConnection::onSocketDisconnected ()
{
	qDebug () << "DeviceConnection: " << QString ("host %1 disconnected").arg (peerAddress_);
	if (disconnectedCallback_) {
		disconnectedCallback_ (this);
	}
}

QByteArray DeviceConnection::getDeviceId () const
{
	const auto *deviceConnection = static_cast<DeviceConnectionImpl *>(deviceConnection_.get ());
	return deviceConnection ? deviceConnection->deviceId () : QByteArray ();
}

void DeviceConnection::sendXml (const QByteArray &xml)
{
	static const QString ipTagPattern = "<IP>%1</IP>";
	xmlReceivedCallback_(xml + ipTagPattern.arg(peerAddress_).toLatin1 ());
}
panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

DeviceCommunicator.h

#pragma once

#include <QtCore/QObject>

#include <memory>
#include <functional>

class QTcpSocket;

struct IDeviceHandler;
struct IDeviceConnection;

class DeviceConnection : public QObject
{
Q_OBJECT

public:
	using XmlReceivedCallback = std::function<void (const QByteArray &xml)>;
	using DataReceivedCallback = std::function<void (const QByteArray &deviceId, const QByteArray &xml)>;
	using DisconnectedCallback = std::function<void (DeviceConnection *deviceConnection)>;

public:
	DeviceConnection (QTcpSocket *socket,
					  IDeviceHandler *deviceHandler,
					  quint32 timeout,
					  DataReceivedCallback &&dataReceivedCallback,
					  XmlReceivedCallback &&xmlReceivedCallback,
					  DisconnectedCallback &&disconnectedCallback);

	virtual ~DeviceConnection ();

	IDeviceHandler *deviceHandler () const;

private:
	Q_DISABLE_COPY(DeviceConnection)

	void sendToSocket (const QByteArray &data);

	void sendXml (const QByteArray &xml);

private Q_SLOTS:

	void readSocket ();

	void onSocketDisconnected ();

	QByteArray getDeviceId () const;

private:
	QTcpSocket *socket_;
	const QString peerAddress_;
	IDeviceHandler *deviceHandler_;

	class QTimer *connectionWatchDogTimer_;

	std::unique_ptr<IDeviceConnection> deviceConnection_;
	DataReceivedCallback dataReceivedCallback_;
	XmlReceivedCallback xmlReceivedCallback_;
	DisconnectedCallback disconnectedCallback_;
};
panter_dsd ★★★★
() автор топика

в это же время не открывается файл лога

ну а почему он не открывается?
ругани на подключения сигнал/слот в рантайме никакой нет?

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Почему не открывается не знаю. Надо будет в stderr закинуть...

Про сигналы/слоты ругани никакой нет.

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Почему не открывается не знаю. Надо будет в stderr закинуть...

ну так смотри QFile::errorString()
может у тебя лимиты на дескрипторы

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Это я выведу в stderr, но повторюсь - открытых файлов я не держу. Каждый файл открывается, туда записывается строка и файл закрывается.

panter_dsd ★★★★
() автор топика

может, остатки данных где-то в буфере, не? сожрал какие-то остатки прерванной последовательности и заткнулся, ожидая какого-то условия, например.

Iron_Bug ★★★★★
()
Ответ на: комментарий от panter_dsd

ну, это предположение. я не знаю, как внутри эта хрень устроена и как она буферизует данные. просто мы как-то давненько на такие грабли уже наступали: сокеты сбросили, а обработка на более высоком уровне застряла.

Iron_Bug ★★★★★
()
Ответ на: комментарий от panter_dsd

ещё бывает, что при обработке ошибок и нестандартном выходе из функции (облом сокета) программист забывает разлочить какие-нить локи.

Iron_Bug ★★★★★
()

wireshark/tcpdump/... еще не советовали?

annulen ★★★★★
()

Да, снифани траф акулой. Хоть наверняка узнаешь — если данные с железок идут, значит трабла точно у тебя. А телнетом пробовал к своему серверу приходить?

deep-purple ★★★★★
()
Ответ на: комментарий от x905

К сожалению, такого предоставить не могу. Да и ошибка происходит после нескольких недель аптайма.

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от deep-purple

Да тут вот такой косяк - данные потерять нельзя, поэтому железки сразу уводят на другой сервак и, соотвественно, нормально подебажить не могу. :( Сейчас хочется просто узнать возможные места, где может быть проблема, дабы на них обратить внимание.

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Да и ошибка происходит после нескольких недель аптайма.

а netstat что показывает в этой ситуации?

dib2 ★★★★★
()
Ответ на: комментарий от panter_dsd

/proc/$pid/fd на кол-во дескрипторов, и /proc/$pid/net/sockstat
очень похоже что сокеты/дескрипторы текут (файл не открывается, соединение принимает слушающий сокет, но новый создать судя по всему не может).

dib2 ★★★★★
()
Ответ на: комментарий от dib2

Ага, это тоже сделаю. Спасибо за подсказки.

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

readyRead эмитируется в промежутке времени между установкой соединения, и newConn = serv->nextPendingConnection(); connect(newConn,&QTcpSocket::ReadyRead,obj,<обработчик>)

после установки обработчика нужно принудительно проверять bytesAvailable, в приемном буфере к этому моменту может уже лежать первый запрос пошагового протокола от шустрого клиента.

PS код не читал

anonymous
()
Ответ на: комментарий от anonymous

Конечно, сразу же пытаюсь вычитать. На это уже напарывался.

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Конкретики, товарищ. Не надо необоснованно кидаться какашками.

Сокеты в Qt не совсем асинхронные, их работа основана на цикле обработки сообщений. На нём же основана работа сигналов/слотов, таймеров и т.п. Понять из кусков твоего кода, где ты заблокировал цикл обработки сообщений и тем самым работу с сокетами, не представляется возможным.

Либо выкладывай всё целиком, либо переписывай всё нормально, Заводи отдельный поток, в нём заводи сервер, сокеты и запускай цикл обработки сообщений. А обработку делай в другом потоке.

dnf83
()
Ответ на: комментарий от dnf83

Это большая часть кода. Все работает в главном потоке. Если соединения создаются, значит, очередь сообщений работает. Не так ли?

panter_dsd ★★★★
() автор топика
Ответ на: комментарий от panter_dsd

Если соединения создаются, значит, очередь сообщений работает. Не так ли?

Теоретически - да, на практике с Qt сокетами всегда вылазят подобного рода баги, вроде всё работает, а сигналы не приходят.

Зачем ты указываешь Qt::QueuedConnection если все работает в одном главном потоке?

dnf83
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.