Newer
Older
/**
* \file /core/tandem_run/tandemcondorprocess.cpp
* \date 5/9/2017
* \author Olivier Langella
* \brief handles execution of a bunch of X!Tandem process throught condor job
*/
/*******************************************************************************
* Copyright (c) 2017 Olivier Langella <olivier.langella@u-psud.fr>.
*
* This file is part of XTPcpp.
*
* XTPcpp is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* XTPcpp is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with XTPcpp. If not, see <http://www.gnu.org/licenses/>.
*
* Contributors:
* Olivier Langella <olivier.langella@u-psud.fr> - initial API and implementation
******************************************************************************/
#include "tandemcondorprocess.h"
#include <QDebug>
#include <pappsomspp/pappsoexception.h>
#include <QSettings>
#include <QProcess>
TandemCondorProcess::TandemCondorProcess(WorkMonitorInterface * p_monitor, const TandemRunBatch & tandem_run_batch) : TandemBatchProcess(p_monitor, tandem_run_batch) {
/*
Universe = vanilla
notification = Error
Rank = Mips
request_memory= 50000
request_cpus = 1
Executable = /usr/bin/tandem
Log = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/submit_condor.log
Output = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).out
Error = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).error
Arguments = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/xtandem_param2054956555872858570.xml
Queue
*/
QString condor_tmp_dir = QString("%1/xtpcpp").arg(settings.value("condor/tmp_dir", "/tmp").toString());
_p_tmp_dir = new QTemporaryDir(condor_tmp_dir);
_condor_submit_command = settings.value("condor/submit", "/usr/bin/condor_submit").toString();
if (!_p_tmp_dir->isValid()) {
// dir.path() returns the unique directory path
throw pappso::PappsoException(QObject::tr("problem creating condor temporary directory in %1\n").arg(condor_tmp_dir));
}
}
TandemCondorProcess::~TandemCondorProcess () {
delete _p_tmp_dir;
}
void TandemCondorProcess::prepareXmlDatabaseFile() {
QFile xml_database_file(QString("%1/database.xml").arg(_p_tmp_dir->path()));
if (xml_database_file.open(QIODevice::WriteOnly))
{
_xml_database_file = QFileInfo( xml_database_file.fileName()).absoluteFilePath();
QXmlStreamWriter * p_out = new QXmlStreamWriter();
p_out->setDevice(&xml_database_file);
writeXmlDatabaseFile(p_out);
xml_database_file.close();
delete p_out;
} else
{
throw pappso::PappsoException(QObject::tr("error : cannot open the XML database file : %1\n").arg(xml_database_file.fileName()));
}
QFileInfo preset_info(_tandem_run_batch._preset_file);
_preset_file = QString("%1/%2").arg(_p_tmp_dir->path()).arg(preset_info.fileName());
QFile::copy(_tandem_run_batch._preset_file, _preset_file);
_preset_file = _tandem_run_batch._preset_file;
prepareXmlDatabaseFile();
//condor submit file :
QFile submit_file(QString("%1/submit.txt").arg(_p_tmp_dir->path()));
QTextStream * p_out = nullptr;
if (submit_file.open(QIODevice::WriteOnly))
{
p_out = new QTextStream();
p_out->setDevice(&submit_file);
*p_out << "Universe = vanilla" << endl;
*p_out << "notification = Error" << endl;
*p_out << "Rank = Mips" << endl;
*p_out << "request_memory= 50000" << endl;
*p_out << "request_cpus = 1" << endl;
*p_out << "Executable = " << _tandem_run_batch._tandem_bin_path << endl;
*p_out << "Log = " << _p_tmp_dir->path() << "/condor.log" << endl;
*p_out << "Output = " << _p_tmp_dir->path() << "/tandem.$(Process).out" << endl;
*p_out << "Error = " << _p_tmp_dir->path() << "/tandem.$(Process).error" << endl;
/*
Log = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/submit_condor.log
Output = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).out
Error = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).error
Arguments = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/xtandem_param2054956555872858570.xml
Queue
*/
} else
{
throw pappso::PappsoException(QObject::tr("error : cannot open condor submit file : %1\n").arg(submit_file.fileName()));
}
std::vector<QTemporaryFile *> input_file_list;
int i=0;
_p_monitor->setProgressMaximumValue(_tandem_run_batch._mz_file_list.size());
for (QString mz_file : _tandem_run_batch._mz_file_list) {
QTemporaryFile * p_xml_input_file = new QTemporaryFile(QString("%1/tandem").arg(_p_tmp_dir->path()));
input_file_list.push_back(p_xml_input_file);
p_xml_input_file->setAutoRemove(false);
if (p_xml_input_file->open())
{
QXmlStreamWriter * p_xml_out = new QXmlStreamWriter();
p_xml_out->setDevice(p_xml_input_file);
*p_out << "Arguments = " << QFileInfo( p_xml_input_file->fileName()).absoluteFilePath() << endl;
} else
{
throw pappso::PappsoException(QObject::tr("error : cannot open the XML X!Tandem input file : %1\n").arg(p_xml_input_file->fileName()));
}
i++;
}
for (QTemporaryFile * p_xml_input_file: input_file_list) {
delete p_xml_input_file;
}
if (p_out != nullptr) {
submit_file.close();
delete p_out;
}
QStringList arguments;
arguments << QFileInfo( submit_file.fileName()).absoluteFilePath();
QProcess * condor_process = new QProcess();
//hk_process->setWorkingDirectory(QFileInfo(_hardklor_exe).absolutePath());
qDebug() << "TandemCondorProcess::run command " << _condor_submit_command << " " << arguments.join(" ");
condor_process->start(_condor_submit_command, arguments);
if (!condor_process->waitForStarted()) {
throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed to start"));
}
if (!condor_process->waitForFinished(_max_xt_time_ms)) {
throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed to finish"));
}
QString perr = condor_process->readAllStandardError();
if (perr.length()) {
qDebug() << "TandemCondorProcess::run readAllStandardError " << perr;
throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed :\n%1").arg(perr));
}
else {
qDebug() << "TandemCondorProcess::run readAllStandardError OK " << perr;
}
QString pjob = condor_process->readAllStandardOutput();
if (pjob.length()) {
qDebug() << "TandemCondorProcess::run readAllStandardOutput OK " << pjob;
}
else {
qDebug() << "TandemCondorProcess::run readAllStandardOutput " << pjob;
throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed :\n%1").arg(pjob));
//Submitting job(s).\n1 job(s) submitted to cluster 29.\n
parseCondorJobNumber(pjob);
_p_monitor->setProgressMaximumValue(_condor_job_size);
qDebug() << "TandemCondorProcess::run job=" << _condor_cluster_number << " size=" << _condor_job_size;
/*
if (!xt_process->waitForFinished(_max_xt_time_ms)) {
throw pappso::PappsoException(QObject::tr("can't wait for X!Tandem process to finish : timeout at %1").arg(_max_xt_time_ms));
}
*/
QByteArray result = condor_process->readAll();
QProcess::ExitStatus Status = condor_process->exitStatus();
qDebug() << "TandemCondorProcess::run ExitStatus " << Status << result.data();
if (Status != 0)
{
// != QProcess::NormalExit
throw pappso::PappsoException(QObject::tr("error executing HTCondor Status != 0 : %1 %2\n%3").arg(_tandem_run_batch._tandem_bin_path).arg(arguments.join(" ").arg(result.data())));
}
delete condor_process;
//condor is running job : we have to survey condor job using "condor_q -xml _condor_cluster_number"
surveyCondorJob();
qDebug() << "TandemCondorProcess::run end" ;
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
void TandemCondorProcess::surveyCondorJob() {
/*
package fr.inra.pappso.xtandempipeline.sax_parse;
import org.apache.log4j.Logger;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import fr.inra.pappso.xtandempipeline.MsException.MSMSException;
import fr.inra.pappso.xtandempipeline.thread.XtandemCondorAnalysis;
public class HandlerHTCondorQueue extends SaxBaseHandler {
private static final Logger logger = Logger
.getLogger(HandlerHTCondorQueue.class);
private Object inName;
private Integer currentProcId;
private Integer currentJobStatus;
private String currentRemoteHost;
private String currentLastRemoteHost;
private int junexpanded = 0;
private int jidle = 0;
private int jrunning = 0;
private int jremoved = 0;
private int jcompleted = 0;
private int jheld = 0;
private int jsubmission_error = 0;
private XtandemCondorAnalysis xtandemCondorAnalysis;
private boolean empty = true;
public HandlerHTCondorQueue(XtandemCondorAnalysis xtandemCondorAnalysis)
throws MSMSException {
super();
this.xtandemCondorAnalysis = xtandemCondorAnalysis;
}
@Override
public void startDocument() throws SAXException {
this.empty = true;
}
@Override
public void endDocument() throws SAXException {
int reallyCompletedJobs = 0;
if (this.empty) {
reallyCompletedJobs = xtandemCondorAnalysis.getCondorJobNumber();
} else {
int totalJobs = junexpanded + jidle + jrunning + jremoved
+ jcompleted + jheld + jsubmission_error;
int diffJobs = xtandemCondorAnalysis.getCondorJobNumber()
- totalJobs;
reallyCompletedJobs = jcompleted + diffJobs;
}
xtandemCondorAnalysis.setJobCompleted(junexpanded, jidle, jrunning,
jremoved, reallyCompletedJobs, jheld, jsubmission_error);
String statusMessage = "" + junexpanded + " unexpanded jobs\n";
statusMessage += jidle + " idle jobs\n";
statusMessage += jrunning + " running jobs\n";
statusMessage += jremoved + " removed jobs\n";
statusMessage += reallyCompletedJobs + " completed jobs\n";
statusMessage += jheld + " held jobs\n";
statusMessage += jsubmission_error + " submission_errors\n";
logger.debug(statusMessage);
xtandemCondorAnalysis.setStatusMessage(statusMessage);
}
// <c>
// <a n="ProcId"><i>0</i></a>
public void startElementc(String nameSpaceUri, Attributes attrs) {
this.empty = false;
currentProcId = null;
currentJobStatus = null;
currentRemoteHost = null;
currentLastRemoteHost = null;
}
public void startElementa(String nameSpaceUri, Attributes attrs) {
// logger.debug("startElementgroup begin");
// <group label="performance parameters" type="parameters">
this.inName = attrs.getValue("n");
}
public void endElementi(String nameSpaceUri, String data)
throws MSMSException {
if (inName.equals("ProcId")) {
this.currentProcId = new Integer(data);
// logger.debug(currentProcId);
} else if (inName.equals("JobStatus")) {
// <a n="JobStatus"><i>2</i></a>
this.currentJobStatus = new Integer(data);
// logger.debug(currentProcId);
}
}
public void endElements(String nameSpaceUri, String data)
throws MSMSException {
if (inName.equals("RemoteHost")) {
// <a n="RemoteHost"><s>slot1@proteus3</s></a>
this.currentRemoteHost = data;
// logger.debug(currentProcId);
} else if (inName.equals("LastRemoteHost")) {
// <a n="LastRemoteHost"><s>slot1@proteus4</s></a>
this.currentLastRemoteHost = data;
// logger.debug(currentProcId);
}
}
public void endElementa(String nameSpaceUri, String data) {
this.inName = "";
}
public void endElementc(String nameSpaceUri, String data)
throws MSMSException {
/*
* 0 Unexpanded U 1 Idle I 2 Running R 3 Removed X 4 Completed C 5 Held
* H 6 Submission_err E
*
if (currentJobStatus == 0) {
this.junexpanded += 1;
} else if (currentJobStatus == 1) {
this.jidle += 1;
} else if (currentJobStatus == 2) {
this.jrunning += 1;
} else if (currentJobStatus == 3) {
this.jremoved += 1;
} else if (currentJobStatus == 4) {
this.jcompleted += 1;
} else if (currentJobStatus == 5) {
this.jheld += 1;
} else if (currentJobStatus == 6) {
this.jsubmission_error += 1;
}
logger.debug("currentRemoteHost:" + this.currentRemoteHost
+ " currentLastRemoteHost" + this.currentLastRemoteHost + " "
+ currentJobStatus);
}
}
*/
}
void TandemCondorProcess::parseCondorJobNumber(QString condor_job) {
// Submitting job(s)...
// 3 job(s) submitted to cluster 3.
QRegExp txt_submit("([0-9]*) job\\(s\\) submitted to cluster ([0-9]*).");
if (txt_submit.indexIn(condor_job, 0) != -1) {
_condor_cluster_number = txt_submit.cap(2).toUInt();
_condor_job_size = txt_submit.cap(1).toUInt();
}
else {
throw pappso::PappsoException(QObject::tr("unable to find HTCondor job numbers in %1").arg(condor_job));
}
}