Commit e501e21b authored by Damien Leroux's avatar Damien Leroux
Browse files

No crash, no leak.

parent 00828730
#ifndef _SPEL_MARKER_OBS_BAYES_H_
#define _SPEL_MARKER_OBS_BAYES_H_
#include "error.h"
#include "generation_rs_fwd.h"
#include "bayes/graph.h"
/*#include "bayes/graph.h"*/
static inline
size_t idx_to_ofs(const std::vector<size_t>& dim, const std::vector<size_t>& idx)
......
#ifndef _SPELL_BAYES_CLI_H_
#define _SPELL_BAYES_CLI_H_
#include "error.h"
#include <map>
#include <vector>
#include "generation_rs.h"
#include "commandline.h"
enum JobDispatchScheme { JDS_None, JDS_MT, JDS_SGE, JDS_SSH };
void read_format(std::map<std::string, marker_observation_spec>& settings, const std::string& filename, std::istream& is);
#define SPELL_CASTER ((size_t) -1)
struct bn_settings_t {
/* Job control */
JobDispatchScheme scheme;
std::vector<std::string> command_line;
std::vector<std::string> ssh_hosts;
std::string qsub_opts;
std::string prg_name;
size_t n_threads;
size_t job_start;
size_t job_end;
std::string job_name;
std::string fifo_path;
/* Configuration */
std::vector<chromosome> map;
std::map<std::string, marker_observation_spec> marker_observation_specs;
std::vector<std::string> marker_observation_specs_filenames;
std::map<std::string, population_marker_observation> observed_mark;
double noise;
/* Inputs */
std::string pedigree_filename;
std::string design_filename;
bn_settings_t()
: scheme(JDS_None)
, command_line()
, ssh_hosts()
, qsub_opts()
, n_threads(0)
, job_start(SPELL_CASTER)
, job_end(SPELL_CASTER)
, job_name()
, fifo_path()
, map()
, marker_observation_specs()
, marker_observation_specs_filenames()
, observed_mark()
, noise(0.)
, pedigree_filename()
, design_filename()
{}
size_t count_markers() const
{
return observed_mark.begin()->second.observations.data.size();
}
void post_init();
bool is_master() const { return job_start == SPELL_CASTER; }
static bn_settings_t* from_args(int argc, const char** argv);
};
#endif
......@@ -1187,6 +1187,7 @@ void new_cross_prob2(size_t n_par, size_t n_al, SparseMatrix<double>& M1, Sparse
size_t idx3 = make_idx(mp2, pp1, ma2, pa1);
size_t idx4 = make_idx(mp2, pp2, ma2, pa2);
/* TODO: comment */
t1.emplace_back(idxm, idxp + dom * idx1, .25);
t1.emplace_back(idxm, idxp + dom * idx2, .25);
t1.emplace_back(idxm, idxp + dom * idx3, .25);
......
......@@ -1455,5 +1455,21 @@ MatrixXd& operator << (MatrixXd& e_mat, const printable<M>& nd_mat)
}
template <>
MatrixXd& operator << (MatrixXd& e_mat, const printable<nd_sparse_matrix>& nd_mat)
{
const nd_sparse_matrix& mat = nd_mat.to_type();
if ((unsigned)e_mat.size() != mat.size()) {
throw std::runtime_error("Incompatible matrix sizes in conversion");
}
e_mat = MatrixXd::Zero(e_mat.rows(), e_mat.cols());
double* data = e_mat.data();
for (const auto& iv: mat.m_data->m_data) {
data[iv.first] = iv.second;
}
return e_mat;
}
#endif
This diff is collapsed.
#include "bayes.h"
#include "pedigree.h"
#include "bayes/cli.h"
#define PREDICATE CONSTRAINT_PREDICATE(bn_settings_t)
#define CALLBACK_ARGS ARGUMENT_CALLBACK_ARGS(bn_settings_t)
bn_settings_t* ensure(bn_settings_t*& t)
{
if (t == NULL) {
t = new bn_settings_t();
}
return t;
}
static
argument_section_list_t<bn_settings_t>
arguments = {
{"Advanced (NOT INTENDED FOR USERS!)", "", true, {
{{"--full-help", "--advanced-help"},
{},
"Display usage.",
false,
{false},
[](CALLBACK_ARGS)
{
print_usage_impl(true, arguments);
exit(0);
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-J", "--job"},
{"name", "begin", "end"},
"Run the BP on the slice [begin;end]",
false,
{},
[](CALLBACK_ARGS)
{
ensure(target)->job_name = *++ai;
ensure(target)->job_start = to<int>(*++ai);
ensure(target)->job_end = to<int>(*++ai);
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-F", "--fifo"},
{"path"},
"Set the path to the FIFO for feedback (SGE)",
false,
{},
[](CALLBACK_ARGS)
{
ensure(target)->fifo_path = *++ai;
SAFE_IGNORE_CALLBACK_ARGS;
}},
}},
{"Miscellaneous", "", false, {
{{"-h", "--help"},
{},
"Display usage.",
false,
{false},
[](CALLBACK_ARGS)
{
print_usage_impl(false, arguments);
exit(0);
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-n", "--noise"},
{"level"},
"Set the noise level for marker observations.",
false,
{&bn_settings_t::noise},
[](CALLBACK_ARGS)
{
ensure(target)->noise = to<double>(*ai++);
SAFE_IGNORE_CALLBACK_ARGS;
}},
}},
{"Job control", "Select and configure the job control scheme", false, {
{{"-mt", "--dispatch-multithread"},
{"n_threads"},
"Use single-machine, multi-threading.",
false,
{false},
[](CALLBACK_ARGS)
{
if (ensure(target)->scheme != JDS_None) {
MSG_ERROR("Only one job control scheme should be specified.", "Specify only one job control scheme in the commandline.");
} else {
ensure(target)->scheme = JDS_MT;
}
ensure(target)->n_threads = to<int>(*++ai);
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-ssh", "--dipatch-SSH"},
{"comma-separated host list"},
"Use SSH for job dispatch.",
false,
{false},
[](CALLBACK_ARGS)
{
if (ensure(target)->scheme != JDS_None) {
MSG_ERROR("Only one job control scheme should be specified.", "Specify only one job control scheme in the commandline.");
} else {
ensure(target)->scheme = JDS_SSH;
std::string hosts(*++ai);
std::istringstream iss(hosts);
std::string host;
while (std::getline(iss, host, ',')) {
ensure(target)->ssh_hosts.emplace_back(host);
}
}
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-sge", "--dispatch-SGE"},
{"n_jobs", "qsub options"},
"Use SGE for job dispatch. Use '-' if you don't wish to provide any specific option.",
false,
{false},
[](CALLBACK_ARGS)
{
if (ensure(target)->scheme != JDS_None) {
MSG_ERROR("Only one job control scheme should be specified.", "Specify only one job control scheme in the commandline.");
} else {
ensure(target)->scheme = JDS_SGE;
}
ensure(target)->n_threads = to<int>(*++ai);
std::string opts = *++ai;
if (opts != "-") {
ensure(target)->qsub_opts = opts;
}
SAFE_IGNORE_CALLBACK_ARGS;
}},
}},
{"Inputs", "Input files and configuration of observations. You may use either a pedigree file in CSV format or a breeding design specification if you don't have pedigree data.\nThere are two essential parameters to compute the genotype probabilities: the number of ancestors and the number of observed alleles (for SNP observations). The number of ancestors is automatically computed from the given pedigree or breeding design specification. The number of alleles is computed from the marker observation specifications.", false, {
{{"-mos", "--marker-observation-spec"},
{"path"},
"Path to a marker observation specification file.",
false,
{true},
[](CALLBACK_ARGS)
{
std::string filename = *++ai;
check_file(filename, false, false);
/*ensure(target)->marker_observation_specs_filename = *++ai;*/
std::ifstream ifs(filename);
/*ensure(target)->marker_observation_specs = read_format(filename, ifs);*/
read_format(ensure(target)->marker_observation_specs, filename, ifs);
ensure(target)->marker_observation_specs_filenames.push_back(filename);
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-p", "--pedigree"},
{"path"},
"Path to the pedigree file if available. The breeding design will be infered from the pedigree data.",
false,
{},
[](CALLBACK_ARGS)
{
if (ensure(target)->design_filename != "") {
MSG_ERROR("You must specify either a pedigree or a design, but not both.", "Don't specify the breeding design if you have pedigree data available.");
}
std::string filename = *++ai;
check_file(filename, false, false);
ensure(target)->pedigree_filename = filename;
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-d", "--breeding-design"},
{"path"},
"Path to the breeding design specification file. The pedigree data will be created assuming one individual per unobserved generation.\nIf multiple generations are observed, please provide pedigree data instead.",
false,
{},
[](CALLBACK_ARGS)
{
if (ensure(target)->design_filename != "") {
MSG_ERROR("You must specify either a pedigree or a design, but not both.", "Don't specify the breeding design if you have pedigree data available.");
}
std::string filename = *++ai;
check_file(filename, false, false);
ensure(target)->pedigree_filename = filename;
SAFE_IGNORE_CALLBACK_ARGS;
}},
{{"-m", "--marker-obs"},
{"gen:format", "path"},
"Path to the marker observations file of generation 'gen' with given format.\nIf a pedigree file is specified, the generation name 'gen' refers to individuals so labelled in the pedigree data. Otherwise, it refers to the corresponding generation in the breeding design specification file.",
false,
{true},
[](CALLBACK_ARGS)
{
std::string gen_and_format = *++ai;
std::string filename(*++ai);
size_t colon = gen_and_format.find_first_of(':');
if (colon == std::string::npos) {
MSG_ERROR("No format was specified for the observations of generation " << gen_and_format << '.', "Specify observations format name along with the generation");
return;
}
std::string gen_name(gen_and_format.cbegin(), gen_and_format.cbegin() + colon);
std::string format_name(gen_and_format.cbegin() + 1 + colon, gen_and_format.cend());
/*if (!ensure(target)->design->generation[gen_name]) {*/
/*MSG_ERROR("Generation '" << gen_name << "' isn't defined.", "Specify a generation name that is defined in the breeding design XML file");*/
/*}*/
size_t colon1 = filename.find_first_of(':');
int start = -1;
int end = -1;
if (colon1 != std::string::npos) {
size_t colon2 = filename.find_first_of(':', colon1 + 1);
if (colon2 != std::string::npos) {
std::stringstream s1(filename.substr(colon1 + 1, colon2 - colon1 - 1));
std::stringstream s2(filename.substr(colon2 + 1));
s1 >> start;
s2 >> end;
} else {
std::stringstream s1(filename.substr(colon1 + 1));
s1 >> start;
end = start;
}
filename.resize(colon1);
}
if (check_file(filename, false, false)) {
std::ifstream ifs(filename);
ensure(target)->observed_mark[gen_name] = {
filename, format_name, 0,
read_data::read_marker(ifs, start, end),
{}};
}
SAFE_IGNORE_CALLBACK_ARGS;
}},
}},
};
argument_parser<bn_settings_t> arg_map(arguments);
bn_settings_t* bn_settings_t::from_args(int argc, const char** argv)
{
std::vector<const char*> args(argv + 1, argv + argc);
std::vector<const char*>::iterator ai = args.begin();
std::vector<const char*>::iterator aj = args.end();
/*prg_name(argv[0]);*/
bn_settings_t* ret = NULL;
while (ai != aj && arg_map(ret, ai, aj)) {
++ai;
}
if (ret) {
ret->prg_name = basename(argv[0]);
ret->command_line.assign(argv, argv + argc);
}
return ret;
}
void print_usage() { print_usage_impl(false, arguments); }
#include "pedigree.h"
std::vector<pedigree_item>
read_csv(const std::string& pedigree_file, char field_sep=';')
{
std::vector<pedigree_item> ret;
std::ifstream pef(pedigree_file);
if (!pef.good()) {
MSG_DEBUG("prout");
}
std::string col_name, col_id, col_p1, col_p2;
read_csv_line(pef, field_sep, col_name, col_id, col_p1, col_p2);
/*MSG_DEBUG("col_name=" << col_name << " col_id=" << col_id << " col_p1=" << col_p1 << " col_p2=" << col_p2);*/
while (!pef.eof()) {
ret.emplace_back(pef, field_sep);
if (ret.back().id == 0) {
ret.pop_back();
}
}
return ret;
}
......@@ -11,20 +11,27 @@ extern "C" {
#include<sys/types.h>
}
typedef std::function<bool(const bn_settings_t*, size_t)> job_type;
typedef std::function<size_t(const bn_settings_t*)> count_jobs_type;
extern std::map<std::string, std::pair<count_jobs_type, job_type>> job_registry;
size_t count_jobs(const std::string& name, const bn_settings_t* settings) { return job_registry.find(name)->second.first(settings); }
job_type find_job(const std::string& name) { return job_registry.find(name)->second.second; }
struct worker_base {
size_t m_slice_begin, m_slice_end;
const bn_settings_t* m_settings;
job_type run_one;
worker_base(const bn_settings_t* settings, size_t slice_begin, size_t slice_end)
: m_slice_begin(slice_begin), m_slice_end(slice_end), m_settings(settings)
worker_base(const bn_settings_t* settings, size_t slice_begin, size_t slice_end, const std::string& job)
: m_slice_begin(slice_begin), m_slice_end(slice_end), m_settings(settings), run_one(find_job(job))
{}
bool run_one(size_t i);
void run_slice()
{
for (size_t i = m_slice_begin; i <= m_slice_end; ++i) {
notify_one(i, run_one(i));
notify_one(i, run_one(m_settings, i));
}
}
......@@ -34,7 +41,6 @@ struct worker_base {
};
struct worker_local_ssh : public worker_base {
using worker_base::worker_base;
void notify_one(size_t i, bool ok)
......@@ -53,6 +59,8 @@ struct worker_local_sge : public worker_base {
FILE* fifo = fopen(m_settings->fifo_path.c_str(), "w");
std::string msg = MESSAGE((ok ? "#S " : "#F ") << i << std::endl);
fwrite(msg.c_str(), msg.size(), 1, fifo);
fflush(fifo);
fclose(fifo);
}
void join() {}
};
......@@ -63,18 +71,20 @@ struct master_base {
bn_settings_t* m_settings;
std::vector<std::shared_ptr<worker_base>> m_workers;
size_t m_n_jobs;
master_base(bn_settings_t* settings, size_t n_jobs)
std::string job_name;
master_base(bn_settings_t* settings, size_t n_jobs, const std::string& job)
: m_settings(settings)
, m_workers()
, m_n_jobs(n_jobs)
, job_name(job)
{}
void run()
{
double step;
size_t total = m_settings->count_markers();
size_t total = count_jobs(job_name, m_settings);
size_t n_slots;
MSG_DEBUG("Have " << m_n_jobs << " slots for " << total << " jobs to do.");
/*MSG_DEBUG("Have " << m_n_jobs << " slots for " << total << " jobs to do.");*/
if (m_n_jobs >= total) {
n_slots = total;
step = 0;
......@@ -82,7 +92,7 @@ struct master_base {
n_slots = m_n_jobs;
step = total / (double) n_slots;
}
MSG_DEBUG("step=" << step);
/*MSG_DEBUG("step=" << step);*/
m_workers.reserve(n_slots);
......@@ -92,13 +102,13 @@ struct master_base {
if (end >= total) {
end = total - 1;
}
MSG_DEBUG("Spawning worker for " << ((size_t) start) << ':' << end);
m_workers.emplace_back(spawn_worker(i, (size_t) start, end));
/*MSG_DEBUG("Spawning worker for " << ((size_t) start) << ':' << end);*/
m_workers.emplace_back(spawn_worker(i, (size_t) start, end, job_name));
start += step + 1;
}
}
virtual std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end) = 0;
virtual std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job) = 0;
virtual void wait_for_jobs() = 0;
};
......@@ -111,11 +121,11 @@ struct master_thread;
struct worker_thread;
struct master_thread : public master_base {
master_thread(bn_settings_t* settings)
: master_base(settings, settings->n_threads)
master_thread(bn_settings_t* settings, const std::string& job)
: master_base(settings, settings->n_threads, job)
{}
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
void wait_for_jobs();
......@@ -128,8 +138,8 @@ struct master_thread : public master_base {
struct worker_thread : public worker_base {
std::thread m_thread;
master_thread* m_master;
worker_thread(master_thread* master, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
worker_thread(master_thread* master, size_t slice_begin, size_t slice_end, const std::string& job)
: worker_base(master->m_settings, slice_begin, slice_end, job)
, m_thread([&] () { run_slice(); })
, m_master(master)
{}
......@@ -152,9 +162,9 @@ void master_thread::wait_for_jobs()
}
std::shared_ptr<worker_base> master_thread::spawn_worker(size_t, size_t slice_start ,size_t slice_end)
std::shared_ptr<worker_base> master_thread::spawn_worker(size_t, size_t slice_start ,size_t slice_end, const std::string& job)
{
return std::make_shared<worker_thread>(this, slice_start, slice_end);
return std::make_shared<worker_thread>(this, slice_start, slice_end, job);
}
......@@ -166,11 +176,11 @@ struct master_ssh;
struct worker_ssh;
struct master_ssh : public master_base {
master_ssh(bn_settings_t* settings)
: master_base(settings, settings->ssh_hosts.size())
master_ssh(bn_settings_t* settings, const std::string& job)
: master_base(settings, settings->ssh_hosts.size(), job)
{}
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
void wait_for_jobs();
......@@ -183,11 +193,13 @@ struct master_ssh : public master_base {
struct worker_ssh : public worker_base {
size_t m_host;
master_ssh* m_master;
std::string m_job_name;
std::thread m_thread;
worker_ssh(master_ssh* master, size_t i, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
worker_ssh(master_ssh* master, size_t i, size_t slice_begin, size_t slice_end, const std::string& job)
: worker_base(master->m_settings, slice_begin, slice_end, job)
, m_host(i)
, m_master(master)
, m_job_name(job)
, m_thread([this] () { run_and_monitor(); })
{}
......@@ -208,7 +220,7 @@ struct worker_ssh : public worker_base {
for (const std::string& a: m_settings->command_line) {
cmd << ' ' << a;
}
cmd << " -J " << m_slice_begin << ' ' << m_slice_end;
cmd << " -J " << m_job_name << ' ' << m_slice_begin << ' ' << m_slice_end;
MSG_DEBUG("RUNNING " << cmd.str());
FILE* remote = popen(cmd.str().c_str(), "r");
char buf[64];
......@@ -228,9 +240,9 @@ void master_ssh::wait_for_jobs()
}
std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_start ,size_t slice_end)
std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job)
{
return std::make_shared<worker_ssh>(this, i, slice_start, slice_end);
return std::make_shared<worker_ssh>(this, i, slice_start, slice_end, job);
}
......@@ -243,18 +255,18 @@ struct master_sge;
struct worker_sge;
struct master_sge : public master_base {
static int fifo;
static FILE* fifo;
std::mutex wait;
size_t jobs_done;
size_t jobs_total;
master_sge(bn_settings_t* settings)
: master_base(settings, settings->n_threads)
master_sge(bn_settings_t* settings, const std::string& job)
: master_base(settings, settings->n_threads, job)
, jobs_done(0)
, jobs_total(settings->count_markers())
{ wait.lock(); }
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
void wait_for_jobs() { wait.lock(); }
......@@ -271,10 +283,12 @@ struct master_sge : public master_base {
struct worker_sge : public worker_base {
size_t m_host;
master_sge* m_master;
worker_sge(master_sge* master, size_t i, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
std::string m_job_name;