LINUX.ORG.RU

Rust и конкурентная структура данных с RAII

 ,


0

3

Я хочу сделать структуру данных, которая хранит в себе элементы идентифицируемые ключом (для простоты примера здесь в качестве ключа выступает число). При этом я хочу, чтобы обращение к несуществующему элементу создавало его с нужным ключом, а с разными элементами можно было работать параллельно из разных потоков, при этом используя RAII (блокирующе получаем обёртку для работы с элементом, потому что HashMap однопоточный, к тому же процесс создания несуществующего элемента это потенциальный race condition, а затем через обёртку можем работать с нашим элементом уже вне зависимости от поведения других потоков, при этом после поиска/создания должен быть заблокирован уже только один конкретный элемент).

Максимально наивная реализация:

use std::sync::{Mutex, MutexGuard};
use std::collections::HashMap;
use std::collections::hash_map::Entry;

struct Item {
    a: i32,
    b: i32
}

struct ItemStorage {
    items: Mutex<HashMap<i32, Box<Mutex<Item>>>>
}

struct ItemRef<'a> {
    item: MutexGuard<'a, Item>
}

// Всякие new и прочие идеоматические вещи опущены для простоты
impl ItemStorage {
    pub fn item(&mut self, key: i32) -> ItemRef {
		let mut items_lock = self.items.lock().unwrap();
		let item = match items_lock.entry(key) {
			Entry::Occupied(v) => v.into_mut(),
			Entry::Vacant(v) => {
				v.insert(Box::new(Mutex::new(Item {
					a: 0, b: 1
				})))
			}
		};
		ItemRef {
			item: item.lock().unwrap()
		}
	}
}

fn main() {
    let mut storage = ItemStorage {
        items: Mutex::new(HashMap::new())
    };
    // Код ниже можно запускать из разных потоков как с одинаковыми, так и с разными ключами
    let mut item_ref = storage.item(100);
    item_ref.item.a += 1;
}

Ожидаемо Rust чувствует, что здесь что-то не так:

error[E0515]: cannot return value referencing local variable `items_lock`
  --> src/main.rs:29:3
   |
22 |           let item = match items_lock.entry(key) {
   |                            ---------- `items_lock` is borrowed here
...
30 | /         ItemRef {
31 | |             item: item.lock().unwrap()
32 | |         }
   | |_________^ returns a value referencing data owned by the current function

error: aborting due to previous error

Как обрабатываются правильно такие ситуации? Или подобный RAII надо реализовать с помощью каких-нибудь возможностей unsafe?

★★★★★

Последнее исправление: KivApple (всего исправлений: 6)

Писал похожий по сути код. Уверен, в нем есть куча проблем и в целом неадеквата, но он хотя бы конпелируется.

use async_lock::Mutex;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::{Duration, Instant};

pub(crate) struct Cache<K: Eq + Hash, V: Default> {
    lru: Mutex<HashMap<K, Entry<V>>>,
    ttl: Duration,
    max_size: usize,
}

struct Entry<T: Default> {
    used: Instant,
    value: Arc<T>,
}

impl<K: Eq + Hash, V: Default> Cache<K, V> {
    pub fn new(max_size: usize, ttl: Duration) -> Self {
        let lru = Mutex::new(HashMap::with_capacity(max_size));
        Cache { lru, max_size, ttl }
    }

    pub async fn get(&self, key: K) -> Arc<V> {
        let mut g = self.lru.lock().await;

        let entry = g.entry(key).or_insert_with(|| Entry {
            used: Instant::now(),
            value: Arc::new(V::default()),
        });

        // TODO: avoid double-setting this field?
        entry.used = Instant::now();

        entry.value.clone()
    }

    pub async fn clean(&self) {
        // удолил: для этого топика не важно.
        // если интересно, смотрите в истории
    }
}
derlafff ★★★★★
()
Последнее исправление: derlafff (всего исправлений: 3)

потому что HashMap однопоточный

Вот это кстати проблема. Когда я искал в прошлый раз, я не осилил найти аналог sync.Map из go, а вот сейчас нашел как минимум один: https://github.com/jonhoo/left-right

Или подобный RAII надо реализовать с помощью каких-нибудь возможностей unsafe?

Я смотрел реализации похожего из публичных крейтов, и много кто использует unsafe. В том числе тот, что я привел выше. Зачем и почему - лично мне не ясно, но это определенно возможно без unsafe.

derlafff ★★★★★
()

Ни бум-бум в русте. Скорее всего, захват мутекса поместить в отдельную область видимости (scope)

item = (); // void value
{
  lock = items.lock()
  item = items.get_or_create();
}
return item;

Либо принудительно освободить lock. Беглый поиск по интернету подсказал какой-то drop(lock), который вроде std::mem::drop.

anonymous
()
Ответ на: комментарий от derlafff

Тут нет блокировки самого элемента для дальнейшей работы после разблокировки HashMap. Вероятно, у тебя элементы иммутабельные, а я хочу мутабельные.

Если просто заменить в моём коде Box на Arc и клонировать его перед созданием ItemRef, ошибка никуда не исчезает. Сохранить Arc внутрь структуры (чтобы Rust понимал, что объект точно будет жить, пока жив ItemRef) я не могу, потому что после блокировки Mutex элемента, Arc, в котором он лежит, становится borrowed и его уже нельзя положить в конструируемый ItemRef.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 2)
Ответ на: комментарий от anonymous

Хотя нет. Скорее всего, у тебя проблема с тем, что вставка захватывает весь map. Сперва отдельно вставь, потом отдельно получи ссылку на существующий элемент в map’е.

if let vacant(v) = lock_items.entry(k) { v.insert(...); }
if let occupied(v) = lock_items.entry(k) { v.get_mut(); }
anonymous
()

Нельзя просто так взять и вернуть из функции MutexGuard элемента пока MutexGuard самой таблицы захвачен. Вместо этого можешь возвращать Arc<_>. Например как-то так.

numas13
()
Ответ на: комментарий от numas13

Мне важно захватить mutex элемента до отпускания mutex таблицы. Зачем? Потому что на самом деле это конкурентная структура воксельного мира, хранящая чанки. И помимо операции блокировки одного чанка, будет ещё операция блокировки группы чанков 3х3х3 (для многих операций важны соседи). И если не гарантировать, что все захваты чанков будут сериализованы одним mutex, то легко устроить deadlock (один тред захватил одну часть чанков, другой другую и оба ждут друг друга до тепловой смерти вселенной). А вот после всех блокировок можно спокойно работать конкурентно.

Кстати, это накладывает ограничение, что если захватил чанк, то пока не отпустишь, новые захватывать нельзя в том же потоке. Было бы круто сделать эту гарантию тоже средствами rust.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 2)
Ответ на: комментарий от KivApple

Итого: нужен атомарный лок сразу нескольких объектов из коллекции, рекурсивные локи коллекции должны быть запрещены.

Стандартными средствами этого не сделать. Да и эти требования больше напоминают работу с базой данных, а не примитив синхронизации. Можно так и сделать: Мutex<(Vec<UnsafeCell<Item>>, LockedAreasList)>, проверять есть ли запрошенный диапазон в списке залоченных, если есть, то добавлять thread::current() в список потоков, которые проснутся при разблокировании этого (или охватывающего, или пересекающегося диапазона), делать thread::park() и ждать.

Но производительность скорее всего будет хреновая. И из-за локов, которые не особо быстрые, и из-за поиска в базе залоченных диапазонов, и из-за возможного false sharing у разных потоков, работающих над соседними чанками.

По-моему, будет быстрее и проще использовать два массива: предыдущее состояние (только чтение) и новое состояние (только запись). Делим новое состояние на куски, раздаём нескольким потокам и они обновляют свои части без всякой синхронизации.

red75prim ★★★
()
Последнее исправление: red75prim (всего исправлений: 3)
Ответ на: комментарий от KivApple

3x3x3

Даже в майне многие атомарные задачи обращаются к бОльшим кускам карты. Рост деревьев, поршни, редстоун, вот это вот всё.

Алсо, рекомендую обрабатывать сразу чанки или группы чанков. Одиночные кубы слишком простые, накладные расходы на организацию многопоточности превышают профит от распараллеливания

izzholtik ★★★
()
Последнее исправление: izzholtik (всего исправлений: 1)
Ответ на: комментарий от red75prim

без всякой синхронизаци

Не совсем, два потока могут перезаписывать один элемент итогового чанка. Например, два игрока ломают один куб, и для обоих дропается лут, а игра дважды перезаписывает куб пустотой.

izzholtik ★★★
()
Ответ на: комментарий от KivApple

Тут нет блокировки самого элемента для дальнейшей работы после разблокировки HashMap.

Мне важно захватить mutex элемента до отпускания mutex таблицы.

Все просто: в моем коде можно возвращать MutexGuard вместо Item.

derlafff ★★★★★
()
Ответ на: комментарий от derlafff

Хотя тогда нужно точно заюзать одну из тех приведенных sync map

derlafff ★★★★★
()
Ответ на: комментарий от izzholtik

Я как раз и имел в виду группу чанков 3х3х3, а не отдельных блоков.

KivApple ★★★★★
() автор топика
Последнее исправление: KivApple (всего исправлений: 1)

Вот так можно, но я не уверен что ты именно этого хочешь.

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&am...

use std::sync::Mutex;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;

#[derive(Debug)]
struct Item {
    a: i32,
    b: i32
}

#[derive(Debug)]
struct ItemStorage {
    items: Mutex<HashMap<i32, Arc<Mutex<Item>>>>
}



// Всякие new и прочие идеоматические вещи опущены для простоты
impl ItemStorage {
    pub fn item(&mut self, key: i32) -> Arc<Mutex<Item>> {
		let mut items_lock = self.items.lock().unwrap();
		let item = match items_lock.entry(key) {
			Entry::Occupied(v) => v.into_mut(),
			Entry::Vacant(v) => {
				v.insert(Arc::new(Mutex::new(Item {
					a: 0, b: 1
				})))
			}
		};
		
		item.clone()
	}
}

fn main() {
    let mut storage = ItemStorage {
        items: Mutex::new(HashMap::new())
    };
    // Код ниже можно запускать из разных потоков как с одинаковыми, так и с разными ключами
    let item = storage.item(100);
    item.lock().unwrap().a = 42;
    println!("Data:{:?}", storage);
}

trex6 ★★★★★
()
Ответ на: комментарий от trex6

P.S. Но лучше неблокирующие Map использовать и мьютекс на всю коллекцию убрать (мьютекс на каждый элемент оставить, он важен).

trex6 ★★★★★
()
Ответ на: комментарий от trex6

Но лучше неблокирующие Map использовать и мьютекс на всю коллекцию убрать (мьютекс на каждый элемент оставить, он важен)

А лицо от такого числа мьютексов не опухнет?

byko3y ★★★★
()
Ответ на: комментарий от trex6

А вы с какой целью интересуетесь?

С той, что size(pthread_mutex_t)=40 на x64 и size(pthread_mutex_t)=24 на x86 для NPTL. По крайней мере никсовый Mutex раста реализован через pthread_mutex. Если такое сыпать в каждый элемент и сами элементы достаточно маленькие, то мало не покажется. Ну типа я сам так делаю, но у меня питон, а не раст, потому прокатывает.

byko3y ★★★★
()
Ответ на: комментарий от byko3y

Зависит от числа элементов и ограничений целевой платформы. Для 10^6 на современном PC должно быть норм (40Mb), для 10^10 надо что-то другое придумывать, конечно. Если память сильно ограниченна - тоже придется придумывать что-то другое.

Насколько я помню, даже с 10^6 мьютексов даже на PC начнутся другие проблемы, вовсем не с памятью.

trex6 ★★★★★
()
Последнее исправление: trex6 (всего исправлений: 1)
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.