И снова здравствуйте! )
Получилось ПОЧТИ что хотел - за одно подключение-рукопожатие при пиковом (за время полного цикла от подключения до записи в сокет) поступлении команд через mqtt удается отправить херову гору rest-запросов, затем происходит отключение и ожидание следующей порции.
Но вот что нормально НЕ работает, так это CPush::handle_read_status, получаю туда после первого нормального response какой то мусор с периодической ошибкой от буста Operation canceled
Описание
class CPush
{
private:
std::deque<std::string> dq;
std::mutex mtx;
std::condition_variable cv;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> * psocket;
std::string request;
boost::asio::streambuf response;
bool verify_certificate(bool preverified, boost::asio::ssl::verify_context & ctx);
void handle_connect(const boost::system::error_code & error);
void handle_handshake(const boost::system::error_code & error);
void handle_write(const boost::system::error_code & error);
void handle_read_status(const boost::system::error_code & error);
void handle_read_header(const boost::system::error_code & error);
void handle_read_content(const boost::system::error_code & error);
void Write(void);
int Get(std::string & http);
public:
CPush(void) { std::cout << "PUSH client initialized" << std::endl; }
int Put(const std::string & tokens, const std::string & cmd);
bool Wait(void);
void Handler(void);
};
Частично реализация
void CPush::handle_handshake(const boost::system::error_code & error)
{
if (error)
std::cerr << "Handshake failed: " << error.message() << std::endl;
else
Write();
}
void CPush::handle_write(const boost::system::error_code & error)
{
if (error)
std::cerr << "Write failed: " << error.message() << std::endl;
else
{
std::cout << "Sending request OK!" << std::endl;
boost::asio::async_read_until(*psocket,
response, "\r\n",
boost::bind(&CPush::handle_read_status,
this,
boost::asio::placeholders::error));
Write();
}
}
void CPush::handle_read_status(const boost::system::error_code & error)
{
if (error)
std::cout << "Error read status: " << error.message() << std::endl;
else
{
std::cout << "!!!!!!!!!!!!!!!!!!!\n" << &response << "\n!!!!!!!!!!!!!!!!!!!\n" << std::flush;
// Check that response is OK.
std::istream response_stream(&response);
std::string http_version;
response_stream >> http_version;
unsigned int status_code;
response_stream >> status_code;
std::string status_message;
std::getline(response_stream, status_message);
if (!response_stream || http_version.substr(0, 5) != "HTTP/")
{
std::cout << "Invalid response\n";
return;
}
if (status_code != 200)
{
std::cout << "Response returned with status code ";
std::cout << status_code << "\n";
// return;
}
/* // Read the response headers, which are terminated by a blank line.
boost::asio::async_read_until(*psocket,
response, "\r\n\r\n",
boost::bind(&CPush::handle_read_header,
this,
boost::asio::placeholders::error));
*/ }
}
int CPush::Get(std::string & http)
{
std::lock_guard<std::mutex> lock(mtx);
int rt = dq.size();
if (rt > 0)
{
http = dq.front();
dq.pop_front();
}
return rt;
}
void CPush::Write(void)
{
if (Get(request) > 0)
boost::asio::async_write(*psocket,
boost::asio::buffer(request),
boost::bind(&CPush::handle_write,
this,
boost::asio::placeholders::error));
}
int CPush::Put(const std::string & tokens, const std::string & cmd)
{
std::string json("{\"registration_ids\":[" + tokens + "],"
"\"notification\":null,"
....................
"\"priority\":\"high\","
"\"time_to_live\":15}");
mtx.lock();
dq.push_back("POST " + std::string(GOOGLE_API) + " HTTP/1.1\r\n"
"Host: " + GOOGLE_HOST + "\r\n" // << ":" << port
"Content-Type: application/json; charset=utf-8\r\n"
"Content-Length: " + std::to_string(json.size()) + "\r\n"
"Authorization: key=" + push_key + "\r\n\r\n" + json);
int rt = dq.size();
mtx.unlock();
cv.notify_one();
return rt;
}
bool CPush::Wait(void)
{
std::unique_lock<std::mutex> ul(mtx);
do {
if (!abRun) return false;
} while (!cv.wait_for(ul, std::chrono::seconds(1), [this]{ return dq.size() > 0; }));
return true;
}
void CPush::Handler(void)
{
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query query(GOOGLE_HOST, GOOGLE_PORT);
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
boost::asio::ssl::context context(boost::asio::ssl::context::sslv23);
context.set_default_verify_paths();
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket(io_service, context);
socket.set_verify_mode(boost::asio::ssl::context::verify_none);
socket.set_verify_callback(boost::bind(&CPush::verify_certificate, this, _1, _2));
psocket = &socket;
boost::asio::async_connect(psocket->lowest_layer(),
iterator,
boost::bind(&CPush::handle_connect,
this,
boost::asio::placeholders::error));
io_service.run();
std::cout << "00000000000000000000000000000000000" << std::endl;
}
запускаю все это дело из основного потока
std::thread ([](void){ while (pPush->Wait()) pPush->Handler(); }).detach();
и добавляю команды из другого потока по событию
pPush->Put(tokens_, cmd_);