Commit 2db31ce3 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

improve the way workflows info are gathered ...

parent 6dc110a2
...@@ -53,7 +53,7 @@ class Workflow(threading.Thread): ...@@ -53,7 +53,7 @@ class Workflow(threading.Thread):
PROPERTIES_FILE_NAME = "workflow.properties" PROPERTIES_FILE_NAME = "workflow.properties"
MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog" MAKEFLOW_LOG_FILE_NAME = "Makeflow.makeflowlog"
DUMP_FILE_NAME = "workflow.dump" DUMP_FILE_NAME = ".workflow.dump"
WORKING = ".working" WORKING = ".working"
STATUS_STARTED = "started" STATUS_STARTED = "started"
STATUS_COMPLETED = "completed" STATUS_COMPLETED = "completed"
......
...@@ -33,9 +33,11 @@ class WorkflowsManager(object): ...@@ -33,9 +33,11 @@ class WorkflowsManager(object):
""" """
IDS_FILE_NAME = "jflowIDs.txt" IDS_FILE_NAME = "jflowIDs.txt"
WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
WF_DIRECTORY_PREFIX = "wf" WF_DIRECTORY_PREFIX = "wf"
lock_ids_file = threading.Lock() lock_ids_file = threading.Lock()
lock_workflows_file = threading.Lock()
def __init__(self): def __init__(self):
""" """
...@@ -43,8 +45,20 @@ class WorkflowsManager(object): ...@@ -43,8 +45,20 @@ class WorkflowsManager(object):
self.config_reader = JFlowConfigReader() self.config_reader = JFlowConfigReader()
if not os.path.isdir(self.get_output_directory()): if not os.path.isdir(self.get_output_directory()):
os.makedirs(self.get_output_directory(), 0751) os.makedirs(self.get_output_directory(), 0751)
self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
# if the file already exists, load all the workflows dump path
if os.path.isfile(self.dump_file):
self.workflows_dump = pickle.load(open(self.dump_file))
else:
self.workflows_dump = {}
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME) self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
def _add_workflow(self, workflow):
WorkflowsManager.lock_workflows_file.acquire()
self.workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
pickle.dump(self.workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
def get_available_workflows(self, parameters_section="parameters"): def get_available_workflows(self, parameters_section="parameters"):
wf_instances = [] wf_instances = []
# Load all modules within the workflow module # Load all modules within the workflow module
...@@ -70,23 +84,18 @@ class WorkflowsManager(object): ...@@ -70,23 +84,18 @@ class WorkflowsManager(object):
for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass): for class_name, obj in inspect.getmembers(sys.modules[modname], inspect.isclass):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section) if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function, parameters_section)
workflow.start() workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
return workflow return workflow
def get_output_directory(self): def get_output_directory(self):
return self.config_reader.get_work_directory() return self.config_reader.get_work_directory()
def get_workflows(self): def get_workflows(self):
from jflow.workflow import Workflow
workflows = [] workflows = []
for wf_name in os.listdir(self.get_output_directory()): for workflow_id in self.workflows_dump:
wf_name_path = os.path.join(self.get_output_directory(), wf_name)
# If it's a workflow directory
if os.path.isdir(wf_name_path):
for wf in os.listdir(wf_name_path):
wf_path = os.path.join(wf_name_path, wf)
workflow_dump_path = os.path.join(wf_path, Workflow.DUMP_FILE_NAME)
try: try:
workflow_dump = open(workflow_dump_path, "rb") workflow_dump = open(self.workflows_dump[workflow_id], "rb")
workflows.append(pickle.load(workflow_dump)) workflows.append(pickle.load(workflow_dump))
workflow_dump.close() workflow_dump.close()
except: pass except: pass
...@@ -103,25 +112,13 @@ class WorkflowsManager(object): ...@@ -103,25 +112,13 @@ class WorkflowsManager(object):
return None return None
def get_workflow(self, workflow_id): def get_workflow(self, workflow_id):
from jflow.workflow import Workflow rworkflow_id = utils.get_nb_string(workflow_id)
search_directory = self.WF_DIRECTORY_PREFIX + utils.get_nb_string(workflow_id) if self.workflows_dump.has_key(rworkflow_id):
workflow_dump_path = None workflow_dump = open(self.workflows_dump[rworkflow_id], "rb")
for wf_name in os.listdir(self.get_output_directory()):
wf_name_path = os.path.join(self.get_output_directory(), wf_name)
# If it's a workflow directory
if os.path.isdir(wf_name_path):
for wf in os.listdir(wf_name_path):
wf_path = os.path.join(wf_name_path, wf)
if wf == search_directory:
if os.path.isfile(os.path.join(wf_path, Workflow.DUMP_FILE_NAME)):
workflow_dump_path = os.path.join(wf_path, Workflow.DUMP_FILE_NAME)
break
if workflow_dump_path:
workflow_dump = open(workflow_dump_path, "rb")
workflow = pickle.load(workflow_dump) workflow = pickle.load(workflow_dump)
workflow_dump.close() workflow_dump.close()
else: else:
raise ValueError("Workflow with id " + str(workflow_id) + " cannot be retrived!") raise ValueError("Workflow with id " + str(rworkflow_id) + " cannot be retrived!")
return workflow return workflow
def get_workflow_directory(self, wname, wid): def get_workflow_directory(self, wname, wid):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment