Commit 040169a3 authored by Damien Leroux's avatar Damien Leroux
Browse files

Implemented fully asynchronous output.

parent 6d4e2399
......@@ -62,24 +62,25 @@ inline void ThreadPool::display_progress()
static constexpr const char* const GOTO_0_0 = "\033[0;0H";
std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);
std::cout << SAVE_CURSOR;
std::cout << GOTO_0_0;
std::cout << msg_handler_t::i();
std::cout << "[SPELL-QTL] Current: " << msg_handler_t::w() << title << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
std::cout << "[SPELL-QTL] " << msg_handler_t::n() << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
std::stringstream msg;
msg << SAVE_CURSOR;
msg << GOTO_0_0;
msg << msg_handler_t::i();
msg << "[SPELL-QTL] Current: " << msg_handler_t::w() << title << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
msg << "[SPELL-QTL] " << msg_handler_t::n() << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
if (queued) {
std::cout << "[SPELL-QTL] Task info: ";
std::cout << msg_handler_t::n();
std::cout << queued << " queued, " << done << " done, ";
std::cout << msg_handler_t::w();
std::cout << (100 * done / queued) << "% progress";
msg << "[SPELL-QTL] Task info: ";
msg << msg_handler_t::n();
msg << queued << " queued, " << done << " done, ";
msg << msg_handler_t::w();
msg << (100 * done / queued) << "% progress";
}
std::cout << ERASE_TO_EOL;
std::cout << msg_handler_t::n();
std::cout << std::endl << "----------------------------------------------------------";
std::cout << ERASE_TO_EOL << std::endl << ERASE_TO_EOL;
std::cout << RESTORE_CURSOR;
std::cout << std::flush;
msg << ERASE_TO_EOL;
msg << msg_handler_t::n();
msg << std::endl << "----------------------------------------------------------";
msg << ERASE_TO_EOL << std::endl << ERASE_TO_EOL;
msg << RESTORE_CURSOR;
CREATE_MESSAGE(msg_channel::Out, msg.str());
if (done == queued) {
total_queued += queued;
queued = 0;
......
......@@ -3,14 +3,15 @@
#include <map>
#include <sys/time.h>
#include "error.h"
struct chrono {
struct chrono_map : public std::map<std::string, chrono> {
~chrono_map()
{
std::cerr << "Timers:" << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::n() << "Timers:" << std::endl));
for (auto& kv: *this) {
std::cerr << kv.first << ": " << kv.second.accum << " seconds." << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second.accum << " seconds." << std::endl));
}
}
};
......@@ -18,9 +19,9 @@ struct chrono {
struct counter_map : public std::map<std::string, int64_t> {
~counter_map()
{
std::cerr << "Counters:" << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::n() << "Counters:" << std::endl));
for (auto& kv: *this) {
std::cerr << kv.first << ": " << kv.second << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second << std::endl));
}
}
};
......
......@@ -3,12 +3,15 @@
#include <string>
#include <iostream>
#include <exception>
#include <set>
#include <utility>
#include <sstream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <deque>
extern "C" {
#include <unistd.h>
}
......@@ -21,6 +24,142 @@ extern "C" {
#define MSG_HANDLER_IS_SYNCED
enum msg_channel { Out, Err, Log };
struct message_struc {
msg_channel channel;
std::string message;
};
typedef std::shared_ptr<message_struc> message_handle;
/*#define MAKE_MESSAGE(_dest_, _expr_) do { std::stringstream __s; __s << _expr_; _dest_ = __s.str(); } while (0)*/
#define MESSAGE(_expr_) dynamic_cast<std::stringstream*>(&(std::stringstream() << _expr_))->str()
/*#define CREATE_MESSAGE(_var_, _channel_, _expr_) message_handle _var_(new message_struc {_channel_, MESSAGE(_expr_)});*/
#define CREATE_MESSAGE(_channel_, _what_) msg_handler_t::enqueue(message_handle{new message_struc {_channel_, _what_}});
struct ostream_manager {
/* inspired from http://stackoverflow.com/questions/22042414/c-stream-insert-string-after-newline */
class HeaderInserter : public std::streambuf {
std::streambuf* dest;
bool start_of_line;
protected:
int overflow(int ch) override;
public:
HeaderInserter(std::streambuf* dest)
: dest(dest)
, start_of_line(true)
{}
};
class ForbidOutput : public std::streambuf {
protected:
int overflow(int) override
{
/* Direct output is forbidden */
/*throw std::ios_base::failure("Direct output is forbidden");*/
abort();
}
};
std::streambuf* old_cout_rdbuf;
std::streambuf* old_clog_rdbuf;
std::streambuf* old_cerr_rdbuf;
HeaderInserter hi;
ForbidOutput forbid;
std::ostream cerr, cout, clog;
ostream_manager()
: old_cout_rdbuf(std::cout.rdbuf())
, old_clog_rdbuf(std::clog.rdbuf())
, old_cerr_rdbuf(std::cerr.rdbuf())
, hi(old_cout_rdbuf)
, forbid()
, cerr(old_cerr_rdbuf), cout(old_cout_rdbuf), clog(&hi)
{
std::clog.rdbuf(&forbid);
std::cout.rdbuf(&forbid);
std::cerr.rdbuf(&forbid);
}
~ostream_manager()
{
std::clog.rdbuf(old_clog_rdbuf);
std::cout.rdbuf(old_cout_rdbuf);
std::cerr.rdbuf(old_cerr_rdbuf);
}
};
struct message_queue : public ostream_manager {
typedef std::mutex mutex_type;
typedef std::unique_lock<mutex_type> scoped_lock_type;
std::deque<message_handle> m_queue;
mutex_type m_mutex;
std::condition_variable m_condition;
bool m_stop;
std::thread m_thread;
message_queue()
: ostream_manager()
, m_queue()
, m_mutex()
, m_condition()
, m_stop(false)
, m_thread([this] () { run(); })
{
}
~message_queue()
{
{
scoped_lock_type lock(m_mutex);
m_stop = true;
}
m_condition.notify_one();
m_thread.join();
}
void enqueue(const message_handle& mh)
{
scoped_lock_type slt(m_mutex);
m_queue.push_back(mh);
m_condition.notify_one();
}
void run()
{
while (true)
{
scoped_lock_type lock(m_mutex);
while (!m_stop && m_queue.empty()) {
m_condition.wait(lock);
}
if (m_stop && m_queue.empty()) {
return;
}
message_handle next = m_queue.front();
m_queue.pop_front();
std::ostream& channel = next->channel == msg_channel::Out ? cout
: next->channel == msg_channel::Log ? clog
: cerr;
lock.unlock();
channel << next->message << std::flush;
}
}
};
struct msg_handler_t {
#ifdef MSG_HANDLER_IS_SYNCED
typedef std::recursive_mutex lock_type;
......@@ -35,41 +174,13 @@ struct msg_handler_t {
} scoped_lock_type;
#endif
/* inspired from http://stackoverflow.com/questions/22042414/c-stream-insert-string-after-newline */
class HeaderInserter : public std::streambuf {
std::streambuf* dest;
bool start_of_line;
protected:
int overflow(int ch) override
{
int retval = 0;
if (ch != traits_type::eof()) {
if (start_of_line) {
int idt = msg_handler_t::get_indent();
for (int i = 0; i < idt; ++i) {
dest->sputc(' ');
}
}
retval = dest->sputc( ch );
start_of_line = ch == '\n';
}
return retval;
}
public:
HeaderInserter(std::streambuf* dest)
: dest(dest)
, start_of_line(true)
{}
};
struct state_t {
bool color;
std::set<std::string> workarounds;
int count;
int debug_indent;
std::vector<std::function<void()>> hooks;
std::streambuf* old_rdbuf;
HeaderInserter hi;
message_queue queue;
const char* error() { ++count; return color ? _RED : ""; }
const char* warning() { return color ? _YELLOW : ""; }
......@@ -78,12 +189,12 @@ struct msg_handler_t {
state_t()
: color(!!isatty(fileno(stdout))), workarounds(), count(0), debug_indent(0), hooks()
, old_rdbuf(std::clog.rdbuf())
, hi(old_rdbuf)
, queue()
{
std::clog.rdbuf(&hi);
/*std::cout << "Message handler instance created." << std::endl;*/
}
~state_t() { std::clog.rdbuf(old_rdbuf); }
~state_t()
{}
void check(bool fatal);
void reset();
void run_hooks() { for (auto& f: hooks) { f(); } }
......@@ -109,10 +220,51 @@ struct msg_handler_t {
static void dedent() { instance().debug_indent -= 3; }
static int get_indent() { return instance().debug_indent; }
static void enqueue(const message_handle& mh) { instance().queue.enqueue(mh); }
static lock_type mutex;
};
inline
int ostream_manager::HeaderInserter::overflow(int ch)
{
int retval = 0;
if (ch != traits_type::eof()) {
if (start_of_line) {
int idt = msg_handler_t::get_indent();
for (int i = 0; i < idt; ++i) {
dest->sputc(' ');
}
}
retval = dest->sputc( ch );
start_of_line = ch == '\n';
}
return retval;
}
#define MSG_ERROR(_msg_expr_, _workaround_expr_) \
do {\
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::e() << "[ERR] " << _msg_expr_ << msg_handler_t::n() << std::endl));\
std::stringstream s; s << _workaround_expr_;\
if (s.str().size()) { msg_handler_t::instance().workarounds.insert(s.str()); }\
} while (0)
#define MSG_WARNING(_msg_expr_) \
do {\
CREATE_MESSAGE(msg_channel::Out, MESSAGE(msg_handler_t::w() << "[WRN] " << _msg_expr_ << msg_handler_t::n() << std::endl));\
} while (0)
#define MSG_INFO(_msg_expr_) \
do {\
CREATE_MESSAGE(msg_channel::Out, MESSAGE(msg_handler_t::i() << "[MSG] " << _msg_expr_ << msg_handler_t::n() << std::endl));\
} while (0)
#define MSG_DEBUG(_msg_expr_) \
do {\
CREATE_MESSAGE(msg_channel::Log, MESSAGE(_msg_expr_ << std::endl));\
} while (0)
#if 0
#define MSG_ERROR(_msg_expr_, _workaround_expr_) \
do {\
{msg_handler_t::scoped_lock_type _(msg_handler_t::mutex);\
......@@ -142,6 +294,7 @@ struct msg_handler_t {
std::clog << _msg_expr_ << std::endl;\
msg_handler_t::run_hooks();\
} while(0)
#endif
#define MSG_DEBUG_INDENT msg_handler_t::indent()
#define MSG_DEBUG_DEDENT msg_handler_t::dedent()
......@@ -150,23 +303,19 @@ inline void msg_handler_t::state_t::check(bool fatal)
{
msg_handler_t::scoped_lock_type _(msg_handler_t::mutex);
if (count > 0) {
std::cerr << info() << "[MSG] " << count;
if (count > 1) {
std::cout << " errors were";
} else {
std::cout << " error was";
}
std::cout << " reported. Suggestions to fix this:" << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(
info() << "[MSG] " << count << " error"
<< (count > 1 ? "s were" : " was")
<< " reported. Suggestions to fix this:" << std::endl));
for (auto& w: workarounds) {
std::cout << " - " << w << std::endl;
CREATE_MESSAGE(msg_channel::Err, MESSAGE(" - " << w));
}
if (fatal) {
std::cout << normal() << "At least one fatal error encountered. Aborting process." << std::endl;
CREATE_MESSAGE(msg_channel::Out, MESSAGE(normal() <<"At least one fatal error encountered. Aborting process."));
exit(-count);
} else {
reset();
}
std::cout << normal();
}
}
......
......@@ -31,6 +31,7 @@ extern "C" void delete_settings() { if (active_settings) { delete active_setting
int main(int argc, const char** argv)
{
(void)msg_handler_t::instance();
#if 0
{
MatrixXb test(3, 3);
......@@ -112,15 +113,14 @@ int main(int argc, const char** argv)
msg_handler_t::check(true);
if (0) {
MSG_DEBUG("new chromosome " << active_settings->map[0].name);
MSG_DEBUG(active_settings->map[0].haplo_sizes);
for (auto x: active_settings->map[0]) {
MSG_DEBUG(x);
}
return 0;
#if 0
MSG_DEBUG("new chromosome " << active_settings->map[0].name);
MSG_DEBUG(active_settings->map[0].haplo_sizes);
for (auto x: active_settings->map[0]) {
MSG_DEBUG(x);
}
return 0;
#endif
if (0) {
population& pop = active_settings->populations.begin()->second;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment