Commit 5e5a32b5 authored by Jerome Mariette's avatar Jerome Mariette

better handle of ng6workflow

parent a6eb000d
......@@ -25,6 +25,7 @@ except ImportError:
from jflow.workflows_manager import WorkflowsManager
from jflow.workflow import Workflow
from ng6.ng6workflow import NG6Workflow
if __name__ == '__main__':
......@@ -46,9 +47,14 @@ if __name__ == '__main__':
sub_parser.set_defaults(cmd_object=instance)
args = vars(parser.parse_args())
if isinstance(args["cmd_object"], Workflow):
if isinstance(args["cmd_object"], NG6Workflow):
#try:
args["cmd_object"].init_workflow(args)
args["cmd_object"].execute(args)
#except (IOError, NameError), e:
# sys.exit("Error: " + str(e))
\ No newline at end of file
elif isinstance(args["cmd_object"], Workflow):
#try:
args["cmd_object"].execute(args)
#except (IOError, NameError), e:
# sys.exit("Error: " + str(e))
\ No newline at end of file
......@@ -32,9 +32,32 @@ from ng6.run import Run
from ng6.config_reader import NG6ConfigReader
from ng6.t3MySQLdb import t3MySQLdb
from ng6.utils import Utils
from jflow.component import Component
class Analyse (object):
def add_analysis(analysis_serialized_path, pr_serialized_path, *files):
from ng6.analyse import Analyse
from ng6.project import Project
from ng6.run import Run
import pickle
# load the analysis object
analysis_dump = open(analysis_serialized_path, "rb")
analysis = pickle.load(analysis_dump)
analysis_dump.close()
# load the project or the run
pr_dump = open(pr_serialized_path, "rb")
pr = pickle.load(pr_dump)
pr_dump.close()
# process the parsing of the analysis
analysis.process()
# save files to save
for file in analysis.files_to_save:
analysis._save_file(file, gzip=True)
# add the analysis to the run or the project
pr.add_analyse(analysis)
class Analyse (Component):
"""
Class Analyse: Define an nG6 Analyse
"""
......@@ -42,22 +65,24 @@ class Analyse (object):
# The directories structure into ng6
DIRECTORIES_STRUCTURE = "/data/analyze/"
def __init__(self, name, description, software, options, version, id=None, parent_analysis=None):
def __init__(self, is_dynamic=False, name=None, description=None,
software=None, options=None, id=None, parent_analysis=None):
"""
Build a Analyse object
@param name : the analyse name
@param description : the analyse description
@param software : the software used to run the analyse
@param options : the options used to run the software
@param version : the software version
@param id : the analyse id if not defined
@param parent_analysis : the parent analyse
"""
Component.__init__(self, is_dynamic)
self.name = name
self.description = description
self.software = software
self.options = options
self.version = version
self.version = self.get_version()
self.files_to_save = []
self.results = {}
# Set the temp folder to the ng6 temp folder
......@@ -91,20 +116,30 @@ class Analyse (object):
os.makedirs(work_dir, 0751)
self.directory = os.path.join(self.DIRECTORIES_STRUCTURE, directory_name)
def __str__(self):
def get_version(self):
"""
Return the tool version, has to be implemented by subclasses
"""
Return a full description of the analyse
"""
return "name="+self.name+";description="+self.description+";software="+self.software+";version="+self.version+";options="+self.options
raise NotImplementedError
def define_analysis(self):
"""
Define all analysis attributs, has to be implemented by subclasses
"""
raise NotImplementedError
def process(self, **kargs):
"""
Process the analyse, has to be implemented in the sub class
"""
raise NotImplementedError
def __str__(self):
"""
Return a full description of the analyse
"""
return "name="+str(self.name)+";description="+str(self.description)+";software="+\
str(self.software)+";version="+str(self.version)+";options="+str(self.options)
def sync(self):
"""
......@@ -182,7 +217,7 @@ class Analyse (object):
"""
# First add the run into the database
t3mysql = t3MySQLdb()
self.id = t3mysql.add_analyse(str(self.__class__).split(".")[1][:-2], self.name, self.description,
self.id = t3mysql.add_analyse(self.__class__.__name__, self.name, self.description,
datetime.date.today(), self.directory, self.software, self.version,
self.options, parent_uid=self.parent_uid)
......
......@@ -18,15 +18,48 @@
import inspect
import os
import pickle
import datetime
from jflow.workflow import Workflow
from weaver.function import PythonFunction
from ng6.analyse import add_analysis
from ng6.project import Project
from ng6.run import Run
class NG6Workflow (Workflow):
"""
"""
def init_workflow(self, args):
self.project = None
self.runobj = None
# if starting from an existing project
if args.has_key("project_id"):
self.project = Project.get_from_id(args["project_id"])
# if starting from an existing run
if args.has_key("run_id"):
self.runobj = Run.get_from_id(args["run_id"])
# if starting from a brand new run
elif args.has_key("run_date") and args.has_key("run_name") and args.has_key("species") and args.has_key("data_nature") \
and args.has_key("type") and args.has_key("run_description") and args.has_key("sequencer") and args.has_key("project_id"):
# build the project
self.project = Project.get_from_id(args["project_id"])
run_date = args["run_date"].split("/")
# and the run
self.runobj = Run(args["run_name"], datetime.date(int(run_date[2]), int(run_date[1]), int(run_date[0])),
args["species"], args["data_nature"], args["type"], args["run_description"],
args["sequencer"])
# if it's a run with some index, let's write down description
if args["mids_description"]:
mids_desc_array = {}
for mids_desc in args["mids_description"].split(";"):
parts = mids_desc.split(":")
mids_desc_array[parts[0]] = parts[1]
self.runobj.add_mids_description(mids_desc_array)
# then add the run to the project
self.project.add_run(self.runobj)
def __add_analysis_behaviour(self, fn, input_files, arguments):
def new_run(*args, **kwargs):
fn(*args, **kwargs)
......@@ -34,7 +67,7 @@ class NG6Workflow (Workflow):
add(inputs=input_files, arguments=arguments)
return new_run
def add_component(self, component_name, args, component_prefix="default"):
def add_component(self, component_name, args, component_prefix="default", addto="run"):
# first build and check if this component is OK
myPckge = __import__(self.compPckg[component_name], globals(), locals(), [component_name], -1)
# build the object and define required field
......@@ -56,10 +89,16 @@ class NG6Workflow (Workflow):
analysis_dump = open(analysis_dump_path, "wb")
pickle.dump(cmpt_object, analysis_dump)
analysis_dump.close()
args = [analysis_dump_path]
pr_dump_path = self.get_temporary_file(".dump")
pr_dump = open(pr_dump_path, "wb")
if addto == "project":
pickle.dump(self.project, pr_dump)
elif addto == "run":
pickle.dump(self.runobj, pr_dump)
pr_dump.close()
args = [analysis_dump_path, pr_dump_path]
output_files = []
for table in cmpt_object.dump_files:
output_files.extend(cmpt_object.dump_files[table])
output_files.extend(cmpt_object.files_to_save)
cmpt_object.run = self.__add_analysis_behaviour(cmpt_object.run, output_files, args)
break
......
......@@ -253,7 +253,6 @@ class Run(object):
"""
if self.id != None :
# First define the run
my_analyse.set_run(self)
analyse_id = my_analyse.save()
t3mysql = t3MySQLdb()
t3mysql.add_analyze_to_run(self.id, analyse_id)
......
......@@ -16,25 +16,43 @@
#
import os
from subprocess import Popen, PIPE
from jflow.component import Component
from ng6.analyse import Analyse
from weaver.function import ShellFunction
from weaver.abstraction import Map
class SFFextract (Component):
class SFFextract (Analyse):
def define_parameters(self, input_files):
self.input_files = input_files
self.output_files = self.get_outputs('{basename_woext}.fastq', self.input_files)
def define_analysis(self):
self.name = "Convert"
self.description = "Convert sff file in fastq format."
self.software = "sff_extract.py"
self.options = "-c"
self.files_to_save = self.output_files
def process(self):
pass
def get_version(self):
cmd = [self.get_exec_path("sff_extract.py"), "--version"]
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
return stdout.split()[1]
def run(self):
sff_extract = ShellFunction(self.get_exec_path("sff_extract.py") + " -c $1 -s $2", cmd_format='{EXE} {IN} {OUT}')
sff_extract = Map(sff_extract, self.input_files, self.output_files)
class SFFfile (Component):
class SFFfile (Analyse):
def __init__(self):
Component.__init__(self, is_dynamic=True)
......@@ -45,7 +63,19 @@ class SFFfile (Component):
self.midscheme = midscheme
self.mids_config_file = mids_config_file
self.stdout = os.path.join(self.output_directory, "sfffile.stdout")
def define_analysis(self):
self.name = "Demultiplexing"
self.description = "Demultiplex sequences per samples."
self.software = "sfffile"
self.options = ""
def process(self):
pass
def get_version(self):
return "v1.0"
def run(self):
sfffile = ShellFunction(self.get_exec_path("sfffile") + " -s $1 -mcf $2 -o $3 $4 > $5", cmd_format='{EXE} {ARG} {IN} {OUT}')
sfffile(inputs=self.sff_file, outputs=self.stdout, arguments=[self.midscheme, self.mids_config_file, self.prefix])
......@@ -15,48 +15,31 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
import os
import glob
from jflow.workflow import Workflow
from ng6.project import Project
from ng6.run import Run
from ng6.ng6workflow import NG6Workflow
class R454 (Workflow):
class R454 (NG6Workflow):
def run(self):
# first check if file provided exists
if not os.path.isfile(self.args["data_file"]):
sys.exit(2, "error: %s file does not exists\n" % (file))
# build the project and the run
my_project = Project.get_from_id(self.args["project_id"])
run_date = self.args["run_date"].split("/")
my_run = Run(self.args["run_name"], datetime.date(int(run_date[2]), int(run_date[1]), int(run_date[0])),
self.args["species"], self.args["data_nature"], self.args["type"], self.args["run_description"],
self.args["sequencer"])
# if it's a run with some index, let's write down description
# if it's a run with some index, let's split raw file
if self.args["mids_description"]:
mids_desc_array = {}
for mids_desc in self.args["mids_description"].split(";"):
parts = mids_desc.split(":")
mids_desc_array[parts[0]] = parts[1]
my_run.add_mids_description(mids_desc_array)
[mids_config_file, midscheme] = my_run.export_mids_to_newbler_cfg_file(self.args["demux_error"])
[mids_config_file, midscheme] = self.runobj.export_mids_to_newbler_cfg_file(self.args["demux_error"])
# extract sff per mids
sfffile = self.add_component("SFFfile", [self.args["data_file"], my_run.name, midscheme, mids_config_file])
sfffile = self.add_component("SFFfile", [self.args["data_file"], self.runobj.name, midscheme, mids_config_file])
files_to_process = glob.glob(os.path.join(sfffile.output_directory, "*.sff"))
else:
files_to_process = [self.args["data_file"]]
# then add the run to the project
my_project.add_run(my_run)
# add raw files to the run
addrawfiles = self.add_component("AddRawFiles", [my_run, self.args["data_file"], self.args["compression"]])
addrawfiles = self.add_component("AddRawFiles", [self.runobj, self.args["data_file"], self.args["compression"]])
# extract the files to process
sff_extract = self.add_component("SFFextract", [files_to_process])
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment