Commit 54a7b2a3 authored by Damien Leroux's avatar Damien Leroux
Browse files

Fixed progress display header. Improved stability regarding async messages.

parent 040169a3
......@@ -61,7 +61,10 @@ inline void ThreadPool::display_progress()
static constexpr const char* const ERASE_TO_EOL = "\033[K";
static constexpr const char* const GOTO_0_0 = "\033[0;0H";
std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);
/*std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);*/
if (msg_handler_t::instance().queue.m_stop) {
return;
}
std::stringstream msg;
msg << SAVE_CURSOR;
msg << GOTO_0_0;
......@@ -78,9 +81,12 @@ inline void ThreadPool::display_progress()
msg << ERASE_TO_EOL;
msg << msg_handler_t::n();
msg << std::endl << "----------------------------------------------------------";
msg << ERASE_TO_EOL << std::endl << ERASE_TO_EOL;
msg << ERASE_TO_EOL << std::endl;
msg << RESTORE_CURSOR;
CREATE_MESSAGE(msg_channel::Out, msg.str());
/*CREATE_MESSAGE(msg_channel::Out, msg.str());*/
if (!msg_handler_t::instance().queue.m_stop) {
msg_handler_t::instance().queue.cout << msg.str() << std::flush;
}
if (done == queued) {
total_queued += queued;
queued = 0;
......@@ -114,7 +120,9 @@ inline ThreadPool::ThreadPool(size_t threads)
++done;
}
if (msg_handler_t::color()) {
msg_handler_t::instance().queue.lock_stream();
display_progress();
msg_handler_t::instance().queue.unlock_stream();
}
}
}
......
......@@ -11,7 +11,9 @@ struct chrono {
{
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::n() << "Timers:" << std::endl));
for (auto& kv: *this) {
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second.accum << " seconds." << std::endl));
if (kv.second.accum != 0.) {
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second.accum << " seconds." << std::endl));
}
}
}
};
......
......@@ -41,10 +41,13 @@ typedef std::shared_ptr<message_struc> message_handle;
struct ostream_manager {
/* FIXME indent/dedent must be managed by this very class... Maybe use \x1 for indent and \x2 for dedent... */
/* inspired from http://stackoverflow.com/questions/22042414/c-stream-insert-string-after-newline */
class HeaderInserter : public std::streambuf {
std::streambuf* dest;
bool start_of_line;
int indent;
protected:
int overflow(int ch) override;
......@@ -52,6 +55,7 @@ struct ostream_manager {
HeaderInserter(std::streambuf* dest)
: dest(dest)
, start_of_line(true)
, indent(0)
{}
};
......@@ -99,6 +103,7 @@ struct message_queue : public ostream_manager {
std::deque<message_handle> m_queue;
mutex_type m_mutex;
mutex_type m_stream_mutex;
std::condition_variable m_condition;
bool m_stop;
......@@ -132,31 +137,10 @@ struct message_queue : public ostream_manager {
m_condition.notify_one();
}
void run()
{
while (true)
{
scoped_lock_type lock(m_mutex);
void lock_stream() { m_stream_mutex.lock(); }
void unlock_stream() { m_stream_mutex.unlock(); }
while (!m_stop && m_queue.empty()) {
m_condition.wait(lock);
}
if (m_stop && m_queue.empty()) {
return;
}
message_handle next = m_queue.front();
m_queue.pop_front();
std::ostream& channel = next->channel == msg_channel::Out ? cout
: next->channel == msg_channel::Log ? clog
: cerr;
lock.unlock();
channel << next->message << std::flush;
}
}
void run();
};
......@@ -229,10 +213,13 @@ inline
int ostream_manager::HeaderInserter::overflow(int ch)
{
int retval = 0;
if (ch != traits_type::eof()) {
if (ch == 1) {
indent += 3;
} else if (ch == 2) {
indent -= 3 * (indent > 0);
} else if (ch != traits_type::eof()) {
if (start_of_line) {
int idt = msg_handler_t::get_indent();
for (int i = 0; i < idt; ++i) {
for (int i = 0; i < indent; ++i) {
dest->sputc(' ');
}
}
......@@ -242,6 +229,39 @@ int ostream_manager::HeaderInserter::overflow(int ch)
return retval;
}
inline
void message_queue::run()
{
while (true)
{
scoped_lock_type lock(m_mutex);
while (!m_stop && m_queue.empty()) {
m_condition.wait(lock);
}
if (m_stop && m_queue.empty()) {
return;
}
message_handle next = m_queue.front();
m_queue.pop_front();
std::ostream& channel = next->channel == msg_channel::Out ? cout
: next->channel == msg_channel::Log ? clog
: cerr;
lock.unlock();
if (next->message.size() == 0) {
continue;
}
lock_stream();
channel << next->message << std::flush;
unlock_stream();
msg_handler_t::run_hooks();
}
}
#define MSG_ERROR(_msg_expr_, _workaround_expr_) \
do {\
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::e() << "[ERR] " << _msg_expr_ << msg_handler_t::n() << std::endl));\
......@@ -296,8 +316,8 @@ int ostream_manager::HeaderInserter::overflow(int ch)
} while(0)
#endif
#define MSG_DEBUG_INDENT msg_handler_t::indent()
#define MSG_DEBUG_DEDENT msg_handler_t::dedent()
#define MSG_DEBUG_INDENT CREATE_MESSAGE(msg_channel::Log, "\x1")
#define MSG_DEBUG_DEDENT CREATE_MESSAGE(msg_channel::Log, "\x2")
inline void msg_handler_t::state_t::check(bool fatal)
{
......@@ -308,10 +328,10 @@ inline void msg_handler_t::state_t::check(bool fatal)
<< (count > 1 ? "s were" : " was")
<< " reported. Suggestions to fix this:" << std::endl));
for (auto& w: workarounds) {
CREATE_MESSAGE(msg_channel::Err, MESSAGE(" - " << w));
CREATE_MESSAGE(msg_channel::Err, MESSAGE(info() << " - " << w << normal() << std::endl));
}
if (fatal) {
CREATE_MESSAGE(msg_channel::Out, MESSAGE(normal() <<"At least one fatal error encountered. Aborting process."));
CREATE_MESSAGE(msg_channel::Out, MESSAGE(normal() <<"At least one fatal error encountered. Aborting process." << std::endl));
exit(-count);
} else {
reset();
......
......@@ -235,13 +235,13 @@ struct format_specification_t {
void operator () (settings_t& settings)
{
settings.marker_observation_specs.insert(map.begin(), map.end());
MSG_DEBUG("settings.marker_observation_specs now:");
for (const auto& mos: settings.marker_observation_specs) {
MSG_DEBUG("* " << mos.first);
for (const auto& m: mos.second) {
MSG_DEBUG(" " << m.first << ": " << m.second);
}
}
/*MSG_DEBUG("settings.marker_observation_specs now:");*/
/*for (const auto& mos: settings.marker_observation_specs) {*/
/*MSG_DEBUG("* " << mos.first);*/
/*for (const auto& m: mos.second) {*/
/*MSG_DEBUG(" " << m.first << ": " << m.second);*/
/*}*/
/*}*/
}
};
......
......@@ -26,8 +26,9 @@ OBJ=$(subst .cc,.o,$(SRC))
DEP=$(subst .cc,.d,$(SRC))
COV_OBJ=$(subst .cc,.cov.o,$(SRC))
DEBUG_OPTS=-ggdb
#OPT_OPTS=-O3 -DEIGEN_NO_DEBUG -DNDEBUG
#DEBUG_OPTS=-ggdb
OPT_OPTS=-O3 -DEIGEN_NO_DEBUG -DNDEBUG
#OPT_OPTS=-O -DEIGEN_NO_DEBUG -DNDEBUG
#DEBUG_OPTS=-Winvalid-pch -ggdb #-H
#DEBUG_OPTS=-ggdb -DNDEBUG
......
......@@ -236,8 +236,8 @@ compute_state_to_parental_origin_haplo(const context_key& ck, const locus_key& l
{
const impl::generation_rs* gen = ck->gen;
stpom_data sd;
MSG_DEBUG("Computing stfopom order=" << lk->depth() << " gen@" << gen);
MSG_DEBUG_INDENT;
/*MSG_DEBUG("Computing stfopom order=" << lk->depth() << " gen@" << gen);*/
/*MSG_DEBUG_INDENT;*/
if (!(lk && lk->locus != locus_key_struc::no_locus)) {
std::set<char> parents;
const std::vector<allele_pair>& ap_labels = gen->get_unique_labels();
......@@ -288,10 +288,10 @@ compute_state_to_parental_origin_haplo(const context_key& ck, const locus_key& l
sd.haplo2 = kroneckerProduct(pred->haplo2, first->haplo2);
}
MSG_DEBUG("stfopom " << lk);
MSG_DEBUG(sd);
/*MSG_DEBUG("stfopom " << lk);*/
/*MSG_DEBUG(sd);*/
MSG_DEBUG_DEDENT;
/*MSG_DEBUG_DEDENT;*/
return sd;
}
......
......@@ -122,14 +122,14 @@ geno_prob_type
joint_geno_prob_at_locus(const context_key& ck, const locus_key& lk, double locus)
{
DUMP_FILE_LINE();
MSG_DEBUG("* Computing geno prob in " << ck << " at " << locus << " given " << lk);
MSG_DEBUG("=====================================================");
/*MSG_DEBUG("* Computing geno prob in " << ck << " at " << locus << " given " << lk);*/
/*MSG_DEBUG("=====================================================");*/
auto it = ck->locus_indices.find(locus);
if (it == ck->locus_indices.end()) {
/* FIXME: whine */
return {};
}
MSG_DEBUG_INDENT;
/*MSG_DEBUG_INDENT;*/
size_t loc_idx = it->second;
value<context_key> vck = ck;
value<locus_key> vlk = lk;
......@@ -138,9 +138,9 @@ joint_geno_prob_at_locus(const context_key& ck, const locus_key& lk, double locu
vck, range<int>(0, ck->pop->size(), 1));
collection<locus_probabilities_type>
alp = make_collection<Disk>(locus_probabilities, vck, vlk, vmgo);
for (auto& x: alp) {
MSG_DEBUG(x);
}
/*for (auto& x: alp) {*/
/*MSG_DEBUG(x);*/
/*}*/
geno_prob_type ret;
......@@ -153,22 +153,17 @@ joint_geno_prob_at_locus(const context_key& ck, const locus_key& lk, double locu
}
DUMP_FILE_LINE();
MSG_DEBUG("computed " << ck << ' ' << lk << '+' << locus);
MSG_DEBUG(ret);
MSG_DEBUG("-----------------------------------------------------");
/*MSG_DEBUG("computed " << ck << ' ' << lk << '+' << locus);*/
/*MSG_DEBUG(ret);*/
/*MSG_DEBUG("-----------------------------------------------------");*/
if (lk && lk->locus != locus_key_struc::no_locus) {
auto pop_pred = make_value<Disk>(joint_geno_prob_at_locus,
vck, value<locus_key>{lk->parent}, value<double>{lk->locus});
(void)*pop_pred;
DUMP_FILE_LINE();
MSG_DEBUG("previous geno_prob " << ck << ' ' << lk->parent << '+' << lk->locus);
MSG_DEBUG(pop_pred);
MSG_DEBUG("-----------------------------------------------------");
/*MSG_DEBUG("previous geno_prob " << ck << ' ' << lk->parent << '+' << lk->locus);*/
/*MatrixXd tmp = kroneckerProduct(pop_pred->data,*/
/*MatrixXd::Ones(ck->gen->get_unique_labels().size(), 1));*/
/*MSG_DEBUG(tmp);*/
/*MSG_DEBUG(pop_pred);*/
/*MSG_DEBUG("-----------------------------------------------------");*/
ret.data = (ret.data.array()
* kroneckerProduct(pop_pred->data,
......@@ -177,12 +172,12 @@ joint_geno_prob_at_locus(const context_key& ck, const locus_key& lk, double locu
}
DUMP_FILE_LINE();
MSG_DEBUG("joint probabilities " << ck << ' ' << lk << '+' << locus);
MSG_DEBUG(ret);
MSG_DEBUG("=====================================================");
/*MSG_DEBUG("joint probabilities " << ck << ' ' << lk << '+' << locus);*/
/*MSG_DEBUG(ret);*/
/*MSG_DEBUG("=====================================================");*/
DUMP_FILE_LINE();
MSG_DEBUG_DEDENT;
/*MSG_DEBUG_DEDENT;*/
return ret;
}
......
......@@ -695,13 +695,15 @@ struct settings_constraint_t {
{
msg_handler_t::reset();
std::pair<bool, std::set<std::string>> ret;
MSG_DEBUG("constraints: " << list().size());
/*MSG_DEBUG("constraints: " << list().size());*/
chrono::start("Consistency check");
for (auto& c: list()) {
bool result = c.predicate(s);
if (!result) {
MSG_ERROR(c.message, c.workaround);
}
}
chrono::stop("Consistency check");
msg_handler_t::check(true);
}
......@@ -875,14 +877,22 @@ settings_constraint_t
PREDICATE
{
bool ok = true;
MSG_DEBUG("populations: " << s->populations.size());
MSG_DEBUG("chromosomes: " << s->map.size());
/*MSG_DEBUG("populations: " << s->populations.size());*/
/*MSG_DEBUG("chromosomes: " << s->map.size());*/
for (const auto& pkv: s->populations) {
const population& pop = pkv.second;
/*MSG_DEBUG(pop);*/
for (const chromosome& chr: s->map) {
/*MSG_DEBUG(chr);*/
/*for (const auto& kv: pop.observed_mark) {*/
auto gen = s->design->generation[pop.qtl_generation_name];
size_t n_ind = pop.pedigree.find(gen)->second.size();
auto obs_qtl_gen_it = pop.observed_mark.find(gen->name);
size_t n_ind;
if (obs_qtl_gen_it == pop.observed_mark.end()) {
n_ind = pop.pedigree.find(gen)->second.size();
} else {
n_ind = obs_qtl_gen_it->second.observations.n_obs;
}
context_key ck(new context_key_struc(&pop, &chr));
collection<multi_generation_observations>
mgos = make_collection<Disk|Mem>(population_marker_obs, value<context_key>{ck}, range<int>(0, n_ind, 1));
......@@ -910,7 +920,9 @@ settings_constraint_t
}
++hi;
}
ok = false;
if (pop.noise == 0.) {
ok = false;
}
}
}
/*}*/
......
......@@ -24,10 +24,22 @@ double settings_t::get_threshold(const std::string& trait)
std::ostream& operator << (std::ostream& os, const std::vector<char>& v) { for (auto& c: v) { os << c; } return os; }
std::ostream& operator << (std::ostream& os, const std::vector<char>& v)
{
if (v.begin() == v.end()) {
return os << '.';
}
auto i = v.begin(), j = v.end();
os << (*i++);
while (i != j) {
os << '.' << (*i++);
}
return os;
}
extern "C" void delete_settings() { if (active_settings) { delete active_settings; } }
extern "C" void delete_settings() { if (active_settings) { msg_handler_t::instance().queue.m_stop = true; delete active_settings; } }
int main(int argc, const char** argv)
{
......@@ -103,10 +115,9 @@ int main(int argc, const char** argv)
msg_handler_t::check(true);
MSG_DEBUG("finalize");
active_settings->finalize();
MSG_DEBUG("sanity_check");
active_settings->set_title("Checking the validity of the configuration");
if (!active_settings->sanity_check()) {
exit(-1);
}
......@@ -202,7 +213,7 @@ int main(int argc, const char** argv)
/*computations::f_test_along_chromosome ftac(M0, M0, pop, chr, gen, computations::selected_qtls_on_chromosome(qtl_chr));*/
collection<population_value> all_pop = all_populations();
/*active_settings->set_title("Computing thresholds");*/
active_settings->set_title("Computing thresholds");
/*collection<double> thresholds*/
/*= make_collection<Disk|Sync>(qtl_threshold,*/
/*all_traits(), qtl_chr,*/
......@@ -413,30 +424,30 @@ int main(int argc, const char** argv)
std::function<MatrixXd(const locus_key&, double loc_removed)>
compare = [&] (const locus_key& lk, double loc_removed)
{
MSG_DEBUG(std::endl);
MSG_DEBUG("LK " << lk << " - " << loc_removed);
/*MSG_DEBUG(std::endl);*/
/*MSG_DEBUG("LK " << lk << " - " << loc_removed);*/
locus_key lk2 = lk - loc_removed;
MSG_DEBUG("LK2 " << lk2);
/*MSG_DEBUG("LK2 " << lk2);*/
parental_origin_per_locus_type popl1, popl2;
MatrixXd popl_red;
popl1 = joint_parental_origin_at_locus(ck, lk->parent, lk->locus);
MSG_DEBUG(MATRIX_SIZE(popl1));
MSG_DEBUG("popl1" << std::endl << popl1);
/*MSG_DEBUG(MATRIX_SIZE(popl1));*/
/*MSG_DEBUG("popl1" << std::endl << popl1);*/
popl2 = joint_parental_origin_at_locus(ck, lk2->parent, lk2->locus);
MSG_DEBUG(MATRIX_SIZE(popl2));
MSG_DEBUG("popl2" << std::endl << popl2);
/*MSG_DEBUG(MATRIX_SIZE(popl2));*/
/*MSG_DEBUG("popl2" << std::endl << popl2);*/
MatrixXd red = lk->reduce(n_par, loc_removed);
/*MSG_DEBUG(MATRIX_SIZE(red));*/
/*MSG_DEBUG("red" << std::endl << red);*/
popl_red = popl1.data * red;
MSG_DEBUG(MATRIX_SIZE(popl_red));
MSG_DEBUG("popl_red" << std::endl << popl_red);
MSG_DEBUG(MATRIX_SIZE(popl2));
MSG_DEBUG(MATRIX_SIZE(popl_red));
/*MSG_DEBUG(MATRIX_SIZE(popl_red));*/
/*MSG_DEBUG("popl_red" << std::endl << popl_red);*/
/*MSG_DEBUG(MATRIX_SIZE(popl2));*/
/*MSG_DEBUG(MATRIX_SIZE(popl_red));*/
MatrixXd ret = popl2.data - popl_red;
return ret;
};
#if 0
#if 1
locus_key lk_all(new locus_key_struc());
locus_key lk_12(new locus_key_struc());
lk_all = lk_all + 1. + 2. + 3.;
......@@ -447,12 +458,14 @@ int main(int argc, const char** argv)
#else
locus_key lk_all(new locus_key_struc(pos));
#endif
/*MSG_DEBUG("compare3@1:" << std::endl << compare(lk_all, pos[0]));*/
/*MSG_DEBUG("compare3@2:" << std::endl << compare(lk_all, pos[1]));*/
/*MSG_DEBUG("compare3@3:" << std::endl << compare(lk_all, pos[2]));*/
active_settings->set_title("Testing locus key reductions... 3 loci -> 2 loci");
MSG_DEBUG("compare3@1:" << std::endl << compare(lk_all, pos[0]));
MSG_DEBUG("compare3@2:" << std::endl << compare(lk_all, pos[1]));
MSG_DEBUG("compare3@3:" << std::endl << compare(lk_all, pos[2]));
lk_all = lk_all - pos[2];
DUMP_FILE_LINE();
MSG_DEBUG_INDENT;
active_settings->set_title("Testing locus key reductions... 2 loci -> 1 locus");
MSG_DEBUG("compare2@1:" << std::endl << compare(lk_all, pos[0]));
MSG_DEBUG_DEDENT;
DUMP_FILE_LINE();
......@@ -461,7 +474,7 @@ int main(int argc, const char** argv)
MSG_DEBUG_DEDENT;
}
std::cout << std::endl;
CREATE_MESSAGE(msg_channel::Out, MESSAGE(std::endl));
++i;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment