LINUX.ORG.RU

Библиотека для обмена сообщениями

 , , ,


2

3

Постепенно отучаюсь использовать велосипеды собственного приготовления. Теперь для сериализации использую protobuf, но остается еще одна проблема: разделение непрерывного потока данных на отдельные сообщения.

Навелосипедить собственную реализацию - раз плюнуть (да и их есть у меня как минимум 3-4 штуки), но хочется посмотреть на готовые решения.

Из пожеланий: желательно LGPL, но подойдет и другая лицензия, позволяющая исопльзовать библиотеку в закрытых проектах, кроссплатформенность, желательно готовые версии для C++ и Java, наличие библиотеки в репозиториях популярных дистрибутивов (Ubuntu, Gentoo).

UPD: Работа с С++ является необходимой. Работа с Java и всем остальным - пожелание.

★★★★★

Последнее исправление: trex6 (всего исправлений: 2)

Erlang, Akka (Scala, Java).

anonymous
()
Ответ на: комментарий от Minoru

Судя по описанию - как раз то, что мне необходимо.

trex6 ★★★★★
() автор топика

Буду оригинален - заюзай asn.1 !!!

nanoolinux ★★★★
()

остается еще одна проблема: разделение непрерывного потока данных на отдельные сообщения.

Ох и проблема, блин.

be32 4 bytes of packet length, packet.

nanoolinux ★★★★
()
Последнее исправление: nanoolinux (всего исправлений: 1)
Ответ на: комментарий от nanoolinux

а если эти 4 байта пришли не целиком, а кусочком? :) Или к целому пакету еще порция следующего пакета прилепилась

оно конечно не рокет сайенс, но строчек на 100 кода потянет

Harald ★★★★★
()
Ответ на: комментарий от nanoolinux

Не совсем понял о чем вы. Может быть добавите ссылку на подробности? А то мое yandex-fu совсем поизносилось.

trex6 ★★★★★
() автор топика
Ответ на: комментарий от Harald

Можно сделать фиксированную длину сообщения. Меньше 100 строчек :)

stave ★★★★★
()
Ответ на: комментарий от Harald
static void read_data(connection_info_t * c) {
    size_t reserved;

    uint8_t* data = bip_buffer_reserve(&c->recv_buffer, CONNECTION_RECV_BUFFER, &reserved);
    ssize_t recv_len = recv(c->fd, data, reserved, 0);

    rx_log_write(RX_LOG_PARANOID, "recv %ld bytes from socket. %s", recv_len, get_connection_name(c));

    if (recv_len < 0) {
        bip_buffer_commit(&c->recv_buffer, 0);
        if (errno != EAGAIN) {
            rx_log_write(RX_LOG_ERROR | RX_LOG_ERRNO, "recv failed. %s", get_connection_name(c));
            destroy_connection(c);
        }
        return;
    }
    bip_buffer_commit(&c->recv_buffer, recv_len);
    if (recv_len == 0 && errno == EINTR) {
        return;
    }
    if (recv_len == 0) {
        rx_log_write(RX_LOG_INFO | RX_LOG_ERRNO, "recv return zero bytes. Close connection %s", get_connection_name(c));
        destroy_connection(c);
        return;
    }
    c->shm_connection->recv_bytes += recv_len;

    while (bip_buffer_get_data_size(&c->recv_buffer) > sizeof (diameter_cmd_t)) {
        size_t size;
        uint32_t len;
        const diameter_len_header_t* header = (const diameter_len_header_t*) bip_buffer_get_contiguous_block(&c->recv_buffer, &size);

        if (size >= sizeof (diameter_len_header_t)) {
            len = hton24(header->length);
        } else {
            diameter_len_header_t* a_header = (diameter_len_header_t*) alloca(sizeof (diameter_len_header_t));
            if (RETCODE_OK != bip_buffer_peek(&c->recv_buffer, a_header, sizeof (diameter_len_header_t))) {
                break;
            }
            len = hton24(a_header->length);
        }

        if (len > DIAMETER_MAX_MESSAGE_SIZE) {
            rx_log_write(RX_LOG_ERROR, "recv too large diameter msg from %s", get_connection_name(c));
            destroy_connection(c);
            return;
        }

        const diameter_cmd_t* cmd;
        if (size >= len) {
            cmd = (const diameter_cmd_t*) header;
        } else {
            if (bip_buffer_get_data_size(&c->recv_buffer) < len) {
                break;
            }
            diameter_cmd_t* a_cmd = (diameter_cmd_t*) alloca(len);
            if (RETCODE_OK != bip_buffer_peek(&c->recv_buffer, a_cmd, len)) {
                break;
            }
            cmd = a_cmd;
        }
        if (RETCODE_OK != process_in_diameter_message(c, cmd)) {
            return;
        }
        ++c->shm_connection->recv_messages;
        bip_buffer_free(&c->recv_buffer, len);

        rx_log_write(RX_LOG_DEBUG, "recv full diameter msg from %s"
                " (data in buffer=%ld, header.len=%u)",
                get_connection_name(c), bip_buffer_get_data_size(&c->recv_buffer), len);
    }

    if (bip_buffer_get_data_size(&c->recv_buffer)) {
        rx_log_write(RX_LOG_DEBUG,
                "recv part diameter msg from %s (len=%ld, rec_len=%lu)",
                get_connection_name(c),
                bip_buffer_get_data_size(&c->recv_buffer), recv_len);
    }
}
vromanov ★★★
()
Ответ на: комментарий от AlexVR

А если прислали спецом бред сивой кобылы?

Слать на йух и обрывать соединение. А как еще?

Pavval ★★★★★
()
Ответ на: комментарий от Harald

покажи свою реализацию, заценим

Уже побежал писать для тебя. Когда я это делал в проектах, мне нужен был только лишь один буфер для сборки сообщения и лишний recv(). Оверхед на гарантирование границ сообщения был аж строк 30.

Pavval ★★★★★
()
Ответ на: комментарий от Harald

100 строк? да ты упорот!

while (true) {
  size_t bytes_read =0;
  unsigned packet_len;
  char *buf = (char *)&packet_len;
  while (bytes_read < 4) {
    ssize_t ret = read(skt, buf+bytes_read, 4 - bytes_read);
    if (ret <= 0) {
      throw ("пока!");
    }
    bytes_read += ret;
  }
  packet_len = ntohl(packet_len);
  std::unique_ptr <unsigned char *> packet(new unsigned char [packec_len]);
  bytes_read = 0;
  while (bytes_read < packet_len) {
    ssize_t ret = read(skt, &packet[0] + bytes_read, packet_len - bytes_read);
    if (ret <= 0) {
      throw ("пока!");
    }
    bytes_read += ret;
  }
  puts("Увага! Нью пакет дід кам!");
  process_packet_and_write_another_100_lines_of_code(packet, packet_len);
}

nanoolinux ★★★★
()
Ответ на: комментарий от trex6

Пакеты состоят из [4|2|1] байтов длинны пакета в сетевом порядке байт, а дальше сам пакет. Можешь ещё crc[8|16|32] какой-нибудь в конец прилепить если надо. Или пару служебных битиков в начале, типа номер пакета в последовательности.

nanoolinux ★★★★
()
Ответ на: комментарий от Pavval

А если сервер или клиент неожиданно выключились или обрыв связи

Тот же TCP словит это на recv().

Ух ты и как же?

AlexVR ★★★★★
()
Ответ на: комментарий от nanoolinux

Это блокирующий вариант, что не очень хорошо... Кстати, сетевой порядок также не так уж и нужен. Думаю, впролне можно использовать и хостовый. Сейчас машин с сетевым порядком байт почти не осталось. нам приходится этим пользоваться, т.к. формат сообщений описан в RFC, и там длинна сообщения это 3 байта в сетевом порядке.

vromanov ★★★
()
Ответ на: комментарий от nanoolinux

а ошибки нормально обрабатывать кто будет, Пушкин?

вон у vromanov-а выше 80 строк получилось, с учётом того, что у него открывающая фигурная скобка не на новой строчке :)

Harald ★★★★★
()
Ответ на: комментарий от nanoolinux

Можешь ещё crc[8|16|32] какой-нибудь в конец прилепить если надо. Или пару служебных битиков в начале, типа номер пакета в последовательности.

за целостность данных и правильный порядок отвечает TCP

Harald ★★★★★
()
Ответ на: комментарий от Harald

Делаю ход конём.

-module(tcp_server).
-export([start/0, init/0, acceptor/1, servant/1]).

start() ->
  P = spawn_link(?MODULE, init, []),
  {ok, P}.

init() ->
  process_flag(trap_exit, true),
  {ok, Srv} = gen_tcp:listen(3000, [binary, {packet, 4}, {active, true}]),
  Acceptor = spawn_link(?MODULE, acceptor, [Srv, self()]),
  loop(Acceptor, Srv).

loop(Acceptor, Srv) ->
  receive
    {'EXIT', Acceptor, shutdown} ->
      error_logger:warning_msg("Tcp acceptor down, restarting~n"),
      loop(spawn_link(?MODULE, acceptor, [Srv]), Srv);
    {'EXIT', _From, shutdown} ->
      gen_tcp:close(Srv),
      ok;
    _ ->
      loop(Acceptor, Srv)
  end.

acceptor(Srv) ->
  case get_tcp:accept(Srv) of
    {ok, Skt} ->
      {ok, Pid} = spawn_link(?MODULE, servant, [Skt]),
      gen_tcp:controlling_process(Skt, Pid),
      acceptor(Srv);
    {error, closed} ->
      exit(normal)
  end.

servant(Skt) ->
  receive
    {tcp, Skt, Request} ->
      gen_server:call(fpga_server, {fpga_req, {raw, Request}}),
      receive
        {fpga_rep, {raw, Reply}} ->
          gen_tcp:send(Skt, Reply);
        {error, R} ->
          gen_tcp:close(Skt),
          exit(R)
      end,
      servant(Skt);
    {tcp_closed, Skt} ->
      gen_tcp:close(Skt),
      exit(normal);
    {'EXIT', _From, shutdown} ->
      exit(normal)
  end.

Сейчас машин с сетевым порядком байт почти не осталось.

vromanov. Смотри код выше. Кроме того, в тех же кутях многие объекты именно так и сериализуются. И quint32_t там именно в сетевом порядке байт емнип. Так что во избежания головной боли в дальнейшем рекомедную не игнорировать принятые правила и стандарты.

nanoolinux ★★★★
()
Ответ на: комментарий от nanoolinux

Qt - это какая-то библиотека для юзер интерфейса. Я бы не считал их экспертами в сетевых приложениях. Ну и protobuff использует интеловский порядок байтов внутри, т.е. не вижу большой проблемы передать размер данных в таком-же порядке.

vromanov ★★★
()
Ответ на: комментарий от vromanov

Ну. Я бы не хотел скатываться в срач be vs le, но сетевой порядок байт, он на то и сетевой. На мой взгляд, когда ты что-то передаёшь по сети, на другом конце в праве ожидать, что порядок будет именно сетевой.

Хотя, впрочем, я уже давно для сети ни на чём кроме эрланга не пишу, так что мне пофиг, что

<<Len:32/little, _/binary>> = Message.
что
<<Len:32/big, _/binary>> = Message.
один хрен.

nanoolinux ★★★★
()
Ответ на: комментарий от nanoolinux

Да нет никакого срача... Большой разницы нет, можно и так и так..

vromanov ★★★
()
Ответ на: комментарий от vromanov

Qt - это какая-то библиотека для юзер интерфейса

какая-то библиотека для юзер интерфейса

Я бы не считал их экспертами в сетевых приложениях

Я бы

То, что ты перестал в development постить свой тупняк в виде тем, не дает тебе правда делать это в постах.

anonymous
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.