task_pool.h 6.15 KB
Newer Older
1
2
3
4
//
// Created by daleroux on 20/04/17.
//

5
6
7
#ifdef SPELL_QTL_TASK_POOL_H
#error "SPELL_QTL_TASK_POOL_H already defined!"
#else
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#define SPELL_QTL_TASK_POOL_H


#include <atomic>
#include <functional>
#include <deque>
#include <future>


/* TODO remove Task class and handle it through a simple std::thread in TaskPool. And implement Task::wait() inside TaskPool. */

struct Task {
    virtual std::thread::id run() = 0;
    virtual bool has_run() const = 0;
};

struct TaskPool {
    static void take_slot() { instance()._take_slot(); }
    static void release_slot() { instance()._release_slot(); }

    static void init(unsigned int slots) { instance().m_free_slots.store(slots); }
    static TaskPool& instance() { static TaskPool _; return _; }
    static void join();

    static bool is_task(std::thread::id id)
    {
//        auto T = lock_instance();
//        return T->m_running_tasks.find(id) != T->m_running_tasks.end();
        auto& T = instance();
        return id != T.m_main_thread_id && id != T.m_task_runner.get_id();
    }

    static bool is_task()
    {
        return is_task(std::this_thread::get_id());
    }

    struct lock_instance_type {
        TaskPool* that;
        lock_instance_type() : that(&instance()) { that->lock(); }
        ~lock_instance_type() { that->unlock(); }
        TaskPool* operator -> () { return that; }
    };

    static lock_instance_type lock_instance() { return {}; }

    void lock() { m_slots_mutex.lock(); }
    void unlock() { m_slots_mutex.unlock(); }

    // add new work item to the pool
    static
    void enqueue(Task* task)
    {
        auto& T = instance();
        T.lock();
        T.m_pending_tasks.push_back(task);
        T.m_slots_cv.notify_all();
65
        T.unlock();
66
67
68
69
70
71
72
73
74
75
76
77
    }


    static void task_runner()
    {
        auto& T = instance();
        while (!T.m_quit) {
//            MSG_DEBUG("*** WAITING FOR A SLOT TO BE AVAILABLE");
//            std::function<void()> task;
//            std::thread* thread;
            Task* task;
            {
78
79
                std::unique_lock<std::recursive_mutex> lock(T.m_slots_mutex);
                T.m_slots_cv.wait(lock, [&]() { return (T.m_free_slots > 0 && T.m_pending_tasks.size() > 0) || T.m_quit; });
80
81
82
83
                if (T.m_quit) {
                    return;
                }
//                if (T.m_pending_tasks.size()) {
84
85
86
87
//                 T.m_slots_cv.wait(lock, [&]() { return T.m_free_slots > 0 || T.m_quit; });
//                 if (T.m_quit) {
//                     return;
//                 }
88
89
90
91
92
93
94
//                MSG_DEBUG(" ** HAVE A SLOT AND A TASK");
//                std::tie(task, thread) = T.m_pending_tasks.front();
                task = T.m_pending_tasks.front();
                T.m_pending_tasks.pop_front();
//                    MSG_DEBUG("thread " << thread << " should not be joinable: " << std::boolalpha << thread->joinable());
//                    MSG_QUEUE_FLUSH();
//                MSG_DEBUG(" ** There are still " << T.m_pending_tasks.size() << " tasks pending.");
95
                T.m_free_slots--;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
            }

//            std::thread::id id = task->run();
            task->run();

//            T.lock();
//                    task.reset();
//            thread->get_id();
//            T.m_running_tasks.emplace(id);
//            MSG_DEBUG(" ** HAVE " << T.m_running_tasks.size() << " RUNNING TASKS AND " << T.m_free_slots.load() << " SLOTS.");
//            T.unlock();
//                }
        }
    }

    static void remove_task(std::thread::id id)
    {
        lock_instance()->m_running_tasks.erase(id);
    }

    void quit() { m_quit = true; m_slots_cv.notify_all(); m_task_runner.join(); }

    struct wait_guard {
        wait_guard()
        {
//            MSG_DEBUG("Task waiting on sub-task. Releasing slot.");
            if (TaskPool::is_task()) {
                TaskPool::release_slot();
            }
        }
        ~wait_guard()
        {
//            MSG_DEBUG("Task done waiting on sub-task. Acquiring slot.");
            if (TaskPool::is_task()) {
                TaskPool::take_slot();
            }
//            MSG_DEBUG("Task re-acquired slot.");
        }
    };

    static
    void
    wait(std::function<void()> f)
    {
//        if (is_task()) {
            wait_guard guard;
//            MSG_DEBUG("*** TASK IS WAITING ON SUB-TASK.")
            return f();
//        }
//        return f();
    }


    void
    _take_slot()
    {
        if (!is_task()) {
            return;
        }
155
        std::unique_lock<std::recursive_mutex> lock(m_slots_mutex);
156
157
158
159
160
//        MSG_DEBUG("… Waiting for a slot …");
        if (m_free_slots == 0) {
            m_slots_cv.wait(lock, [this]() { return m_free_slots > 0 || m_quit; });
        }
        m_free_slots--;
161
//        MSG_DEBUG("*** TOOK ONE SLOT. " << m_free_slots.load() << " LEFT.")
162
163
164
165
166
167
168
169
    }

    void
    _release_slot()
    {
        if (!is_task()) {
            return;
        }
170
171
//         {
            std::unique_lock<std::recursive_mutex> lock(m_slots_mutex);
172
            m_free_slots++;
173
//            MSG_DEBUG("*** FREED ONE SLOT. " << m_free_slots.load() << " LEFT.")
174
//         }
175
176
177
178
179
180
181
182
183
184
185
186
        m_slots_cv.notify_all();
    }

    void
    _set_slot_count(unsigned int n)
    {
        lock();
//        MSG_DEBUG("SET SLOT COUNT " << n);
        m_free_slots.store(m_free_slots.load() + n - m_total_slots.load());
        m_total_slots.store(n);
//        MSG_DEBUG("  Now slot count = " << m_free_slots.load());
        m_slots_cv.notify_all();
187
        unlock();
188
189
190
191
192
193
194
195
196
197
198
199
200
    }

    static void set_slot_count(unsigned int n) { instance()._set_slot_count(n); }

private:
    TaskPool(unsigned int slots=1)
            : m_free_slots(slots), m_total_slots(slots), m_pending_tasks(), m_slots_cv(), m_slots_mutex(), m_quit(false), m_running_tasks(), m_task_runner(task_runner), m_main_thread_id(std::this_thread::get_id())
    {}

    ~TaskPool() { quit(); }

    std::atomic_int m_free_slots, m_total_slots;
    std::deque<Task*> m_pending_tasks;
201
202
    std::condition_variable_any m_slots_cv;
    std::recursive_mutex m_slots_mutex;
203
204
205
206
207
208
209
210
211
    std::atomic_bool m_quit;
    std::unordered_set<std::thread::id> m_running_tasks;
    std::thread m_task_runner;
    std::thread::id m_main_thread_id;
};



#endif //SPELL_QTL_TASK_POOL_H