Commit cb6fbccb authored by Damien Leroux's avatar Damien Leroux
Browse files

Job dispatch implemented. MT, SSH, SGE.

parent 93bb1c9c
#include <atomic>
#include <thread>
#include <cstdio>
#include "error.h"
#include "cli.h"
extern "C" {
#include <sys/stat.h>
#include <fcntl.h>
#include<signal.h>
#include<sys/types.h>
}
struct worker_base {
size_t m_slice_begin, m_slice_end;
const bn_settings_t* m_settings;
worker_base(const bn_settings_t* settings, size_t slice_begin, size_t slice_end)
: m_slice_begin(slice_begin), m_slice_end(slice_end), m_settings(settings)
{}
bool run_one(size_t i);
void run_slice()
{
for (size_t i = m_slice_begin; i <= m_slice_end; ++i) {
notify_one(i, run_one(i));
}
}
/* middleware */
virtual void notify_one(size_t, bool) = 0;
virtual void join() = 0;
};
struct worker_local_ssh : public worker_base {
using worker_base::worker_base;
void notify_one(size_t i, bool ok)
{
MSG_DEBUG((ok ? "#S " : "#F ") << i);
MSG_ERROR("prout", "");
}
void join() {}
};
struct worker_local_sge : public worker_base {
using worker_base::worker_base;
void notify_one(size_t i, bool ok)
{
FILE* fifo = fopen(m_settings->fifo_path.c_str(), "w");
std::string msg = MESSAGE((ok ? "#S " : "#F ") << i << std::endl);
fwrite(msg.c_str(), msg.size(), 1, fifo);
}
void join() {}
};
struct master_base {
bn_settings_t* m_settings;
std::vector<std::shared_ptr<worker_base>> m_workers;
size_t m_n_jobs;
master_base(bn_settings_t* settings, size_t n_jobs)
: m_settings(settings)
, m_workers()
, m_n_jobs(n_jobs)
{}
void run()
{
double step;
size_t total = m_settings->count_markers();
size_t n_slots;
MSG_DEBUG("Have " << m_n_jobs << " slots for " << total << " jobs to do.");
if (m_n_jobs >= total) {
n_slots = total;
step = 0;
} else {
n_slots = m_n_jobs;
step = total / (double) n_slots;
}
MSG_DEBUG("step=" << step);
m_workers.reserve(n_slots);
double start = 0;
for (size_t i = 0; i < n_slots; ++i) {
size_t end = (size_t) (start + step);
if (end >= total) {
end = total - 1;
}
MSG_DEBUG("Spawning worker for " << ((size_t) start) << ':' << end);
m_workers.emplace_back(spawn_worker(i, (size_t) start, end));
start += step + 1;
}
}
virtual std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end) = 0;
virtual void wait_for_jobs() = 0;
};
/**************************************
* MULTI-THREADING
*/
struct master_thread;
struct worker_thread;
struct master_thread : public master_base {
master_thread(bn_settings_t* settings)
: master_base(settings, settings->n_threads)
{}
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
void wait_for_jobs();
void notify(size_t i, bool ok)
{
MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
}
};
struct worker_thread : public worker_base {
std::thread m_thread;
master_thread* m_master;
worker_thread(master_thread* master, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
, m_thread([&] () { run_slice(); })
, m_master(master)
{}
void notify_one(size_t i, bool ok)
{
m_master->notify(i, ok);
}
void join()
{
m_thread.join();
}
};
void master_thread::wait_for_jobs()
{
for (auto& t: m_workers) { t->join(); }
}
std::shared_ptr<worker_base> master_thread::spawn_worker(size_t, size_t slice_start ,size_t slice_end)
{
return std::make_shared<worker_thread>(this, slice_start, slice_end);
}
/**************************************
* SSH
*/
struct master_ssh;
struct worker_ssh;
struct master_ssh : public master_base {
master_ssh(bn_settings_t* settings)
: master_base(settings, settings->ssh_hosts.size())
{}
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
void wait_for_jobs();
void notify(size_t i, bool ok)
{
MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
}
};
struct worker_ssh : public worker_base {
size_t m_host;
master_ssh* m_master;
std::thread m_thread;
worker_ssh(master_ssh* master, size_t i, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
, m_host(i)
, m_master(master)
, m_thread([this] () { run_and_monitor(); })
{}
void notify_one(size_t i, bool ok)
{
m_master->notify(i, ok);
}
void join()
{
m_thread.join();
}
void run_and_monitor()
{
std::stringstream cmd;
cmd << "ssh " << m_settings->ssh_hosts[m_host];
for (const std::string& a: m_settings->command_line) {
cmd << ' ' << a;
}
cmd << " -J " << m_slice_begin << ' ' << m_slice_end;
MSG_DEBUG("RUNNING " << cmd.str());
FILE* remote = popen(cmd.str().c_str(), "r");
char buf[64];
while (!feof(remote)) {
char* s = fgets(buf, (sizeof buf) - 1, remote);
if (s && s[0] == '#' && (s[1] == 'S' || s[1] == 'F')) {
notify_one(atoi(s + 3), s[1] == 'S');
}
}
}
};
void master_ssh::wait_for_jobs()
{
for (auto& t: m_workers) { t->join(); }
}
std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_start ,size_t slice_end)
{
return std::make_shared<worker_ssh>(this, i, slice_start, slice_end);
}
/**************************************
* SGE
*/
struct master_sge;
struct worker_sge;
struct master_sge : public master_base {
static int fifo;
std::mutex wait;
size_t jobs_done;
size_t jobs_total;
master_sge(bn_settings_t* settings)
: master_base(settings, settings->n_threads)
, jobs_done(0)
, jobs_total(settings->count_markers())
{ wait.lock(); }
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end);
void wait_for_jobs() { wait.lock(); }
void notify(size_t i, bool ok)
{
++jobs_done;
MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
if (jobs_done == jobs_total) {
wait.unlock();
}
}
};
struct worker_sge : public worker_base {
size_t m_host;
master_sge* m_master;
worker_sge(master_sge* master, size_t i, size_t slice_begin, size_t slice_end)
: worker_base(master->m_settings, slice_begin, slice_end)
, m_host(i)
, m_master(master)
{ schedule(); }
void notify_one(size_t i, bool ok)
{
m_master->notify(i, ok);
}
void join() {}
void schedule()
{
char wdbuf[PATH_MAX];
const char* wd = getcwd(wdbuf, PATH_MAX);
std::stringstream cmd;
cmd << "cd " << wd << " &&";
for (const std::string& a: m_settings->command_line) {
cmd << ' ' << a;
}
cmd << " -J " << m_slice_begin << ' ' << m_slice_end << " -F " << m_settings->fifo_path;
MSG_DEBUG("SCHEDULING JOB " << cmd.str());
FILE* remote = popen("/usr/bin/env qsub", "w");
const char* cmdstr = cmd.str().c_str();
fwrite(cmdstr, cmd.str().size(), 1, remote);
fclose(remote);
}
};
std::shared_ptr<worker_base> master_sge::spawn_worker(size_t i, size_t slice_start ,size_t slice_end)
{
return std::make_shared<worker_sge>(this, i, slice_start, slice_end);
}
bool worker_base::run_one(size_t i)
{
MSG_DEBUG("running #" << i);
return true;
}
master_sge* global_msge;
int master_sge::fifo = -1;
extern "C" {
void reader(int)
{
char buf[512];
int n;
while((n = read(master_sge::fifo, buf, sizeof buf)) > 0){
buf[511] = '\0';
if (buf[0] == '#') {
if (buf[1] == 'F' || buf[1] == 'S') {
global_msge->notify(atoi(buf + 3), buf[1] == 'S');
}
}
}
}
} // extern "C"
int main(int argc, const char** argv)
{
bn_settings_t* settings = bn_settings_t::from_args(argc, argv);
if (settings) {
if (settings->job_start == -1) {
if (settings->scheme == JDS_MT) {
master_thread mt(settings);
mt.run();
mt.wait_for_jobs();
} else if (settings->scheme == JDS_SSH) {
master_ssh mssh(settings);
mssh.run();
mssh.wait_for_jobs();
} else if (settings->scheme == JDS_SGE) {
char wdbuf[PATH_MAX];
const char* wd = getcwd(wdbuf, PATH_MAX);
settings->fifo_path = MESSAGE(wd << "spell_bayes_fifo_" << getpid());
signal(SIGIO, reader);
mknod(settings->fifo_path.c_str(), 0666 | S_IFIFO,0);
master_sge::fifo = open(settings->fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
fcntl(master_sge::fifo, F_SETOWN, getpid());
fcntl(master_sge::fifo, F_SETFL, O_ASYNC);
master_sge msge(settings);
global_msge = &msge;
msge.run();
msge.wait_for_jobs();
}
} else {
if (settings->scheme == JDS_SSH) {
worker_local_ssh wl(settings, settings->job_start, settings->job_end);
wl.run_slice();
} else if (settings->scheme == JDS_SGE) {
worker_local_sge wl(settings, settings->job_start, settings->job_end);
wl.run_slice();
}
}
}
return 0;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment