dispatch.cc 11.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
#include <atomic>
#include <thread>
#include <cstdio>
#include "error.h"
#include "cli.h"

extern "C" {
#include <sys/stat.h> 
#include <fcntl.h>
#include<signal.h>
#include<sys/types.h>    
}

Damien Leroux's avatar
Damien Leroux committed
14
15
16
17
18
19
20
21
typedef std::function<bool(const bn_settings_t*, size_t)> job_type;
typedef std::function<size_t(const bn_settings_t*)> count_jobs_type;

extern std::map<std::string, std::pair<count_jobs_type, job_type>> job_registry;

size_t count_jobs(const std::string& name, const bn_settings_t* settings) { return job_registry.find(name)->second.first(settings); }
job_type find_job(const std::string& name) { return job_registry.find(name)->second.second; }

22
23
24
struct worker_base {
    size_t m_slice_begin, m_slice_end;
    const bn_settings_t* m_settings;
Damien Leroux's avatar
Damien Leroux committed
25
    job_type run_one;
26

Damien Leroux's avatar
Damien Leroux committed
27
28
    worker_base(const bn_settings_t* settings, size_t slice_begin, size_t slice_end, const std::string& job)
        : m_slice_begin(slice_begin), m_slice_end(slice_end), m_settings(settings), run_one(find_job(job))
29
30
31
32
33
    {}

    void run_slice()
    {
        for (size_t i = m_slice_begin; i <= m_slice_end; ++i) {
Damien Leroux's avatar
Damien Leroux committed
34
            notify_one(i, run_one(m_settings, i));
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
        }
    }

    /* middleware */
    virtual void notify_one(size_t, bool) = 0;
    virtual void join() = 0;
};


struct worker_local_ssh : public worker_base {
    using worker_base::worker_base;
    void notify_one(size_t i, bool ok)
    {
        MSG_DEBUG((ok ? "#S " : "#F ") << i);
        MSG_ERROR("prout", "");
    }
    void join() {}
};


struct worker_local_sge : public worker_base {
    using worker_base::worker_base;
    void notify_one(size_t i, bool ok)
    {
        FILE* fifo = fopen(m_settings->fifo_path.c_str(), "w");
        std::string msg = MESSAGE((ok ? "#S " : "#F ") << i << std::endl);
        fwrite(msg.c_str(), msg.size(), 1, fifo);
Damien Leroux's avatar
Damien Leroux committed
62
63
        fflush(fifo);
        fclose(fifo);
64
65
66
67
68
69
70
71
72
73
    }
    void join() {}
};



struct master_base {
    bn_settings_t* m_settings;
    std::vector<std::shared_ptr<worker_base>> m_workers;
    size_t m_n_jobs;
Damien Leroux's avatar
Damien Leroux committed
74
75
    std::string job_name;
    master_base(bn_settings_t* settings, size_t n_jobs, const std::string& job)
76
77
78
        : m_settings(settings)
        , m_workers()
        , m_n_jobs(n_jobs)
Damien Leroux's avatar
Damien Leroux committed
79
        , job_name(job)
80
81
82
83
84
    {}

    void run()
    {
        double step;
Damien Leroux's avatar
Damien Leroux committed
85
        size_t total = count_jobs(job_name, m_settings);
86
        size_t n_slots;
Damien Leroux's avatar
Damien Leroux committed
87
        /*MSG_DEBUG("Have " << m_n_jobs << " slots for " << total << " jobs to do.");*/
88
89
90
91
92
93
94
        if (m_n_jobs >= total) {
            n_slots = total;
            step = 0;
        } else {
            n_slots = m_n_jobs;
            step = total / (double) n_slots;
        }
Damien Leroux's avatar
Damien Leroux committed
95
        /*MSG_DEBUG("step=" << step);*/
96
97
98
99
100
101
102
103
104

        m_workers.reserve(n_slots);

        double start = 0;
        for (size_t i = 0; i < n_slots; ++i) {
            size_t end = (size_t) (start + step);
            if (end >= total) {
                end = total - 1;
            }
Damien Leroux's avatar
Damien Leroux committed
105
106
            /*MSG_DEBUG("Spawning worker for " << ((size_t) start) << ':' << end);*/
            m_workers.emplace_back(spawn_worker(i, (size_t) start, end, job_name));
107
108
109
110
            start += step + 1;
        }
    }

Damien Leroux's avatar
Damien Leroux committed
111
    virtual std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job) = 0;
112
113
114
115
116
117
118
119
120
121
122
123
    virtual void wait_for_jobs() = 0;
};


/**************************************
 * MULTI-THREADING
 */

struct master_thread;
struct worker_thread;

struct master_thread : public master_base {
Damien Leroux's avatar
Damien Leroux committed
124
125
    master_thread(bn_settings_t* settings, const std::string& job)
        : master_base(settings, settings->n_threads, job)
126
127
    {}

Damien Leroux's avatar
Damien Leroux committed
128
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
129
130
131
132
133
134
135
136
137
138
139
140

    void wait_for_jobs();

    void notify(size_t i, bool ok)
    {
        MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
    }
};

struct worker_thread : public worker_base {
    std::thread m_thread;
    master_thread* m_master;
Damien Leroux's avatar
Damien Leroux committed
141
142
    worker_thread(master_thread* master, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
        , m_thread([&] () { run_slice(); })
        , m_master(master)
    {}

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join()
    {
        m_thread.join();
    }
};


void master_thread::wait_for_jobs()
{
    for (auto& t: m_workers) { t->join(); }
}


Damien Leroux's avatar
Damien Leroux committed
165
std::shared_ptr<worker_base> master_thread::spawn_worker(size_t, size_t slice_start ,size_t slice_end, const std::string& job)
166
{
Damien Leroux's avatar
Damien Leroux committed
167
    return std::make_shared<worker_thread>(this, slice_start, slice_end, job);
168
169
170
171
172
173
174
175
176
177
178
}


/**************************************
 * SSH
 */

struct master_ssh;
struct worker_ssh;

struct master_ssh : public master_base {
Damien Leroux's avatar
Damien Leroux committed
179
180
    master_ssh(bn_settings_t* settings, const std::string& job)
        : master_base(settings, settings->ssh_hosts.size(), job)
181
182
    {}

Damien Leroux's avatar
Damien Leroux committed
183
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
184
185
186
187
188
189
190
191
192
193
194
195

    void wait_for_jobs();

    void notify(size_t i, bool ok)
    {
        MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
    }
};

struct worker_ssh : public worker_base {
    size_t m_host;
    master_ssh* m_master;
Damien Leroux's avatar
Damien Leroux committed
196
    std::string m_job_name;
197
    std::thread m_thread;
Damien Leroux's avatar
Damien Leroux committed
198
199
    worker_ssh(master_ssh* master, size_t i, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
200
201
        , m_host(i)
        , m_master(master)
Damien Leroux's avatar
Damien Leroux committed
202
        , m_job_name(job)
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        , m_thread([this] () { run_and_monitor(); })
    {}

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join()
    {
        m_thread.join();
    }

    void run_and_monitor()
    {
        std::stringstream cmd;
        cmd << "ssh " << m_settings->ssh_hosts[m_host];
        for (const std::string& a: m_settings->command_line) {
            cmd << ' ' << a;
        }
Damien Leroux's avatar
Damien Leroux committed
223
        cmd << " -J " << m_job_name << ' ' << m_slice_begin << ' ' << m_slice_end;
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
        MSG_DEBUG("RUNNING " << cmd.str());
        FILE* remote = popen(cmd.str().c_str(), "r");
        char buf[64];
        while (!feof(remote)) {
            char* s = fgets(buf, (sizeof buf) - 1, remote);
            if (s && s[0] == '#' && (s[1] == 'S' || s[1] == 'F')) {
                notify_one(atoi(s + 3), s[1] == 'S');
            }
        }
    }
};


void master_ssh::wait_for_jobs()
{
    for (auto& t: m_workers) { t->join(); }
}


Damien Leroux's avatar
Damien Leroux committed
243
std::shared_ptr<worker_base> master_ssh::spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job)
244
{
Damien Leroux's avatar
Damien Leroux committed
245
    return std::make_shared<worker_ssh>(this, i, slice_start, slice_end, job);
246
247
248
249
250
251
252
253
254
255
256
257
}



/**************************************
 * SGE
 */

struct master_sge;
struct worker_sge;

struct master_sge : public master_base {
Damien Leroux's avatar
Damien Leroux committed
258
    static FILE* fifo;
259
260
261
262
    std::mutex wait;
    size_t jobs_done;
    size_t jobs_total;

Damien Leroux's avatar
Damien Leroux committed
263
264
    master_sge(bn_settings_t* settings, const std::string& job)
        : master_base(settings, settings->n_threads, job)
265
266
267
268
        , jobs_done(0)
        , jobs_total(settings->count_markers())
    { wait.lock(); }

Damien Leroux's avatar
Damien Leroux committed
269
    std::shared_ptr<worker_base> spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job);
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285

    void wait_for_jobs() { wait.lock(); }

    void notify(size_t i, bool ok)
    {
        ++jobs_done;
        MSG_DEBUG("Job #" << i << (ok ? " successful" : " failed"));
        if (jobs_done == jobs_total) {
            wait.unlock();
        }
    }
};

struct worker_sge : public worker_base {
    size_t m_host;
    master_sge* m_master;
Damien Leroux's avatar
Damien Leroux committed
286
287
288
    std::string m_job_name;
    worker_sge(master_sge* master, size_t i, size_t slice_begin, size_t slice_end, const std::string& job)
        : worker_base(master->m_settings, slice_begin, slice_end, job)
289
290
        , m_host(i)
        , m_master(master)
Damien Leroux's avatar
Damien Leroux committed
291
        , m_job_name(job)
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    { schedule(); }

    void notify_one(size_t i, bool ok)
    {
        m_master->notify(i, ok);
    }

    void join() {}

    void schedule()
    {
        char wdbuf[PATH_MAX];
        const char* wd = getcwd(wdbuf, PATH_MAX);
        std::stringstream cmd;
        cmd << "cd " << wd << " &&";
        for (const std::string& a: m_settings->command_line) {
            cmd << ' ' << a;
        }
Damien Leroux's avatar
Damien Leroux committed
310
311
312
313
314
315
316
        cmd << " -J " << m_job_name << ' ' << m_slice_begin << ' ' << m_slice_end << " -F " << m_settings->fifo_path;
        /*MSG_DEBUG("SCHEDULING JOB " << cmd.str());*/
        std::string qsub = MESSAGE("/usr/bin/env qsub -N " << m_settings->prg_name << " " << m_settings->qsub_opts << " > /dev/null");
        FILE* remote = popen(qsub.c_str(), "w");
        std::string cmdstr = cmd.str();
        /*const char* cmdstr = cmd.str().c_str();*/
        fwrite(cmdstr.c_str(), cmdstr.size(), 1, remote);
317
318
319
320
321
        fclose(remote);
    }
};


Damien Leroux's avatar
Damien Leroux committed
322
std::shared_ptr<worker_base> master_sge::spawn_worker(size_t i, size_t slice_start ,size_t slice_end, const std::string& job)
323
{
Damien Leroux's avatar
Damien Leroux committed
324
    return std::make_shared<worker_sge>(this, i, slice_start, slice_end, job);
325
326
327
328
329
330
331
332
}





master_sge* global_msge;

Damien Leroux's avatar
Damien Leroux committed
333
FILE* master_sge::fifo = NULL;
334
335
336
337

extern "C" {
void reader(int)
{
Damien Leroux's avatar
Damien Leroux committed
338
339
340
341
342
#define BUFSZ 64
    char buf[BUFSZ];
    /*while((n = read(master_sge::fifo, buf, sizeof buf)) > 0){*/
    while(fgets(buf, sizeof buf - 1, master_sge::fifo) != NULL) {
        /*buf[BUFSZ - 1] = '\0';*/
343
344
345
346
347
348
349
350
351
        if (buf[0] == '#') {
            if (buf[1] == 'F' || buf[1] == 'S') {
                global_msge->notify(atoi(buf + 3), buf[1] == 'S');
            }
        }
    }
}
} // extern "C"

Damien Leroux's avatar
Damien Leroux committed
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398

void do_the_job(bn_settings_t* settings, std::string job)
{
    if (settings->is_master()) {
        if (settings->scheme == JDS_MT) {
            master_thread mt(settings, job);
            mt.run();
            mt.wait_for_jobs();
        } else if (settings->scheme == JDS_SSH) {
            master_ssh mssh(settings, job);
            mssh.run();
            mssh.wait_for_jobs();
        } else if (settings->scheme == JDS_SGE) {
            char wdbuf[PATH_MAX];
            const char* wd = getcwd(wdbuf, PATH_MAX);
            settings->fifo_path = MESSAGE(wd << "/spell_bayes_fifo_" << getpid());

            sighandler_t old_sigio = signal(SIGIO, reader);
            mknod(settings->fifo_path.c_str(), 0666 | S_IFIFO,0);
            int fd = open(settings->fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
            fcntl(fd, F_SETOWN, getpid());
            fcntl(fd, F_SETFL, O_ASYNC);
            master_sge::fifo = fdopen(fd, "r");

            master_sge msge(settings, job);
            global_msge = &msge;
            msge.run();
            msge.wait_for_jobs();

            fflush(master_sge::fifo);
            unlink(settings->fifo_path.c_str());
            signal(SIGIO, old_sigio);
            fclose(master_sge::fifo);
        }
    } else {
        if (settings->scheme == JDS_SSH) {
            worker_local_ssh wl(settings, settings->job_start, settings->job_end, job);
            wl.run_slice();
        } else if (settings->scheme == JDS_SGE) {
            worker_local_sge wl(settings, settings->job_start, settings->job_end, job);
            wl.run_slice();
        }
    }
}


#ifdef ONLY_TESTING_JOBS_CC
399
400
401
402
int main(int argc, const char** argv)
{
    bn_settings_t* settings = bn_settings_t::from_args(argc, argv);
    if (settings) {
Damien Leroux's avatar
Damien Leroux committed
403
404
        do_the_job(settings, "dummy");
        delete settings;
405
406
407
    }
    return 0;
}
Damien Leroux's avatar
Damien Leroux committed
408
409
410
411
412
413
414
415
416
417
#endif


std::map<std::string, std::pair<count_jobs_type, job_type>>
job_registry = {
    {"dummy", {
        [](const bn_settings_t* settings) { return settings->count_markers(); },
        [](const bn_settings_t*, size_t i) { MSG_DEBUG("running #" << i); return true; }
    }},
};
418