Skip to content
Snippets Groups Projects
tandemcondorprocess.cpp 14.2 KiB
Newer Older
/**
 * \file /core/tandem_run/tandemcondorprocess.cpp
 * \date 5/9/2017
 * \author Olivier Langella
 * \brief handles execution of a bunch of X!Tandem process throught condor job
 */

/*******************************************************************************
* Copyright (c) 2017 Olivier Langella <olivier.langella@u-psud.fr>.
*
* This file is part of XTPcpp.
*
*     XTPcpp is free software: you can redistribute it and/or modify
*     it under the terms of the GNU General Public License as published by
*     the Free Software Foundation, either version 3 of the License, or
*     (at your option) any later version.
*
*     XTPcpp is distributed in the hope that it will be useful,
*     but WITHOUT ANY WARRANTY; without even the implied warranty of
*     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
*     GNU General Public License for more details.
*
*     You should have received a copy of the GNU General Public License
*     along with XTPcpp.  If not, see <http://www.gnu.org/licenses/>.
*
* Contributors:
*     Olivier Langella <olivier.langella@u-psud.fr> - initial API and implementation
******************************************************************************/

Langella Olivier's avatar
Langella Olivier committed
#include "tandemcondorprocess.h"
#include <QDebug>
#include <pappsomspp/pappsoexception.h>
#include <QSettings>
#include <QProcess>
Langella Olivier's avatar
Langella Olivier committed
TandemCondorProcess::TandemCondorProcess(WorkMonitorInterface * p_monitor, const TandemRunBatch & tandem_run_batch) : TandemBatchProcess(p_monitor, tandem_run_batch) {
    /*
    Universe   = vanilla
    notification   = Error
    Rank       = Mips
    request_memory= 50000
    request_cpus = 1
    Executable = /usr/bin/tandem
    Log        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/submit_condor.log
    Output        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).out
    Error        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).error

    Arguments = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/xtandem_param2054956555872858570.xml
    Queue
    */

    QSettings settings;
Olivier Langella's avatar
Olivier Langella committed
    QString condor_tmp_dir = QString("%1/xtpcpp").arg(settings.value("condor/tmp_dir", "/tmp").toString());
    _p_tmp_dir = new QTemporaryDir(condor_tmp_dir);
Olivier Langella's avatar
Olivier Langella committed
    _p_tmp_dir->setAutoRemove(true);
    _condor_submit_command = settings.value("condor/submit", "/usr/bin/condor_submit").toString();


    if (!_p_tmp_dir->isValid()) {
        // dir.path() returns the unique directory path
        throw pappso::PappsoException(QObject::tr("problem creating condor temporary directory in %1\n").arg(condor_tmp_dir));
    }


}

TandemCondorProcess::~TandemCondorProcess () {
    delete _p_tmp_dir;
}

void TandemCondorProcess::prepareXmlDatabaseFile() {

    QFile xml_database_file(QString("%1/database.xml").arg(_p_tmp_dir->path()));

    if (xml_database_file.open(QIODevice::WriteOnly))
    {
        _xml_database_file = QFileInfo( xml_database_file.fileName()).absoluteFilePath();
        QXmlStreamWriter * p_out = new QXmlStreamWriter();
        p_out->setDevice(&xml_database_file);
        writeXmlDatabaseFile(p_out);
        xml_database_file.close();
        delete p_out;
    } else
    {
        throw pappso::PappsoException(QObject::tr("error : cannot open the XML database file : %1\n").arg(xml_database_file.fileName()));
    }

Langella Olivier's avatar
Langella Olivier committed
void TandemCondorProcess::run() {
Olivier Langella's avatar
Olivier Langella committed
    qDebug() << "TandemCondorProcess::run begin ";
    QFileInfo preset_info(_tandem_run_batch._preset_file);
    _preset_file = QString("%1/%2").arg(_p_tmp_dir->path()).arg(preset_info.fileName());
    QFile::copy(_tandem_run_batch._preset_file, _preset_file);
    _preset_file = _tandem_run_batch._preset_file;

    prepareXmlDatabaseFile();


    //condor submit file :
    QFile submit_file(QString("%1/submit.txt").arg(_p_tmp_dir->path()));
    QTextStream * p_out = nullptr;

    if (submit_file.open(QIODevice::WriteOnly))
    {
        p_out = new QTextStream();
        p_out->setDevice(&submit_file);

        *p_out <<     "Universe   = vanilla" << endl;
        *p_out <<     "notification   = Error" << endl;
        *p_out <<     "Rank       = Mips" << endl;
        *p_out <<     "request_memory= 50000" << endl;
        *p_out <<     "request_cpus = 1" << endl;
        *p_out <<     "Executable = " << _tandem_run_batch._tandem_bin_path << endl;
Olivier Langella's avatar
Olivier Langella committed
        *p_out <<     "Log        = " << _p_tmp_dir->path() << "/condor.log" << endl;
        *p_out <<     "Output        = " << _p_tmp_dir->path() << "/tandem.$(Process).out" << endl;
        *p_out <<     "Error        = " << _p_tmp_dir->path() << "/tandem.$(Process).error" << endl;
        /*
        Log        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/submit_condor.log
        Output        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).out
        Error        = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/tandem.$(Process).error

        Arguments = /gorgone/pappso/tmp/temp_condor_job93294001891239208719639434471283743/xtandem_param2054956555872858570.xml
        Queue
        */
    } else
    {
        throw pappso::PappsoException(QObject::tr("error : cannot open condor submit file : %1\n").arg(submit_file.fileName()));
    }



    std::vector<QTemporaryFile *> input_file_list;

    int i=0;
    _p_monitor->setProgressMaximumValue(_tandem_run_batch._mz_file_list.size());
    for (QString mz_file : _tandem_run_batch._mz_file_list) {


Olivier Langella's avatar
Olivier Langella committed
        QTemporaryFile *  p_xml_input_file = new QTemporaryFile(QString("%1/tandem").arg(_p_tmp_dir->path()));

        input_file_list.push_back(p_xml_input_file);
        p_xml_input_file->setAutoRemove(false);
        if (p_xml_input_file->open())
        {
            QXmlStreamWriter * p_xml_out = new QXmlStreamWriter();
            p_xml_out->setDevice(p_xml_input_file);
            writeXmlInputFile(p_xml_out, mz_file);

            p_xml_input_file->close();
            delete p_xml_out;


Olivier Langella's avatar
Olivier Langella committed
            *p_out <<     "Arguments        = " << QFileInfo( p_xml_input_file->fileName()).absoluteFilePath() << endl;
            *p_out <<     "Queue" << endl;
        } else
        {
            throw pappso::PappsoException(QObject::tr("error : cannot open the XML X!Tandem input file : %1\n").arg(p_xml_input_file->fileName()));
        }
        i++;
    }
    for (QTemporaryFile *  p_xml_input_file: input_file_list) {
        delete p_xml_input_file;
    }


    if (p_out != nullptr) {
        submit_file.close();
        delete p_out;
    }

    //now run condor job on submit_file

    QStringList arguments;

    arguments << QFileInfo( submit_file.fileName()).absoluteFilePath();

    QProcess * condor_process = new QProcess();
    //hk_process->setWorkingDirectory(QFileInfo(_hardklor_exe).absolutePath());
Olivier Langella's avatar
Olivier Langella committed
    qDebug() << "TandemCondorProcess::run command " << _condor_submit_command << " " << arguments.join(" ");
    condor_process->start(_condor_submit_command, arguments);


    if (!condor_process->waitForStarted()) {
        throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed to start"));
    }

Olivier Langella's avatar
Olivier Langella committed
    if (!condor_process->waitForFinished(_max_xt_time_ms)) {
        throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed to finish"));
    }

    QString perr = condor_process->readAllStandardError();
    if (perr.length()) {

        qDebug() << "TandemCondorProcess::run readAllStandardError " << perr;
        throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed :\n%1").arg(perr));
    }
    else {
        qDebug() << "TandemCondorProcess::run readAllStandardError OK " << perr;
    }

    QString pjob = condor_process->readAllStandardOutput();
    if (pjob.length()) {
        qDebug() << "TandemCondorProcess::run readAllStandardOutput OK " << pjob;
    }
    else {
        qDebug() << "TandemCondorProcess::run readAllStandardOutput " << pjob;
        throw pappso::PappsoException(QObject::tr("HTCondor X!Tandem process failed :\n%1").arg(pjob));
Olivier Langella's avatar
Olivier Langella committed

    //Submitting job(s).\n1 job(s) submitted to cluster 29.\n
    parseCondorJobNumber(pjob);
    
    _p_monitor->setProgressMaximumValue(_condor_job_size);
    qDebug() << "TandemCondorProcess::run job=" << _condor_cluster_number << " size=" << _condor_job_size;


    /*
    if (!xt_process->waitForFinished(_max_xt_time_ms)) {
        throw pappso::PappsoException(QObject::tr("can't wait for X!Tandem process to finish : timeout at %1").arg(_max_xt_time_ms));
    }
    */
    QByteArray result = condor_process->readAll();

    QProcess::ExitStatus Status = condor_process->exitStatus();

Olivier Langella's avatar
Olivier Langella committed
    qDebug() << "TandemCondorProcess::run ExitStatus " << Status << result.data();
    if (Status != 0)
    {
        // != QProcess::NormalExit
        throw pappso::PappsoException(QObject::tr("error executing HTCondor Status != 0 : %1 %2\n%3").arg(_tandem_run_batch._tandem_bin_path).arg(arguments.join(" ").arg(result.data())));
    }

    delete condor_process;
Olivier Langella's avatar
Olivier Langella committed


    //condor is running job : we have to survey condor job using "condor_q -xml _condor_cluster_number"

    surveyCondorJob();

    qDebug() << "TandemCondorProcess::run end" ;
Olivier Langella's avatar
Olivier Langella committed

void TandemCondorProcess::surveyCondorJob() {
    /*
    package fr.inra.pappso.xtandempipeline.sax_parse;

    import org.apache.log4j.Logger;
    import org.xml.sax.Attributes;
    import org.xml.sax.SAXException;

    import fr.inra.pappso.xtandempipeline.MsException.MSMSException;
    import fr.inra.pappso.xtandempipeline.thread.XtandemCondorAnalysis;

    public class HandlerHTCondorQueue extends SaxBaseHandler {

    private static final Logger logger = Logger
    		.getLogger(HandlerHTCondorQueue.class);
    private Object inName;
    private Integer currentProcId;
    private Integer currentJobStatus;
    private String currentRemoteHost;
    private String currentLastRemoteHost;
    private int junexpanded = 0;
    private int jidle = 0;
    private int jrunning = 0;
    private int jremoved = 0;
    private int jcompleted = 0;
    private int jheld = 0;
    private int jsubmission_error = 0;
    private XtandemCondorAnalysis xtandemCondorAnalysis;
    private boolean empty = true;

    public HandlerHTCondorQueue(XtandemCondorAnalysis xtandemCondorAnalysis)
    		throws MSMSException {
    	super();
    	this.xtandemCondorAnalysis = xtandemCondorAnalysis;
    }

    @Override
    public void startDocument() throws SAXException {
    	this.empty = true;
    }

    @Override
    public void endDocument() throws SAXException {
    	int reallyCompletedJobs = 0;
    	if (this.empty) {
    		reallyCompletedJobs = xtandemCondorAnalysis.getCondorJobNumber();
    	} else {
    		int totalJobs = junexpanded + jidle + jrunning + jremoved
    				+ jcompleted + jheld + jsubmission_error;
    		int diffJobs = xtandemCondorAnalysis.getCondorJobNumber()
    				- totalJobs;
    		reallyCompletedJobs = jcompleted + diffJobs;
    	}

    	xtandemCondorAnalysis.setJobCompleted(junexpanded, jidle, jrunning,
    			jremoved, reallyCompletedJobs, jheld, jsubmission_error);

    	String statusMessage = "" + junexpanded + " unexpanded jobs\n";
    	statusMessage += jidle + " idle jobs\n";
    	statusMessage += jrunning + " running jobs\n";
    	statusMessage += jremoved + " removed jobs\n";
    	statusMessage += reallyCompletedJobs + " completed jobs\n";
    	statusMessage += jheld + " held jobs\n";
    	statusMessage += jsubmission_error + " submission_errors\n";

    	logger.debug(statusMessage);
    	xtandemCondorAnalysis.setStatusMessage(statusMessage);

    }

    // <c>
    // <a n="ProcId"><i>0</i></a>
    public void startElementc(String nameSpaceUri, Attributes attrs) {
    	this.empty = false;
    	currentProcId = null;
    	currentJobStatus = null;
    	currentRemoteHost = null;
    	currentLastRemoteHost = null;

    }

    public void startElementa(String nameSpaceUri, Attributes attrs) {
    	// logger.debug("startElementgroup begin");
    	// <group label="performance parameters" type="parameters">
    	this.inName = attrs.getValue("n");

    }

    public void endElementi(String nameSpaceUri, String data)
    		throws MSMSException {
    	if (inName.equals("ProcId")) {
    		this.currentProcId = new Integer(data);
    		// logger.debug(currentProcId);
    	} else if (inName.equals("JobStatus")) {
    		// <a n="JobStatus"><i>2</i></a>
    		this.currentJobStatus = new Integer(data);
    		// logger.debug(currentProcId);
    	}

    }

    public void endElements(String nameSpaceUri, String data)
    		throws MSMSException {
    	if (inName.equals("RemoteHost")) {
    		// <a n="RemoteHost"><s>slot1@proteus3</s></a>
    		this.currentRemoteHost = data;
    		// logger.debug(currentProcId);
    	} else if (inName.equals("LastRemoteHost")) {
    		// <a n="LastRemoteHost"><s>slot1@proteus4</s></a>
    		this.currentLastRemoteHost = data;
    		// logger.debug(currentProcId);
    	}

    }

    public void endElementa(String nameSpaceUri, String data) {
    	this.inName = "";
    }

    public void endElementc(String nameSpaceUri, String data)
    		throws MSMSException {
    	/*
    	 * 0 Unexpanded U 1 Idle I 2 Running R 3 Removed X 4 Completed C 5 Held
    	 * H 6 Submission_err E
    	 *
    	if (currentJobStatus == 0) {
    		this.junexpanded += 1;
    	} else if (currentJobStatus == 1) {
    		this.jidle += 1;
    	} else if (currentJobStatus == 2) {
    		this.jrunning += 1;
    	} else if (currentJobStatus == 3) {
    		this.jremoved += 1;
    	} else if (currentJobStatus == 4) {
    		this.jcompleted += 1;
    	} else if (currentJobStatus == 5) {
    		this.jheld += 1;
    	} else if (currentJobStatus == 6) {
    		this.jsubmission_error += 1;
    	}

    	logger.debug("currentRemoteHost:" + this.currentRemoteHost
    			+ " currentLastRemoteHost" + this.currentLastRemoteHost + " "
    			+ currentJobStatus);
    }

    }
    */
}


void TandemCondorProcess::parseCondorJobNumber(QString condor_job) {
    // Submitting job(s)...
    // 3 job(s) submitted to cluster 3.
    QRegExp txt_submit("([0-9]*) job\\(s\\) submitted to cluster ([0-9]*).");

    if (txt_submit.indexIn(condor_job, 0) != -1) {
        _condor_cluster_number = txt_submit.cap(2).toUInt();
        _condor_job_size = txt_submit.cap(1).toUInt();
    }
    else {
        throw pappso::PappsoException(QObject::tr("unable to find HTCondor job numbers in %1").arg(condor_job));
    }
}