ThreadPool.h 5.38 KB
Newer Older
1
2
3
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

4
5
6
7

/* heavily modified */


8
9
10
11
12
13
14
15
16
17
18
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include "error.h"

19

20
21
22
23
24
25
26
class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
27
28
29

    void set_title(const std::string& s)
    {
Damien Leroux's avatar
Damien Leroux committed
30
        std::lock_guard<std::mutex> lock(title_mutex);
31
32
33
        title = s;
    }

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
	std::atomic_bool stop;

	std::atomic_ulong total_queued;
	std::atomic_ulong queued;
	std::atomic_ulong done;

49
    std::string title;
Damien Leroux's avatar
Damien Leroux committed
50
    std::mutex title_mutex;
51

52
53
54
55
56
57
58
	void display_progress();
};

inline void ThreadPool::display_progress()
{
	if (!msg_handler_t::color()) {
		return;
59
60
61
62
63
    } else {
        static constexpr const char* const SAVE_CURSOR = "\033[s";
        static constexpr const char* const RESTORE_CURSOR = "\033[u";
        static constexpr const char* const ERASE_TO_EOL = "\033[K";
        static constexpr const char* const GOTO_0_0 = "\033[0;0H";
64
65
        /*static constexpr const char* const HIDE_CURSOR = "\033?25l";*/
        /*static constexpr const char* const SHOW_CURSOR = "\033?25h";*/
66

67
68
69
70
        /*std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);*/
        if (msg_handler_t::instance().queue.m_stop) {
            return;
        }
71
72
        std::stringstream msg;
        msg << SAVE_CURSOR;
73
        msg /*<< HIDE_CURSOR*/ << GOTO_0_0;
74
        msg << msg_handler_t::i();
Damien Leroux's avatar
Damien Leroux committed
75
76
77
78
79
80
        msg << "[SPELL-QTL] Current: " << msg_handler_t::w();
        {
            std::lock_guard<std::mutex> lock(title_mutex);
            msg <<  title;
        }
        msg << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
81
        msg << "[SPELL-QTL] " << msg_handler_t::n() << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
82
        if (queued) {
83
84
85
86
87
            msg << "[SPELL-QTL] Task info: ";
            msg << msg_handler_t::n();
            msg << queued << " queued, " << done << " done, ";
            msg << msg_handler_t::w();
            msg << (100 * done / queued) << "% progress";
88
        }
89
90
91
        msg << ERASE_TO_EOL;
        msg << msg_handler_t::n();
        msg << std::endl << "----------------------------------------------------------";
92
        msg << ERASE_TO_EOL << std::endl;
93
        msg << RESTORE_CURSOR /*<< SHOW_CURSOR*/;
94
95
96
97
        /*CREATE_MESSAGE(msg_channel::Out, msg.str());*/
        if (!msg_handler_t::instance().queue.m_stop) {
            msg_handler_t::instance().queue.cout << msg.str() << std::flush;
        }
98
99
100
101
102
103
        if (done == queued) {
            total_queued += queued;
            queued = 0;
            done = 0;
        }
    }
104
105
106
107
}
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
108
    :   stop(false), total_queued(0), queued(0), done(0), title()
109
110
111
112
113
114
115
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                while(true)
                {
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        while(!this->stop && this->tasks.empty()) {
                            this->condition.wait(lock);
                        }
                        if(this->stop && this->tasks.empty()) {
                            return;
                        }
                        std::function<void()> task(this->tasks.front());
                        this->tasks.pop();
                        lock.unlock();
                        task();
                        ++done;
                    }
                    if (msg_handler_t::color()) {
131
                        msg_handler_t::instance().queue.lock_stream();
132
    					display_progress();
133
                        msg_handler_t::instance().queue.unlock_stream();
134
                    }
135
136
137
                }
            }
        );
138
    msg_handler_t::hook([this] () { display_progress(); });
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    typedef typename std::result_of<F(Args...)>::type return_type;
    
    // don't allow enqueueing after stopping the pool
    if(stop)
        throw std::runtime_error("enqueue on stopped ThreadPool");

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        tasks.push([task](){ (*task)(); });
    }
	++queued;
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(size_t i = 0;i<workers.size();++i)
        workers[i].join();
}

#endif