LINUX.ORG.RU

Запихнуть Axum Multipart в S3

 axum, ,


0

2

Пишу бекэнд на расте и Axum. Делаю загрузку файла multipart/form-data.

Грузить файл предполагается как-то так:

// Обработчик запроса принимает параметр mut multipart: Multipart
while let Some(mut field) = multipart.next_field().await? {
    if let Some("file") = field.name() { // Ищем нужное имя поля
        while let Some(chunk) = field.chunk().await? {
            // Делаем что-то с очередным куском файла (Bytes)
        }
        // Файл закончился
    }
}

Как я понял по документации, это единственный асинхронный способ. У Field есть ещё методы bytes() и text(), но они грузят содержимое файла полностью в ОЗУ и возвращают целиком Bytes или String.

А библиотека rust-s3 имеет вот такой метод у Bucket:

pub async fn put_object_stream<R: AsyncRead + Unpin>(
    &self,
    reader: &mut R,
    s3_path: impl AsRef<str>,
) -> Result<u16, S3Error>

То есть мне надо как-то превратить field в AsyncRead. При этом я категорически не хочу грузить файл целиком в ОЗУ или сохранять во временный каталог на диск. Хочу напрямую от клиента в S3 пересылать файл (но при этом S3 хранилище напрямую клиенту недоступно, потому что там ещё определённая бизнес-логика навёрнута на загрузке файлов, плюс контроль доступа).

★★★★★

Последнее исправление: KivApple (всего исправлений: 3)

Судя по документации, Field реализует трейт Stream. В крейте futures есть адаптер, который умеет из Stream делать AsyncRead. Правда, у tokio пока ещё свой собственный трейт AsyncRead, но для этого в крейте async-compat есть адаптер, который умеет из futures::io::AsyncRead делать tokio::io::AsyncRead.

По идее, как-то так должно работать:

use futures::stream::TryStreamExt;
use async_compat::Compat;

let mut reader = field.into_async_read().compat_mut();
put_object_stream(&mut reader, s3_path).await?;
Pacmu3ka
()
Последнее исправление: Pacmu3ka (всего исправлений: 1)

Попробуй завернуть код на async/await в отдельную async fn, а в качестве реализации AsyncRead.poll_read() напиши враппер, который будет вызывать твою async fn «всырую», сохранять полученную Future и руками транслировать результат .poll() в результат .poll_read().

intelfx ★★★★★
()
Ответ на: комментарий от Pacmu3ka

Почти заработало, осталось как-то ошибки преобразовать, чтобы они тоже были совместимы.

let filename = field.file_name();
let mimetype = field.content_type();
let mut reader = field.into_async_read().compat();
self.bucket.put_object_stream_with_content_type(
	&mut reader,
	filename.unwrap(), // TODO
	mimetype.unwrap_or("application/octet-stream")
).await?;
error[E0271]: type mismatch resolving `<Field<'_> as Stream>::Item == Result<_, Error>`
    --> src\uploads.rs:47:26
     |
47   |         let mut reader = field.into_async_read().compat();
     |                                ^^^^^^^^^^^^^^^ expected `Result<_, Error>`, found `Result<Bytes, MultipartError>`
     |
     = note: expected enum `Result<_, std::io::Error>`
                found enum `Result<axum::body::Bytes, MultipartError>`
     = note: required for `Field<'_>` to implement `TryStream`
note: required by a bound in `into_async_read`
    --> C:\Users\Ivan\.cargo\registry\src\index.crates.io-6f17d22bba15001f\futures-util-0.3.28\src\stream\try_stream\mod.rs:1125:36
     |
1123 |     fn into_async_read(self) -> IntoAsyncRead<Self>
     |        --------------- required by a bound in this associated function
1124 |     where
1125 |         Self: Sized + TryStreamExt<Error = std::io::Error>,
     |                                    ^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `TryStreamExt::into_async_read`
KivApple ★★★★★
() автор топика
Ответ на: комментарий от KivApple

Видимо, как-то так:

let mut reader = field
    .map_err(std::io::Error::other)
    .into_async_read()
    .compat_mut()

Сам запутался, нужен ли тут .compat_mut(), или достаточно .compat(), негде быстро проверить.

Pacmu3ka
()
Последнее исправление: Pacmu3ka (всего исправлений: 1)
Ответ на: комментарий от Pacmu3ka
`io_error_other` is unstable [E0658]

А вот так работает :-)

let filename = field.file_name()
	.unwrap_or("file")
	.to_owned();
let mimetype = field.content_type()
	.unwrap_or("application/octet-stream")
	.to_owned();
let mut reader = field
	.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
	.into_async_read()
	.compat();
self.bucket.put_object_stream_with_content_type(
	&mut reader,
	filename, // TODO: Generate file name
	mimetype
)?;

compat_mut не нужен

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)