Commit 224d72a4 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

back from master submission option

parent 6d253284
......@@ -20,8 +20,6 @@ makeflow = /usr/bin/makeflow
batch_system_type = local
# add these options to all batch submit files
batch_options =
# should the master job be submitted on the cluster (False|True)
submit_master_job = False
# on which socket host should run the web server
server_socket_host = 127.0.0.1
# on which socket port should run the web server
......
......@@ -80,9 +80,4 @@ class JFlowConfigReader(object):
return self.reader.get("components", component_class+".batch_options")
except:
return ""
def submit_master_job(self):
try:
return self.reader.get("global", "submit_master_job") == "True"
except:
return False
\ No newline at end of file
\ No newline at end of file
......@@ -24,25 +24,10 @@ import imp
import os
import pickle
import threading
import tempfile
import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader
from weaver.function import PythonFunction
from weaver.nest import Nest
from weaver.options import Options
def run_workflow_wrapper(workflow_serial_path):
import pickle
# load the workflow object
workflow_dump = open(workflow_serial_path, "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
# and start it
workflow.start()
class WorkflowsManager(object):
"""
"""
......@@ -56,13 +41,6 @@ class WorkflowsManager(object):
"""
"""
self.config_reader = JFlowConfigReader()
self.options = Options()
# try to parse engine arguments
try:
type, options = self.jflow_config_reader.get_batch()
if type: self.engine_arguments += ' -T ' + type
if options : self.engine_arguments += ' -B "' + options + '"'
except: self.engine_arguments = None
if not os.path.isdir(self.get_output_directory()):
os.makedirs(self.get_output_directory(), 0751)
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
......@@ -82,49 +60,16 @@ class WorkflowsManager(object):
def rerun_workflow(self, workflow_id):
workflow = self.get_workflow(workflow_id)
if self.config_reader.submit_master_job():
workflow_dump_path = self.get_temporary_file(".dump")
workflow_dump = open(workflow_dump_path, "wb")
pickle.dump(workflow, workflow_dump)
workflow_dump.close()
# Execute nest
with Nest(self.get_temporary_directory(), path=self.config_reader.get_makeflow_path()) as nest:
with self.options:
rw = PythonFunction(run_workflow_wrapper, cmd_format="{EXE} {IN} > {OUT}")
rw(inputs=workflow_dump_path, outputs=self.get_temporary_file(".stdout"))
try:
nest.compile()
nest.execute(self.engine_arguments, exit_on_failure=True)
except:
raise
else:
workflow.start()
return workflow
def run_workflow(self, workflow_class, args):
workflow.start()
def run_workflow(self, workflow_class, args):
# Load all modules within the workflow module
for importer, modname, ispkg in pkgutil.iter_modules(workflows.__path__, workflows.__name__ + "."):
__import__(modname)
# Search for Workflow classes
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, id=self.get_next_id())
if self.config_reader.submit_master_job():
workflow_dump_path = self.get_temporary_file(".dump")
workflow_dump = open(workflow_dump_path, "wb")
pickle.dump(workflow, workflow_dump)
workflow_dump.close()
# Execute nest
with Nest(self.get_temporary_directory(), path=self.config_reader.get_makeflow_path()) as nest:
with self.options:
rw = PythonFunction(run_workflow_wrapper, cmd_format="{EXE} {IN} > {OUT}")
rw(inputs=workflow_dump_path, outputs=self.get_temporary_file(".stdout"))
try:
nest.compile()
nest.execute(self.engine_arguments, exit_on_failure=True)
except:
raise
else:
workflow.start()
workflow.start()
return workflow
def get_output_directory(self):
......@@ -202,20 +147,4 @@ class WorkflowsManager(object):
finally:
WorkflowsManager.lock_ids_file.release()
return cid
def get_temporary_file(self, suffix=".txt"):
# first check if tmp directory exists
if not os.path.isdir(self.config_reader.get_tmp_directory()):
os.makedirs(self.config_reader.get_tmp_directory(), 0751)
tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name)
return os.path.join(self.config_reader.get_tmp_directory(), tempfile_name)
def get_temporary_directory(self):
# first check if tmp directory exists
if not os.path.isdir(self.config_reader.get_tmp_directory()):
os.makedirs(self.config_reader.get_tmp_directory(), 0751)
tempdir_name = os.path.basename(tempfile.NamedTemporaryFile().name)
tempdir = os.path.join(self.config_reader.get_tmp_directory(), tempdir_name)
os.makedirs(tempdir, 0751)
return tempdir
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment