Commit 79fc7b44 authored by damien's avatar damien
Browse files

Replaced FIFO with TCP server/client.

FIFO can't work across multiple machines. It was a stupid design idea.
parent d4bb818a
This diff is collapsed.
......@@ -21,8 +21,6 @@ project(spell_qtl)
#INCLUDE(pandocology)
find_package(Boost 1.55.0 REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
set(CMAKE_CONFIGURATION_TYPES Debug Release CACHE TYPE INTERNAL FORCE)
set(CMAKE_VERBOSE_MAKEFILE ON)
......@@ -68,8 +66,9 @@ find_path(X2C_INCLUDE_DIR x2c/x2c.h HINTS /usr/include/ /usr/local/include/ /hom
find_path(EIGEN_INCLUDE_DIR Eigen/Eigen HINTS /usr/include /usr/local/include /usr/include/eigen3/ /usr/local/include/eigen3/ /home/daleroux/include/eigen3/)
include_directories(AFTER 3rd-party/ThreadPool)
include_directories(AFTER include/ include/input/ include/bayes/ ${EIGEN_INCLUDE_DIR})
include_directories(SYSTEM ${EXPAT_INCLUDE_DIR} ${X2C_INCLUDE_DIR})
include_directories(AFTER ${CMAKE_SOURCE_DIR}/include/ ${CMAKE_SOURCE_DIR}/include/input/ ${CMAKE_SOURCE_DIR}/include/bayes/ ${EIGEN_INCLUDE_DIR})
include_directories(SYSTEM /usr/include ${Boost_INCLUDE_DIRS} ${EXPAT_INCLUDE_DIR} ${X2C_INCLUDE_DIR})
include_directories(AFTER /usr/include ${Boost_INCLUDE_DIRS} ${EXPAT_INCLUDE_DIR} ${X2C_INCLUDE_DIR})
set(SPELL_PEDIGREE_SRC
src/static_data.cc
......
/* Spell-QTL Software suite for the QTL analysis of modern datasets.
* Copyright (C) 2016,2017 Damien Leroux <damien.leroux@inra.fr>, Sylvain Jasson <sylvain.jasson@inra.fr>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef SPELL_QTL_INET_H
#define SPELL_QTL_INET_H
extern "C" {
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
};
#define MAXMSG 512
struct inet_server {
uint16_t port;
int sock;
struct sockaddr_in name;
std::string hostname;
std::thread thread;
std::atomic<int> running;
std::condition_variable is_running_cv;
std::mutex is_running_mutex;
int
make_socket (uint16_t port)
{
int _sock;
/* Create the socket. */
_sock = socket (PF_INET, SOCK_STREAM, 0);
if (_sock < 0)
{
MSG_ERROR("Couldn't create socket: " << strerror(errno), "");
return -1;
}
/* Give the socket a name. */
name.sin_family = AF_INET;
name.sin_port = htons (port);
name.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (_sock, (struct sockaddr *) &name, sizeof (name)) < 0)
{
return -1;
}
return _sock;
}
std::string
address() const
{
return MESSAGE(hostname << ':' << port);
}
inet_server(uint16_t start_port, uint16_t end_port)
: thread(), running(false), is_running_cv(), is_running_mutex()
{
port = (uint16_t) (start_port - 1);
do {
++port;
sock = make_socket(port);
} while (sock == -1 && port <= end_port);
char buffer[HOST_NAME_MAX];
gethostname(buffer, HOST_NAME_MAX);
buffer[HOST_NAME_MAX - 1] = 0;
hostname = buffer;
thread = std::thread([this]() { main(); });
}
~inet_server()
{
running = false;
thread.join();
close(sock);
}
virtual bool on_client_message(const std::string& msg) = 0;
virtual bool on_client_error(int) = 0;
virtual void on_start() = 0;
virtual void on_stop() = 0;
int
read_from_client (int filedes)
{
char buffer[MAXMSG];
ssize_t nbytes;
nbytes = read (filedes, buffer, MAXMSG);
if (nbytes < 0) {
/* Read error. */
on_client_error(errno);
}
else if (nbytes == 0) {
/* End-of-file. */
return -1;
}
/* Data read. */
running.fetch_and(on_client_message(buffer), std::memory_order::memory_order_relaxed);
return 0;
}
void
wait_for_server_to_run()
{
std::unique_lock<std::mutex> lk(is_running_mutex);
is_running_cv.wait(lk, [this] () -> bool { return running; });
}
void
main (void)
{
fd_set active_fd_set, read_fd_set;
int i;
struct sockaddr_in clientname;
socklen_t size;
/* Create the socket and set it up to accept connections. */
if (listen(sock, 1) < 0)
{
MSG_ERROR("Can't listen: " << strerror(errno), "");
return;
}
/* Initialize the set of active sockets. */
FD_ZERO (&active_fd_set);
FD_SET (sock, &active_fd_set);
{
std::lock_guard<std::mutex> guard(is_running_mutex);
running = true;
on_start();
is_running_cv.notify_all();
}
while (running)
{
/* Block until input arrives on one or more active sockets. */
read_fd_set = active_fd_set;
if (select (FD_SETSIZE, &read_fd_set, NULL, NULL, NULL) < 0)
{
perror ("select");
exit (EXIT_FAILURE);
}
/* Service all the sockets with input pending. */
for (i = 0; i < FD_SETSIZE; ++i)
if (FD_ISSET (i, &read_fd_set))
{
if (i == sock)
{
/* Connection request on original socket. */
int newcli;
size = sizeof (clientname);
newcli = accept (sock,
(struct sockaddr *) &clientname,
&size);
if (newcli < 0)
{
perror ("accept");
exit (EXIT_FAILURE);
}
FD_SET (newcli, &active_fd_set);
}
else
{
/* Data arriving on an already-connected socket. */
if (read_from_client (i) < 0)
{
close (i);
FD_CLR (i, &active_fd_set);
}
}
}
}
on_stop();
}
};
struct inet_client {
struct sockaddr_in name;
inet_client(const std::string &uri)
{
init_sockaddr(uri);
}
void
init_sockaddr (const char *hostname,
uint16_t port)
{
struct hostent *hostinfo;
name.sin_family = AF_INET;
name.sin_port = htons (port);
hostinfo = gethostbyname (hostname);
if (hostinfo == NULL)
{
fprintf (stderr, "Unknown host %s.\n", hostname);
exit (EXIT_FAILURE);
}
name.sin_addr = *(struct in_addr *) hostinfo->h_addr;
}
void
init_sockaddr(const std::string& uri)
{
size_t ofs = uri.find(':');
std::string hostname(uri.begin(), uri.begin() + ofs);
std::string portstr(uri.begin() + ofs + 1, uri.end());
uint16_t port = atoi(portstr.c_str());
init_sockaddr(hostname.c_str(), port);
}
void
connect_and_send(const std::string& s)
{
int sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock < 0) {
MSG_ERROR("Couldn't create socket: " << strerror(errno), "");
return;
}
if (connect(sock, (struct sockaddr*) &name, sizeof(name)) < 0) {
MSG_ERROR("Couldn't connect to server: " << strerror(errno), "");
}
write(sock, s.c_str(), s.size());
close(sock);
}
};
#endif //SPELL_QTL_INET_H
......@@ -20,6 +20,7 @@
#include <cstdio>
#include "error.h"
#include "dispatch.h"
#include "inet.h"
extern "C" {
#include <sys/stat.h>
......@@ -71,20 +72,20 @@ struct worker_local_sge : public worker_base {
{
MSG_INFO("Opening FIFO for notification. " << m_settings->fifo_path.c_str());
MSG_QUEUE_FLUSH();
int fifo = open(m_settings->fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
MSG_INFO("Sending notification.");
MSG_QUEUE_FLUSH();
inet_client client(m_settings->fifo_path);
if (errno) {
MSG_ERROR("An error occurred while trying to notify the master process: " << strerror(errno), "");
MSG_ERROR("An error occurred while trying to connect to the master process: " << strerror(errno), "");
MSG_QUEUE_FLUSH();
} else {
MSG_INFO("Sending notification.");
MSG_QUEUE_FLUSH();
std::string msg = MESSAGE((ok ? "\fS " : "\fF ") << i << std::endl);
write(fifo, msg.c_str(), msg.size());
client.connect_and_send(msg);
if (errno) {
MSG_ERROR("An error occurred while trying to notify the master process: " << strerror(errno), "");
MSG_QUEUE_FLUSH();
}
close(fifo);
}
/*
FILE* fifo = fopen(m_settings->fifo_path.c_str(), "a");
......@@ -368,17 +369,16 @@ std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_sta
struct master_sge;
struct worker_sge;
struct master_sge : public master_base {
static FILE* fifo;
struct master_sge : public master_base, inet_server {
std::mutex wait;
size_t jobs_done;
size_t jobs_total;
master_sge(bn_settings_t* settings, const std::string& job)
: master_base(settings, settings->n_threads, job)
: master_base(settings, settings->n_threads, job), inet_server(56000, 65535)
, jobs_done(0)
, jobs_total(count_jobs(job, settings))
{ wait.lock(); }
{ wait_for_server_to_run(); }
std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
......@@ -393,6 +393,24 @@ struct master_sge : public master_base {
wait.unlock();
}
}
bool on_client_message(const std::string& buf) override
{
if (buf.size() >= 3) {
if (buf[0] == '\f') {
if (buf[1] == 'F' || buf[1] == 'S') {
notify(atoi(buf.c_str() + 3), buf[1] == 'S');
}
}
}
return jobs_done < jobs_total;
}
bool on_client_error(int) override { return jobs_done < jobs_total; }
void on_start() override { wait.lock(); }
void on_stop() override { wait.unlock(); }
};
struct worker_sge : public worker_base {
......@@ -440,34 +458,6 @@ std::shared_ptr<worker_base> master_sge::spawn_worker(size_t i, size_t slice_sta
}
master_sge* global_msge;
FILE* master_sge::fifo = NULL;
extern "C" {
void reader(int)
{
#define BUFSZ 64
char buf[BUFSZ];
MSG_DEBUG("entering reader");
/*while((n = read(master_sge::fifo, buf, sizeof buf)) > 0){*/
while(fgets(buf, sizeof buf - 1, master_sge::fifo) != NULL) {
buf[BUFSZ - 1] = '\0';
MSG_DEBUG("read " << buf);
if (buf[0] == '\f') {
if (buf[1] == 'F' || buf[1] == 'S') {
global_msge->notify(atoi(buf + 3), buf[1] == 'S');
}
}
}
MSG_DEBUG("exiting reader");
}
} // extern "C"
bool do_the_job(bn_settings_t* settings, std::string job)
{
bool all_good = false;
......@@ -477,44 +467,21 @@ bool do_the_job(bn_settings_t* settings, std::string job)
master_thread mt(settings, job);
mt.run();
mt.wait_for_jobs();
all_good = mt.all_good();
} else if (settings->scheme == JDS_SSH) {
master_ssh mssh(settings, job);
mssh.run();
mssh.wait_for_jobs();
all_good = mssh.all_good();
} else if (settings->scheme == JDS_SGE) {
char wdbuf[PATH_MAX];
const char* wd = getcwd(wdbuf, PATH_MAX);
settings->fifo_path = MESSAGE(wd << "/spell_bayes_fifo_" << getpid());
sighandler_t old_sigio = signal(SIGIO, reader);
mknod(settings->fifo_path.c_str(), 0666 | S_IFIFO,0);
int fd = open(settings->fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
MSG_DEBUG("fd=" << fd);
fcntl(fd, F_SETOWN, getpid());
fcntl(fd, F_SETFL, O_ASYNC);
master_sge::fifo = fdopen(fd, "r");
MSG_DEBUG("fifo=" << master_sge::fifo);
master_sge msge(settings, job);
global_msge = &msge;
msge.run();
msge.wait_for_jobs();
fflush(master_sge::fifo);
unlink(settings->fifo_path.c_str());
signal(SIGIO, old_sigio);
fclose(master_sge::fifo);
all_good = msge.all_good();
} else {
master_none mnone(settings, job);
mnone.run();
mnone.wait_for_jobs();
all_good = mnone.all_good();
}
if (!all_good) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment