Commit dc08c829 authored by Penom Nom's avatar Penom Nom

Update jflow to new version and split NG6wf / CasavaWF

parent c0f2136e
This diff is collapsed.
......@@ -78,6 +78,12 @@ if __name__ == '__main__':
required=True, dest="component_name")
sub_parser.set_defaults(cmd_object="reset")
# Add delete workflow availability
sub_parser = subparsers.add_parser("delete", help="Delete a workflow")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be deleted",
required=True, dest="workflow_id")
sub_parser.set_defaults(cmd_object="delete")
# Add rerun workflow availability
sub_parser = subparsers.add_parser("execution-graph", help="Display the workflow execution graph")
sub_parser.add_argument("--workflow-id", type=str, help="Which workflow should be considered",
......@@ -129,22 +135,54 @@ if __name__ == '__main__':
elif args["cmd_object"] == "rerun":
wfmanager.rerun_workflow(args["workflow_id"])
elif args["cmd_object"] == "reset":
wfmanager.reset_workflow_component(args["workflow_id"], args["component_name"])
try:
wfmanager.reset_workflow_component(args["workflow_id"], args["component_name"])
except Exception as e:
utils.display_error_message(str(e))
elif args["cmd_object"] == "delete":
try:
wfmanager.delete_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
elif args["cmd_object"] == "execution_graph":
workflow = wfmanager.get_workflow(args["workflow_id"])
try:
workflow = wfmanager.get_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
gr = workflow.get_execution_graph()
print "nodes: ", gr.nodes()
inputs, components = [], []
for node in gr.nodes():
if Workflow.INPUT_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(node)
elif Workflow.INPUTS_GRAPH_LABEL in gr.node_attributes(node):
inputs.append(node)
elif Workflow.COMPONENT_GRAPH_LABEL in gr.node_attributes(node):
components.append(node)
print "inputs: ", inputs
print "components: ", components
print "edges: ", gr.edges()
elif args["cmd_object"] == "status":
if args["workflow_id"]:
workflow = wfmanager.get_workflow(args["workflow_id"])
print workflow.get_status_under_text_format(True, args["display_errors"])
try:
workflow = wfmanager.get_workflow(args["workflow_id"])
except Exception as e:
utils.display_error_message(str(e))
print Workflow.get_status_under_text_format(workflow, True, args["display_errors"])
else:
workflows = wfmanager.get_workflows()
try:
workflows = wfmanager.get_workflows(use_cache=True)
except Exception as e:
utils.display_error_message(str(e))
if len(workflows) > 0:
workflows_by_id, wfids = {}, []
# first sort workflow by ID
for workflow in workflows:
wfids.append(workflow.id)
workflows_by_id[workflow.id] = workflow
status = "ID\tNAME\tSTATUS\tELAPSED_TIME\tSTART_TIME\tEND_TIME\n"
for i, workflow in enumerate(workflows):
status += workflow.get_status_under_text_format()
for i, wfid in enumerate(sorted(wfids, reverse=True)):
status += Workflow.get_status_under_text_format(workflows_by_id[wfid])
if i<len(workflows)-1: status += "\n"
else: status = "no workflow available"
print status
This diff is collapsed.
......@@ -81,11 +81,6 @@ if __name__ == '__main__':
# Copy nG6 folders
dirutil.copy_tree(os.path.join(ng6_path, "ui", "nG6"), os.path.join(args["web_path"], "typo3conf", "ext", "nG6"))
os.mkdir(os.path.join(args["web_path"], "fileadmin", "templates"))
dirutil.copy_tree(os.path.join(ng6_path, "ui", "nG6template"),
os.path.join(args["web_path"], "fileadmin", "templates", "nG6template"))
os.symlink(os.path.join(ng6reader.get("storage", "save_directory"), "data"),
os.path.join(args["web_path"], "fileadmin", "data"))
......@@ -121,9 +116,9 @@ if __name__ == '__main__':
f.write("?>\n")
#template html
with open( os.path.join(args["web_path"], "fileadmin", "templates", "nG6template","index.html")) as f:
with open( os.path.join(args["web_path"], "typo3conf", "ext", "nG6", "template", "index.html")) as f:
template = f.readlines()
with open(os.path.join(args["web_path"], "fileadmin", "templates", "nG6template","index.html"),"w") as f:
with open(os.path.join(args["web_path"], "typo3conf", "ext", "nG6", "template", "index.html"),"w") as f:
for line in template:
line = re.sub("<li><a href=\"mailto:@\">Contact us</a></li>", "<li><a href=\"mailto:"+args["email"]+"\">Contact us</a></li>" + "\">",line)
f.write(line)
......@@ -131,12 +126,7 @@ if __name__ == '__main__':
f.write("?>\n")
with open(db_path,"r") as f:
database = f.readlines()
ng6_tables=[]
ng6_re_table= re.compile ("CREATE TABLE IF NOT EXISTS `(.*)` \(")
for iline in range(0, len(database)):
res = ng6_re_table.match(database[iline])
if res :
ng6_tables.append(res.group(1))
database[iline] = re.sub("class=\"homeitem\"><a href=\"([\w/]+)\">", "class=\"homeitem\"><a href=\"/" + os.path.basename(args["web_path"]) + "\">",database[iline])
database[iline] = re.sub("plugin.tx_nG6_pi1.data=([\w/.-]+)", "plugin.tx_nG6_pi1.data=" + fileadmin, database[iline])
database[iline] = re.sub("plugin.tx_nG6_pi1.FromEmail=[\w\.]+\@[\w\.]+", "plugin.tx_nG6_pi1.FromEmail=" + args["email"], database[iline])
......
This diff is collapsed.
......@@ -21,7 +21,7 @@ import inspect
from ConfigParser import ConfigParser, NoOptionError
from jflow.utils import which
from jflow.utils import which, display_error_message
class JFlowConfigReader(object):
......@@ -70,8 +70,8 @@ class JFlowConfigReader(object):
exec_path = None
if exec_path is None: exec_path = "makeflow"
if which(exec_path) == None:
sys.stderr.write("Error: 'makeflow' path connot be retrieved either in the PATH and in the application.properties file!\n")
sys.exit(1)
logging.getLogger("jflow").exception("'makeflow' path connot be retrieved either in the PATH and in the application.properties file!")
raise Exception("'makeflow' path connot be retrieved either in the PATH and in the application.properties file!")
return exec_path
def get_date_format(self):
......
This diff is collapsed.
......@@ -16,6 +16,7 @@
#
import re
import sys
import smtplib
import socket
import math
......@@ -27,6 +28,14 @@ except:
DNS = None
class ServerError(Exception): pass
def display_error_message(msg):
sys.stderr.write("\033[91mError: "+msg+"\n\033[0m")
sys.exit(1)
def display_info_message(msg, with_exit=False):
sys.stderr.write("\033[93mInfo: "+msg+"\n\033[0m")
if with_exit: sys.exit(1)
def which(program):
"""
Return if the asked program exist in the user path
......
This diff is collapsed.
......@@ -24,13 +24,12 @@ import imp
import os
import pickle
import threading
import logging
import jflow.utils as utils
from jflow.config_reader import JFlowConfigReader
class WorkflowsManager(object):
"""
"""
IDS_FILE_NAME = "jflowIDs.txt"
WORKFLOWS_DUMP_FILE_NAME = ".workflows.dump"
......@@ -40,23 +39,23 @@ class WorkflowsManager(object):
lock_workflows_file = threading.Lock()
def __init__(self):
"""
"""
self.config_reader = JFlowConfigReader()
if not os.path.isdir(self.get_output_directory()):
os.makedirs(self.get_output_directory(), 0751)
self.dump_file = os.path.join(self.get_output_directory(), self.WORKFLOWS_DUMP_FILE_NAME)
self.ids_file = os.path.join(self.get_output_directory(), self.IDS_FILE_NAME)
def _add_workflow(self, workflow):
def _dump_workflows(self, workflows):
WorkflowsManager.lock_workflows_file.acquire()
# first load the existing workflows
try:
workflows_dump = pickle.load(open(self.dump_file))
except:
workflows_dump = {}
# then add the new one
workflows_dump[utils.get_nb_string(workflow.id)] = workflow.dump_path
# then add the new ones
for workflow in workflows:
workflows_dump[utils.get_nb_string(workflow.id)] = {"dump_path": workflow.dump_path,
"object": workflow.minimize()}
# and save them
pickle.dump(workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
......@@ -89,11 +88,15 @@ class WorkflowsManager(object):
if hasattr(workflow, "stderr"):
workflow.set_stderr()
workflow.start()
# Update the workflow in the cache
self._dump_workflows([workflow])
return workflow
def reset_workflow_component(self, workflow_id, component_name):
workflow = self.get_workflow(workflow_id)
workflow.reset_component(component_name)
# Update the workflow in the cache
self._dump_workflows([workflow])
return workflow
def run_workflow(self, workflow_class, args, function="process"):
......@@ -105,32 +108,64 @@ class WorkflowsManager(object):
if class_name == workflow_class: workflow = obj(args, self.get_next_id(), function)
workflow.start()
# Add the workflow dump path to the workflows dump
self._add_workflow(workflow)
self._dump_workflows([workflow])
return workflow
def delete_workflow(self, workflow_id):
from jflow.workflow import Workflow
WorkflowsManager.lock_workflows_file.acquire()
try:
workflows_dump = pickle.load(open(self.dump_file))
except:
workflows_dump = {}
rworkflow_id = utils.get_nb_string(workflow_id)
try:
workflow_dump = open(workflows_dump[rworkflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
# if workflow is not in a running status
if workflow.get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
workflow.delete()
del workflows_dump[rworkflow_id]
workflow_dump.close()
except:
logging.getLogger("jflow").debug("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
raise Exception("Workflow #" + rworkflow_id + " connot be retrieved in the available workflows!")
# and save them
pickle.dump(workflows_dump, open(self.dump_file, "w"))
WorkflowsManager.lock_workflows_file.release()
def get_workflow_errors(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_errors()
def get_workflow_ouputs(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_outputs_per_components()
def get_output_directory(self):
return self.config_reader.get_work_directory()
def get_workflows(self):
def get_workflow_outputs(self, workflow_id):
workflow = self.get_workflow(workflow_id)
return workflow.get_outputs_per_components()
def get_workflows(self, use_cache=False):
from jflow.workflow import Workflow
workflows = []
try:
workflows_dump = pickle.load(open(self.dump_file))
except:
workflows_dump = {}
updated_workflows = []
for workflow_id in workflows_dump:
try:
workflow_dump = open(workflows_dump[workflow_id], "rb")
workflows.append(pickle.load(workflow_dump))
workflow_dump.close()
except: pass
# is the workflow completed, failed or aborted use the miniworkflow cached
if use_cache and workflows_dump[workflow_id]["object"].get_status() in [Workflow.STATUS_COMPLETED, Workflow.STATUS_FAILED, Workflow.STATUS_ABORTED]:
workflows.append(workflows_dump[workflow_id]["object"])
else:
try:
workflow_dump = open(workflows_dump[workflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflows.append(workflow)
updated_workflows.append(workflow)
workflow_dump.close()
except: pass
self._dump_workflows(updated_workflows)
return workflows
def get_workflow_by_class(self, workflow_class):
......@@ -150,12 +185,12 @@ class WorkflowsManager(object):
except:
workflows_dump = {}
if workflows_dump.has_key(rworkflow_id):
workflow_dump = open(workflows_dump[rworkflow_id], "rb")
workflow_dump = open(workflows_dump[rworkflow_id]["dump_path"], "rb")
workflow = pickle.load(workflow_dump)
workflow_dump.close()
else:
sys.stderr.write("Error: Workflow with id " + str(rworkflow_id) + " cannot be retrived!\n")
sys.exit(1)
logging.getLogger("jflow").debug("Workflow #" + str(rworkflow_id) + " connot be retrieved in the available workflows!")
raise Exception("Workflow #" + str(rworkflow_id) + " connot be retrieved in the available workflows!")
return workflow
def get_workflow_directory(self, wname, wid):
......@@ -180,4 +215,4 @@ class WorkflowsManager(object):
ifh.close()
finally:
WorkflowsManager.lock_ids_file.release()
return cid
\ No newline at end of file
return cid
......@@ -38,6 +38,9 @@ class BasicNG6Workflow (Workflow):
def __init__(self, args={}, id=None, function= "process"):
Workflow.__init__(self, args, id, function)
self.add_parameter("admin_login", "Who is the project administrator", required = True, type = adminlogin, display_name="Admin login")
self.add_parameter("to", "The email address, and optionally name of the message's recipient", group="Email")
self.add_parameter("subject", "A brief summary of the topic of the message", group="Email")
self.add_parameter("message", "The content of the message", group="Email")
def add_component(self, component_name, args=[], kwargs={}, component_prefix="default", parent=None, addto="run"):
# first build and check if this component is OK
......@@ -104,7 +107,7 @@ class NG6Workflow (BasicNG6Workflow):
def __init__(self, args={}, id=None, function= "process"):
BasicNG6Workflow.__init__(self, args, id, function)
self._add_run_parameters()
self._add_sample_parameters()
self.__add_sample_parameters__()
self.samples = []
self.reads1 = []
......@@ -121,7 +124,7 @@ class NG6Workflow (BasicNG6Workflow):
self.add_parameter("species", "Which species has been sequenced", required = True, display_name="Species", group="Run information")
self.add_parameter("run_type", "What type of data is it (1 lane, 1 region)", flag = "--type", required = True, display_name="Type", group="Run information")
def _add_sample_parameters(self):
def __add_sample_parameters__(self):
self.add_multiple_parameter_list("input_sample", "Definition of a sample", flag="--sample", required = True)
self.add_parameter("sample_id", "The uniq identifier of the sample", add_to = "input_sample")
self.add_parameter("sample_name", "A descriptive name for the sample", add_to = "input_sample")
......@@ -215,33 +218,26 @@ class CasavaNG6Workflow(NG6Workflow):
def __init__(self, args={}, id=None, function= "process"):
NG6Workflow.__init__(self, args, id, function)
self._add_casava_parameters()
self.is_casava = False
self.group_prefix = None
self.undetermined_reads1 = []
self.undetermined_reads2 = []
def _add_casava_parameters(self):
def __add_sample_parameters__(self):
self.add_multiple_parameter("casava", "Definition of a casava directory for sample retrieving", required = True)
self.add_parameter("casava_directory", "Path to the CASAVA directory", required=True, type=casavadir, add_to = "casava")
self.add_parameter("lane", "The lane number", required=True, type=int, add_to = "casava")
self.add_exclusion_rule('casava', 'input_sample')
def __create_samples__(self):
if self.casava and self.casava['casava_directory'] and self.casava['lane'] :
self.__preprocess_casavadir(self.casava['casava_directory'], '_'.join( self.project.get_name().split() ), self.casava['lane'])
else :
NG6Workflow.__create_samples__(self)
def __preprocess_casavadir(self, casava_directory, project_name, lane_number):
"""
Parse SampleSheep.mk file from a casava directory and return a list of Sample object
@param casava_directory : path to CASAVA output directory
@param project_name : files in each sample are part of this project
@param lane_number : files in each sample are sequenced on this lane
"""
self.is_casava = True
casava_directory = self.casava['casava_directory']
project_name = '_'.join( self.project.get_name().split() )
lane_number = self.casava['lane']
with open(os.path.join(casava_directory, "SampleSheet.mk")) as fh :
barcodes_list = []
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment