Commit 6902ee22 authored by Damien Leroux's avatar Damien Leroux
Browse files

Lots of changes. WIP. Check the diff.

parent cb29469b
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
/* heavily modified */
#include <vector>
#include <queue>
#include <memory>
......@@ -12,6 +16,7 @@
#include <stdexcept>
#include "error.h"
class ThreadPool {
public:
ThreadPool(size_t);
......@@ -19,6 +24,13 @@ public:
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
void set_title(const std::string& s)
{
std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);
title = s;
}
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
......@@ -34,6 +46,8 @@ private:
std::atomic_ulong queued;
std::atomic_ulong done;
std::string title;
void display_progress();
};
......@@ -41,40 +55,42 @@ inline void ThreadPool::display_progress()
{
if (!msg_handler_t::color()) {
return;
}
static constexpr const char* const SAVE_CURSOR = "\033[s";
static constexpr const char* const RESTORE_CURSOR = "\033[u";
static constexpr const char* const ERASE_TO_EOL = "\033[K";
static constexpr const char* const GOTO_0_0 = "\033[0;0H";
std::lock_guard<msg_handler_t::lock_type> lock(msg_handler_t::mutex);
std::cout << SAVE_CURSOR;
std::cout << GOTO_0_0;
std::cout << msg_handler_t::i();
std::cout << "[SPELL-QTL] " << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << ERASE_TO_EOL << std::endl;
if (queued) {
std::cout << "[SPELL-QTL] Task info: ";
std::cout << msg_handler_t::n();
std::cout << queued << " queued, " << done << " done, ";
std::cout << msg_handler_t::w();
std::cout << (100 * done / queued) << "% progress";
}
std::cout << ERASE_TO_EOL;
std::cout << msg_handler_t::n();
std::cout << std::endl << "----------------------------------------------------------";
std::cout << ERASE_TO_EOL << std::endl << ERASE_TO_EOL;
std::cout << RESTORE_CURSOR;
std::cout << std::flush;
if (done == queued) {
total_queued += queued;
queued = 0;
done = 0;
}
} else {
static constexpr const char* const SAVE_CURSOR = "\033[s";
static constexpr const char* const RESTORE_CURSOR = "\033[u";
static constexpr const char* const ERASE_TO_EOL = "\033[K";
static constexpr const char* const GOTO_0_0 = "\033[0;0H";
std::unique_lock<msg_handler_t::lock_type> lock(msg_handler_t::mutex);
std::cout << SAVE_CURSOR;
std::cout << GOTO_0_0;
std::cout << msg_handler_t::i();
std::cout << "[SPELL-QTL] Current: " << msg_handler_t::w() << title << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
std::cout << "[SPELL-QTL] " << msg_handler_t::n() << workers.size() << " threads, " << total_queued << " tasks queued in previous batches" << msg_handler_t::i() << ERASE_TO_EOL << std::endl;
if (queued) {
std::cout << "[SPELL-QTL] Task info: ";
std::cout << msg_handler_t::n();
std::cout << queued << " queued, " << done << " done, ";
std::cout << msg_handler_t::w();
std::cout << (100 * done / queued) << "% progress";
}
std::cout << ERASE_TO_EOL;
std::cout << msg_handler_t::n();
std::cout << std::endl << "----------------------------------------------------------";
std::cout << ERASE_TO_EOL << std::endl << ERASE_TO_EOL;
std::cout << RESTORE_CURSOR;
std::cout << std::flush;
if (done == queued) {
total_queued += queued;
queued = 0;
done = 0;
}
}
}
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false), total_queued(0), queued(0), done(0)
: stop(false), total_queued(0), queued(0), done(0), title()
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
......@@ -82,20 +98,27 @@ inline ThreadPool::ThreadPool(size_t threads)
{
while(true)
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
while(!this->stop && this->tasks.empty())
this->condition.wait(lock);
if(this->stop && this->tasks.empty())
return;
std::function<void()> task(this->tasks.front());
this->tasks.pop();
lock.unlock();
task();
++done;
display_progress();
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
while(!this->stop && this->tasks.empty()) {
this->condition.wait(lock);
}
if(this->stop && this->tasks.empty()) {
return;
}
std::function<void()> task(this->tasks.front());
this->tasks.pop();
lock.unlock();
task();
++done;
}
if (msg_handler_t::color()) {
display_progress();
}
}
}
);
msg_handler_t::hook([this] () { display_progress(); });
}
// add new work item to the pool
......
......@@ -8,6 +8,11 @@
#include <string>
#include <sstream>
enum CachingPolicy : int { Oneshot = 0, Mem = 1, Disk = 2, Sync = 4 };
constexpr CachingPolicy operator | (CachingPolicy c1, CachingPolicy c2) { return CachingPolicy(int(c1) | int(c2)); }
constexpr CachingPolicy operator & (CachingPolicy c1, CachingPolicy c2) { return CachingPolicy(int(c1) & int(c2)); }
#include "error.h"
#include "input.h"
#include "cache/md5.h"
......@@ -307,6 +312,8 @@ template <typename ValueType>
immediate_value(const ValueType& v) : m_storage(v) {}
immediate_value(ValueType&& v) : m_storage(std::forward<ValueType>(v)) {}
template <typename... Args>
immediate_value(Args... x) : m_storage(x...) {}
virtual
value_type& operator * ()
......@@ -388,25 +395,28 @@ template <typename Ret, typename... Args>
typedef Ret (*computation_function_pointer_type) (Args...);
typedef std::packaged_task<Ret(Args...)> task_type;
async_computation(computation_function_pointer_type func,
async_computation(CachingPolicy _Sync, computation_function_pointer_type func,
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage()
, m_future(active_settings->enqueue(func, *args...))
, m_future(active_settings->enqueue(_Sync, func, *args...))
/*, m_future(std::async(std::launch::async, func, *args...))*/
/*, m_future(std::async(std::launch::deferred, func, *args...))*/
, mutex()
{}
async_computation(std::function<Ret(Args...)>& func,
async_computation(CachingPolicy _Sync, std::function<Ret(Args...)>& func,
const value<typename clean_type<Args>::type>&... args)
: dependencies(args...)
, m_storage()
, m_future(active_settings->enqueue(func, *args...))
, m_future(active_settings->enqueue(_Sync, func, *args...))
/*, m_future(std::async(func, *args...))*/
, mutex()
{}
value_type& __get_noconst()
{
std::unique_lock<std::mutex> read_guard(mutex);
if (m_future.valid()) {
return m_storage = m_future.get();
}
......@@ -415,6 +425,7 @@ template <typename Ret, typename... Args>
const value_type& __get_const() const
{
std::unique_lock<std::mutex> read_guard(*const_cast<std::mutex*>(&mutex));
return const_cast<this_type*>(this)->__get_noconst();
}
......@@ -422,6 +433,7 @@ template <typename Ret, typename... Args>
std::tuple<value<typename clean_type<Args>::type>...> dependencies;
value_type m_storage;
std::future<Ret> m_future;
std::mutex mutex;
};
......@@ -429,7 +441,7 @@ template <typename Ret, typename... Args>
struct cached_computation<Ret(Args...)> {
typedef Ret value_type;
cached_computation(const std::string& name, Ret (*f) (Args...), const value<Args>&... args)
cached_computation(const std::string& name, Ret (*f) (Args...), const value<typename clean_type<Args>::type>&... args)
: m_name(name)
, m_func(f)
, m_md5_hash{.accum = compute_md5(args...), .append = append_md5(args...)}
......@@ -447,7 +459,7 @@ template <typename Ret, typename... Args>
Ret data;
std::string path = get_path();
/* if cache file found */
if (check_file(path, false, true)) {
if (check_file(path, false, true, false)) {
std::ifstream ifs(path);
cache_input ci(ifs);
std::string check;
......@@ -457,17 +469,17 @@ template <typename Ret, typename... Args>
if (check == m_md5_hash.append) {
/* if same */
/* CACHE FOUND */
MSG_INFO("Found data in cache");
/*MSG_INFO("Found data in cache");*/
ci & data;
/* return value read from file */
return data;
}
/* CACHE INVALID */
MSG_INFO("Cache is invalid. Computing data.");
/*MSG_INFO("Cache is invalid. Computing data.");*/
} else {
/* else */
/* COMPUTE AND SAVE */
MSG_INFO("Computing data.");
/*MSG_INFO("Computing data.");*/
}
/* compute value = m_func(x...) */
data = m_func(x...);
......@@ -493,9 +505,9 @@ template <typename Ret, typename... Args>
typedef Ret value_type;
typedef std::function<Ret(Args...)> computation_type;
computed_value(Ret (*func) (Args...), const value<typename clean_type<Args>::type>&... args)
computed_value(CachingPolicy _Sync, Ret (*func) (Args...), const value<typename clean_type<Args>::type>&... args)
: m_hash(compute_hash(args...))
, m_task(func, args...)
, m_task(_Sync, func, args...)
{}
virtual
......@@ -528,11 +540,11 @@ template <typename Ret, typename... Args>
typedef Ret value_type;
typedef std::function<Ret(Args...)> computation_type;
cached_computed_value(Ret (*func) (Args...), const value<Args>&... args)
cached_computed_value(CachingPolicy _Sync, Ret (*func) (Args...), const value<typename clean_type<Args>::type>&... args)
: m_hash(compute_hash(args...))
, m_comp(get_func_name(func), func, args...)
, m_comp_proxy([this](Args... x) { return m_comp(x...); })
, m_task(m_comp_proxy, args...)
, m_task(_Sync, m_comp_proxy, args...)
{}
virtual
......@@ -668,14 +680,26 @@ __get_registry()
return _reg_;
}
#if 0
static inline
std::mutex& __get_lock(void* fptr)
{
struct m_wrap { std::mutex mutex; m_wrap() : mutex() {} m_wrap(const m_wrap&) : mutex() {} };
static std::unordered_map<void*, m_wrap> _;
return _[fptr].mutex;
/*auto it = _.find(fptr);*/
/*if (it == _.end()) {*/
/*bool discard;*/
/*std::tie(it, discard) = _.insert({fptr, {}});*/
/*}*/
/*return it->second;*/
}
#endif
template <typename VT>
using clean_value_type = value<typename clean_type<VT>::type>;
enum CachingPolicy : int { oneshot = 0, mem = 1, disk = 2, mem_and_disk = 3 };
template <typename Ret, typename... Args>
struct with_disk_cache_traits {
typedef cached_computed_value<Ret(Args...)> type;
......@@ -690,10 +714,10 @@ template <int _Policy, typename Ret, typename... Args>
struct disk_cache_traits;
template <typename Ret, typename... Args>
struct disk_cache_traits<oneshot, Ret, Args...> : without_disk_cache_traits<Ret, Args...> {};
struct disk_cache_traits<Oneshot, Ret, Args...> : without_disk_cache_traits<Ret, Args...> {};
template <typename Ret, typename... Args>
struct disk_cache_traits<disk, Ret, Args...> : with_disk_cache_traits<Ret, Args...> {};
struct disk_cache_traits<Disk, Ret, Args...> : with_disk_cache_traits<Ret, Args...> {};
template <typename Ret, typename... Args>
struct with_mem_cache_traits {
......@@ -702,12 +726,14 @@ struct with_mem_cache_traits {
template <typename _Maker>
static
return_type
create(Ret (&f) (Args...), const clean_value_type<Args>&... x)
create(CachingPolicy _Sync, Ret (&f) (Args...), const clean_value_type<Args>&... x)
{
static std::mutex _;
std::unique_lock<std::mutex> lock_guard(_);
/*MSG_DEBUG("new value with mem cache");*/
return_type ret = __get_registry<Ret, Args...>().get(&f, x...);
if (!ret.valid()) {
ret = new _Maker(f, x...);
ret = new _Maker(_Sync, f, x...);
}
return ret;
}
......@@ -720,10 +746,10 @@ struct without_mem_cache_traits {
template <typename _Maker>
static
return_type
create(Ret (&f) (Args...), const clean_value_type<Args>&... x)
create(CachingPolicy _Sync, Ret (&f) (Args...), const clean_value_type<Args>&... x)
{
/*MSG_DEBUG("new value without mem cache");*/
return_type ret = new _Maker(f, x...);
return_type ret = new _Maker(_Sync, f, x...);
return ret;
}
};
......@@ -732,24 +758,35 @@ template <int _Policy, typename Ret, typename... Args>
struct mem_cache_traits;
template <typename Ret, typename... Args>
struct mem_cache_traits<oneshot, Ret, Args...> : without_mem_cache_traits<Ret, Args...> {};
struct mem_cache_traits<Oneshot, Ret, Args...> : without_mem_cache_traits<Ret, Args...> {};
template <typename Ret, typename... Args>
struct mem_cache_traits<mem, Ret, Args...> : with_mem_cache_traits<Ret, Args...> {};
struct mem_cache_traits<Mem, Ret, Args...> : with_mem_cache_traits<Ret, Args...> {};
template <CachingPolicy _Policy = oneshot, typename Ret, typename... Args>
typename mem_cache_traits<_Policy & mem, Ret, Args...>::return_type
template <CachingPolicy _Policy = Oneshot, typename Ret, typename... Args>
typename mem_cache_traits<_Policy & Mem, Ret, Args...>::return_type
make_value(Ret (&f) (Args...), const clean_value_type<Args>&... x)
{
typedef mem_cache_traits<_Policy & mem, Ret, Args...> mem_policy;
typedef disk_cache_traits<_Policy & disk, Ret, Args...> disk_policy;
return mem_policy::template create<typename disk_policy::type>(f, x...);
typedef mem_cache_traits<_Policy & Mem, Ret, Args...> mem_policy;
typedef disk_cache_traits<_Policy & Disk, Ret, Args...> disk_policy;
return mem_policy::template create<typename disk_policy::type>(_Policy & Sync, f, x...);
}
template <typename T> struct collection : std::vector<value<T>> {
using std::vector<value<T>>::vector;
using std::vector<value<T>>::operator [];
template <typename INTEGRAL_TYPE>
value<T>&
operator [] (const value<INTEGRAL_TYPE>& i)
{ return (*this)[*i]; }
template <typename INTEGRAL_TYPE>
const value<T>&
operator [] (const value<INTEGRAL_TYPE>& i) const
{ return (*this)[*i]; }
};
template <typename... X> struct tuple;
template <CachingPolicy C, typename F, typename R, typename P> struct make_coll_impl;
template <typename A, typename B> struct types_must_be_identical : std::integral_constant<bool, false> {};
......@@ -761,27 +798,27 @@ template <typename A> struct types_must_be_identical<A&, A> : std::integral_cons
template <typename A> struct types_must_be_identical<const A, A> : std::integral_constant<bool, true> {};
template <typename LA, typename LB> struct type_lists_must_be_identical;
template <> struct type_lists_must_be_identical<std::tuple<>, std::tuple<>> {};
template <> struct type_lists_must_be_identical<tuple<>, tuple<>> {};
template <typename A0, typename... A, typename B0, typename... B>
struct type_lists_must_be_identical<std::tuple<A0, A...>, std::tuple<B0, B...>>
: type_lists_must_be_identical<std::tuple<A...>, std::tuple<B...>> {
struct type_lists_must_be_identical<tuple<A0, A...>, tuple<B0, B...>>
: type_lists_must_be_identical<tuple<A...>, tuple<B...>> {
static_assert(types_must_be_identical<A0, B0>::value, "INVALID PARAMETER TYPE.");
};
template <CachingPolicy _Policy, typename Ret, typename... FuncArgs, typename... ErrArgs>
struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<>, std::tuple<ErrArgs...>> {
struct make_coll_impl<_Policy, Ret(FuncArgs...), tuple<>, tuple<ErrArgs...>> {
void operator () (collection<Ret>& coll,
Ret (&f) (FuncArgs...),
ErrArgs... errargs)
{
type_lists_must_be_identical<std::tuple<FuncArgs...>, std::tuple<typename ErrArgs::value_type...>>();
type_lists_must_be_identical<tuple<FuncArgs...>, tuple<typename ErrArgs::value_type...>>();
coll.push_back(make_value<_Policy>(f, errargs...));
}
};
template <CachingPolicy _Policy, typename Ret, typename... FuncArgs>
struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<>, std::tuple<clean_value_type<FuncArgs>...>> {
struct make_coll_impl<_Policy, Ret(FuncArgs...), tuple<>, tuple<clean_value_type<FuncArgs>...>> {
void operator () (collection<Ret>& coll,
Ret (&f) (FuncArgs...),
const value<typename clean_type<FuncArgs>::type>&... pargs)
......@@ -792,7 +829,7 @@ struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<>, std::tuple<clean_
template <CachingPolicy _Policy, typename Ret, typename... FuncArgs, typename VT, typename... ArgsRemaining, typename... PreviousArgs>
struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<value<VT>, ArgsRemaining...>, std::tuple<PreviousArgs...>> {
struct make_coll_impl<_Policy, Ret(FuncArgs...), tuple<value<VT>, ArgsRemaining...>, tuple<PreviousArgs...>> {
void operator () (collection<Ret>& coll,
Ret (&f) (FuncArgs...),
const value<VT>& car,
......@@ -802,13 +839,13 @@ struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<value<VT>, ArgsRemai
make_coll_impl<
_Policy,
Ret(FuncArgs...),
std::tuple<ArgsRemaining...>,
std::tuple<PreviousArgs..., value<VT>>>() (coll, f, cdr..., pargs..., car);
tuple<ArgsRemaining...>,
tuple<PreviousArgs..., value<VT>>>() (coll, f, cdr..., pargs..., car);
}
};
template <CachingPolicy _Policy, typename Ret, typename... FuncArgs, typename T, typename... ArgsRemaining, typename... PreviousArgs>
struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<range<T>, ArgsRemaining...>, std::tuple<PreviousArgs...>> {
struct make_coll_impl<_Policy, Ret(FuncArgs...), tuple<range<T>, ArgsRemaining...>, tuple<PreviousArgs...>> {
void operator () (collection<Ret>& coll,
Ret (&f) (FuncArgs...),
const range<T>& car,
......@@ -820,14 +857,14 @@ struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<range<T>, ArgsRemain
make_coll_impl<
_Policy,
Ret(FuncArgs...),
std::tuple<ArgsRemaining...>,
std::tuple<PreviousArgs..., vtype>>() (coll, f, cdr..., pargs..., v);
tuple<ArgsRemaining...>,
tuple<PreviousArgs..., vtype>>() (coll, f, cdr..., pargs..., v);
}
}
};
template <CachingPolicy _Policy, typename Ret, typename... FuncArgs, typename T, typename... ArgsRemaining, typename... PreviousArgs>
struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<collection<T>, ArgsRemaining...>, std::tuple<PreviousArgs...>> {
struct make_coll_impl<_Policy, Ret(FuncArgs...), tuple<collection<T>, ArgsRemaining...>, tuple<PreviousArgs...>> {
void operator () (collection<Ret>& coll,
Ret (&f) (FuncArgs...),
const collection<T>& car,
......@@ -839,23 +876,63 @@ struct make_coll_impl<_Policy, Ret(FuncArgs...), std::tuple<collection<T>, ArgsR
make_coll_impl<
_Policy,
Ret(FuncArgs...),
std::tuple<ArgsRemaining...>,
std::tuple<PreviousArgs..., vtype>>() (coll, f, cdr..., pargs..., v);
tuple<ArgsRemaining...>,
tuple<PreviousArgs..., vtype>>() (coll, f, cdr..., pargs..., v);
}
}
};
template <CachingPolicy _Policy = oneshot, typename Ret, typename... Args, typename... CollArgs>
template <CachingPolicy _Policy = Oneshot, typename Ret, typename... Args, typename... CollArgs>
collection<Ret>
make_collection(Ret (&f) (Args...), CollArgs... args)
{
collection<Ret> ret;
make_coll_impl<_Policy, Ret(Args...), std::tuple<CollArgs...>, std::tuple<>>() (ret, f, args...);
make_coll_impl<_Policy, Ret(Args...), tuple<CollArgs...>, tuple<>>() (ret, f, args...);
return ret;
}
template <typename T1, typename T2>
std::ostream& operator << (std::ostream& os, const std::pair<T1, T2>& p) { return os << p.first << ':' << p.second; }
template <typename T1, typename T2>
std::ostream& operator << (std::ostream& os, const std::map<T1, T2>& m)
{
auto i = m.begin(), j = m.end();
os << '{';
if (i != j) {
os << (*i);
for (++i; i != j; ++i) {
os << ' ' << (*i);
}
}
return os << '}';
}
template <typename T>
std::ostream&
operator << (std::ostream& os, const collection<T>& coll)
{
for (const auto& k: coll) {
os << '\t' << k << std::endl;
}
#if 0
auto i = coll.begin(), j = coll.end();
if (i == j) {
return os;
}
os << (**i);
for (++i; i != j; ++i) {
os << ' ' << (**i);
}
#endif
return os;
}
#endif
......@@ -48,7 +48,8 @@ all_observed_generation_names(population_value);
std::vector<char>
marker_observations(population_value, chromosome_value, generation_value, int);
collection<double>
/*collection<double>*/
value<std::vector<double>>
test_loci(chromosome_value);
generation_rs::segment_computer_t
......@@ -67,9 +68,34 @@ pedigree(population_value, generation_value, int);
range<int>
individual_range(population_value);
observation_vectors_type
get_observation_vectors(generation_value gen);
struct pop_mgo_data {
generation_value qtl_gen;
collection<std::string> aogn;
collection<generation_value> aog;
collection<observation_vectors_type> aov;
pop_mgo_data(population_value pop)
: qtl_gen(generation_by_name(pop->qtl_generation_name))
, aogn(all_observed_generation_names(pop))
, aog(make_collection<Sync>(generation_by_name, aogn))
, aov(make_collection<Sync>(get_observation_vectors, aog))
{
/*MSG_DEBUG("new pop_mgo_data");*/
}
};
static inline md5_digest& operator << (md5_digest& md5, const pop_mgo_data&)
{
return md5;
}
multi_generation_observations
population_marker_obs(population_value, chromosome_value, const pedigree_type&);
population_marker_obs(population_value, chromosome_value, const pedigree_type&, const pop_mgo_data*);
collection<std::string>
all_traits();
#endif
......@@ -27,6 +27,16 @@ struct bloc_builder {
int n_rows;
int n_columns;
bloc_builder&
operator = (bloc_builder&& bb)
{
elements.swap(bb.elements);
labels.swap(bb.labels);
n_rows = bb.n_rows;
n_columns = bb.n_columns;
return *this;
}