Commit de2c25c6 authored by Damien Leroux's avatar Damien Leroux
Browse files

Minor tweaks.


Former-commit-id: 28574fea5208cdd7ab5351602a8cc2d1538888db
parent 1ee44388
......@@ -19,7 +19,7 @@
class ThreadPool {
public:
ThreadPool(size_t);
ThreadPool(size_t, std::unordered_map<std::thread::id, std::vector<std::string>>*);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
......@@ -35,7 +35,7 @@ private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
std::queue<std::function<void()>> tasks;
// synchronization
std::mutex queue_mutex;
......@@ -48,6 +48,8 @@ private:
std::string title;
std::mutex title_mutex;
std::unordered_map<std::thread::id, std::vector<std::string>>* thread_stacks;
std::thread::id master_id;
void display_progress();
};
......@@ -87,7 +89,31 @@ inline void ThreadPool::display_progress()
msg << (100 * done / queued) << "% progress";
}
msg << ERASE_TO_EOL;
msg << msg_handler_t::n();
msg << msg_handler_t::n() << std::endl;
int ti = 0;
std::stringstream master;
std::stringstream workers;
std::stringstream* tmp;
for (const auto& kv: *thread_stacks) {
std::thread::id id = kv.first;
if (id == master_id) {
master << "[MASTER] ";
tmp = &master;
} else {
workers << '[' << (ti++) << "] ";
tmp = &workers;
}
if (kv.second.size()) {
(*tmp) << kv.second[0];
for (size_t i = 1; i < kv.second.size(); ++i) {
(*tmp) << " | " << kv.second[i];
}
} else {
(*tmp) << "<idle>";
}
(*tmp) << ERASE_TO_EOL << std::endl;
}
msg << master.str() << workers.str();
msg << std::endl << "----------------------------------------------------------";
msg << ERASE_TO_EOL << std::endl;
msg << RESTORE_CURSOR /*<< SHOW_CURSOR*/;
......@@ -104,10 +130,11 @@ inline void ThreadPool::display_progress()
}
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false), total_queued(0), queued(0), done(0), title()
inline ThreadPool::ThreadPool(size_t threads, std::unordered_map<std::thread::id, std::vector<std::string>>* tt)
: stop(false), total_queued(0), queued(0), done(0), title(), thread_stacks(tt)
, master_id(std::this_thread::get_id())
{
for(size_t i = 0;i<threads;++i)
for(size_t i = 0;i<threads;++i) {
workers.emplace_back(
[this]
{
......@@ -135,6 +162,10 @@ inline ThreadPool::ThreadPool(size_t threads)
}
}
);
/*MSG_DEBUG("NEW THREAD [" << workers.back().get_id() << ']');*/
thread_stacks->insert({workers.back().get_id(), {}});
}
thread_stacks->insert({master_id, {}});
msg_handler_t::hook([this] () { display_progress(); });
}
......
......@@ -92,7 +92,7 @@ struct pedigree_bayesian_network {
} else {
for (size_t i = 0; i < vts.parents.size(); ++i) {
pmap[i] = state_vectors[vts.parents[i]];
MSG_DEBUG("PMAP[" << i << "] = #" << vts.parents[i] << " : " << pmap[i]);
/*MSG_DEBUG("PMAP[" << i << "] = #" << vts.parents[i] << " : " << pmap[i]);*/
}
for (int i = 0; i < lc.size(); ++i) {
/* FIXME the parent key should be {gen, #id} or #id. NOT gen only. */
......
......@@ -372,14 +372,14 @@ struct bayesian_network {
#endif
, m_evidence_table()
{
MSG_DEBUG("Initializing factors...");
/*MSG_DEBUG("Initializing factors...");*/
chrono::start("init_factor_map");
init_factor_map();
chrono::stop("init_factor_map");
chrono::display() = true;
MSG_DEBUG("Done initializing factors.");
/*MSG_DEBUG("Done initializing factors.");*/
MSG_DEBUG("Factor map size :");
/*MSG_DEBUG("Factor map size :");*/
/*MSG_DEBUG("Obs " << m_factor_map[FObs].size());*/
/*dump_factor(FObs, 0);*/
......@@ -774,7 +774,7 @@ struct bayesian_network {
m_inexact_var_message_counters.resize(m_var_message_counters.size(), 0);
m_inexact_factor_message_counters.clear();
m_inexact_factor_message_counters.resize(m_factor_message_counters.size(), 0);
bool exact = update_messages();
bool exact = update_messages(verbose);
size_t n_iter = 1;
if (verbose >= 2) {
if (verbose >= 3) {
......@@ -785,9 +785,11 @@ struct bayesian_network {
}
if (!exact) {
do {
MSG_DEBUG("ITER " << n_iter << " delta=" << delta_max());
if (verbose >= 2) {
MSG_DEBUG("ITER " << n_iter << " delta=" << delta_max());
}
++n_iter;
update_messages();
update_messages(verbose);
if (verbose >= 2) {
if (verbose >= 3) {
MSG_DEBUG("MESSAGES");
......@@ -803,7 +805,7 @@ struct bayesian_network {
return exact;
}
bool update_messages_async()
bool update_messages_async(size_t verbose=0)
{
bool exact = true;
std::deque<std::pair<MessageType, size_t>> transmittable_messages;
......@@ -835,7 +837,9 @@ struct bayesian_network {
transmittable_messages.clear();
}
MSG_DEBUG("NEW ITERATION " << pending_messages.size() << "P " << transmittable_messages.size() << "T");
if (verbose >= 2) {
MSG_DEBUG("NEW ITERATION " << pending_messages.size() << "P " << transmittable_messages.size() << "T");
}
MessageType type;
size_t msg_i;
......@@ -848,7 +852,7 @@ struct bayesian_network {
/*MSG_DEBUG("UPDATING " << m_bn->debug_message(type, msg_i));*/
/*MSG_QUEUE_FLUSH();*/
if (_async_update_message(type, msg_i)) {
MSG_DEBUG("ZERO " << m_bn->debug_message(type, msg_i, true, m_factor_msg_buf));
/*MSG_DEBUG("ZERO " << m_bn->debug_message(type, msg_i, true, m_factor_msg_buf));*/
}
/*_async_message_transmitted(type, msg_i);*/
switch (type) {
......@@ -924,7 +928,9 @@ struct bayesian_network {
if (pending_messages.size()) {
if (m_exact) {
MSG_DEBUG("GOT " << pending_messages.size() << " PENDING MESSAGE(S)!");
if (verbose >= 2) {
MSG_DEBUG("GOT " << pending_messages.size() << " PENDING MESSAGE(S)!");
}
m_exact = false;
m_inexact_messages = pending_messages;
m_inexact_var_message_counters = m_var_message_counters;
......@@ -939,7 +945,7 @@ struct bayesian_network {
return exact;
}
bool update_messages_sync()
bool update_messages_sync(size_t verbose=0)
{
for (const auto& m: m_bn->m_factor_messages) {
/*m.update(m_factor_msg_buf.front(m.m_buf_idx), m_bn->m_factor_map, m_var_msg_buf);*/
......@@ -951,15 +957,16 @@ struct bayesian_network {
m_var_msg_buf.flip();
m_factor_msg_buf.flip();
return false;
(void) verbose;
}
bool update_messages()
bool update_messages(size_t verbose=0)
{
switch (m_mode) {
case PM_Sync:
return update_messages_sync();
return update_messages_sync(verbose);
case PM_Async:
return update_messages_async();
return update_messages_async(verbose);
default:
return false;
};
......
......@@ -563,7 +563,7 @@ struct pop_data_type {
return ret;
}
std::string extract_subpops(std::multimap<std::string, qtl_pop_type>& pops, const std::string& traits_filename, const std::vector<trait>& traits)
std::string extract_subpops(std::multimap<std::string, qtl_pop_type>& pops, const std::string& traits_filename, const std::vector<trait>& traits, std::vector<std::vector<const qtl_pop_type*>>& linked_pops)
{
auto extract_traits
= [&] (const std::vector<size_t>& ind_vec)
......@@ -578,15 +578,19 @@ struct pop_data_type {
return ret;
};
auto aqg = all_qtl_generation_rs();
linked_pops.emplace_back();
linked_pops.back().reserve(aqg.size());
for (const auto& kv: aqg) {
pops.insert({name, {
name,
kv.second,
kv.first,
LV.extract(qtl_generation_name, kv.second),
traits_filename,
extract_traits(kv.second)
}});
auto it =
pops.insert({name, {
name,
kv.second,
kv.first,
LV.extract(qtl_generation_name, kv.second),
traits_filename,
extract_traits(kv.second)
}});
linked_pops.back().push_back(&it->second);
}
return name;
}
......
......@@ -461,19 +461,45 @@ template <typename Ret, typename... Args>
->enqueue(_Sync,
[=] (Args... args)
{
chrono_trace _(get_func_name(func));
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
std::thread::id this_id = std::this_thread::get_id();
active_settings->thread_stacks[this_id].push_back(func_name);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] ENTER " << func_name);*/
msg_handler_t::run_hooks();
Ret ret = func(args...);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] LEAVE " << func_name);*/
active_settings->thread_stacks[this_id].pop_back();
msg_handler_t::run_hooks();
unregister_task_in_progress(func, {args}...);
return ret;
}, *args...))
, mutex()
{ }
async_computation(CachingPolicy _Sync, std::function<Ret(Args...)>& func,
async_computation(CachingPolicy _Sync, std::function<Ret(Args...)>& proxy,
computation_function_pointer_type func,
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage()
, m_future(active_settings->enqueue(_Sync, func, *args...))
/*, m_future(active_settings->enqueue(_Sync, func, *args...))*/
, m_future(active_settings
->enqueue(_Sync,
[=] (Args... args)
{
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
std::thread::id this_id = std::this_thread::get_id();
active_settings->thread_stacks[this_id].push_back(func_name);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] ENTER " << func_name);*/
msg_handler_t::run_hooks();
Ret ret = proxy(args...);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] LEAVE " << func_name);*/
active_settings->thread_stacks[this_id].pop_back();
msg_handler_t::run_hooks();
unregister_task_in_progress(func, {args}...);
return ret;
}, *args...))
/*, m_future(std::async(func, *args...))*/
, mutex()
{}
......@@ -543,7 +569,7 @@ std::shared_ptr<async_computation<Ret(Args...)>>
return *exists;
} else {
std::shared_ptr<async_computation<Ret(Args...)>>
ac(new async_computation<Ret(Args...)>(_Sync, proxy, args...));
ac(new async_computation<Ret(Args...)>(_Sync, proxy, func, args...));
return r.get(func, args...) = register_task_in_progress(ac, func, args...);
}
}
......
......@@ -171,7 +171,7 @@ init_connected_block_builder(
index.insert({l, sz});
}
}
#if 0
#if 1
MSG_DEBUG("index size " << index.size());
std::stringstream s;
s << '{';
......@@ -179,7 +179,7 @@ init_connected_block_builder(
s << " (" << kv.first << ')';
}
s << " }";
MSG_DEBUG("index: " << s);
MSG_DEBUG("index: " << s.str());
#endif
ret.n_rows = 0;
......
......@@ -387,6 +387,7 @@ struct qtl_chromosome {
{
qtl_col.reserve(c->qtl.size());
qtl_blend.reserve(c->qtl.size());
/*MSG_DEBUG("NEW QTL_STATE_ITERATOR with " << c->qtl.size() << " QTLs to iterate over " << states.size() << " states");*/
}
long size() const
......
......@@ -591,7 +591,7 @@ void generation_rs::reduce_processes()
inline
void generation_rs::precompute_redux()
{
redux_matrix = make_redux(matrix_labels_to_cliques(main_process().column_labels));
redux_matrix = make_redux(matrix_labels_to_cliques(main_process().column_labels, unique_labels));
}
inline
......@@ -603,6 +603,7 @@ const MatrixXd& generation_rs::get_redux() const
inline
void generation_rs::precompute_unique_labels()
{
/*
const MatrixXd& redux = get_redux();
const auto& breakmat = main_process();
size_t i = 0;
......@@ -614,6 +615,8 @@ void generation_rs::precompute_unique_labels()
while (j < breakmat.column_labels.size() && redux(i, j) == 1) ++j;
++i;
}
MSG_DEBUG("COMPUTING UNIQUE LABELS IN " << name << ": " << unique_labels);
*/
}
inline
......
......@@ -344,7 +344,7 @@ struct lumper {
ret.reserve(tmp.size());
for (auto& kv: tmp) {
/*std::cout << kv.first << "\t" << kv.second << std::endl;*/
MSG_DEBUG(kv.first << "\t" << kv.second);
/*MSG_DEBUG(kv.first << "\t" << kv.second);*/
ret.push_back(kv.second);
}
pol.stop();
......@@ -409,7 +409,7 @@ struct lumper {
/*std::cout << P.size() << std::endl;*/
/*std::cout << P << std::endl;*/
/*MSG_DEBUG("P.size() = " << P.size());*/
MSG_DEBUG("P = " << P);
/*MSG_DEBUG("P = " << P);*/
/*MSG_DEBUG("Lumping time: ck=" << ck.accum << " cs=" << cs.accum << " pol=" << pol.accum << " tr=" << tr.accum);*/
return P;
}
......@@ -418,7 +418,7 @@ struct lumper {
std::set<subset> refine_all()
{
std::vector<subset> tmp = partition_on_labels();
MSG_DEBUG("INITIAL PARTITION" << std::endl << tmp);
/*MSG_DEBUG("INITIAL PARTITION" << std::endl << tmp);*/
std::set<subset> P(tmp.begin(), tmp.end());
return try_refine(P);
}
......@@ -609,7 +609,7 @@ struct lumper {
for (auto& s: subsets) {
auto io = P.insert(s.second);
if (!io.second) {
MSG_DEBUG("PROUT SUBSET ALREADY IN P " << (*io.first));
/*MSG_DEBUG("PROUT SUBSET ALREADY IN P " << (*io.first));*/
}
if (io.first->size() > 1) {
stack.push_back(&*io.first);
......
......@@ -148,9 +148,25 @@ inline ancestor_allele operator % (const allele_pair& ap, bool fs) { return fs ?
template <typename ALLELE_PAIR_TYPE>
inline std::vector<int> matrix_labels_to_cliques(const std::vector<ALLELE_PAIR_TYPE>& labels)
inline std::vector<int> matrix_labels_to_cliques(const std::vector<ALLELE_PAIR_TYPE>& labels, std::vector<ALLELE_PAIR_TYPE>& uniq)
{
std::vector<int> cliques;
#if 1
std::set<ALLELE_PAIR_TYPE> uniq_set(labels.begin(), labels.end());
/*std::vector<ALLELE_PAIR_TYPE> uniq(uniq_set.begin(), uniq_set.end());*/
uniq.assign(uniq_set.begin(), uniq_set.end());
/*MSG_DEBUG("LABELS " << labels);*/
/*MSG_DEBUG("UNIQ " << uniq);*/
auto get_index = [&] (const ALLELE_PAIR_TYPE& ap) { return (int) (std::find(uniq.begin(), uniq.end(), ap) - uniq.begin()); };
cliques.resize(labels.size());
for (size_t i = 0; i < labels.size(); ++i) {
cliques[i] = get_index(labels[i]);
}
#else
int clique = -1;
cliques.resize(labels.size(), -1);
for (size_t i = 0; i < labels.size(); ++i) {
......@@ -165,6 +181,7 @@ inline std::vector<int> matrix_labels_to_cliques(const std::vector<ALLELE_PAIR_T
}
}
/*std::cout << "CLIQUES :: " << cliques << std::endl;*/
#endif
return cliques;
}
......@@ -186,6 +203,8 @@ inline MatrixXd make_redux(std::vector<int> cliques)
for (size_t i = 0; i < cliques.size(); ++i) {
reduxd(cliques[i], i) = 1;
}
MSG_DEBUG("REDUX");
MSG_DEBUG(reduxd);
return reduxd;
}
......
......@@ -14,6 +14,7 @@
#include "bayes/output.h"
#include "input/observations.h"
#include <thread>
#include "../3rd-party/ThreadPool/ThreadPool.h"
......@@ -97,6 +98,10 @@ struct settings_t {
LV_database LV;
std::vector<std::vector<const qtl_pop_type*>> linked_pops;
std::unordered_map<std::thread::id, std::vector<std::string>> thread_stacks;
settings_t()
: notes()
, map_filename(), map()
......@@ -131,6 +136,8 @@ struct settings_t {
, ld_parents()
, ld_data()
, LV()
, linked_pops()
, thread_stacks()
{}
std::vector<std::pair<const chromosome*, double>>
......@@ -188,7 +195,7 @@ struct settings_t {
c.compute_haplo_sizes();
}
if (parallel > 1) {
pool = new ThreadPool(parallel);
pool = new ThreadPool(parallel, &thread_stacks);
}
#if 0
if (design && marker_observation_specs) {
......@@ -207,6 +214,7 @@ struct settings_t {
-> std::future<typename std::result_of<F(Args...)>::type>
{
if (_Sync || parallel < 2 || std::this_thread::get_id() != main_thread) {
/*MSG_DEBUG(std::this_thread::get_id() << " RUN SYNC");*/
return std::async(std::launch::deferred,
std::forward<F>(f),
std::forward<Args>(args)...);
......
......@@ -36,7 +36,7 @@ class Chromosome(object):
self.Minterval = zip(self.marker_names, intervals)
self.M = zip(self.marker_names, positions)
self.map_file_hack = ([(len(positions), marker_names[0])]
+ zip(intervals[1:], self.marker_names[1:]))
+ zip(intervals[1:], self.marker_names[1:]))
return self
@staticmethod
......@@ -225,23 +225,36 @@ class Individual(object):
return self
@staticmethod
def ancestor(allele, pop):
def ancestor(allele, pop, pedigree):
a = allele + allele
ind_num = 1 + len(pedigree)
pedigree.append((pop.name, ind_num, 0, 0))
pop.ped_id = [ind_num]
genotype = (a for m in pop.map)
return Individual(0, pop).apply(genotype)
@staticmethod
def crossing(i, pop, mi, fi):
def crossing(i, pop, mi, fi, pedigree):
hf = pop.father_pop.individuals[fi].meiosis()
hm = pop.mother_pop.individuals[mi].meiosis()
genotype = izip(hm, hf)
ind_num = 1 + len(pedigree)
pedigree.append((pop.name, ind_num,
pop.mother_pop.ped_id[mi],
pop.father_pop.ped_id[fi]))
pop.ped_id[i] = ind_num
return Individual(i, pop, mi, fi).apply(genotype)
@staticmethod
def selfing(i, pop, pi):
def selfing(i, pop, pi, pedigree):
hf = pop.father_pop.individuals[pi].meiosis()
hm = pop.mother_pop.individuals[pi].meiosis()
genotype = izip(hm, hf)
ind_num = 1 + len(pedigree)
pedigree.append((pop.name, ind_num,
pop.mother_pop.ped_id[pi],
pop.father_pop.ped_id[pi]))
pop.ped_id[i] = ind_num
return Individual(i, pop, pi, pi).apply(genotype)
@staticmethod
......@@ -309,22 +322,24 @@ class Population(object):
self.observations = [[None] * n_ind for m in genmap]
self.individuals = []
self.geno_to_obs = {}
self.ped_id = range(n_ind)
@staticmethod
def generate_line(name, genmap, allele):
def generate_line(name, genmap, allele, pedigree):
self = Population(name, genmap, 1)
self.individuals = [Individual.ancestor(allele, self)]
self.individuals = [Individual.ancestor(allele, self, pedigree)]
self.geno_to_obs = None
return self
@staticmethod
def generate_crossing(name, mother_pop, father_pop, n_ind, geno2obs):
assert mother_pop.map == father_pop.map, \
"Parent populations MUST share the same genetic map"
def generate_crossing(name, mother_pop, father_pop, n_ind, geno2obs,
pedigree):
#assert (mother_pop.map == father_pop.map,
# "Parent populations MUST share the same genetic map")
self = Population(name, mother_pop.map, n_ind, mother_pop, father_pop)
mi = lambda: random.choice(xrange(self.mother_pop.n_ind))
fi = lambda: random.choice(xrange(self.father_pop.n_ind))
self.individuals = [Individual.crossing(i, self, mi(), fi())
self.individuals = [Individual.crossing(i, self, mi(), fi(), pedigree)
for i in xrange(n_ind)]
self.geno_to_obs = geno2obs
return self
......@@ -355,10 +370,10 @@ class Population(object):
return self
@staticmethod
def generate_selfing(name, mother_pop, n_ind, geno2obs):
def generate_selfing(name, mother_pop, n_ind, geno2obs, pedigree):
self = Population(name, mother_pop.map, n_ind, mother_pop, mother_pop)
mi = lambda: random.choice(xrange(self.mother_pop.n_ind))
self.individuals = [Individual.selfing(i, self, mi())
self.individuals = [Individual.selfing(i, self, mi(), pedigree)
for i in xrange(n_ind)]
self.geno_to_obs = geno2obs
return self
......@@ -375,8 +390,8 @@ class Population(object):
def __str__(self):
if self.geno_to_obs is None:
return "<Population %s, %i individuals, not observed>" % (
self.name, len(self.individuals)
)
self.name, len(self.individuals)
)
self.geno_to_obs[''] = '-'
array = [
['data type', self.name],
......@@ -404,7 +419,7 @@ class Trait(object):
self.name = name
def init(self, pop, trait_qtls, qtl_effect, qtl_epistasis,
noise_gain, noise_mu, noise_sigma):
noise_gain, noise_mu, noise_sigma):
self.pop = pop
self.qtls = trait_qtls
self.qtl_effect = qtl_effect
......@@ -473,6 +488,7 @@ class Design(object):
self.traits = {}
if something is not None:
self.__iadd__(something)
self.pedigree = []
def gen_map(self, n_mark_list, length_list, constraint_list):
self.map = Map.generate(n_mark_list, length_list, constraint_list)
......@@ -483,7 +499,8 @@ class Design(object):
return self
def gen_line(self, name, allele):
self.pops[name] = Population.generate_line(name, self.map, allele)
self.pops[name] = Population.generate_line(name, self.map, allele,
self.pedigree)
return self
def gen_cross(self, name, p1, p2, n_ind, geno2obs):
......@@ -491,14 +508,16 @@ class Design(object):
self.pops[p1],
self.pops[p2],
n_ind,
geno2obs)
geno2obs,
self.pedigree)
return self
def gen_self(self, name, p1, n_ind, geno2obs):