How to create an efficient multi-threaded task scheduler in C++? How to create an efficient multi-threaded task scheduler in C++? multithreading multithreading

How to create an efficient multi-threaded task scheduler in C++?


You can avoid both having a separate "manager" thread, and having to wake up a large number of tasks when the next-to-run task changes, by using a design where a single pool thread waits for the "next to run" task (if there is one) on one condition variable, and the remaining pool threads wait indefinitely on a second condition variable.

The pool threads would execute pseudocode along these lines:

pthread_mutex_lock(&queue_lock);while (running){    if (head task is ready to run)    {        dequeue head task;        if (task_thread == 1)            pthread_cond_signal(&task_cv);        else            pthread_cond_signal(&queue_cv);        pthread_mutex_unlock(&queue_lock);        run dequeued task;        pthread_mutex_lock(&queue_lock);    }    else if (!queue_empty && task_thread == 0)    {        task_thread = 1;        pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);        task_thread = 0;    }    else    {        pthread_cond_wait(&queue_cv, &queue_lock);    }}pthread_mutex_unlock(&queue_lock);

If you change the next task to run, then you execute:

if (task_thread == 1)    pthread_cond_signal(&task_cv);else    pthread_cond_signal(&queue_cv);

with the queue_lock held.

Under this scheme, all wakeups are directly at only a single thread, there's only one priority queue of tasks, and there's no manager thread required.


Your specification is a bit too strong:

delayToRun means that the task doesn't get executed immediately, but delayToRun seconds later

You forgot to add "at least" :

  • The task don't get executed now, but at least delayToRun seconds later

The point is that if ten thousand tasks are all scheduled with a 0.1 delayToRun, they surely won't practically be able to run at the same time.

With such correction, you just maintain some queue (or agenda) of (scheduled-start-time, closure to run), you keep that queue sorted, and you start N (some fixed number) of threads which atomically pop the first element of the agenda and run it.

then all the worker threads need to be woken up to update the time which they are waiting for.

No, some worker threads would be woken up.

Read about condition variables and broadcast.

You might also user POSIX timers, see timer_create(2), or Linux specific fd timer, see timerfd_create(2)

You probably would avoid running blocking system calls in your threads, and have some central thread managing them using some event loop (see poll(2)...); otherwise, if you have a hundred tasks running sleep(100) and one task scheduled to run in half a second it won't run before a hundred seconds.

You may want to read about continuation-passing style programming (it -CPS- is highly relevant). Read the paper about Continuation Passing C by Juliusz Chroboczek.

Look also into Qt threads.

You could also consider coding in Go (with its Goroutines).


This is a sample implementation for the interface you provided that comes closest to your 'With manager thread' description.

It uses a single thread (timer_thread) to manage a queue (allTasksQueue) that is sorted based on the actual time when a task must be started (std::chrono::time_point).
The 'queue' is a std::priority_queue (which keeps its time_point key elements sorted).

timer_thread is normally suspended until the next task is started or when a new task is added.
When a task is about to be run, it is placed in tasksReadyToRunQueue, one of the worker threads is signaled, wakes up, removes it from the queue and starts processing the task..

Note that the thread pool has a compile-time upper limit for the number of threads (40). If you are scheduling more tasks than can be dispatched to workers,new task will block until threads are available again.

You said this approach is not efficient, but overall, it seems reasonably efficient to me. It's all event driven and you are not wasting CPU cycles by unnecessary spinning.Of course, it's just an example, optimizations are possible (note: std::multimap has been replaced with std::priority_queue).

The implementation is C++11 compliant

#include <iostream>#include <chrono>#include <queue>#include <unistd.h>#include <vector>#include <thread>#include <condition_variable>#include <mutex>#include <memory>class Task {public:    virtual void run() = 0;    virtual ~Task() { }};class Scheduler {public:    Scheduler();    ~Scheduler();    void add(Task &task, double delayToRun);private:    using timepoint = std::chrono::time_point<std::chrono::steady_clock>;    struct key {        timepoint tp;        Task *taskp;    };    struct TScomp {        bool operator()(const key &a, const key &b) const        {            return a.tp > b.tp;        }    };    const int ThreadPoolSize = 40;    std::vector<std::thread> ThreadPool;    std::vector<Task *> tasksReadyToRunQueue;    std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;    std::thread TimerThr;    std::mutex TimerMtx, WorkerMtx;    std::condition_variable TimerCV, WorkerCV;    bool WorkerIsRunning = true;    bool TimerIsRunning = true;    void worker_thread();    void timer_thread();};Scheduler::Scheduler(){    for (int i = 0; i <ThreadPoolSize; ++i)        ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));    TimerThr = std::thread(&Scheduler::timer_thread, this);}Scheduler::~Scheduler(){    {        std::lock_guard<std::mutex> lck{TimerMtx};        TimerIsRunning = false;        TimerCV.notify_one();    }    TimerThr.join();    {        std::lock_guard<std::mutex> lck{WorkerMtx};        WorkerIsRunning = false;        WorkerCV.notify_all();    }    for (auto &t : ThreadPool)        t.join();}void Scheduler::add(Task &task, double delayToRun){    auto now = std::chrono::steady_clock::now();    long delay_ms = delayToRun * 1000;    std::chrono::milliseconds duration (delay_ms);    timepoint tp = now + duration;    if (now >= tp)    {        /*         * This is a short-cut         * When time is due, the task is directly dispatched to the workers         */        std::lock_guard<std::mutex> lck{WorkerMtx};        tasksReadyToRunQueue.push_back(&task);        WorkerCV.notify_one();    } else    {        std::lock_guard<std::mutex> lck{TimerMtx};        allTasksQueue.push({tp, &task});        TimerCV.notify_one();    }}void Scheduler::worker_thread(){    for (;;)    {        std::unique_lock<std::mutex> lck{WorkerMtx};        WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||                                           !WorkerIsRunning; } );        if (!WorkerIsRunning)            break;        Task *p = tasksReadyToRunQueue.back();        tasksReadyToRunQueue.pop_back();        lck.unlock();        p->run();        delete p; // delete Task    }}void Scheduler::timer_thread(){    for (;;)    {        std::unique_lock<std::mutex> lck{TimerMtx};        if (!TimerIsRunning)            break;        auto duration = std::chrono::nanoseconds(1000000000);        if (allTasksQueue.size() != 0)        {            auto now = std::chrono::steady_clock::now();            auto head = allTasksQueue.top();            Task *p = head.taskp;            duration = head.tp - now;            if (now >= head.tp)            {                /*                 * A Task is due, pass to worker threads                 */                std::unique_lock<std::mutex> ulck{WorkerMtx};                tasksReadyToRunQueue.push_back(p);                WorkerCV.notify_one();                ulck.unlock();                allTasksQueue.pop();            }        }        TimerCV.wait_for(lck, duration);    }}/* * End sample implementation */class DemoTask : public Task {    int n;public:    DemoTask(int n=0) : n{n} { }    void run() override    {        std::cout << "Start task " << n << std::endl;;        std::this_thread::sleep_for(std::chrono::seconds(2));        std::cout << " Stop task " << n << std::endl;;    }};int main(){    Scheduler sched;    Task *t0 = new DemoTask{0};    Task *t1 = new DemoTask{1};    Task *t2 = new DemoTask{2};    Task *t3 = new DemoTask{3};    Task *t4 = new DemoTask{4};    Task *t5 = new DemoTask{5};    sched.add(*t0, 7.313);    sched.add(*t1, 2.213);    sched.add(*t2, 0.713);    sched.add(*t3, 1.243);    sched.add(*t4, 0.913);    sched.add(*t5, 3.313);    std::this_thread::sleep_for(std::chrono::seconds(10));}