dispatch.cc 15.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Spell-QTL  Software suite for the QTL analysis of modern datasets.
 * Copyright (C) 2016,2017  Damien Leroux <damien.leroux@inra.fr>, Sylvain Jasson <sylvain.jasson@inra.fr>
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

18
19
20
#include <atomic>
#include <thread>
#include <cstdio>
21
22
#include "eigen.h"
#include "input.h"
23
#include "dispatch.h"
damien's avatar
damien committed
24
#include "inet.h"
25
26
27
28
29
30
31
32

extern "C" {
#include <sys/stat.h> 
#include <fcntl.h>
#include<signal.h>
#include<sys/types.h>    
}

33
size_t count_jobs(const std::string& name, bn_settings_t* settings) { return job_registry.find(name)->second.first(settings); }
Damien Leroux's avatar
Damien Leroux committed
34
35
job_type find_job(const std::string& name) { return job_registry.find(name)->second.second; }

36
37
struct worker_base {
    size_t m_slice_begin, m_slice_end;
38
    bn_settings_t* m_settings;
Damien Leroux's avatar
Damien Leroux committed
39
    job_type run_one;
40

41
    worker_base(bn_settings_t* settings, size_t slice_begin, size_t slice_end, const std::string& job)
Damien Leroux's avatar
Damien Leroux committed
42
        : m_slice_begin(slice_begin), m_slice_end(slice_end), m_settings(settings), run_one(find_job(job))
43
44
45
46
    {}

    void run_slice()
    {
Damien Leroux's avatar
Damien Leroux committed
47
        /*MSG_INFO("RUN SLICE " << m_slice_begin << ':' << m_slice_end);*/
48
        for (size_t i = m_slice_begin; i <= m_slice_end; ++i) {
49
50
            auto ret = run_one(m_settings, i);
            notify_one(i, ret);
51
52
53
54
55
56
57
58
59
60
61
62
63
        }
    }

    /* middleware */
    virtual void notify_one(size_t, bool) = 0;
    virtual void join() = 0;
};


struct worker_local_ssh : public worker_base {
    using worker_base::worker_base;
    void notify_one(size_t i, bool ok)
    {
64
        msg_handler_t::cout() << (ok ? "\fS " : "\fF ") << i << std::endl;
65
66
67
68
69
70
71
72
73
    }
    void join() {}
};


struct worker_local_sge : public worker_base {
    using worker_base::worker_base;
    void notify_one(size_t i, bool ok)
    {
74
75
        MSG_INFO("Opening FIFO for notification. " << m_settings->fifo_path.c_str());
        MSG_QUEUE_FLUSH();
damien's avatar
damien committed
76
77
78
79
        MSG_INFO("Sending notification.");
        MSG_QUEUE_FLUSH();

        inet_client client(m_settings->fifo_path);
80
        if (errno) {
damien's avatar
damien committed
81
            MSG_ERROR("An error occurred while trying to connect to the master process: " << strerror(errno), "");
82
83
            MSG_QUEUE_FLUSH();
        } else {
84
            std::string msg = SPELL_STRING((ok ? "\fS " : "\fF ") << i << std::endl);
damien's avatar
damien committed
85
            client.connect_and_send(msg);
86
87
88
89
90
91
92
93
94
95
            if (errno) {
                MSG_ERROR("An error occurred while trying to notify the master process: " << strerror(errno), "");
                MSG_QUEUE_FLUSH();
            }
        }
/*
        FILE* fifo = fopen(m_settings->fifo_path.c_str(), "a");
        if (!fifo || errno) {
            MSG_ERROR("An error occurred while trying to notify the master process: " << strerror(errno), "");
        }
96
        std::string msg = SPELL_STRING((ok ? "\fS " : "\fF ") << i << std::endl);
97
        fwrite(msg.c_str(), msg.size(), 1, fifo);
98
99
100
        if (ferror(fifo)) {
            MSG_ERROR("An error occurred while trying to notify the master process: " << strerror(errno), "");
        }
Damien Leroux's avatar
Damien Leroux committed
101
102
        fflush(fifo);
        fclose(fifo);
103
104
*/
        MSG_INFO("Done notifying.");
105
106
107
108
109
110
111
112
113
114
    }
    void join() {}
};



struct master_base {
    bn_settings_t* m_settings;
    std::vector<std::shared_ptr<worker_base>> m_workers;
    size_t m_n_jobs;
Damien Leroux's avatar
Damien Leroux committed
115
116
117
118
    size_t m_job_total;
    std::atomic<size_t> m_job_done_counter;
    std::atomic<size_t> m_job_ok_counter;
    std::atomic<size_t> m_job_failed_counter;
Damien Leroux's avatar
Damien Leroux committed
119
120
    std::string job_name;
    master_base(bn_settings_t* settings, size_t n_jobs, const std::string& job)
121
122
123
        : m_settings(settings)
        , m_workers()
        , m_n_jobs(n_jobs)
Damien Leroux's avatar
Damien Leroux committed
124
125
126
127
        , m_job_total(0)
        , m_job_done_counter(0)
        , m_job_ok_counter(0)
        , m_job_failed_counter(0)
Damien Leroux's avatar
Damien Leroux committed
128
        , job_name(job)
129
130
131
132
133
    {}

    void run()
    {
        double step;
Damien Leroux's avatar
Damien Leroux committed
134
        m_job_total = count_jobs(job_name, m_settings);
135
        size_t n_slots;
Damien Leroux's avatar
Damien Leroux committed
136
137
138
        MSG_DEBUG("Have " << m_n_jobs << " slots for " << m_job_total << " jobs to do.");
        if (m_n_jobs >= m_job_total) {
            n_slots = m_job_total;
139
140
141
            step = 0;
        } else {
            n_slots = m_n_jobs;
Damien Leroux's avatar
Damien Leroux committed
142
            step = m_job_total / (double) n_slots;
143
        }
Damien Leroux's avatar
Damien Leroux committed
144
        /*MSG_DEBUG("step=" << step);*/
145

146
        m_workers.clear();
147
148
149
150
151
        m_workers.reserve(n_slots);

        double start = 0;
        for (size_t i = 0; i < n_slots; ++i) {
            size_t end = (size_t) (start + step);
Damien Leroux's avatar
Damien Leroux committed
152
153
            if (end >= m_job_total) {
                end = m_job_total - 1;
154
            }
Damien Leroux's avatar
Damien Leroux committed
155
            /*MSG_DEBUG("Spawning worker for " << ((size_t) start) << ':' << end);*/
Damien Leroux's avatar
Damien Leroux committed
156
            m_workers.emplace_back(spawn_worker(i, (size_t) start, end, job_name));
157
158
159
160
            start += step + 1;
        }
    }

161
162
    bool all_good() const { return m_job_failed_counter == 0; }

Damien Leroux's avatar
Damien Leroux committed
163
    virtual std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job) = 0;
164
    virtual void wait_for_jobs() = 0;
Damien Leroux's avatar
Damien Leroux committed
165
166
167
168
169
170
171
172
173

    void update_job(bool successful)
    {
        if (successful) {
            ++m_job_ok_counter;
        } else {
            ++m_job_failed_counter;
        }
        ++m_job_done_counter;
174
        msg_handler_t::cout() << "\033[100D\033[KProgress: " << YELLOW << m_job_done_counter << NORMAL << '/' << YELLOW << m_job_total << NORMAL << " (" << m_job_ok_counter << " good, " << m_job_failed_counter << " failed)" << std::flush;
Damien Leroux's avatar
Damien Leroux committed
175
    }
176
177
178
};


179
180
181
182
183
184
185
186
187
188
189
190
191
192
/**************************************
 * MULTI-THREADING
 */

struct master_none;
struct worker_none;

struct master_none : public master_base {
    master_none(bn_settings_t* settings, const std::string& job)
        : master_base(settings, 1, job)
    {}

    void wait_for_jobs();

193
    void notify(size_t /*i*/, bool ok)
194
    {
Damien Leroux's avatar
Damien Leroux committed
195
        update_job(ok);
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
    }

    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
};


struct worker_none : public worker_base {
    master_none* m_master;
    worker_none(master_none* master, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
        , m_master(master)
    {}

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join() { run_slice(); }
};


void master_none::wait_for_jobs()
{
    for (auto& t: m_workers) { t->join(); }
}

std::shared_ptr<worker_base> master_none::spawn_worker(size_t, size_t slice_start ,size_t slice_end, const std::string& job)
{
225
    return std::dynamic_pointer_cast<worker_base>(std::make_shared<worker_none>(this, slice_start, slice_end, job));
226
227
228
}


229
230
231
232
233
234
235
236
/**************************************
 * MULTI-THREADING
 */

struct master_thread;
struct worker_thread;

struct master_thread : public master_base {
237
    std::mutex m_notify_lock;
Damien Leroux's avatar
Damien Leroux committed
238
239
    master_thread(bn_settings_t* settings, const std::string& job)
        : master_base(settings, settings->n_threads, job)
240
        , m_notify_lock()
241
242
    {}

Damien Leroux's avatar
Damien Leroux committed
243
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
244
245
246

    void wait_for_jobs();

247
    void notify(size_t /*i*/, bool ok)
248
    {
249
        std::lock_guard<std::mutex> lock(m_notify_lock);
Damien Leroux's avatar
Damien Leroux committed
250
        update_job(ok);
251
252
253
254
255
    }
};

struct worker_thread : public worker_base {
    master_thread* m_master;
256
    std::thread m_thread;
Damien Leroux's avatar
Damien Leroux committed
257
258
    worker_thread(master_thread* master, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
259
        , m_master(master)
260
        , m_thread([&] () { run_slice(); })
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
    {}

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join()
    {
        m_thread.join();
    }
};


void master_thread::wait_for_jobs()
{
    for (auto& t: m_workers) { t->join(); }
}


Damien Leroux's avatar
Damien Leroux committed
281
std::shared_ptr<worker_base> master_thread::spawn_worker(size_t, size_t slice_start ,size_t slice_end, const std::string& job)
282
{
283
    return std::dynamic_pointer_cast<worker_base>(std::make_shared<worker_thread>(this, slice_start, slice_end, job));
284
285
286
287
288
289
290
291
292
293
294
}


/**************************************
 * SSH
 */

struct master_ssh;
struct worker_ssh;

struct master_ssh : public master_base {
Damien Leroux's avatar
Damien Leroux committed
295
296
    master_ssh(bn_settings_t* settings, const std::string& job)
        : master_base(settings, settings->ssh_hosts.size(), job)
297
298
    {}

Damien Leroux's avatar
Damien Leroux committed
299
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
300
301
302

    void wait_for_jobs();

303
    void notify(size_t /*i*/, bool ok)
304
    {
Damien Leroux's avatar
Damien Leroux committed
305
        update_job(ok);
306
307
308
309
310
311
    }
};

struct worker_ssh : public worker_base {
    size_t m_host;
    master_ssh* m_master;
Damien Leroux's avatar
Damien Leroux committed
312
    std::string m_job_name;
313
    std::thread m_thread;
Damien Leroux's avatar
Damien Leroux committed
314
315
    worker_ssh(master_ssh* master, size_t i, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
316
317
        , m_host(i)
        , m_master(master)
Damien Leroux's avatar
Damien Leroux committed
318
        , m_job_name(job)
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
        , m_thread([this] () { run_and_monitor(); })
    {}

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join()
    {
        m_thread.join();
    }

    void run_and_monitor()
    {
        std::stringstream cmd;
        cmd << "ssh " << m_settings->ssh_hosts[m_host];
        for (const std::string& a: m_settings->command_line) {
            cmd << ' ' << a;
        }
Damien Leroux's avatar
Damien Leroux committed
339
        cmd << " -J " << m_job_name << ' ' << m_slice_begin << ' ' << m_slice_end;
340
341
342
343
344
        MSG_DEBUG("RUNNING " << cmd.str());
        FILE* remote = popen(cmd.str().c_str(), "r");
        char buf[64];
        while (!feof(remote)) {
            char* s = fgets(buf, (sizeof buf) - 1, remote);
345
            if (s && s[0] == '\f' && (s[1] == 'S' || s[1] == 'F')) {
346
347
348
349
350
351
352
353
354
355
356
357
358
                notify_one(atoi(s + 3), s[1] == 'S');
            }
        }
    }
};


void master_ssh::wait_for_jobs()
{
    for (auto& t: m_workers) { t->join(); }
}


Damien Leroux's avatar
Damien Leroux committed
359
std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job)
360
{
361
    return std::dynamic_pointer_cast<worker_base>(std::make_shared<worker_ssh>(this, i, slice_start, slice_end, job));
362
363
364
365
366
367
368
369
370
371
372
}



/**************************************
 * SGE
 */

struct master_sge;
struct worker_sge;

damien's avatar
damien committed
373
struct master_sge : public master_base, inet_server {
374
375
376
377
    std::mutex wait;
    size_t jobs_done;
    size_t jobs_total;

Damien Leroux's avatar
Damien Leroux committed
378
    master_sge(bn_settings_t* settings, const std::string& job)
damien's avatar
damien committed
379
        : master_base(settings, settings->n_threads, job), inet_server(56000, 65535)
380
        , jobs_done(0)
381
        , jobs_total(count_jobs(job, settings))
382
383
384
385
    {
        wait_for_server_to_run();
        m_settings->fifo_path = address();
    }
386

387
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job) override;
388

389
    void wait_for_jobs() override { wait.lock(); wait.unlock(); }
390

391
    void notify(size_t /*i*/, bool ok)
392
393
    {
        ++jobs_done;
Damien Leroux's avatar
Damien Leroux committed
394
395
        /*MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed") << " (" << jobs_done << '/' << jobs_total << ')');*/
        update_job(ok);
396
        if (jobs_done >= jobs_total) {
397
398
399
            wait.unlock();
        }
    }
damien's avatar
damien committed
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417

    bool on_client_message(const std::string& buf) override
    {
        if (buf.size() >= 3) {
            if (buf[0] == '\f') {
                if (buf[1] == 'F' || buf[1] == 'S') {
                    notify(atoi(buf.c_str() + 3), buf[1] == 'S');
                }
            }
        }
        return jobs_done < jobs_total;
    }

    bool on_client_error(int) override { return jobs_done < jobs_total; }

    void on_start() override { wait.lock(); }
    void on_stop() override { wait.unlock(); }

418
419
420
421
422
};

struct worker_sge : public worker_base {
    size_t m_host;
    master_sge* m_master;
Damien Leroux's avatar
Damien Leroux committed
423
424
425
    std::string m_job_name;
    worker_sge(master_sge* master, size_t i, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
426
427
        , m_host(i)
        , m_master(master)
Damien Leroux's avatar
Damien Leroux committed
428
        , m_job_name(job)
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
    { schedule(); }

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join() {}

    void schedule()
    {
        char wdbuf[PATH_MAX];
        const char* wd = getcwd(wdbuf, PATH_MAX);
        std::stringstream cmd;
        cmd << "cd " << wd << " &&";
        for (const std::string& a: m_settings->command_line) {
            cmd << ' ' << a;
        }
Damien Leroux's avatar
Damien Leroux committed
447
448
        cmd << " -J " << m_job_name << ' ' << m_slice_begin << ' ' << m_slice_end << " -F " << m_settings->fifo_path;
        /*MSG_DEBUG("SCHEDULING JOB " << cmd.str());*/
449
        std::string qsub = SPELL_STRING("/usr/bin/env qsub -N " << m_settings->prg_name << '.' << m_job_name << " " << m_settings->qsub_opts << " > /dev/null");
Damien Leroux's avatar
Damien Leroux committed
450
451
452
453
        FILE* remote = popen(qsub.c_str(), "w");
        std::string cmdstr = cmd.str();
        /*const char* cmdstr = cmd.str().c_str();*/
        fwrite(cmdstr.c_str(), cmdstr.size(), 1, remote);
454
455
456
457
458
        fclose(remote);
    }
};


Damien Leroux's avatar
Damien Leroux committed
459
std::shared_ptr<worker_base> master_sge::spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job)
460
{
461
    return std::dynamic_pointer_cast<worker_base>(std::make_shared<worker_sge>(this, i, slice_start, slice_end, job));
462
463
464
}


465
bool do_the_job(bn_settings_t* settings, std::string job)
Damien Leroux's avatar
Damien Leroux committed
466
{
467
    bool all_good = false;
Damien Leroux's avatar
Damien Leroux committed
468
    if (settings->is_master()) {
469
        msg_handler_t::cout() << std::endl << GREEN << "Running jobs " << job << NORMAL << std::endl;
Damien Leroux's avatar
Damien Leroux committed
470
471
472
473
        if (settings->scheme == JDS_MT) {
            master_thread mt(settings, job);
            mt.run();
            mt.wait_for_jobs();
474
            all_good = mt.all_good();
Damien Leroux's avatar
Damien Leroux committed
475
476
477
478
        } else if (settings->scheme == JDS_SSH) {
            master_ssh mssh(settings, job);
            mssh.run();
            mssh.wait_for_jobs();
479
            all_good = mssh.all_good();
Damien Leroux's avatar
Damien Leroux committed
480
481
482
483
        } else if (settings->scheme == JDS_SGE) {
            master_sge msge(settings, job);
            msge.run();
            msge.wait_for_jobs();
484
            all_good = msge.all_good();
485
486
487
488
        } else {
            master_none mnone(settings, job);
            mnone.run();
            mnone.wait_for_jobs();
489
490
491
            all_good = mnone.all_good();
        }
        if (!all_good) {
492
            msg_handler_t::cout() << std::endl;
493
            MSG_ERROR("At least one job failed. Aborting.", "Check error messages in job logs.");
Damien Leroux's avatar
Damien Leroux committed
494
        }
495
        return all_good;
Damien Leroux's avatar
Damien Leroux committed
496
497
498
499
500
501
502
503
    } else {
        if (settings->scheme == JDS_SSH) {
            worker_local_ssh wl(settings, settings->job_start, settings->job_end, job);
            wl.run_slice();
        } else if (settings->scheme == JDS_SGE) {
            worker_local_sge wl(settings, settings->job_start, settings->job_end, job);
            wl.run_slice();
        }
504
        return true;
Damien Leroux's avatar
Damien Leroux committed
505
506
507
    }
}

508
#include "io/output_impl.h"