Каким свойством в общем случае обладают мьютексы

Введение

При написании многопоточных приложений почти всегда требуется работать с общими данными, одновременное изменение которых может привести к очень неприятным последствиям.

Для блокировки общих данных от одновременного доступа необходимо использовать объекты синхронизации.

В данном топике рассмотренна методика работы с мютексами, существенно уменьшающая количество потенциальных ошибок связанных с созданием/удалением и захватом/освобождением.

Неудаление мютекса приводит к утечке памяти, незахват — к некорректным данным, а неосвобождение — к блокировке всех функций, работающих с общими данными.

Ниже рассматривается работа с мютексами в Windows и Unix, подобная идея может быть использована при работе с другими объектами синхронизации.

Эта идея является частным случаем методики «Выделение ресурса — есть инициализация (RAII)».

Создание, настройка и удаление мютекса

Для начала объявим класс CAutoMutex, который создает мютекс в конструкторе и удаляет в деструторе.
Плюсы:
— не нужно плодить по всему проекту похожие фрагменты коды инициализации, настройки и удаления мютекса
— автоматическое удаление мютекса и освобождение ресурсов, занятых им

// класс-оболочка, создающий и удаляющий мютекс (Windows)
class CAutoMutex
{
  // дескриптор создаваемого мютекса
  HANDLE m_h_mutex;// запрет копирования
  CAutoMutex(const CAutoMutex&);
  CAutoMutex& operator=(const CAutoMutex&);
  
public:
  CAutoMutex()
  {
    m_h_mutex = CreateMutex(NULL, FALSE, NULL);
    assert(m_h_mutex);
  }
  
  ~CAutoMutex() { CloseHandle(m_h_mutex); }
  
  HANDLE get() { return m_h_mutex; }
};

* This source code was highlighted with Source Code Highlighter.

В Windows мютексы по умолчанию рекурсивные, а в Unix — нет. Если мютекс не является рекурсивным, то попытка захватить его два раза в одном потоке приведет к deadlock-у.
Чтобы в Unix создать рекурсивный мютекс, необходимо установить соответствующий флаг при инициализации. Соответствующий класс CAutoMutex выглядел бы так (проверки возвращаемых значений не показаны для компактности):

// класс-оболочка, создающий и удаляющий рекурсивный мютекс (Unix)
class CAutoMutex
{
  pthread_mutex_t m_mutex;

  CAutoMutex(

const CAutoMutex&);
  CAutoMutex& operator=(const CAutoMutex&);public:
  CAutoMutex()
  {
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&m_mutex, &attr);
    pthread_mutexattr_destroy(&attr);
  }
  ~CAutoMutex()
  {
    pthread_mutex_destroy(&m_mutex);
  }
  pthread_mutex_t& get()
  {
    return m_mutex;
  }
};* This source code was highlighted with Source Code Highlighter.

Захват и освобождение мютекса

По аналогии с предыдущим классом объявим класс CMutexLock, который занимает мютекс в конструкторе и освобождает в деструкторе. Созданный объект этого класса автоматически захватит мютекс и освободит его в конце области действия независимо от того, какой именно был выход из этой области: нормальный выход, преждевременный return или выброс исключения. Плюсом также является, что можно не плодить похожие фрагменты кода работы с мютексами.

// класс-оболочка, занимающий и освобождающий мютекс
class CMutexLock
{
  HANDLE m_mutex;// запрещаем копирование
  CMutexLock(const CMutexLock&);
  CMutexLock& operator=(const CMutexLock&);
public:
  // занимаем мютекс при конструировании объекта
  CMutexLock(HANDLE mutex): m_mutex(mutex)
  {
    const DWORD res = WaitForSingleObject(m_mutex, INFINITE);
    assert(res == WAIT_OBJECT_0);
  }
  // освобождаем мютекс при удалении объекта
  ~CMutexLock()
  {
    const BOOL res = ReleaseMutex(m_mutex);
    assert(res);
  }
};

* This source code was highlighted with Source Code Highlighter.

Для еще большего удобства объявим следующий макрос:

// макрос, занимающий мютекс до конца области действия
#define SCOPE_LOCK_MUTEX(hMutex) CMutexLock _tmp_mtx_capt(hMutex);

* This source code was highlighted with Source Code Highlighter.

Макрос позволяет не держать в голове имя класса CMutexLock и его пространство имен, а также не ломать голову каждый раз над названием создаваемого (например _tmp_mtx_capt) объекта.

Примеры использования

Рассмотрим примеры использования.

Для упрощения примера объявим мютекс и общие данные в глобальной области:

// автоматически создаваемый и удаляемый мютекс
static CAutoMutex g_mutex;// общие данные
static DWORD g_common_cnt = 0;
static DWORD g_common_cnt_ex = 0;

* This source code was highlighted with Source Code Highlighter.

Пример простой функции, использующей общие данные и макрос SCOPE_LOCK_MUTEX:

void do_sth_1( ) throw()
{
  // …
  // мютекс не занят
  // …

  {
    

// занимаем мютекс
    SCOPE_LOCK_MUTEX(g_mutex.get());// изменяем общие данные
    g_common_cnt_ex = 0;
    g_common_cnt = 0;
    
    // здесь мютекс освобождается
  }
  
  // …
  // мютекс не занят
  // …
}

* This source code was highlighted with Source Code Highlighter.

Не правда ли, что функция do_sth_1() выглядит элегантнее, чем следующая? do_sth_1_eq:

void do_sth_1_eq( ) throw()
{
  // занимаем мютекс
  if (WaitForSingleObject(g_mutex.get(), INFINITE) == WAIT_OBJECT_0)
  {
    // изменяем общие данные
    g_common_cnt_ex = 0;
    g_common_cnt = 0;// надо не забыть освободить мютекс
    ReleaseMutex(g_mutex.get());
  }
  else
  {
    assert(0);
  }
}* This source code was highlighted with Source Code Highlighter.

В следующем примере точек выхода из функции три, но упоминание о мютексе только одно (объявление области блокировки мютекса):

// какое-то исключение
struct Ex {};// фунцкция, использующая общие данные
int do_sth_2( const int data ) throw (Ex)
{
  // …
  // мютекс не занят
  // …
  
  // занимаем мютекс на критическом участке
  SCOPE_LOCK_MUTEX(g_mutex.get());
  
  int rem = data % 3;
  
  if (rem == 1)
  {
    g_common_cnt_ex++;
    // мютекс автоматически освободится при выбросе исключения
    throw Ex();
  }
  else if (rem == 2)
  {
    // мютекс автоматически освободится при возврате
    g_common_cnt++;
    return 1;
  }
  
  // здесь мютекс автоматически освободится при возврате
  return 0;
}

* This source code was highlighted with Source Code Highlighter.

Примечание: я не сторонник использовать несколько return-ов в одной функции, просто пример от этого
становится чуть показательнее.
А если бы функция была длиннее и точек выброса исключений было бы с десяток? Без макроса нужно было поставить перед каждой из них ReleaseMutex(…), а ошибиться здесь можно очень легко.

Читайте также:  Какие свойства воздуха используют 4 класс

Заключение

Приведенные примеры классов и макросов достаточно просты, они не содержат сложных проверок и ожидают освобождения мютекса в течение бесконечного времени. Но даже это облегчает жизнь во многих случаях. А если облегчает жизнь, то почему бы это не использовать?

UPD: Первый класс CAutoMutex по ошибке не был написан, вместо него было повторное объявление второго класса CMutexLock. Исправлено.

UPD2: Убраны слова inline в объявлении методов внутри классов за ненадобностью.

UPD3: Был добавлен вариант класса CAutoMutex с рекурсивным мютексом для Unix.

UPD4: Перенесено в блог «C++»

Источник

Часть 1, Часть 2

В прошлой статье мы разобрались с тем, что такое конкурентность/параллелизм и зачем нужна синхронизация. Настала пора изучить примитивы синхронизации, которые предлагает нам стандартная библиотека шаблонов C++.

Первым из них будет std::mutex. Но сначала ознакомьтесь с картой статьи (она пригодится, если вы вдруг запутаетесь).

Итак, начнём.

Что такое мьютекс?

Мьютекс (англ. mutex, от mutual exclusion — «взаимное исключение») — это базовый механизм синхронизации. Он предназначен для организации взаимоисключающего доступа к общим данным для нескольких потоков с использованием барьеров памяти (для простоты можно считать мьютекс дверью, ведущей к общим данным).

Синтаксис

  • Заголовочный файл | #include <mutex>
  • Объявление | std::mutex mutex_name;
  • Захват мьютекса | mutex_name.lock();
    Поток запрашивает монопольное использование общих данных, защищаемых мьютексом. Дальше два варианта развития событий: происходит захват мьютекса этим потоком (и в этом случае ни один другой поток не сможет получить доступ к этим данным) или поток блокируется (если мьютекс уже захвачен другим потоком).
  • Освобождение мьютекса | mutex_name.unlock();
    Когда ресурс больше не нужен, текущий владелец должен вызвать функцию разблокирования unlock, чтобы и другие потоки могли получить доступ к этому ресурсу. Когда мьютекс освобождается, доступ предоставляется одному из ожидающих потоков.

Как создать потокобезопасную очередь

Разберёмся, как реализовать простейшие потокобезопасные очереди, то есть очереди с безопасным доступом для потоков.
В библиотеке стандартных шаблонов уже есть готовая очередь (rawQueue). Наша реализация будет предполагать: а) извлечение и удаление целочисленного значения из начала очереди и б) добавление нового в конец очереди. И всё это при обеспечении потокобезопасности.

Сначала выясним, почему и как эти две операции могут создавать проблемы для многопоточности.

  • Извлечение и удаление
    Для извлечения и удаления значения из начала очереди необходимо выполнить три операции:
    1. Проверить, не пуста ли очередь.
    2. Если нет, получается ссылка на начало очереди (rawQueue.front()). 
    3. Удаляется начало очереди (rawQueue.pop()). 
    В промежутках между этими этапами к очереди могут получать доступ и другие потоки с целью внесения изменений или чтения. Попробуйте сами.

Например:

Удалили “1”, хотя до его извлечения даже не дошли, потому что поток B извлекает 0 и удаляет 1.
Дальше — больше: если rawQueue состоит из одного элемента, поток B видит непустую очередь, и тут же поток A удаляет последнее значение. Теперь поток B пытается удалить первое значение из пустой очереди, приводя к неопределённому поведению. Настоящая страшилка!

  • Добавление
    Рассмотрим теперь добавление нового значения с помощью rawQueue.push(): новый элемент добавляется в конец контейнера и становится следующим за последним на данный момент элементом. Дальше на единицу увеличивается размер. Заметили здесь проблему? А что, если два потока одновременно добавят новое значение, увидев этот последний элемент? И что может произойти в интервале между добавлением нового элемента и увеличением размера? Кто-нибудь возьмёт да и прочитает неправильный размер. 

Получается, мы должны быть уверены, что никто не будет трогать очередь, пока мы выполняем наши задачи. Используем мьютекс для защиты этих многоступенчатых операций и сделаем так, чтобы все вместе они смотрелись как одна атомарная операция.

Обратите внимание:

  • Связь между мьютексом и защищаемым ресурсом — только в голове программиста.
    Мы знаем, что мьютекс m защищает rawQueue, но напрямую это не указывается.
  • Захват с необходимой степенью распараллеливания. 
    Использование мьютекса уменьшает параллелизм. Предположим, у нас только один мьютекс для защиты вектора и строки без каких-либо зависимостей (например, значение переменной не зависит от вектора и наоборот). Поток А захватывает мьютекс, читает строку и начинает обработку каких-то данных перед добавлением нового значения в вектор и освобождением мьютекса. И тут появляется поток B, который хочет внести пару изменений в строку, но при попытке захватить мьютекс (какая досада!) оказывается блокированным до того момента, пока не будут завершены все операции с вектором. Проблему решаем дополнительным мьютексом для строки (захваченным перед чтением и освобождённым сразу по завершении).
    → Всегда прикидывайте, какой объём данных будет защищён одним мьютексом.
  • Проводите захват только для тех операций, которым это необходимо. 
    См. предыдущий пункт. 
  • Не вызывайте lock(), если мьютекс у вас уже есть.
    Мьютекс уже заблокирован, и попытки повторного захвата приведут вас к состоянию бесконечного ожидания. Если он вам так нужен, можно воспользоваться классом std::recursive_mutex. Рекурсивный мьютекс можно получить одним и тем же потоком много раз, но столько же раз он должен быть и освобождён.
  • Используйте try_lock() или std::timed_mutex, если не хотите блокироваться и ожидать неопределённое время.
    try_lock() — это неблокирующий метод в std::mutex. Он возвращает немедленно, даже если владение не получено, причём со значением true, если мьютекс захвачен, и false — если нет. 
    std::timed_mutex предлагает два неблокирующих метода для захвата мьютекса: try_lock_for() и try_lock_until(), причём оба возвращаются по истечении времени со значением true или false в зависимости от успешности захвата.
  • Не забывайте вызывать unlock() или используйте std::lock_guard (или другие шаблонные классы), когда есть возможность. 
    См. ниже.
Читайте также:  У какого элемента больше выражены неметаллические свойства у хлора или иода

Lock guard и парадигма RAII

У нас две большие проблемы с этим простым мьютексом:

  • Что произойдёт, если мы забудем вызвать unlock()? Ресурс будет недоступен в течение всего времени существования мьютекса, и уничтожение последнего в неразблокированном состоянии приведёт к неопределённому поведению.
  • Что произойдёт, если до вызова unlock() будет выброшено исключение? unlock() так и не будет исполнен, а у нас будут все перечисленные выше проблемы.

К счастью, проблемы можно решить с помощью класса std::lock_guard. Он всегда гарантированно освобождает мьютекс, используя парадигму RAII (Resource Acquisition Is Initialization, что означает «получение ресурса есть инициализация»). Вот как это происходит: мьютекс инкапсулируется внутри lock_guard, который вызывает lock() в его конструкторе и unlock() в деструкторе при выходе из области видимости. Это безопасно даже при исключениях: раскрутка стека уничтожит lock_guard, вызывая деструктор и таким образом освобождая мьютекс.

Посмотрим теперь, как можно изменить нашу потокобезопасную очередь threadSafe_queue (на этот раз обращаем внимание на то, где освобождается мьютекс).

Unique lock, дающий свободу

Как только владение мьютексом получено (благодаря std::lock_guard), он может быть освобождён. std::unique_lock действует в схожей манере плюс делает возможным многократный захват и освобождение (всегда в таком порядке) мьютекса, используя те же преимущества безопасности парадигмы RAII.

Когда использовать?

  • Когда вам не всегда нужен захват ресурса.
  • Вместе с std::condition_variable (в следующей статье).
  • При захвате std::shared_mutex в эксклюзивном режиме (см. далее). 

Общий мьютекс + общий захват дают больше читателей

std::mutex  — это мьютекс, которым одномоментно может владеть только один поток. Однако это ограничение не всегда обязательно. Например, потоки могут одновременно и безопасно читать одни и те же общие данные. Просто читать, не производя с ними никаких изменений. Но в случае с доступом к записи только записывающий поток может иметь доступ к данным.

Начиная с C++17, std::shared_mutex формирует доступ двух типов:

  • Общий доступ: потоки вместе могут владеть мьютексом и иметь доступ к одному и тому же ресурсу. Доступ такого типа можно запросить с помощью std::shared_lock (lock guard для общего мьютекса). При таком доступе любой эксклюзивный доступ блокируется.
  • Эксклюзивный доступ: доступ к ресурсу есть только у одного потока. Запрос этого типа осуществляется с помощью класса unique lock.

Синтаксис

  • Заголовочный файл | #include <shared_mutex>;
  • Объявление | std::shared_mutex raw_sharedMutex;
  • Для захвата в общем режиме |
    std::shared_lock<std::shared_mutex> sharedLock_name(raw_sharedMutex);
  • Для захвата в эксклюзивном режиме |
    std::unique_lock<std::shared_mutex> uniqueLock_name(raw_sharedMutex);

Scoped lock, дающий больше мьютексов (и без клинча)

Впервые появившийся в C++17 и действующий в схожей манере, что и std::lock_guard, он даёт возможность получения нескольких мьютексов. Без std::scoped_lock такая операция очень опасна, так как может привести к взаимной блокировке.

Краткая история взаимоблокировки:

Поток A хочет увести 200$ с банковского счёта Жеки на счёт Бяки в виде одной атомарной операции. Он начинает с того, что захватывает мьютекс, защищающий счёт Жеки, чтобы изъять деньги, а затем пытается захватить счёт Бяки.
В то же время поток B хочет увести 100$ со счёта Бяки на счёт Жеки. Он получает захват счёта Бяки, чтобы изъять деньги и попытаться захватить счёт Жеки. Оба потока блокируются, уснув в ожидании друг друга.

std::scoped_lock одновременно захватывают (а затем освобождают) все мьютексы, передаваемые в качестве аргумента, по принципу «всё или ничего»: если хотя бы один захват выбрасывает исключение, освобождаются все уже захваченные мьютексы.

  • std::scoped_lock<std::mutex> scoped_lock_name(raw_mutex1, raw_mutex2, ..);

Заключение

Если вы вдруг запутались в этом ворохе новой информации:

  • воспользуйтесь картой в начале статьи (или составьте свою);
  • применяйте на практике новые знания и пробуйте писать простенький код.

До встречи в следующей статье, в которой речь пойдёт о condition_variableи вы узнаете, как синхронизировать потоки!

Читайте также:

Читайте нас в телеграмме и vk

Перевод статьи Valentina: [C++] MUTEX: Write Your First Concurrent Code

Источник

Фото: James P. Blair/National Geographic Creative

Вы когда-нибудь сталкивались со следующей проблемой в rust, когда использовали std::sync::Mutex в асинхронном коде?

7 | tokio::spawn(/* some future here */);
| ^^^^^^^^^^^^ future returned by `fut` is not `Send`
|
127 | T: Future + Send + ‘static,
| —- required by this bound in `tokio::task::spawn::spawn`
|

Как указывает ошибка, это происходит из-за нереализованного типажа Send для переданной футуры. В мире async-await это происходит из-за использования !Send объекта между вызовами await (далее await-точки). В частности, один из подобных интересных случаев — это блокировка мьютекса из стандартной библиотеки.

Как только Mutex заблокирован, можно работать с вложенным объектом с помощью обертки MutexGuard. Это тот самый тип, который реализует !Send и является причиной указанной ошибки. Мотивация в реализации этого типажа прямо следует из спецификаций соответствующих вызовов операционных систем (к примеру pthread_mutex_unlock и ReleaseMutex).

Читайте также:  Какими свойствами должен обладать работник

Давайте попробуем решить эту ошибку со следующим (неработающим) примером:

use std::sync::Mutex;

#[tokio::main]
async fn main() {
let counter = Mutex::new(0);

tokio::spawn(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});
}

Этот достаточно упрощенный пример (мьютекс без Arc в большинстве случаев бесполезен) и еще вдобавок переусложнен (к примеру вместо числового типа за мьютексом может быть использован атомарный тип), но для иллюстрации сгодится. Далее примеры будут использовать tokio 2.0, по большей части из-за того что я просто привык к нему. Тем не менее, очень похожий код можно получить для futures 3.0, async-std, smol или возможно другого асинхронного рантайма, если не указано иначе.

Закрепление задачи за потоком

Поскольку Send — требование для многопоточности, один из способов убрать ошибку компиляции — это закрепить задачу (прим.пер: task) за одним потоком, либо же воспользоваться специализированным однопоточным асинхронным рантаймом, к примеру futures::executor::LocalPool. В таком случае экзекьютор будет выполнять задачу только на том же самом потоке, где она была порождена. Тогда код будет выглядеть следующим образом:

// …

let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
let _guard = counter.lock().unwrap();
async {}.await;
});

// Need to be explicitly awaited unlike `tokio::spawn`
local.await;

Этот код, впрочем, имеет семантические отличия от от исходного. Если текущий поток перегружен, он не может выгрузить задачи на другие потоки, в крайних случаях это может привести к голоданию ядер процессора. Более того, это скорее обходной путь, нежели полноценное решение, которое вдобавок может привести к куда более серьезным проблемам. Как вы, возможно, знаете, rust не защищает от взаимных блокировок, и это один и из тех способов, где их очень легко получить.

Допустим другая задача также работает с тем же мьютексом и await-точка является ожиданием этой задачи. После переключение контекста, вторая задача будет заблокирована, что и приведет к состоянию взаимной блокировки. Так что в общем случае использование общего состояния вместе с !Send-порождением задач — сомнительный подход в большинстве случаев. В однопоточном случае, как правило, достаточно RefCell, в ином случае же придется воспользоваться другим решением.

Ограничение времени жизни

Другая уловка, нежели общее решение, является подход с пересмотром времени жизни MutexGuard. В примере, вызов drop происходил в конце области видимости, т.е. после await. Поскольку переключение между задачами может происходить только в await-точках, все еще возможно использовать !Send объекты в этих рамках. Конкретно для этого примера, будет работать ограничение жизни для MutexGuard прямо перед вызовом асинхронной функции:

// …

tokio::spawn(async move {
// For one-liner it looks better with omitted variable declaration.
let guard = counter.lock().unwrap();
// ..
drop(guard);

async {}.await;
});

На практике, указанный подход работает для многих случаев, тем не менее не для всех. К примеру, критическая секция может быть использована для нескольких операций для обеспечения транзакционности. В таком случае операционная система не может управлять мьютексом, а значит быть использован из стандартной библиотеке, вместо этого он должен быть реализован на основе среды выполнения асинхронного экзекьютора.

Асинхронный мьютекс

Последнее, и наконец общее, решение — это асинхронный мьютекс. Хочу отметить, что smol не представляет данного примитива, но он может быть реализован самостоятельно либо использован через сторонний крейт (например async_mutex). Вот полный пример с асинхронным мьютексом из токио:

use tokio::sync::Mutex;

#[tokio::main]
async fn main() {
let counter = Mutex::new(0);

tokio::spawn(async move {
let _guard = counter.lock().await;
async {}.await;
});
}

Как можно заметить, помимо выделения await части из метода lock(), убран unwrap. Для стандартного мьютекса блокировка возвращает Result, который несет информацию о возникшей панике на предыдущих блокировках мьютекса в других потоках. В асинхронном мьютексе блокировка — это футура, разрешающаяся в тип аналогичный MutexGuard. А значит в асинхронном случае информация о панике теряется, просто закрывая критическую секцию при раскрутке стэка. С одной стороны это достаточно удобно, поскольку не возникает взаимных блокировок, с другой достаточно просто получить неконсистентное состояние синхронизируемого объекта.

Другая проблема с асинхронным мьютексом — это поведение при взаимных блокировках. Как я уже упоминал, rust не защищает от них и само собой асинхронный подход не исключение. Однако достаточно важный момент, что отладка в последнем случае существенно сложнее. Взаимная блокировка асинхронного мьютекса блокирует задачу, а не поток. Так что при адекватной нагрузке на процессор можно не заметить заблокированные задачи. При взаимной блокировке в случае с потоками можно заметить, что процесс простаивает, и при подключении отладчиком можно увидеть в трассировке стека блокирующий системный вызов. В случае с асинхронным мьютексом, состояние блокировки хранится во внутренней структуре экзекьютора (или другого крейта) и отследить это состояние уже существенно сложнее.

Выводы

При выборе мьютекса в асинхронном коде необходимо определиться с компромиссами того или иного подхода. В первую очередь, стоит убедиться в необходимости мьютекса и многопоточного экзекьютора. После этого решение достаточно простое: либо использовать мьютекс из стандартной библиотеки, если он не используется между await-точками, иначе в лучше использовать асинхронную версию примитива. Последнее утверждение вероятно не всегда истинно, для места критичного к производительности имеет смысл написать релевантный бенчмарк.

Источник