Commit 44494e56 authored by Damien Leroux's avatar Damien Leroux
Browse files

New implementation of taskpool supersedes the older 3rd-party threadpool.

parent f7ef4771
......@@ -14,6 +14,10 @@
<file path="$PROJECT_DIR$/cmake-build-debug/CMakeFiles" />
<file path="$PROJECT_DIR$/cmake-build-release/CMakeFiles" />
<file path="$PROJECT_DIR$/data" />
<file path="$PROJECT_DIR$/examples/F3-outbred-pleio-small/spell" />
<file path="$PROJECT_DIR$/examples/F3-outbred-pleio/spell" />
<file path="$PROJECT_DIR$/examples/small/S.cache" />
<file path="$PROJECT_DIR$/examples/three_parents_F2/spell" />
<file path="$PROJECT_DIR$/include/attic" />
<file path="$PROJECT_DIR$/sample-data" />
<file path="$PROJECT_DIR$/sandbox" />
......@@ -27,10 +31,13 @@
<file path="$PROJECT_DIR$/simulation/gen_SIB" />
<file path="$PROJECT_DIR$/simulation/test/gen_RIL" />
<file path="$PROJECT_DIR$/simulator" />
<file path="$PROJECT_DIR$/src/all_values" />
<file path="$PROJECT_DIR$/src/bayes/gros_tests" />
<file path="$PROJECT_DIR$/src/bayes/test-data" />
<file path="$PROJECT_DIR$/src/bayes/tmp" />
<file path="$PROJECT_DIR$/src/experiments" />
<file path="$PROJECT_DIR$/src/include-graphs" />
<file path="$PROJECT_DIR$/src/misc_outputs" />
<file path="$PROJECT_DIR$/test_ril_ril2" />
<file path="$PROJECT_DIR$/tests/TestCodeCoverage" />
</excludeRoots>
......
This diff is collapsed.
......@@ -40,7 +40,7 @@ public:
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
// the task manager
std::queue<std::function<void()>> tasks;
// synchronization
......
......@@ -20,7 +20,8 @@
#include "cache/base.h"
#include "disk_hashtable.h"
#include "task_pool.h"
#include "stl_output.h"
template <typename Ret, typename... Args>
computation_registry<value<Ret>, Ret (*) (Args...), value<typename clean_type<Args>::type>...>&
......@@ -77,62 +78,159 @@ register_task_in_progress(std::shared_ptr<async_computation<Ret(Args...)>> v,
template <typename Ret, typename... Args>
struct async_computation<Ret(Args...)> {
struct async_computation<Ret(Args...)> : public Task {
typedef async_computation<Ret(Args...)> this_type;
typedef Ret value_type;
typedef Ret (*computation_function_pointer_type) (Args...);
typedef std::packaged_task<Ret(Args...)> task_type;
async_computation(const async_computation&) = delete;
async_computation() = delete;
// ~async_computation() { if (m_thread.joinable()) { m_thread.join(); } }
async_computation(CachingPolicy _Sync, computation_function_pointer_type func,
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage_init(false)
, m_storage()
, m_future(active_settings
->enqueue(_Sync,
[=] (Args... args)
{
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
std::thread::id this_id = std::this_thread::get_id();
active_settings->thread_stacks[this_id].push_back(func_name);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] ENTER " << func_name);*/
msg_handler_t::run_hooks();
Ret ret = func(args...);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] LEAVE " << func_name);*/
active_settings->thread_stacks[this_id].pop_back();
msg_handler_t::run_hooks();
unregister_task_in_progress(func, {args}...);
return ret;
}, *args...))
, m_future()
, m_compute([=] () {
/*
bool previous = m_started.exchange(true);
if (previous) {
MSG_DEBUG("Task " << get_func_name(func) << " already run. Returning.");
// unregister_task_in_progress(func, {*args}...);
TaskPool::release_slot();
TaskPool::remove_task(m_thread.get_id());
return;
}
*/
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
if (0) {
// msg_handler_t::cout() << "INVOKING " << func_name << "(…)" << std::endl;
std::stringstream ss;
ss << "INVOKING " << func_name << "(" << std::endl;
std::vector<std::string> avec;
do_with_arg_pack(avec.push_back(MESSAGE(args)));
for (size_t i = 0; i < avec.size() - 1; ++i) {
ss << avec[i] << ", ";
}
msg_handler_t::cout() << ss.str() << ')';
}
std::thread::id this_id = std::this_thread::get_id();
// active_settings->thread_stacks[this_id].push_back(func_name);
MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] ENTER " << func_name);
msg_handler_t::run_hooks();
auto ret = func(*args...);
// active_settings->thread_stacks[this_id].pop_back();
msg_handler_t::run_hooks();
unregister_task_in_progress(func, {*args}...);
TaskPool::release_slot();
// TaskPool::remove_task(m_thread->get_id());
MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] LEAVE " << func_name);
m_started_cv.notify_all();
return ret;
})
, mutex()
{ }
, m_started(false)
, m_started_cv()
, m_storage_waiting(false)
, m_storage_waiting_cv()
{
// MSG_DEBUG("In constructor, thread " << (&m_thread) << " should not be joinable: " << std::boolalpha << m_thread.joinable());
std::unique_lock<std::mutex> lock(mutex);
// if (!m_started) {
// m_started.store(true);
TaskPool::enqueue(this);
// }
}
async_computation(CachingPolicy _Sync, std::function<Ret(Args...)>& proxy,
computation_function_pointer_type func,
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage_init(false)
// , m_thread()
, m_storage()
/*, m_future(active_settings->enqueue(_Sync, func, *args...))*/
, m_future(active_settings
->enqueue(_Sync,
[=] (Args... args)
{
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
std::thread::id this_id = std::this_thread::get_id();
active_settings->thread_stacks[this_id].push_back(func_name);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] ENTER " << func_name);*/
msg_handler_t::run_hooks();
Ret ret = proxy(args...);
/*MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] LEAVE " << func_name);*/
active_settings->thread_stacks[this_id].pop_back();
msg_handler_t::run_hooks();
unregister_task_in_progress(func, {args}...);
return ret;
}, *args...))
// , m_promise()
, m_future()
, m_compute([=] () {
m_started_cv.notify_all();
/*
bool previous = m_started.exchange(true);
if (previous) {
MSG_DEBUG("Task " << get_func_name(func) << " already run. Returning.");
// unregister_task_in_progress(func, {*args}...);
TaskPool::release_slot();
TaskPool::remove_task(m_thread.get_id());
return;
}
*/
std::string func_name = get_func_name(func);
chrono_trace _(func_name);
if (0) {
// msg_handler_t::cout() << "INVOKING " << func_name << "(" << std::endl;
std::stringstream ss;
ss << "INVOKING " << func_name << "(…)" << std::endl;
std::vector<std::string> avec;
do_with_arg_pack(avec.push_back(MESSAGE(args)));
for (size_t i = 0; i < avec.size() - 1; ++i) {
ss << avec[i] << ", ";
}
msg_handler_t::cout() << ss.str() << ')';
}
std::thread::id this_id = std::this_thread::get_id();
// active_settings->thread_stacks[this_id].push_back(func_name);
MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] p ENTER " << func_name);
// msg_handler_t::run_hooks();
// m_promise.set_value(proxy(*args...));
auto ret = proxy(*args...);
MSG_DEBUG("[" << this_id << ',' << (this_id == active_settings->main_thread) << "] p LEAVE " << func_name);
// active_settings->thread_stacks[this_id].pop_back();
// msg_handler_t::run_hooks();
unregister_task_in_progress(func, {*args}...);
TaskPool::release_slot();
// TaskPool::remove_task(m_thread->get_id());
m_started_cv.notify_all();
return ret;
})
/*, m_future(std::async(func, *args...))*/
, mutex()
{}
, m_started(false)
, m_started_cv()
, m_storage_waiting(false)
, m_storage_waiting_cv()
{
// MSG_DEBUG("In constructor (w/ proxy), thread " << (&m_thread) << " should not be joinable: " << std::boolalpha << m_thread.joinable());
std::unique_lock<std::mutex> lock(mutex);
// if (!m_started) {
// m_started.store(true);
TaskPool::enqueue(this);
// }
}
bool has_run() const override { return m_started; }
std::thread::id run() override
{
std::unique_lock<std::mutex> lock(mutex);
if (m_started) {
MSG_ERROR("Task has already run.", "");
return {};
} else {
m_future = std::async(std::launch::async, m_compute);
// m_thread.reset(new std::thread(std::move(m_compute)));
// m_thread = new std::thread(std::move(m_compute));
// m_thread.detach();
// return m_thread->get_id();
m_started = true;
m_started_cv.notify_all();
return {};
}
}
value_type& __get_noconst()
{
......@@ -140,11 +238,33 @@ template <typename Ret, typename... Args>
/*if (m_future.valid()) {*/
/*return m_storage = m_future.get();*/
/*}*/
mutex.lock();
if (m_future.valid()) {
m_storage = m_future.get();
// mutex.lock();
// if (!m_storage_init && m_future.valid()) {
// mutex.unlock();
if (m_storage_init) {
return m_storage;
}
mutex.unlock();
TaskPool::wait([this]() {
// mutex.lock();
std::unique_lock<std::mutex> lock(mutex);
if (!m_started) {
m_started_cv.wait(lock, [this]() -> bool { return m_started; });
}
if (!m_storage_init && m_future.valid()) {
bool waiting_for_storage = m_storage_waiting.exchange(true);
if (waiting_for_storage) {
m_storage_waiting_cv.wait(lock, [this] () -> bool { return m_storage_init; });
} else {
m_storage = m_future.get();
m_storage_init.store(true);
m_storage_waiting_cv.notify_all();
}
}
// mutex.unlock();
});
// mutex.lock();
// }
// mutex.unlock();
return m_storage;
}
......@@ -155,9 +275,16 @@ template <typename Ret, typename... Args>
protected:
std::tuple<value<typename clean_type<Args>::type>...> dependencies;
std::atomic_bool m_storage_init;
// std::unique_ptr<std::thread> m_thread;
value_type m_storage;
std::future<Ret> m_future;
std::recursive_mutex mutex;
std::function<Ret()> m_compute;
std::mutex mutex;
std::atomic_bool m_started;
std::condition_variable m_started_cv;
std::atomic_bool m_storage_waiting;
std::condition_variable m_storage_waiting_cv;
};
template <typename Ret, typename... Args>
......@@ -472,7 +599,7 @@ make_value(Ret (&f) (Args...), const clean_value_type<Args>&... x)
{
typedef mem_cache_traits<_Policy & Mem, Ret, Args...> mem_policy;
typedef disk_cache_traits<_Policy & Disk, Ret, Args...> disk_policy;
return mem_policy::template create<typename disk_policy::type>(_Policy & Sync, f, x...);
return mem_policy::template create<typename disk_policy::type>(_Policy, f, x...);
}
......@@ -605,22 +732,22 @@ make_collection(Ret (&f) (Args...), CollArgs... args)
template <typename T1, typename T2>
std::ostream& operator << (std::ostream& os, const std::pair<T1, T2>& p) { return os << p.first << ':' << p.second; }
template <typename T1, typename T2>
std::ostream& operator << (std::ostream& os, const std::map<T1, T2>& m)
{
auto i = m.begin(), j = m.end();
os << '{';
if (i != j) {
os << (*i);
for (++i; i != j; ++i) {
os << ' ' << (*i);
}
}
return os << '}';
}
//template <typename T1, typename T2>
//std::ostream& operator << (std::ostream& os, const std::pair<T1, T2>& p) { return os << p.first << ':' << p.second; }
//
//template <typename T1, typename T2>
//std::ostream& operator << (std::ostream& os, const std::map<T1, T2>& m)
//{
// auto i = m.begin(), j = m.end();
// os << '{';
// if (i != j) {
// os << (*i);
// for (++i; i != j; ++i) {
// os << ' ' << (*i);
// }
// }
// return os << '}';
//}
......
......@@ -29,10 +29,10 @@ struct chrono {
if (!chrono::display()) {
return;
}
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::n() << "Timers:" << std::endl));
msg_handler_t::cerr() << msg_handler_t::n() << "Timers:" << std::endl;
for (auto& kv: *this) {
if (kv.second.accum != 0.) {
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second.accum << " seconds." << std::endl));
msg_handler_t::cerr() << kv.first << ": " << kv.second.accum << " seconds." << std::endl;
}
}
}
......@@ -44,9 +44,9 @@ struct chrono {
if (!chrono::display()) {
return;
}
CREATE_MESSAGE(msg_channel::Err, MESSAGE(msg_handler_t::n() << "Counters:" << std::endl));
msg_handler_t::cerr() << msg_handler_t::n() << "Counters:" << std::endl;
for (auto& kv: *this) {
CREATE_MESSAGE(msg_channel::Err, MESSAGE(kv.first << ": " << kv.second << std::endl));
msg_handler_t::cerr() << kv.first << ": " << kv.second << std::endl;
}
}
};
......
......@@ -34,7 +34,7 @@ extern "C" {
#include <ftw.h>
}
#define OUT(__x__) CREATE_MESSAGE(msg_channel::Out, MESSAGE(__x__))
#define OUT(__x__) do { msg_handler_t::cout() << __x__ << std::endl; } while (0)
/*#include "computations.h"*/
......
......@@ -26,6 +26,9 @@
#include <regex>
#include <boost/math/distributions/normal.hpp> // for normal_distribution
#define SIGNAL_DISPLAY_ONELINER
using boost::math::normal; // typedef provides default type is double.
using boost::math::cdf;
using boost::math::mean;
......@@ -538,15 +541,22 @@ struct test_result {
friend
std::ostream& operator << (std::ostream& os, const test_result& tr)
{
os << "<result chrom=" << (tr.chrom ? tr.chrom->name : "nil")
<< " locus=" << tr.locus
<< " test=" << tr.test_value
<< " at=" << tr.index
<< " over?=" << tr.over_threshold
<< " block_key=" << tr.pop_block_key
<< " dominance_block_key=" << tr.dom_block_key
<< '>';
return os;
os << "<result chrom=" << (tr.chrom ? tr.chrom->name : "nil");
os << " locus=" << tr.locus;
os << " test=" << tr.test_value;
os << " at=" << tr.index;
os << " over?=" << tr.over_threshold;
if (tr.pop_block_key) {
os << " block_key=" << tr.pop_block_key;
} else {
os << " block_key=none";
}
if (tr.dom_block_key) {
os << " dominance_block_key=" << tr.dom_block_key;
} else {
os << " dominance_block_key=none";
}
return os << '>';
}
bool
......@@ -1201,7 +1211,7 @@ struct search_lg_type {
#ifdef SIGNAL_DISPLAY_ONELINER
signal_display sd(last_computation.transpose(), i_max, max > threshold);
MSG_DEBUG("[COMPUTATION] " << effective_positions.front() << sd << effective_positions.back() << " max=" << max << " at " << effective_positions[i_max]);
MSG_DEBUG("[COMPUTATION] " << active_loci.front() << sd << active_loci.back() << " max=" << max << " at " << active_loci[i_max]);
#else
signal_display sd(*chrom, active_loci, last_computation.transpose(), i_max, threshold);
MSG_DEBUG("[COMPUTATION] " << active_loci.front() << " ... " << active_loci.back() << " max=" << max << " at " << active_loci[i_max] << std::endl << sd);
......@@ -1226,6 +1236,9 @@ struct search_lg_type {
MSG_DEBUG((*last_computation));
MSG_QUEUE_FLUSH();
*/
MSG_DEBUG("LOCUS BLOCK");
MSG_DEBUG("" << locus_blocks[i_max]);
MSG_QUEUE_FLUSH();
return {
this, chrom,
......@@ -1262,24 +1275,45 @@ struct search_lg_type {
void
deselect(double position, const collection<population_value>& all_pops, value<model> M)
{
MSG_DEBUG("deselect position " << position << " model " << M->keys());
size_t i_deselect = std::find(all_positions.begin(), all_positions.end(), position) - all_positions.begin();
std::vector<size_t> indices;
for (size_t i = 0; i < all_positions.size(); ++i) {
if (i == i_deselect || (!!local_selections[i] && local_selections[i]->has(position))) {
indices.push_back(i);
}
}
for (size_t i: indices) {
auto vmbk = model_block_key_struc::pop(chrom, local_selections[i] + all_positions[i]);
value<model_block_type> vblock = M->m_blocks[vmbk];
M->remove_block(vmbk);
/* Need to add the reduced block now */
if (local_selections[i]->depth() > 1) {
reduce(all_pops, position, vblock);
M->add_block(model_block_key_struc::pop(chrom, local_selections[i] - position), vblock);
auto i = M->m_blocks.begin(), j = M->m_blocks.end();
for (; i != j;) {
auto& key = i->first;
auto& value = i->second;
if (key->has_locus(chrom, position)) {
if (!key->remove_position(chrom, position)) {
auto tmp = i;
++i;
M->m_blocks.erase(tmp);
} else {
reduce(all_pops, position, value);
++i;
}
} else {
++i;
}
}
//
// MSG_DEBUG("deselect position " << position << " model " << M->keys());
// size_t i_deselect = std::find(all_positions.begin(), all_positions.end(), position) - all_positions.begin();
// std::vector<size_t> indices;
// for (size_t i = 0; i < all_positions.size(); ++i) {
// if (i == i_deselect || (!!local_selections[i] && local_selections[i]->has(position))) {
// indices.push_back(i);
// }
// }
// for (size_t i: indices) {
// auto vmbk = model_block_key_struc::pop(chrom, local_selections[i] + all_positions[i]);
// value<model_block_type> vblock = M->m_blocks[vmbk];
// if (vblock) {
// M->remove_block(vmbk);
// /* Need to add the reduced block now */
//// if (local_selections[i]->depth() > 1) {
// if (local_modes[i] == Joint) {
// reduce(all_pops, position, vblock);
// M->add_block(model_block_key_struc::pop(chrom, local_selections[i] - position), vblock);
// }
// }
// }
selection = selection - position;
recompute_modes();
/*deselect(position);*/
......
......@@ -473,10 +473,13 @@ struct disk_hashtable<ValueType(AllArgs...)> {
std::vector<std::string> entries = get_entries(path);
std::pair<bool, value_type> ret;
ret.first = find([&] (file& ifs) { cache_input ci(ifs); ci & ret.second; },
// args...);
std::forward<AllArgs>(args)...);
if (!ret.first) {
ret.second = func(std::forward<AllArgs>(args)...);
put(path, entries, ret.second, std::forward<AllArgs>(args)...);
// ret.second = func(args...);
// put(path, entries, ret.second, args...);
}
return ret.second;
}
......
This diff is collapsed.
......@@ -24,6 +24,7 @@
#include "symmetry.h"
#include <limits>
#include <cmath>
#include "stl_output.h"
#define FLOAT_TOL 1.e-6
......
......@@ -23,6 +23,7 @@
#include <vector>
#include <map>
#include <iomanip>
#include "stl_output.h"
#define MATRIX_SIZE(_x_) "dim(" #_x_ ") = (" << (_x_).innerSize() << ',' << (_x_).outerSize() << ")"
......@@ -45,13 +46,13 @@ inline std::ostream& operator << (std::ostream& os, const std::pair<double, doub
}
template <typename VALUE_TYPE>
std::ostream& operator << (std::ostream& os, const std::vector<VALUE_TYPE>& v)
{
std::string sep = "";
for (auto x: v) { os << sep << x; sep = " "; }
return os;
}
//template <typename VALUE_TYPE>
//std::ostream& operator << (std::ostream& os, const std::vector<VALUE_TYPE>& v)
//{
// std::string sep = "";
// for (auto x: v) { os << sep << x; sep = " "; }
// return os;
//}
template <typename MATRIX, typename ROW_LABEL, typename COL_LABEL=ROW_LABEL>
struct labelled_matrix {
......
......@@ -348,6 +348,27 @@ struct model_block_key_struc {
return false;
}
bool remove_position(const chromosome* chr, double locus)
{
switch (type) {
case mbk_Covar:
case mbk_CI:
return true;
case mbk_POP:
if (has_locus(chr, locus)) {
loci = loci - locus;
}
return !loci->is_empty();
case mbk_Dominance:
return left->remove_position(chr, locus);
case mbk_Interaction:
bool a = left->remove_position(chr, locus);
bool b = right->remove_position(chr, locus);
return a && b;
};
return false;
}
friend
inline
bool
......@@ -1065,6 +1086,10 @@ struct model {
return;
/*throw key_already_exists(*this, key);*/
}
if (!x) {
MSG_ERROR("Attempting to insert a null block in the model.", "");
abort();
}
m_computed = false;
m_blocks.insert({key, x});
/*MSG_DEBUG("selection now: " << keys());*/
......@@ -1092,6 +1117,12 @@ struct model {
add_blocks(const model_block_collection& mbc)
{
m_computed = false;
for (auto kv: mbc) {
if (!kv.second) {
MSG_ERROR("Attempting to insert a null block in the model.", "");
abort();
}
}