Каким свойством в общем случае обладают мьютексы
Введение
При написании многопоточных приложений почти всегда требуется работать с общими данными, одновременное изменение которых может привести к очень неприятным последствиям.
Для блокировки общих данных от одновременного доступа необходимо использовать объекты синхронизации.
В данном топике рассмотренна методика работы с мютексами, существенно уменьшающая количество потенциальных ошибок связанных с созданием/удалением и захватом/освобождением.
Неудаление мютекса приводит к утечке памяти, незахват — к некорректным данным, а неосвобождение — к блокировке всех функций, работающих с общими данными.
Ниже рассматривается работа с мютексами в Windows и Unix, подобная идея может быть использована при работе с другими объектами синхронизации.
Эта идея является частным случаем методики «Выделение ресурса — есть инициализация (RAII)».
Создание, настройка и удаление мютекса
Для начала объявим класс CAutoMutex, который создает мютекс в конструкторе и удаляет в деструторе.
Плюсы:
— не нужно плодить по всему проекту похожие фрагменты коды инициализации, настройки и удаления мютекса
— автоматическое удаление мютекса и освобождение ресурсов, занятых им
// класс-оболочка, создающий и удаляющий мютекс (Windows)
class CAutoMutex
{
// дескриптор создаваемого мютекса
HANDLE m_h_mutex;// запрет копирования
CAutoMutex(const CAutoMutex&);
CAutoMutex& operator=(const CAutoMutex&);
public:
CAutoMutex()
{
m_h_mutex = CreateMutex(NULL, FALSE, NULL);
assert(m_h_mutex);
}
~CAutoMutex() { CloseHandle(m_h_mutex); }
HANDLE get() { return m_h_mutex; }
};* This source code was highlighted with Source Code Highlighter.
В Windows мютексы по умолчанию рекурсивные, а в Unix — нет. Если мютекс не является рекурсивным, то попытка захватить его два раза в одном потоке приведет к deadlock-у.
Чтобы в Unix создать рекурсивный мютекс, необходимо установить соответствующий флаг при инициализации. Соответствующий класс CAutoMutex выглядел бы так (проверки возвращаемых значений не показаны для компактности):
// класс-оболочка, создающий и удаляющий рекурсивный мютекс (Unix)
class CAutoMutex
{
pthread_mutex_t m_mutex;CAutoMutex(
const CAutoMutex&);
CAutoMutex& operator=(const CAutoMutex&);public:
CAutoMutex()
{
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&m_mutex, &attr);
pthread_mutexattr_destroy(&attr);
}
~CAutoMutex()
{
pthread_mutex_destroy(&m_mutex);
}
pthread_mutex_t& get()
{
return m_mutex;
}
};* This source code was highlighted with Source Code Highlighter.
Захват и освобождение мютекса
По аналогии с предыдущим классом объявим класс CMutexLock, который занимает мютекс в конструкторе и освобождает в деструкторе. Созданный объект этого класса автоматически захватит мютекс и освободит его в конце области действия независимо от того, какой именно был выход из этой области: нормальный выход, преждевременный return или выброс исключения. Плюсом также является, что можно не плодить похожие фрагменты кода работы с мютексами.
// класс-оболочка, занимающий и освобождающий мютекс
class CMutexLock
{
HANDLE m_mutex;// запрещаем копирование
CMutexLock(const CMutexLock&);
CMutexLock& operator=(const CMutexLock&);
public:
// занимаем мютекс при конструировании объекта
CMutexLock(HANDLE mutex): m_mutex(mutex)
{
const DWORD res = WaitForSingleObject(m_mutex, INFINITE);
assert(res == WAIT_OBJECT_0);
}
// освобождаем мютекс при удалении объекта
~CMutexLock()
{
const BOOL res = ReleaseMutex(m_mutex);
assert(res);
}
};* This source code was highlighted with Source Code Highlighter.
Для еще большего удобства объявим следующий макрос:
// макрос, занимающий мютекс до конца области действия
#define SCOPE_LOCK_MUTEX(hMutex) CMutexLock _tmp_mtx_capt(hMutex);* This source code was highlighted with Source Code Highlighter.
Макрос позволяет не держать в голове имя класса CMutexLock и его пространство имен, а также не ломать голову каждый раз над названием создаваемого (например _tmp_mtx_capt) объекта.
Примеры использования
Рассмотрим примеры использования.
Для упрощения примера объявим мютекс и общие данные в глобальной области:
// автоматически создаваемый и удаляемый мютекс
static CAutoMutex g_mutex;// общие данные
static DWORD g_common_cnt = 0;
static DWORD g_common_cnt_ex = 0;* This source code was highlighted with Source Code Highlighter.
Пример простой функции, использующей общие данные и макрос SCOPE_LOCK_MUTEX:
void do_sth_1( ) throw()
{
// …
// мютекс не занят
// …{
// занимаем мютекс
SCOPE_LOCK_MUTEX(g_mutex.get());// изменяем общие данные
g_common_cnt_ex = 0;
g_common_cnt = 0;
// здесь мютекс освобождается
}
// …
// мютекс не занят
// …
}* This source code was highlighted with Source Code Highlighter.
Не правда ли, что функция do_sth_1() выглядит элегантнее, чем следующая? do_sth_1_eq:
void do_sth_1_eq( ) throw()
{
// занимаем мютекс
if (WaitForSingleObject(g_mutex.get(), INFINITE) == WAIT_OBJECT_0)
{
// изменяем общие данные
g_common_cnt_ex = 0;
g_common_cnt = 0;// надо не забыть освободить мютекс
ReleaseMutex(g_mutex.get());
}
else
{
assert(0);
}
}* This source code was highlighted with Source Code Highlighter.
В следующем примере точек выхода из функции три, но упоминание о мютексе только одно (объявление области блокировки мютекса):
// какое-то исключение
struct Ex {};// фунцкция, использующая общие данные
int do_sth_2( const int data ) throw (Ex)
{
// …
// мютекс не занят
// …
// занимаем мютекс на критическом участке
SCOPE_LOCK_MUTEX(g_mutex.get());
int rem = data % 3;
if (rem == 1)
{
g_common_cnt_ex++;
// мютекс автоматически освободится при выбросе исключения
throw Ex();
}
else if (rem == 2)
{
// мютекс автоматически освободится при возврате
g_common_cnt++;
return 1;
}
// здесь мютекс автоматически освободится при возврате
return 0;
}* This source code was highlighted with Source Code Highlighter.
Примечание: я не сторонник использовать несколько return-ов в одной функции, просто пример от этого
становится чуть показательнее.
А если бы функция была длиннее и точек выброса исключений было бы с десяток? Без макроса нужно было поставить перед каждой из них ReleaseMutex(…), а ошибиться здесь можно очень легко.
Заключение
Приведенные примеры классов и макросов достаточно просты, они не содержат сложных проверок и ожидают освобождения мютекса в течение бесконечного времени. Но даже это облегчает жизнь во многих случаях. А если облегчает жизнь, то почему бы это не использовать?
UPD: Первый класс CAutoMutex по ошибке не был написан, вместо него было повторное объявление второго класса CMutexLock. Исправлено.
UPD2: Убраны слова inline в объявлении методов внутри классов за ненадобностью.
UPD3: Был добавлен вариант класса CAutoMutex с рекурсивным мютексом для Unix.
UPD4: Перенесено в блог «C++»
Источник
Часть 1, Часть 2
В прошлой статье мы разобрались с тем, что такое конкурентность/параллелизм и зачем нужна синхронизация. Настала пора изучить примитивы синхронизации, которые предлагает нам стандартная библиотека шаблонов C++.
Первым из них будет std::mutex. Но сначала ознакомьтесь с картой статьи (она пригодится, если вы вдруг запутаетесь).
Итак, начнём.
Что такое мьютекс?
Мьютекс (англ. mutex, от mutual exclusion — «взаимное исключение») — это базовый механизм синхронизации. Он предназначен для организации взаимоисключающего доступа к общим данным для нескольких потоков с использованием барьеров памяти (для простоты можно считать мьютекс дверью, ведущей к общим данным).
Синтаксис
- Заголовочный файл | #include <mutex>
- Объявление | std::mutex mutex_name;
- Захват мьютекса | mutex_name.lock();
Поток запрашивает монопольное использование общих данных, защищаемых мьютексом. Дальше два варианта развития событий: происходит захват мьютекса этим потоком (и в этом случае ни один другой поток не сможет получить доступ к этим данным) или поток блокируется (если мьютекс уже захвачен другим потоком). - Освобождение мьютекса | mutex_name.unlock();
Когда ресурс больше не нужен, текущий владелец должен вызвать функцию разблокирования unlock, чтобы и другие потоки могли получить доступ к этому ресурсу. Когда мьютекс освобождается, доступ предоставляется одному из ожидающих потоков.
Как создать потокобезопасную очередь
Разберёмся, как реализовать простейшие потокобезопасные очереди, то есть очереди с безопасным доступом для потоков.
В библиотеке стандартных шаблонов уже есть готовая очередь (rawQueue). Наша реализация будет предполагать: а) извлечение и удаление целочисленного значения из начала очереди и б) добавление нового в конец очереди. И всё это при обеспечении потокобезопасности.
Сначала выясним, почему и как эти две операции могут создавать проблемы для многопоточности.
- Извлечение и удаление
Для извлечения и удаления значения из начала очереди необходимо выполнить три операции:
1. Проверить, не пуста ли очередь.
2. Если нет, получается ссылка на начало очереди (rawQueue.front()).
3. Удаляется начало очереди (rawQueue.pop()).
В промежутках между этими этапами к очереди могут получать доступ и другие потоки с целью внесения изменений или чтения. Попробуйте сами.
Например:
Удалили “1”, хотя до его извлечения даже не дошли, потому что поток B извлекает 0 и удаляет 1.
Дальше — больше: если rawQueue состоит из одного элемента, поток B видит непустую очередь, и тут же поток A удаляет последнее значение. Теперь поток B пытается удалить первое значение из пустой очереди, приводя к неопределённому поведению. Настоящая страшилка!
- Добавление
Рассмотрим теперь добавление нового значения с помощью rawQueue.push(): новый элемент добавляется в конец контейнера и становится следующим за последним на данный момент элементом. Дальше на единицу увеличивается размер. Заметили здесь проблему? А что, если два потока одновременно добавят новое значение, увидев этот последний элемент? И что может произойти в интервале между добавлением нового элемента и увеличением размера? Кто-нибудь возьмёт да и прочитает неправильный размер.
Получается, мы должны быть уверены, что никто не будет трогать очередь, пока мы выполняем наши задачи. Используем мьютекс для защиты этих многоступенчатых операций и сделаем так, чтобы все вместе они смотрелись как одна атомарная операция.
Обратите внимание:
- Связь между мьютексом и защищаемым ресурсом — только в голове программиста.
Мы знаем, что мьютекс m защищает rawQueue, но напрямую это не указывается. - Захват с необходимой степенью распараллеливания.
Использование мьютекса уменьшает параллелизм. Предположим, у нас только один мьютекс для защиты вектора и строки без каких-либо зависимостей (например, значение переменной не зависит от вектора и наоборот). Поток А захватывает мьютекс, читает строку и начинает обработку каких-то данных перед добавлением нового значения в вектор и освобождением мьютекса. И тут появляется поток B, который хочет внести пару изменений в строку, но при попытке захватить мьютекс (какая досада!) оказывается блокированным до того момента, пока не будут завершены все операции с вектором. Проблему решаем дополнительным мьютексом для строки (захваченным перед чтением и освобождённым сразу по завершении).
→ Всегда прикидывайте, какой объём данных будет защищён одним мьютексом. - Проводите захват только для тех операций, которым это необходимо.
См. предыдущий пункт. - Не вызывайте lock(), если мьютекс у вас уже есть.
Мьютекс уже заблокирован, и попытки повторного захвата приведут вас к состоянию бесконечного ожидания. Если он вам так нужен, можно воспользоваться классом std::recursive_mutex. Рекурсивный мьютекс можно получить одним и тем же потоком много раз, но столько же раз он должен быть и освобождён. - Используйте try_lock() или std::timed_mutex, если не хотите блокироваться и ожидать неопределённое время.
→ try_lock() — это неблокирующий метод в std::mutex. Он возвращает немедленно, даже если владение не получено, причём со значением true, если мьютекс захвачен, и false — если нет.
→ std::timed_mutex предлагает два неблокирующих метода для захвата мьютекса: try_lock_for() и try_lock_until(), причём оба возвращаются по истечении времени со значением true или false в зависимости от успешности захвата.
- Не забывайте вызывать unlock() или используйте std::lock_guard (или другие шаблонные классы), когда есть возможность.
См. ниже.
Lock guard и парадигма RAII
У нас две большие проблемы с этим простым мьютексом:
- Что произойдёт, если мы забудем вызвать unlock()? Ресурс будет недоступен в течение всего времени существования мьютекса, и уничтожение последнего в неразблокированном состоянии приведёт к неопределённому поведению.
- Что произойдёт, если до вызова unlock() будет выброшено исключение? unlock() так и не будет исполнен, а у нас будут все перечисленные выше проблемы.
К счастью, проблемы можно решить с помощью класса std::lock_guard. Он всегда гарантированно освобождает мьютекс, используя парадигму RAII (Resource Acquisition Is Initialization, что означает «получение ресурса есть инициализация»). Вот как это происходит: мьютекс инкапсулируется внутри lock_guard, который вызывает lock() в его конструкторе и unlock() в деструкторе при выходе из области видимости. Это безопасно даже при исключениях: раскрутка стека уничтожит lock_guard, вызывая деструктор и таким образом освобождая мьютекс.
Посмотрим теперь, как можно изменить нашу потокобезопасную очередь threadSafe_queue (на этот раз обращаем внимание на то, где освобождается мьютекс).
Unique lock, дающий свободу
Как только владение мьютексом получено (благодаря std::lock_guard), он может быть освобождён. std::unique_lock действует в схожей манере плюс делает возможным многократный захват и освобождение (всегда в таком порядке) мьютекса, используя те же преимущества безопасности парадигмы RAII.
Когда использовать?
- Когда вам не всегда нужен захват ресурса.
- Вместе с std::condition_variable (в следующей статье).
- При захвате std::shared_mutex в эксклюзивном режиме (см. далее).
Общий мьютекс + общий захват дают больше читателей
std::mutex — это мьютекс, которым одномоментно может владеть только один поток. Однако это ограничение не всегда обязательно. Например, потоки могут одновременно и безопасно читать одни и те же общие данные. Просто читать, не производя с ними никаких изменений. Но в случае с доступом к записи только записывающий поток может иметь доступ к данным.
Начиная с C++17, std::shared_mutex формирует доступ двух типов:
- Общий доступ: потоки вместе могут владеть мьютексом и иметь доступ к одному и тому же ресурсу. Доступ такого типа можно запросить с помощью std::shared_lock (lock guard для общего мьютекса). При таком доступе любой эксклюзивный доступ блокируется.
- Эксклюзивный доступ: доступ к ресурсу есть только у одного потока. Запрос этого типа осуществляется с помощью класса unique lock.
Синтаксис
- Заголовочный файл | #include <shared_mutex>;
- Объявление | std::shared_mutex raw_sharedMutex;
- Для захвата в общем режиме |
std::shared_lock<std::shared_mutex> sharedLock_name(raw_sharedMutex); - Для захвата в эксклюзивном режиме |
std::unique_lock<std::shared_mutex> uniqueLock_name(raw_sharedMutex);
Scoped lock, дающий больше мьютексов (и без клинча)
Впервые появившийся в C++17 и действующий в схожей манере, что и std::lock_guard, он даёт возможность получения нескольких мьютексов. Без std::scoped_lock такая операция очень опасна, так как может привести к взаимной блокировке.
Краткая история взаимоблокировки:
Поток A хочет увести 200$ с банковского счёта Жеки на счёт Бяки в виде одной атомарной операции. Он начинает с того, что захватывает мьютекс, защищающий счёт Жеки, чтобы изъять деньги, а затем пытается захватить счёт Бяки.
В то же время поток B хочет увести 100$ со счёта Бяки на счёт Жеки. Он получает захват счёта Бяки, чтобы изъять деньги и попытаться захватить счёт Жеки. Оба потока блокируются, уснув в ожидании друг друга.
std::scoped_lock одновременно захватывают (а затем освобождают) все мьютексы, передаваемые в качестве аргумента, по принципу «всё или ничего»: если хотя бы один захват выбрасывает исключение, освобождаются все уже захваченные мьютексы.
- std::scoped_lock<std::mutex> scoped_lock_name(raw_mutex1, raw_mutex2, ..);
Заключение
Если вы вдруг запутались в этом ворохе новой информации:
- воспользуйтесь картой в начале статьи (или составьте свою);
- применяйте на практике новые знания и пробуйте писать простенький код.
До встречи в следующей статье, в которой речь пойдёт о condition_variableи вы узнаете, как синхронизировать потоки!
Читайте также:
Читайте нас в телеграмме и vk
Перевод статьи Valentina: [C++] MUTEX: Write Your First Concurrent Code
Источник
Фото: James P. Blair/National Geographic Creative
Вы когда-нибудь сталкивались со следующей проблемой в rust, когда использовали std::sync::Mutex в асинхронном коде?
7 | tokio::spawn(/* some future here */);
| ^^^^^^^^^^^^ future returned by `fut` is not `Send`
|
127 | T: Future + Send + ‘static,
| —- required by this bound in `tokio::task::spawn::spawn`
|
Как указывает ошибка, это происходит из-за нереализованного типажа Send для переданной футуры. В мире async-await это происходит из-за использования !Send объекта между вызовами await (далее await-точки). В частности, один из подобных интересных случаев — это блокировка мьютекса из стандартной библиотеки.
Как только Mutex заблокирован, можно работать с вложенным объектом с помощью обертки MutexGuard. Это тот самый тип, который реализует !Send и является причиной указанной ошибки. Мотивация в реализации этого типажа прямо следует из спецификаций соответствующих вызовов операционных систем (к примеру pthread_mutex_unlock и ReleaseMutex).
Давайте попробуем решить эту ошибку со следующим (неработающим) примером:
use std::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Mutex::new(0);
tokio::spawn(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});
}
Этот достаточно упрощенный пример (мьютекс без Arc в большинстве случаев бесполезен) и еще вдобавок переусложнен (к примеру вместо числового типа за мьютексом может быть использован атомарный тип), но для иллюстрации сгодится. Далее примеры будут использовать tokio 2.0, по большей части из-за того что я просто привык к нему. Тем не менее, очень похожий код можно получить для futures 3.0, async-std, smol или возможно другого асинхронного рантайма, если не указано иначе.
Закрепление задачи за потоком
Поскольку Send — требование для многопоточности, один из способов убрать ошибку компиляции — это закрепить задачу (прим.пер: task) за одним потоком, либо же воспользоваться специализированным однопоточным асинхронным рантаймом, к примеру futures::executor::LocalPool. В таком случае экзекьютор будет выполнять задачу только на том же самом потоке, где она была порождена. Тогда код будет выглядеть следующим образом:
// …
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});
// Need to be explicitly awaited unlike `tokio::spawn`
local.await;
Этот код, впрочем, имеет семантические отличия от от исходного. Если текущий поток перегружен, он не может выгрузить задачи на другие потоки, в крайних случаях это может привести к голоданию ядер процессора. Более того, это скорее обходной путь, нежели полноценное решение, которое вдобавок может привести к куда более серьезным проблемам. Как вы, возможно, знаете, rust не защищает от взаимных блокировок, и это один и из тех способов, где их очень легко получить.
Допустим другая задача также работает с тем же мьютексом и await-точка является ожиданием этой задачи. После переключение контекста, вторая задача будет заблокирована, что и приведет к состоянию взаимной блокировки. Так что в общем случае использование общего состояния вместе с !Send-порождением задач — сомнительный подход в большинстве случаев. В однопоточном случае, как правило, достаточно RefCell, в ином случае же придется воспользоваться другим решением.
Ограничение времени жизни
Другая уловка, нежели общее решение, является подход с пересмотром времени жизни MutexGuard. В примере, вызов drop происходил в конце области видимости, т.е. после await. Поскольку переключение между задачами может происходить только в await-точках, все еще возможно использовать !Send объекты в этих рамках. Конкретно для этого примера, будет работать ограничение жизни для MutexGuard прямо перед вызовом асинхронной функции:
// …
tokio::spawn(async move {
// For one-liner it looks better with omitted variable declaration.
let guard = counter.lock().unwrap();
// ..
drop(guard);
async {}.await;
});
На практике, указанный подход работает для многих случаев, тем не менее не для всех. К примеру, критическая секция может быть использована для нескольких операций для обеспечения транзакционности. В таком случае операционная система не может управлять мьютексом, а значит быть использован из стандартной библиотеке, вместо этого он должен быть реализован на основе среды выполнения асинхронного экзекьютора.
Асинхронный мьютекс
Последнее, и наконец общее, решение — это асинхронный мьютекс. Хочу отметить, что smol не представляет данного примитива, но он может быть реализован самостоятельно либо использован через сторонний крейт (например async_mutex). Вот полный пример с асинхронным мьютексом из токио:
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let counter = Mutex::new(0);
tokio::spawn(async move {
let _guard = counter.lock().await;
async {}.await;
});
}
Как можно заметить, помимо выделения await части из метода lock(), убран unwrap. Для стандартного мьютекса блокировка возвращает Result, который несет информацию о возникшей панике на предыдущих блокировках мьютекса в других потоках. В асинхронном мьютексе блокировка — это футура, разрешающаяся в тип аналогичный MutexGuard. А значит в асинхронном случае информация о панике теряется, просто закрывая критическую секцию при раскрутке стэка. С одной стороны это достаточно удобно, поскольку не возникает взаимных блокировок, с другой достаточно просто получить неконсистентное состояние синхронизируемого объекта.
Другая проблема с асинхронным мьютексом — это поведение при взаимных блокировках. Как я уже упоминал, rust не защищает от них и само собой асинхронный подход не исключение. Однако достаточно важный момент, что отладка в последнем случае существенно сложнее. Взаимная блокировка асинхронного мьютекса блокирует задачу, а не поток. Так что при адекватной нагрузке на процессор можно не заметить заблокированные задачи. При взаимной блокировке в случае с потоками можно заметить, что процесс простаивает, и при подключении отладчиком можно увидеть в трассировке стека блокирующий системный вызов. В случае с асинхронным мьютексом, состояние блокировки хранится во внутренней структуре экзекьютора (или другого крейта) и отследить это состояние уже существенно сложнее.
Выводы
При выборе мьютекса в асинхронном коде необходимо определиться с компромиссами того или иного подхода. В первую очередь, стоит убедиться в необходимости мьютекса и многопоточного экзекьютора. После этого решение достаточно простое: либо использовать мьютекс из стандартной библиотеки, если он не используется между await-точками, иначе в лучше использовать асинхронную версию примитива. Последнее утверждение вероятно не всегда истинно, для места критичного к производительности имеет смысл написать релевантный бенчмарк.
Источник