LINUX.ORG.RU

Boost tcp::socket не работает асинхронное чтение / запись

 , , ,


0

1

Доброго всем времени суток. Пытаюсь сделать асинхронное чтение из клиентского сокета на сокетах буста, но что-то всё не получается - проходит нормально только одна итерация из send_request и только если поставить после async_read() вот это m_io_service.run(), а должно работать каждый раз. Подскажите в чём я ошибаюсь

using Json_t = nlohmann::json;
using Tcp_t = boost::asio::ip::tcp;
namespace Ip_n = boost::asio::ip;
namespace Asio_n = boost::asio;

class Protocol: public boost::enable_shared_from_this<Protocol> {
public:
    static boost::shared_ptr<Protocol> create() {
        if (m_instance == nullptr)
            m_instance =  boost::shared_ptr<Protocol>(new Protocol());
        return m_instance;
    }

    boost::shared_ptr<Protocol> shared_from_this() {
        return m_instance;
    }

private:
    static constexpr int MAX_SIZE_INPUT_BUF = 1024;
    static constexpr size_t MAX_MSEC_TO_WAIT_ANS = 5000;///< Максимальное время (мс) ожидания ответа от сервера
    static constexpr size_t MAX_MSEC_TO_WAIT_WR_TO_SCK = 10000;
    static constexpr size_t MAX_CNT_TO_REQUEST = 3; ///< Максимальное количество попыток запроса

    Asio_n::io_service m_io_service;
    Tcp_t::socket m_sck{m_io_service}; ///< Сокет для обмена данными
    Tcp_t::endpoint m_ep;
    std::array<char, MAX_SIZE_INPUT_BUF> m_bufSck; ///< Приёмный буфер данных из сокета
    std::vector<char> m_buf_sck_part;
    Tcp_t::endpoint m_sender_ep;
    bool m_dataReady=false; ///< Флаг показывающий, что пришли полные данные и они готовы для дальнейшей обработки
    bool m_logged_in=false; ///< Флаг показывающий, что программа зарегистрировалась на сервере
    static boost::shared_ptr<Protocol> m_instance;

    Protocol();

    bool connect_to_ip(const std::string &ipAddr, std::uint16_t ipPort);
    void readData(const boost::system::error_code &err, std::size_t read_bytes);
    void writeData(const boost::system::error_code &err, std::size_t wr_bytes);
    std::string send_request(const std::string& sJson);
};

Protocol::Protocol() {
    connect_to_ip("192.168.11.60", 1000);
    login();

    m_thread = std::make_unique<std::thread>(&Protocol::run_method, this);
}

void Protocol::run_method() {
    m_io_service.run();
}

bool Protocol::connect_to_ip(const std::string &ipAddr, std::uint16_t ipPort) {
    auto err = std::make_shared<Error_t>();

    try {
        m_sck.connect( {Ip_n::address::from_string( ipAddr ), ipPort} );
        return true;
    }
    catch (boost::system::system_error e) {
        std::cout << e.what() << std::endl;
    }
    return false;
}

void Protocol::readData(const boost::system::error_code &err, std::size_t read_bytes) {
    if (read_bytes > 0) {
        switch(err.value()) {
            case boost::asio::error::eof: 
            case boost::asio::error::connection_reset: on_host_disconnected(); break;

            default: 
                Json_t j = Json_t::parse(m_buf_sck_part.data());
                if (j.is_object()) { 
                    m_dataReady = true;
                }
        }
    }
}

void Protocol::writeData(const boost::system::error_code &err, std::size_t wr_bytes) {
    std::cout << "writeData - " << wr_bytes << std::endl;
}

void readDataFunct(const boost::system::error_code &err, std::size_t read_bytes) {
    std::cout << "readData_funct - " << read_bytes << std::endl;
}

bool Protocol::write(const std::string& sJson) {
    m_dataReady = false;

    m_sck.async_write_some(Asio_n::buffer(sJson.c_str(), sJson.size()),
                           boost::bind(&Protocol::writeData, this,
                           Asio_n::placeholders::error,
                           Asio_n::placeholders::bytes_transferred));

    boost::asio::async_read(m_sck,
                        boost::asio::dynamic_buffer(m_buf_sck_part),
                        boost::asio::transfer_at_least(3),
                            boost::bind(&Protocol::readData, this,
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred)
    );
    
    // Если сделать так, то одна итерация чтения/записи сработает, а после следующей записи в сокет чтения уже не будет
    // причём данные в обоих случаях в сокет будут записаны, но обработчики событий вызываются только при первой итерации
    m_io_service.run(); 

    return true;
}

std::string Protocol::send_request(const std::string& sJson) {
    for (size_t i=0; i<MAX_CNT_TO_REQUEST; ++i) {
        write(sJson);
        auto cnt = 20;//read();
        if ( cnt != 0 ) {
            std::cout << "m_bufSck - " << m_bufSck.data() << std::endl;;
            // делаем какую-то обработку, но стоит continue, чтобы было несколько циклов чтения/записи
            continue;
        }
        else continue;
    }
    return std::move( std::string() );
}

int main() {
    auto protocol = Protocol::create();
    protocol.send_request("{\"Request\":\"Login\"}");
    return 0;
}


Последнее исправление: seijuurou (всего исправлений: 1)

Ответ на: комментарий от seijuurou

итак m_io_service.run(); - это менеджер потоков, он не должен завершится, потому убрать его надо из отдельного потока, либо дергать в отдельном потоке poll,но main завершится не должно. Почему у тебя два менеджера, ты повторным run убиваешь очередь

Silerus ★★★★
()

У тебя не правильное представление о работе asio. Смотри метод run - блокируемый и будет заблокирован пока в очереди есть задания (внутри себя он постоянно дёргает и ждёт завершения задания). Метод poll - он не блокируемый, он дёргает задания и проверяет есть готовые результаты, но не ждёт завершения. Т.е регистрируемые наши callback’s должны сами ставить добавлять задания в очередь. (Если посмотреть примеры то там read иницирует write,а write read). Если я правильно понял твою задумку в твоем случае задача write должна взять первый запрос из очереди (какойто потоко безопасный массив) запустить задачу read, a callback read должен вызвать задачу write.

Silerus ★★★★
()
Ответ на: комментарий от Silerus

что-то всё-ровно я не понял как делать. Мне нужно записать в сокет запрос в json формате и получить на него ответ из того же сокета в том же json формате, но в синхронном варианте не хотелось делать класс. Я поменял местами async_write и async_read (сейчас сначала read, потом write), в send_request. В конструкторе я запускал m_io_service.run() и ждал какое-то время когда придёт read, потом в конструкторе запускал run, в отдельном потоке запускал m_io_service.poll и ждал когда придёт read после write, убирал run и poll из потока, но в send_request после read и write делал poll - в лучшем случае данные приходят один раз

seijuurou
() автор топика
Ответ на: комментарий от seijuurou

Твой случай это их классический пример tcp_echo_client. Посмотри их примеры, там всегда вызов run - это последняя инструкция функции main - потому что, когда он завершится - это значит что сервис прекратил работу. Вот этот вызов m_io_service.run() примерно тоже самое что и вот этот while m_io_service.poll(){}. Не знаю как сейчас, но ранее m_io_service.run() не могло крутится в другом потоке, задачи и менеджер должны быть в одном потоке boost asio example

Silerus ★★★★
()
Ответ на: комментарий от seijuurou

Почитай как надо, тебе тут явно понимания асинхронного IO не хватает. Сначала запускаешь первую асинхронную операцию (accept или connect), этот метод завершается без блокировки, потом запускаешь event-loop (в данном случаи это io_context::run). Когда завершится первая асинхронная операция, в ее обработчике запускается следующая (чтение/запись). И так до тех пор пока не нужно будет остановить соединение. Пока запущена хотя бы одна асинхронная операция, метод io_context::run будет продолжать.

dvetutnev
()
Последнее исправление: dvetutnev (всего исправлений: 1)