⋁-параллелизм в стандартной библиотеке C++

26.02.2016 13:45:57

Попался занятный случай необходимости реализации ⋁-параллелизма на C++. Так получилось, что до этого я вообще не работал с std::thread и сопутствующим, решил поглядеть, что к чему.

Сам ⋁-параллелизм подразумевает, что запускаются несколько процессов, обрабатывающих свои куски данных и получение результата любым из процессов означает успешное выполнение всей задачи. На практике это задачи типа поиска или подбора паролей.

Постановка конкретной задачи подразумевала, что входной поток должен нарезаться на небольшие куски, то есть, нельзя было сразу поделить всю перебираемую область на некоторое количество кусков и отдать по куску на поток, необходимо было динамически получать куски работы. Но надо отметить, что даже при наличии такой возможности, более тонкая нарезка всё равно имеет смысл, поскольку убивать потоки в стандартной библиотеке C++ даже и не предполагается, а если бы использовалась посиксовая библиотека, то были бы вопросы по ресурсам. То есть, в идеале, поток должен сам понимать, когда ему самоубиться.

То есть, в самом общем случаем нам необходимы:

  • возможность запускать потоки, обрабатывающие данные
  • возможность из потока вернуть результат
  • возможность в потоке проверить наличие/отсутствие результата
  • возможность обработки случая, когда во всём входном потоке ничего не найдено

Понятно, что если брать классический C и базовые примитивы, то можно было бы передавать указатели на общую память (как для входного потока, так и для выходного результата), иметь некий (опять же, в общей памяти) флаг завершённости процесса поиска и из основного потока запускать набор вторичных, ожидая окончания их исполнения. Либо, для пущей реактивности, использовать условные переменные (тогда основному потоку можно не ждать завершения всех рабочих) и дополнительный поток, ожидающий завершения рабочих (иначе не обнаружить состояния «ничего не найдено»).

Но у нас современный C++, в котором помимо std::thread, std::mutex и std::condition_variable навертели ещё и std::future, в котором помимо собственно фьючерсов есть также и обещания. И хотелось поиспользовать именно их, естественно, без всяких указателей. В идеале, вообще выйти на простую функциональную семантику — дёрнул нечто, получил результат.

Сам по себе, механизм фьючерсов именно на такую семантику и заточен, предполагается использование std::async() в виде:

ret_type some_function(...)
{...}
 
std::future<ret_type> ft = std::async(some_function, ...);
...
ret_type r = ft.get();
...

В общем-то, всё просто, но именно в таком раскладе, когда асинхронно в один поток (потенциально, понятно, что есть ещё политика исполнения std::launch) запускается нужная функция и, когда дело доходит до необходимости получения результата, мы его забираем из фьючерса. Что приятно, потоки в явном виде даже не фигурируют.

Но, увы, в прямом виде для необходимого случая такой подход не работает, нам нужно запустить максимально разумное количество потоков, каждый из которых будет обрабатывать куски общего набора входных данных. Вот с этими данными имеет смысл разобраться сразу.

В общем виде, их можно представить как некий генерирующий объект, вызов метода которого порождает некий кусочный объект, с которым уже можно работать в отдельном потоке, то есть:

class Chunk
{
/*
 * Что он собой представляет, нам совершенно всё равно, может быть пару итераторов,
 * может быть массив значений, может что-то ещё. От задачи зависит и необходимый
 * набор методов.
 */
}
 
class Generator
{
    Chunk get_chunk();
...
}

Теперь к самому вкусному. Есть желание получать в вызывающем потоке результат через фьючерс, но простой std::async() здесь нам не поможет, поскольку внутри вызываемой таким образом функции значение фьючерсу можно выставить только возвратом из функции, а нам надо как-то это значение выставить из рабочего потока, то есть, состояние фьючерса должно быть разделяемо между потоками. Для этого в пару к фьючерсам имеются обещания (std::promise) и упакованные задачи (std::packaged_task). Упакованные задачи нам здесь также не очень помогают, поскольку они имеют чёткую функциональную привязку, остаются лишь обещания.

Механизм же обещаний позволяет разрулить ситуацию, можно передавать обещания в рабочие потоки и выполнение обещания любым из потоков будет означать выполнение всей задачи (напоминаю, изначально стоит задача ⋁-параллелизма). Естественный инвариант цикла рабочего потока выглядел бы так:

void worker_function(Generator &gen, std::promise<ret_type> &prms, ...)
{
    while (!prms.satisfied()) {
        ...
    }
}

Но тут выясняется неприятный нюанс — у обещаний нет никакой возможности проверки их исполнения. То есть, естественным образом такой инвариант не реализуется. Надо заметить, что у фьючерса такая возможность есть, хотя и тоже несколько непрямо (через метод wait_for() с нулевым временем ожидания). Здесь может возникнуть мысль получить из обещания фьючерс и проверить у него, типа:

void worker_function(Generator &gen, std::promise<ret_type> &prms, ...)
{
    while ((prms.get_future()).wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) {
        ...
    }
}

Но и это не работает, в силу того, что фьючерс из обещания можно извлечь только один раз. В результате получается печальная ситуация, когда инвариант цикла приходится реализовывать каким-то суррогатом. Так как разводить глобальный флаг никак не хочется, видится правильным расширить определение генератора методом empty() (нечто подобное необходимо в любом случае, просматриваемая область не бесконечна) и методом finish(), который принудительно остановит генератор кусков работы.

В результате получается нечто подобное:

void worker_function(Generator &gen, std::promise<ret_type> &prms, ...)
{
    while (!gen.empty()) {
        Chunk chin = gen.get_chunk();
        // do work
        if (found) {
            gen.finish();
            prms.set_value(ret_value);
            break;
        }
    }
}

Вместо empty() можно сделать выброс исключения на get_chunk(), тут уже по желанию и ситуации. Дальше всё является делом техники:

void threaded_search(Generator &gen, std::promise<ret_type> &prms, ...)
{
    unsigned int threadc;
    std::vector<std::thread> threadvec; 
 
    threadc = std::thread::hardware_concurrency();
    if (threadc == 0)
        threadc = 1;
 
    for (auto i = 0; i < threadc; i++) {
        // в принципе, можно и emplace_back()
        std::thread th(worker_function, std::ref(gen), std::ref(prms), ...);
        threadvec.push_back(std::move(th));
    }
    for (auto &amp;t : threadvec)
        t.join();
    try {
        /*                                                                                                                                                                                                                              
         * that's just a guard for 'not found' case, it can easily fail                                                                                                                                                                 
         * with promise_already_satisfied                                                                                                                                                                                               
         */
        ret_type v;
        prms.set_value(v);
    } catch (std::future_error &amp;e) {
        if (e.code() !=
            std::make_error_condition(std::future_errc::promise_already_satisfied))
            throw;
    }
}

Здесь снова всплывает неприятный нюанс того, что из обещания невозможно извлечь его состояние в плане исполненности/неисполненности. threaded_search() вынужден ожидать завершения всех потоков, поскольку он должен обработать случай нулевого результата. Но после их завершения он не может прямо узнать, было ли исполнено обещание. В приведённой реализации используется тот же самый подход к минимизации призраков там, где без них можно обойтись, выносить новые флаги или навешивать что-то ещё на генератор выглядит некорректно. В качестве варианта, если пустой (нулевой) результат может быть правильным с точки зрения логики программы, можно вместо set_value() использовать set_exception().

В итоге, в вызывающем потоке это выглядит так:

int main()
{
    std::promise&lt;ret_type&gt; prms;
    Generator gen;
    ...
    std::thread srch_main(threaded_search, std::ref(gen), std::ref(prms), ...);
    srch_main.detach();
    ...
    auto res = prms.get_future().get();
    ...
}

Из неприятного — торчит в явном виде обещание, но это можно дополнительно обернуть, и в явном же виде создаётся дополнительный «надзирающий» (он же запускающий) поток, но это тоже можно слегка переделать под std::async().

Из приятного — вызывающий поток получает результат сразу, как только он выставляется рабочим потоком (остальные потоки могут продолжать дорабатывать свои куски параллельно вызывающему), минимум лишних сущностей и низкоуровневых примитивов синхронизации.

Один комментарий к заметке “⋁-параллелизм в стандартной библиотеке C++”

  1. Роман Химов » ⋁-параллелизм в Go:

    […] Посмотрев на стандартную библиотеку C++, у меня появилась мысль попробовать решить ту же задачу средствами языка Го. У меня было время немного посмотреть на него, что-то понравилось, что-то так себе (это будет отдельной темой), было время прочитать ключевую работу тов. Хоара. Ну а чтобы лучше понять, как оно работает, неплохо бы попробовать что-нибудь сделать. Итак, сетап тот же самый, необходимо максимально разумно распараллелить задачу поиска или подбора пароля. В Го для организации взаимодействия различных активных сущностей естественным образом предназначены каналы, именно их и будем использовать. […]

Закомментировать

Вам бы, по-хорошему, зарегистрироваться сначала надобно, прежде чем комментарии оставлять. Но, в порядке исключения, можете попробовать с OpenID проскочить, вдруг.