Commit 44368b5c authored by Damien Leroux's avatar Damien Leroux
Browse files

Near catastrophe averted. Also, bug fixes.

parent 7c3a5c48
......@@ -87,12 +87,21 @@ inline void ThreadPool::display_progress()
}
msg << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
msg << "[SPELL-QTL] " << msg_handler_t::n() << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
if (queued) {
unsigned long local_queued;
unsigned long local_done;
{
/*std::lock_guard<std::mutex> lock(queue_mutex);*/
local_queued = queued;
local_done = done;
}
if (local_queued) {
msg << "[SPELL-QTL] Task info: ";
msg << msg_handler_t::n();
msg << queued << " queued, " << done << " done, ";
msg << local_queued << " queued, " << done << " done, ";
msg << msg_handler_t::w();
msg << (100 * done / queued) << "% progress";
msg << (100 * local_done / local_queued) << "% progress";
}
msg << ERASE_TO_EOL;
msg << msg_handler_t::n() << std::endl;
......@@ -120,24 +129,27 @@ inline void ThreadPool::display_progress()
(*tmp) << ERASE_TO_EOL << std::endl;
}
msg << master.str() << workers.str();
msg << std::endl << "----------------------------------------------------------";
msg << "----------------------------------------------------------";
msg << ERASE_TO_EOL << std::endl;
msg << RESTORE_CURSOR /*<< SHOW_CURSOR*/;
/*CREATE_MESSAGE(msg_channel::Out, msg.str());*/
if (!msg_handler_t::instance().queue.m_stop) {
msg_handler_t::instance().queue.cout << msg.str() << std::flush;
}
if (done == queued) {
total_queued += queued;
queued = 0;
done = 0;
{
/*std::lock_guard<std::mutex> lock(queue_mutex);*/
if (done == queued) {
total_queued += queued;
queued = 0;
done = 0;
}
}
}
}
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads, std::unordered_map<std::thread::id, std::vector<std::string>>* tt)
: stop(false), total_queued(0), queued(0), done(0), title(), thread_stacks(tt)
: workers(), tasks(), queue_mutex(), condition(), stop(false), total_queued(0), queued(0), done(0), title(), title_mutex(), thread_stacks(tt)
, master_id(std::this_thread::get_id())
{
for(size_t i = 0;i<threads;++i) {
......@@ -180,6 +192,7 @@ template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
std::lock_guard<std::mutex> lock(queue_mutex);
typedef typename std::result_of<F(Args...)>::type return_type;
// don't allow enqueueing after stopping the pool
......@@ -192,7 +205,7 @@ auto ThreadPool::enqueue(F&& f, Args&&... args)
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
/*std::unique_lock<std::mutex> lock(queue_mutex);*/
tasks.push([task](){ (*task)(); });
}
++queued;
......
include Makefile.conf
all: spell-qtl spell-pedigree spell-marker
depend:
......@@ -18,3 +21,10 @@ clean:
cd src && make clean
cd src/bayes && make clean
cd src/pedigree && make clean
install: spell-qtl spell-pedigree spell-marker
cp -v src/spell-qtl $(INSTALL_DIR)/bin
cp -v src/bayes/spell-marker $(INSTALL_DIR)/bin
cp -v src/pedigree/spell-pedigree $(INSTALL_DIR)/bin
INSTALL_DIR=/usr/local
ROOT_DIR:=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
GCC_VERSION=-4.9
......@@ -18,10 +21,12 @@ VERSION_DEFINES=$(shell git tag | tail -1 | sed 's/\([0-9]*\)[.]\([0-9]*\)\([.][
CXXARGS=-Wextra -Wall -pthread -fPIC -DEIGEN_NO_DEPRECATED_WARNING $(VERSION_DEFINES)
#CXXARGS=-Wextra -Wall -pthread -fPIC -DEIGEN_NO_DEPRECATED_WARNING $(VERSION_DEFINES)
#DEBUG_OPTS=-ggdb -O0
OPT_OPTS=-O2 -DEIGEN_NO_DEBUG -DNDEBUG
#DEBUG_OPTS=-ggdb -O0 -DNDEBUG
DEBUG_OPTS=-ggdb -O2 -DNDEBUG
#OPT_OPTS=-O2 -DEIGEN_NO_DEBUG -DNDEBUG
#OPT_OPTS=-O3 -DEIGEN_NO_DEBUG -DNDEBUG
#OPT_OPTS=-ggdb -O2 -DEIGEN_NO_DEBUG -DNDEBUG
#OPT_OPTS=-ggdb -O2 -DNDEBUG
#DEBUG_OPTS=-ggdb -DNDEBUG
#OPT_OPTS=-O3 -DNDEBUG
#PROF_OPTS=-O2 -DEIGEN_NO_DEBUG -DNDEBUG -g -pg
......
......@@ -79,6 +79,7 @@ bool ensure_directory_exists(const std::string& path)
}
#if 0
static inline
bool ensure_directories_exist(const std::string& path)
{
......@@ -96,7 +97,25 @@ bool ensure_directories_exist(const std::string& path)
} while (ok && i != j);
return ok;
}
#endif
static inline
bool ensure_directories_exist(const std::string& path)
{
if (path.size() == 0) {
/*MSG_DEBUG("ensure_directories_exist(" << path << ") OK");*/
return true;
}
size_t i = path.size() - 1;
while (i > 0 && path[i] != '/') { --i; }
std::string parent(path.begin(), path.begin() + i);
if (check_file(parent, true, true, false) || ensure_directories_exist(parent)) {
/*MSG_DEBUG("ensure_directories_exist(" << path << ") OK");*/
return mkdir(path.c_str(), 0770) != -1;
}
/*MSG_DEBUG("ensure_directories_exist(" << path << ") bad.");*/
return false;
}
#endif
......@@ -832,6 +832,19 @@ struct LV_database {
/** **/
template <typename V>
std::vector<V>
filter_vector(const std::vector<V>& vec, const std::vector<size_t>& list)
{
std::vector<V> ret;
ret.reserve(list.size());
for (size_t i: list) {
ret.push_back(vec[i]);
}
return ret;
}
struct qtl_pop_type {
std::string name;
std::string qtl_generation_name;
......@@ -840,14 +853,57 @@ struct qtl_pop_type {
std::map<std::string, std::vector<MatrixXd>> LV;
std::string observed_traits_filename;
std::vector<trait> observed_traits;
/*qtl_pop_type(const std::string& n, const std::vector<size_t>& ind, const geno_matrix* g,*/
/*std::vector<MatrixXd>&& lv, const std::string& otf, std::vector<trait>&& ot)*/
/*: name(n), indices(ind), gen(g), LV(std::move(lv)), observed_traits_filename(ofs), observed_traits(std::move(ot))*/
/*{}*/
qtl_pop_type()
: name(), qtl_generation_name(), indices(), gen(), LV(), observed_traits_filename(), observed_traits()
{}
qtl_pop_type(const std::string& n, const std::string& qgn, const std::vector<size_t>& i, std::shared_ptr<geno_matrix> g,
const std::map<std::string, std::vector<MatrixXd>>& lv, const std::string& otf, const std::vector<trait>& ot)
: name(n), qtl_generation_name(qgn), indices(i), gen(g), LV(lv), observed_traits_filename(otf), observed_traits(ot)
{}
std::shared_ptr<qtl_pop_type>
filter_clone(const trait& this_trait) const
{
const std::vector<size_t>& keep = this_trait.good_indices;
std::shared_ptr<qtl_pop_type> ret = std::make_shared<qtl_pop_type>();
ret->name = name;
ret->qtl_generation_name = qtl_generation_name;
ret->gen = gen;
ret->observed_traits_filename = observed_traits_filename;
ret->observed_traits.emplace_back(this_trait);
auto i = indices.begin();
auto j = indices.end();
auto filt = keep.begin();
auto filt_end = keep.end();
/*MSG_DEBUG("Filtering <" << indices << "> with <" << keep << '>');*/
std::vector<size_t> keep_indices;
keep_indices.reserve(keep.size());
while (filt != filt_end && i != j) {
if (*i == *filt) {
keep_indices.push_back(i - indices.begin());
++i;
++filt;
} else if (*i < *filt) {
++i;
} else {
++filt;
}
}
ret->indices = filter_vector(indices, keep_indices);
for (const auto& kv: LV) {
ret->LV[kv.first] = filter_vector(kv.second, keep_indices);
}
return ret;
}
size_t size() const
{
return observed_traits.front().values.size();
/*return observed_traits.front().values.size();*/
return indices.size();
}
const MatrixXd& get_LV(const std::string& chr, size_t i) const { return LV.find(chr)->second[i]; }
......@@ -1010,7 +1066,7 @@ struct pop_data_type {
return ret;
}
std::string extract_subpops(std::multimap<std::string, qtl_pop_type>& pops, const std::string& traits_filename, const std::vector<trait>& traits, std::vector<std::vector<const qtl_pop_type*>>& linked_pops)
std::string extract_subpops(std::vector<std::shared_ptr<qtl_pop_type>>& pops, const std::string& traits_filename, const std::vector<trait>& traits, std::map<std::string, std::vector<std::vector<std::shared_ptr<qtl_pop_type>>>>& linked_pops)
{
auto extract_traits
= [&] (const std::vector<size_t>& ind_vec)
......@@ -1019,12 +1075,51 @@ struct pop_data_type {
ret.resize(traits.size());
for (size_t ti = 0; ti < traits.size(); ++ti) {
ret[ti].name = traits[ti].name;
/* FIXME there may be less values, need to filter the ind_vec using good_indices */
ret[ti].values.reserve(ind_vec.size());
for (size_t i: ind_vec) { ret[ti].values.push_back(traits[ti].values[i]); }
ret[ti].good_indices.reserve(ind_vec.size());
auto ivi = ind_vec.begin(), ivj = ind_vec.end();
auto vali = traits[ti].values.begin(), valj = traits[ti].values.end();
auto goodi = traits[ti].good_indices.begin();
while (true) {
if (ivi == ivj || vali == valj) {
break;
} else if (*ivi < *goodi) {
++ivi;
} else if (*ivi > *goodi) {
++goodi;
++vali;
} else {
ret[ti].values.push_back(*vali);
ret[ti].good_indices.push_back(*goodi);
++goodi;
++vali;
++ivi;
}
}
/*for (size_t i: ind_vec) {*/
/*ret[ti].values.push_back(traits[ti].values[i]);*/
/*ret[ti].good_indices.push_back(traits[ti].good_indices[i]);*/
/*}*/
}
return ret;
};
for (const auto& this_trait: traits) {
linked_pops[this_trait.name].emplace_back();
}
auto aqg = all_qtl_geno_matrix();
for (const auto& kv: aqg) {
std::shared_ptr<qtl_pop_type> this_pop = std::make_shared<qtl_pop_type>(name, qtl_generation_name, kv.second, kv.first, LV.extract(qtl_generation_name, kv.second), traits_filename, extract_traits(kv.second));
for (const auto& this_trait: traits) {
auto& lp = linked_pops[this_trait.name];
auto pop_ptr = this_pop->filter_clone(this_trait);
lp.back().emplace_back(pop_ptr);
}
pops.emplace_back(this_pop);
}
/*
linked_pops.emplace_back();
linked_pops.back().reserve(aqg.size());
for (const auto& kv: aqg) {
......@@ -1040,6 +1135,7 @@ struct pop_data_type {
}});
linked_pops.back().push_back(&it->second);
}
*/
return name;
}
......
......@@ -238,7 +238,7 @@ template <typename Ret, typename... Args>
Ret operator () (Args... x)
{
static disk_hashtable<Ret(Args...)> dht(MESSAGE(active_settings->work_directory << '/' << m_name));
static disk_hashtable<Ret(Args...)> dht(MESSAGE(active_settings->work_directory << "/cache/" << m_name));
return dht.get_or_compute(m_func, std::forward<Args>(x)...);
}
......
......@@ -160,7 +160,7 @@ population_marker_obs(const context_key&, int);
collection<std::string>
all_traits();
#if 0
static inline
cache_input& operator & (cache_input& ci, const qtl_pop_type*& pop)
{
......@@ -176,6 +176,7 @@ cache_output& operator & (cache_output& co, const qtl_pop_type*& pop)
{
return co & pop->name;
}
#endif // 0
#endif
This diff is collapsed.
......@@ -47,6 +47,10 @@ struct block_builder {
iterator&
operator += (const MATRIX& block)
{
if (ei->n_rows != block.rows() || ei->columns.size() != block.cols()) {
MSG_ERROR("Invalid part size in model block. Have " << MATRIX_SIZE(block) << " but expected (" << ei->n_rows << ", " << ei->columns.size() << ')', "Fix the code");
exit(-1);
}
/*MSG_DEBUG("output " << output);*/
/*MSG_DEBUG(output->innerSize() << ',' << output->outerSize());*/
/*MSG_DEBUG("first_row " << ei->first_row);*/
......@@ -127,7 +131,7 @@ VectorXd
f_tester(const model& M, const model& M0, const parental_origin_per_locus_type& popl);
model_block_type
cross_indicator(const collection<population_value>& pops);
cross_indicator(const std::string& trait_name);
model
basic_model();
......@@ -185,7 +189,7 @@ init_connected_block_builder(
ret.n_rows = 0;
ret.n_columns = index.size();
ret.labels.reserve(ret.n_columns);
ret.elements.reserve(active_settings->populations.size());
ret.elements.reserve(all_pops.size());
for (auto& kv: index) { ret.labels.push_back(kv.first); }
for (auto& kv: all_pops) {
......@@ -218,7 +222,7 @@ init_disconnected_block_builder(
ret.n_columns += get_labels(**kv).size();
}
ret.labels.reserve(ret.n_columns);
ret.elements.reserve(active_settings->populations.size());
ret.elements.reserve(all_pops.size());
for (auto& kv: all_pops) {
ret.elements.emplace_back();
......@@ -302,6 +306,11 @@ struct computation_along_chromosome {
}
};
inline void arg_write(std::ostream&, const computation_along_chromosome*) {}
inline bool arg_match(std::istream&, const computation_along_chromosome*) { return true; }
namespace std { template<> struct hash<computation_along_chromosome*> { bool operator () (const computation_along_chromosome*) const { return 1; } }; }
enum ComputationType { NoTest=0, FTest=1, FTestLOD=2, R2=4, Chi2=8, Mahalanobis=16 };
enum ComputationResults { Test=0, RSS=1, Residuals=2, Coefficients=4, Rank=8 };
......@@ -389,7 +398,7 @@ compute_along_chromosome(computation_along_chromosome& ret,
DUMP_FILE_LINE();
value<model_block_key> vmbk = model_block_key(base_key);
*vmbk += std::make_pair(chr, loci[*i]);
MSG_DEBUG("new block key " << vmbk);
/*MSG_DEBUG("new block key " << vmbk);*/
joiner.push_back(make_value<_Policy>(compute_one,
i, pcac, /* flower */
vct, vcr,
......
......@@ -73,6 +73,13 @@ bool
return true;
}
template <typename C, typename R, typename M>
bool
arg_match(std::istream& is, const labelled_matrix<C, R, M>& lm)
{
return arg_match(is, lm.column_labels) && arg_match(is, lm.row_labels) && arg_match(is, lm.data);
}
inline bool
arg_match(std::istream& is, const geno_matrix* mat)
{
......@@ -85,7 +92,8 @@ inline bool
arg_match(std::istream& is, const context_key& ck)
{
static std::string null("null");
return ((ck->pop ? arg_match(is, ck->pop->name) : arg_match(is, null))
return ((ck->pop ? arg_match(is, ck->pop->name) && arg_match(is, ck->pop->qtl_generation_name) && arg_match(is, ck->pop->size())
: arg_match(is, null))
&& (ck->gen ? arg_match(is, ck->gen) : arg_match(is, null))
&& (ck->chr ? arg_match(is, ck->chr->name) : arg_match(is, null))
&& arg_match(is, ck->loci));
......@@ -94,6 +102,17 @@ inline bool
inline bool
arg_match(std::istream& is, const locus_key& lk)
{
static std::string end("LKend");
if (lk) {
auto vec = lk->to_vec();
/*MSG_DEBUG("arg_match lk->to_vec " << vec);*/
if (!arg_match(is, vec)) {
return false;
}
}
return arg_match(is, end);
#if 0
static std::string end("LKend");
locus_key tmp = lk;
while (tmp) {
if (!arg_match(is, tmp->locus)) {
......@@ -101,7 +120,9 @@ inline bool
}
tmp = tmp->parent;
}
return arg_match(is, end);
return true;
#endif
}
template <typename... Args> struct arg_matcher;
......@@ -149,6 +170,15 @@ template <typename Scalar, int... Opts>
}
}
template <typename R, typename C, typename M>
inline void
arg_write(std::ostream& os, const labelled_matrix<R, C, M>& lm)
{
arg_write(os, lm.column_labels);
arg_write(os, lm.row_labels);
arg_write(os, lm.data);
}
inline void
arg_write(std::ostream& os, const geno_matrix* mat)
{
......@@ -161,7 +191,13 @@ inline void
arg_write(std::ostream& os, const context_key& ck)
{
static std::string null("null");
arg_write(os, ck->pop ? ck->pop->name : null);
if (ck->pop) {
arg_write(os, ck->pop->name);
arg_write(os, ck->pop->qtl_generation_name);
arg_write(os, ck->pop->size());
} else {
arg_write(os, null);
}
if (ck->gen) {
arg_write(os, ck->gen);
} else {
......@@ -174,11 +210,22 @@ inline void
inline void
arg_write(std::ostream& os, const locus_key& lk)
{
static std::string end("LKend");
if (lk) {
auto vec = lk->to_vec();
/*MSG_DEBUG("arg_write lk->to_vec " << vec);*/
arg_write(os, vec);
}
arg_write(os, end);
#if 0
static std::string end("LKend");
locus_key tmp = lk;
while (tmp) {
arg_write(os, tmp->locus);
tmp = tmp->parent;
}
arg_write(os, end);
#endif
}
......@@ -247,9 +294,10 @@ struct disk_hashtable<ValueType(AllArgs...)> {
{
static char hex[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
size_t h = arg_hasher<AllArgs...>()(std::forward<AllArgs>(args)...);
std::string ret(8, '0');
std::string ret(11, '/');
static size_t ret_ofs[8] = {0, 1, 3, 4, 6, 7, 9, 10};
for (int i = 7; i >= 0; --i) {
ret[i] = hex[h & 0xF];
ret[ret_ofs[i]] = hex[h & 0xF];
h >>= 4;
}
return ret;
......
......@@ -104,7 +104,15 @@ inline std::string read_star_name(std::istream& is) {
if (is.eof()) {
throw input_exception(is, "unexpected end of file");
}
if (is.get() != '*') {
char c = is.get();
if (c == '#') {
while (is.get() != '\n');
if (is.eof()) {
throw input_exception(is, "unexpected end of file");
}
c = is.get();
}
if (c != '*') {
throw input_exception(is, "expected a *");
}
is >> ret;
......
......@@ -8,6 +8,7 @@
struct trait {
std::string name;
std::vector<double> values;
std::vector<size_t> good_indices;
};
namespace read_data {
......@@ -17,10 +18,18 @@ namespace read_data {
inline std::ostream& operator << (std::ostream& os, const trait& t)
{
os << "<trait:" << t.name;
auto val = t.values.begin();
auto end = t.values.end();
for (; val != end; ++val) {
os << ' ' << (*val);
for (double v: t.values) {
os << ' ' << v;
}
/*auto val = t.values.begin();*/
/*auto end = t.values.end();*/
/*for (; val != end; ++val) {*/
/*os << ' ' << (*val);*/
/*}*/
os << " indices:";
for (size_t i: t.good_indices) {
os << ' ' << i;
}
return os << '>';
}
......
......@@ -971,19 +971,21 @@ struct model {
#endif
}
void output_X_to_file() const
void output_X_to_file(const std::string& path=".") const
{
static Eigen::IOFormat model_format(Eigen::FullPrecision, Eigen::DontAlignCols, "\t", "\n", "", "", "", "");
filename_stream filename;
filename << "Model_" << keys() << "_X.txt";
ensure_directories_exist(path);
filename << path << '/' << "Model_" << keys() << "_X.txt";
std::ofstream(filename) << X().format(model_format);
}
void output_XtX_inv_to_file() const
void output_XtX_inv_to_file(const std::string& path=".") const
{
static Eigen::IOFormat model_format(Eigen::FullPrecision, Eigen::DontAlignCols, "\t", "\n", "", "", "", "");
filename_stream filename;
filename << "Model_" << keys() << "_XtX_inv.txt";
ensure_directories_exist(path);
filename << path << '/' << "Model_" << keys() << "_XtX_inv.txt";
std::ofstream(filename) << XtX_pseudo_inverse().format(model_format);
}
......
......@@ -101,8 +101,9 @@ struct settings_t {
std::string map_filename;
std::vector<chromosome> map;
/*format_specification_t* marker_observation_specs;*/
std::multimap<std::string, qtl_pop_type> populations;
std::map<std::string, std::vector<const qtl_pop_type*>> pops_by_trait;
/*std::multimap<std::string, qtl_pop_type> populations;*/
std::vector<std::shared_ptr<qtl_pop_type>> populations;
std::map<std::string, std::vector<std::shared_ptr<qtl_pop_type>>> pops_by_trait;
std::vector<qtl_chromosome> working_set;
......@@ -159,14 +160,14 @@ struct settings_t {
LV_database LV;
std::vector<std::vector<const qtl_pop_type*>> linked_pops;
std::map<std::string, std::vector<std::vector<std::shared_ptr<qtl_pop_type>>>> linked_pops;
std::unordered_map<std::thread::id, std::vector<std::string>> thread_stacks;
settings_t()
: notes()
, map_filename(), map()