Реализация многопоточной общей памяти в Rust
3 ноября 2022 г.Эта статья является третьей в серии, посвященной созданию беспроводного термостата на Rust, работающего на Raspberry Pi, хотя эта статья имеет очень мало общего с Raspberry Pi и более актуальна для любого многопоточного приложения. При создании своего приложения я реализовал простой и эффективный дизайн для управления многопоточным доступом к разделяемой памяти. Предыдущие статьи можно найти здесь:
первая статья — Беспроводной термостат Raspberry Pi в Rust и
вторая статья - Кросс-компиляция стала проще .
У меня есть несколько фрагментов кода ниже, но вы можете найти весь репозиторий кода на GitHub.
Во-первых, предыстория:
При разработке многопоточного приложения одним из основных соображений является то, как разные потоки обмениваются данными. Всегда есть что-то общее между потоками — в конце концов, вы выделяете потоки для работы над общей проблемой. По крайней мере, данные конфигурации или пулы соединений с базой данных должны быть общими. Есть два основных способа решения этой проблемы.
- Передача сообщений — каналы между потоками позволяют отправлять и получать данные между ними. Передача сообщений — это механизм, который предпочитает golang, как указано в документации golang: «Не общайтесь, разделяя память; вместо этого делитесь памятью, общаясь». Rust поддерживает создание каналов; вы можете найти больше информации в главе Rust Book о передаче сообщений. ли>
2. Shared-State – набор данных, определяемых как общее состояние. Каждый поток получает доступ к общей области данных через потокобезопасные механизмы защиты — обычно Mutex. Блокировки данных позволяют вам безопасно получать доступ к данным, считывая или обновляя данные по мере необходимости, а затем блокировка снимается. Только один поток может читать или записывать данные одновременно. Эти механизмы описаны в разделе shared state Rust Book.
Оба подхода работают, и Rust имеет хорошую поддержку обоих механизмов в стандартной библиотеке. Чтобы выбрать между ними, обратите внимание на следующее.
- Каковы шаблоны доступа? Будут ли потоки в основном читать данные, в основном записывать данные или и то, и другое?
2. Как можно избежать взаимоблокировки? Предотвращение взаимоблокировки является основной задачей для многопоточных приложений. Нет ничего хуже, чем наличие нескольких потоков, доступных для выполнения работы, и все они застревают в ожидании другого. те, чтобы уйти с дороги.
3. Производительность. Хотя чрезмерное проектирование для повышения производительности является распространенной ошибкой, важно иметь некоторое представление о требованиях к производительности вашего решения. Я большой поклонник правила: решение должно быть максимально простым, но не проще.
Я выбрал подход с общим состоянием для приложения, которое я создаю. Это решение также подразумевало, что мои рабочие потоки будут работать путем опроса — периодической проверки вещей вместо получения внешнего сообщения. Вот что привело меня к такому выводу.
- У меня есть три основных темы. Одним из них является веб-интерфейс, который позволяет читать/записывать общее состояние (получить температуру, получить настройку термостата, установить настройку термостата). Веб-интерфейс по определению является создателем незапланированной активности — вещи приходят на основе действий внешнего клиента и не находятся под контролем приложения. Во-вторых, пара фоновых рабочих потоков реагирует на среду приложения. Первый считывает датчик температуры, а второй рассчитывает, должен ли термостат быть включен или выключен, и управляет физическим реле, чтобы это произошло. Термостат находится в отдельном потоке, так как он может включаться/выключаться, когда температура пересекает пороговое значение, или мы получаем новую настройку термостата из веб-интерфейса.
2. Мне не нужно было беспокоиться о конкуренции только с тремя потоками (при условии наличия одного внешнего клиента в веб-интерфейсе). Простой подход с общим состоянием не столкнется с проблемами из-за слишком большого количества потоков, пытающихся одновременно получить доступ к блокировкам.
3. Логика определения состояния термостата имеет временную составляющую. Мы не хотим частых, коротких вспышек включения/выключения, которые бьют в печь. Мы устанавливаем минимальное время, в течение которого термостат будет включен или выключен перед переключением в другое состояние. Чтобы точно отреагировать на сообщение (изменение показаний температуры или значения термостата), необходимо знать, сколько времени прошло с момента замены термостата.
Исходя из этих требований, общее состояние имело смысл. Мой дизайн обеспечивает небольшое количество потоков, поэтому конкуренция не была проблемой (общая проблема с подходами с общим состоянием). Мне нужно следить за временем, поэтому всегда требовались какие-то данные о состоянии. Реализация двух разных механизмов данных состояния в одном приложении усложнила бы работу. Когда каждый поток делает свое дело и независимо получает/устанавливает общее состояние, мы изолируем каждый поток от других. Этот выбор привел к упрощению общего дизайна приложения.
Но когда я просмотрел содержание Rust Book на shared-state, сложность управления доступом из каждого потока казались пугающими. Кроме того, разбрызгивание кода блокировки потока в приложении сделало его беспорядочным — мои хорошие одноцелевые функции теперь имели логику блокировки Mutex. Кроме того, при доступе к данным через блокировку Mutex работа с проверкой заимствования Rust между потоками оказалась сложной задачей.
Простой подход:
Чтобы сделать это управляемым, я использовал подход, который уже использовал ранее в проектах на C++, — инкапсуляцию общих данных в отдельный класс и передачу всей логики Mutex в этот класс. В Rust нет классов, но пара структур сделала свое дело.
Для начала я создал структуру для хранения всех общих данных о состоянии в одном месте. Тогда я мог бы иметь дело с одной ссылкой на эту структуру вместо того, чтобы управлять каждым элементом данных по отдельности.
pub struct SharedData {
continue_background_tasks: bool,
current_temp: f32,
thermostat_value: usize,
thermostat_on: bool,
thermostat_change_datetime: OffsetDateTime,
}
Затем мы определяем структуру, которая содержит указатель Arc — указатель Atomic Reference Count на Mutex, который охраняет нашу общую структуру данных. Мы используем эту структуру для управления доступом к нашим общим данным и из них.
pub struct AccessSharedData {
pub sd: Arc<Mutex<SharedData>>,
}
Мы сделаем много копий этого указателя — все они указывают на наш мьютекс, так мы получаем доступ к нашему общему пространству памяти. Мы делаем это, настраивая метод Clone() для AccessSharedData — он выглядит следующим образом.
// Clone here makes a copy of the Arc pointer - not the entire class of data
// All clones point to the same internal data
impl Clone for AccessSharedData {
fn clone(&self) -> Self {
AccessSharedData {
sd: Arc::clone(&self.sd),
}
}
}
Чтобы использовать это, мы сначала создаем экземпляр нашей структуры SharedData (простой метод .new() не показан выше, но он прост).
let common_data = SharedData::new(
true,
configuration.initial_thermostat_value as f32 + 5.0,
configuration.initial_thermostat_value,
false,
OffsetDateTime::UNIX_EPOCH,
);
Затем мы инициализируем экземпляр AccessSharedData.
// The wrapper around our shared data that gives it safe access across threads
let sd = AccessSharedData {
sd: Arc::new(Mutex::new(common_data)),
};
Затем мы даем каждому потоку, который мы порождаем(), клонированную копию структуры AccessSharedData. Вызов clone создает копию указателя Arc, которую мы затем перемещаем в новый поток. Аналогичный метод передает клон структуры AccessSharedData в метод actix_web HttpServer::new(), поэтому он также доступен в обработчиках HTTP-клиента.
// Create another clone of our pointer to shared data, and send it into a new thread that continuously
// checks to see how the current temperature and current thermostat setting compare - and will
// trigger turning on the relay for the furnace as needed.
let sdc = sd.clone();
let control_handle = spawn( async move {
tracing::debug ! ("kicking off control_thermostat");
match run_control_thermostat( & sdc, configuration.poll_interval).await {
Ok(_) => tracing::info ! ("control_thermostat ended"),
Err(e) => tracing::error ! ("control_thermostat returned an error {:?}", e),
}
});
Теперь, когда у каждого есть копия указателя Arc, нам нужно создать средство для его использования для чтения/записи нашей общей структуры данных. Простой набор геттеров/сеттеров для каждого члена нашей структуры обрабатывает задачу получения блокировки, получения/установки данных и автоматического снятия блокировки. Помещая эту логику вокруг get/set и используя конец метода в качестве границы области действия для принудительного снятия блокировки, мы получаем контроль над доступом к нашим данным.
impl AccessSharedData {
pub fn continue_background_tasks(&self) -> bool {
let lock = self.sd.lock().unwrap();
lock.continue_background_tasks
}
pub fn set_continue_background_tasks(&self, new_val: bool) {
let mut lock = self.sd.lock().unwrap();
lock.continue_background_tasks = new_val;
}
//repeated for remaining struct members
}
Когда каждая функция get/set возвращает значение, блокировка снимается. Этот подход также означает, что невозможно заблокировать доступ к структуре — все блокируется и немедленно освобождается и запрещает что-либо еще мешать. При наличии всех наших механизмов блокировки использование общих данных в остальной части нашего приложения становится тривиальной задачей.
sd.set_current_temp(temp_f);
let temp_now = sd.current_temp();
let thermostat_now = sd.thermostat_value();
Вкратце
Эта стратегия представляет собой инкапсуляцию в лучшем виде. Код приложения использует прямые вызовы методов для получения/установки общих элементов данных, когда это необходимо. За кулисами мы управляем блокировкой мьютекса, чтобы этот доступ был потокобезопасным и защищенным от взаимоблокировок. И средство проверки заимствования Rust очень счастливо, что мы не разрешаем прямые общие изменяемые ссылки на наши общие данные. При необходимости вы можете легко расширить этот шаблон проектирования, чтобы реализовать более надежный шаблон доступа.
Внедрение логики блокировки в геттеры/сеттеры позволяет нашей общей структуре данных скрыть сложности многопоточного доступа — так же, как это делает стандартная библиотека для потокобезопасных коллекций. Использование клонов Arc позволяет получить дескриптор структуры общей памяти в каждом потоке. Эти две концепции являются основными идеями, благодаря которым работает наш шаблон проектирования Easy Shared Data.
Также опубликовано здесь
Оригинал