Введение
С годами работы в области распределённых систем, я понял, что мой опыт не будет полным без реализации алгоритма Raft. Это осознание побудило меня к действию: я решил создать свою реализацию, используя асинхронные возможности C++20.
Задача стояла не из лёгких: мне требовалось разработать сетевую библиотеку, обходясь без громоздких решений вроде Boost или gRPC, создать эффективную библиотеку сериализации сообщений без использования таких тяжёлых инструментов, как protobuf, и реализовать алгоритм Raft таким образом, чтобы он был независим от сетевой инфраструктуры и поддавался простому тестированию через юнит-тесты.
В этой статье я поделюсь своим опытом создания сетевой библиотеки на основе корутин C++20.
Реализация EchoServer и EchoClient
Первым шагом в создании сетевой библиотеки была реализация простого EchoServer и EchoClient. EchoClient соединяется с EchoServer и отправляет ему сообщения, на которые сервер отвечает тем же текстом. Взаимодействие клиента и сервера можно увидеть на примере:
$ ./echoclient
test
Received: test
message
Received: message
yet another message
Received: yet another message
$ ./echoserver
Received: test
Received: message
Received: yet another message
Код EchoClient
Целью было сделать код клиента максимально простым. Пример реализации:
TSimpleTask client(TLoop* loop)
{
char out[128] = {0};
char in[128] = {0};
ssize_t size = 1;
try {
TSocket input{TAddress{}, 0 /* stdin */, loop->Poller()};
TSocket socket{TAddress{"127.0.0.1", 8888}, loop->Poller()};
co_await socket.Connect();
while (size && (size = co_await input.ReadSome(out, sizeof(out)))) {
co_await socket.WriteSome(out, size);
size = co_await socket.ReadSome(in, sizeof(in));
std::cout << "Received: " << std::string_view(in, size) << "\n";
}
} catch (const std::exception& ex) {
std::cout << "Exception: " << ex.what() << "\n";
}
loop->Stop();
co_return;
}
где TSimpleTask
- тривиальная корутина.
Код EchoServer
Аналогично, код сервера был разработан для простоты и эффективности:
TSimpleTask client_handler(TSocket socket, TLoop* loop) {
char buffer[128] = {0}; ssize_t size = 0;
try {
while ((size = co_await socket.ReadSome(buffer, sizeof(buffer))) > 0) {
std::cerr << "Received: " << std::string_view(buffer, size) << "\n";
co_await socket.WriteSome(buffer, size);
}
} catch (const std::exception& ex) {
std::cerr << "Exception: " << ex.what() << "\n";
}
co_return;
}
TSimpleTask server(TLoop* loop)
{
TSocket socket(TAddress{"0.0.0.0", 8888}, loop->Poller());
socket.Bind();
socket.Listen();
while (true) {
auto client = co_await socket.Accept();
client_handler(std::move(client), loop);
}
co_return;
}
Реализация Awaitable в C++20
C++20, хотя и предлагает достаточно низкоуровневый API для реализации корутин, позволяет сократить сложность, предоставляя механизм Awaitable
. Awaitable
управляет приостановкой и возобновлением корутин, используя coroutine_handle
, который ожидает активации на основе механизмов поллинга вроде select или poll.
Awaitable
определяет три основных метода: await_ready
, который проверяет, готова ли корутина к выполнению, await_suspend
, который приостанавливает корутину и связывает её с механизмом поллинга, и await_resume
, который возобновляет выполнение корутины после того, как ожидаемое событие произошло. Эта концепция облегчает управление асинхронными операциями, делая код более читаемым и эффективным.
В данном примере метод ReadSome
сокета реализован с использованием механизма Awaitable в C++20:
auto ReadSome(char* buf, size_t size) {
struct TAwaitable {
bool await_ready() {
Run();
return ready = (ret >= 0);
}
int await_resume() {
if (!ready) { Run(); }
return ret;
}
void Run() {
ret = read(fd, b, s);
if (ret < 0 && !(err == EINTR||err==EAGAIN||err==EINPROGRESS)) {
throw std::system_error(errno, std::generic_category(), "read");
}
}
void await_suspend(std::coroutine_handle<> h) {
poller->AddRead(fd, h);
}
TSelect* poller;
int fd;
char* b; size_t s;
int ret;
bool ready;
};
return TAwaitable{Poller_,Fd_,buf,size};
}
Структура TAwaitable
определяет необходимые функции для управления асинхронным выполнением. Если чтение данных с сокета невозможно, корутина приостанавливается (await_suspend
), и событие добавляется в поллер. Как только сокет готов к чтению, вызывается resume
на coroutine_handle
, что приводит к возобновлению корутины (await_resume
) и продолжению её выполнения.
Для реализации более сложных вещей, например, чтобы с помощью ReadSome сделать Read, который читает точное число байт из сокета, нужно уметь вызывать одну корутину из другой корутины и получать результат:
template<typename T, typename TSocket>
struct TStructReader {
TStructReader(TSocket& socket)
: Socket(socket)
{ }
TValueTask<T> Read() {
T res;
size_t size = sizeof(T);
char* p = reinterpret_cast<char*>(&res);
while (size != 0) {
auto readSize = co_await Socket.ReadSome(p, size);
if (readSize == 0) {
throw std::runtime_error("Connection closed");
}
if (readSize < 0) {
continue; // retry
}
p += readSize;
size -= readSize;
}
co_return res;
}
private:
TSocket& Socket;
};
Эта структура будет использоваться так:
auto result = co_await TStructReader<TType>(socket).Read();
Структура TStructReader
использует метод ReadSome
для асинхронного чтения, обрабатывая ситуации закрытого соединения и необходимость повтора чтения. Чтобы достичь этого, корутина, возвращаемая TStructReader::Read
, должна быть одновременно Awaitable
. Это обеспечивает возможность приостановки вызывающей корутины и её возобновления после получения результата. Для обеспечения данного поведения в await_suspend
мы будем прикапывать корутину, которая нас вызывает, в final_suspend
вызываемой корутины мы будем пробуждать вызывающую корутину и получать результат. Полный код подобной корутины (TValueTask<T>
) можно найти на GitHub: coroio/corochain.hpp.
Расширенные Возможности Сетевой Библиотеки
С помощью механизма вызова корутин по цепочке, я расширил сетевую библиотеку, реализовав чтение и запись структур данных, построчное чтение, а также поддержку SSL сокетов с использованием цепочек вызовов корутин. Пример echoclient
с построчным чтением демонстрирует эффективность и гибкость подхода:
TFileHandle input{0, poller}; // stdin
TSocket socket{std::move(addr), poller};
TLineReader lineReader(input, maxLineSize);
TByteWriter byteWriter(socket);
TByteReader byteReader(socket);
co_await socket.Connect();
while (auto line = co_await lineReader.Read()) {
co_await byteWriter.Write(line);
co_await byteReader.Read(in.data(), line.Size());
std::cout << "Received: " << std::string_view(in.data(), line.Size()) << "\n";
}
Заключение
В заключение, архитектура разработанной мной библиотеки представлена на стартовой картинке статьи. Библиотека поддерживает множество механизмов полинга, включая select
, poll
, epoll
, kqueue
и uring
. Хотя я не описываю протокол Raft
в деталях (его можно найти в raft.pdf), я точно следовал его спецификации. Для сериализации сообщений использовалась техника чтения/записи запакованных структур в формате TLV. Пример сессии трёх серверов Raft
и клиента демонстрирует функциональность:
$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
...
Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0 MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176
....
$ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Follower, Term: 3, Index: 0, CommitIndex: 0,
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...
$ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Follower, Term: 3, Index: 0, CommitIndex: 0,
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...
$ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1
198k 0:00:03 [159.2k/s] [ <=>
Исходный код сетевой библиотеки доступен на GitHub: coroio, где также можно ознакомиться с сравнительными графиками производительности по сравнению с libevent. Код библиотеки Raft также доступен: miniraft-cpp.