Commit dfe93504 authored by Jerome Mariette's avatar Jerome Mariette

put back jflow libraries in source

parent b59de43b
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__version__ = '1.0'
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import inspect
import tempfile
from jflow.workflows_manager import WorkflowsManager
from jflow.config_reader import JFlowConfigReader
from jflow.dataset import ArrayList
from weaver.util import parse_string_list
class Component(object):
"""
"""
def __init__(self, is_dynamic=False):
"""
@param is_dynamic: if true, this component has dynamic inputs/outputs
"""
self.is_dynamic = is_dynamic
self.prefix = "default"
self.output_directory = None
self.config_reader = JFlowConfigReader()
def get_outputs(self, output_list=None, input_list=None):
"""
If `output_list` is a string template, then it may have the following
fields:
- `{fullpath}`, `{FULL}` -- Full input file path.
- `{basename}`, `{BASE}` -- Base input file name.
- `{fullpath_woext}`, `{FULLWE}` -- Full input file path without extension
- `{basename_woext}`, `{BASEWE}` -- Base input file name without extension
"""
if output_list is None:
return []
if isinstance(output_list, str):
if not input_list or not '{' in str(output_list):
return [output_list]
return [os.path.join(self.output_directory, str(output_list).format(
fullpath = input,
FULL = input,
i = '{0:05X}'.format(i),
NUMBER = '{0:05X}'.format(i),
fullpath_woext = os.path.splitext(input)[0],
FULL_WOEXT = os.path.splitext(input)[0],
basename = os.path.basename(input),
BASE = os.path.basename(input),
basename_woext = os.path.splitext(os.path.basename(input))[0] if os.path.splitext(os.path.basename(input))[1] != ".gz" else os.path.splitext(os.path.splitext(os.path.basename(input))[0])[0],
BASE_WOEXT = os.path.splitext(os.path.basename(input))[0] if os.path.splitext(os.path.basename(input))[1] != ".gz" else os.path.splitext(os.path.splitext(os.path.basename(input))[0])[0]))
for i, input in enumerate(parse_string_list(input_list))]
def execute(self):
# first create the output directory
os.makedirs(self.output_directory, 0751)
# then run the component
self.run()
def run(self):
"""
Run the component, has to be implemented by subclasses
"""
raise NotImplementedError
def get_temporary_file(self, suffix=".txt"):
# first check if tmp directory exists
if not os.path.isdir(self.config_reader.get_tmp_directory()):
os.makedirs(self.config_reader.get_tmp_directory(), 0751)
tempfile_name = os.path.basename(tempfile.NamedTemporaryFile(suffix=suffix).name)
return os.path.join(self.config_reader.get_tmp_directory(), tempfile_name)
def define_parameters(self, *args):
"""
Define the component parameters, has to be implemented by subclasses
"""
raise NotImplementedError
def get_exec_path(self, software):
exec_path = self.config_reader.get_exec(software)
if exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), software)):
exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), software)
elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)):
exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "../bin", software)
elif exec_path is None and os.path.isfile(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)):
exec_path = os.path.join(os.path.dirname(inspect.getfile(self.__class__)), "bin", software)
return exec_path
def __eq__(self, other):
return self.__class__ == other.__class__ and self.prefix == other.prefix
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from jflow.component import Component
from jflow.utils import split_and_rename_seq, get_file_base
from weaver.function import ShellFunction
class FormatDB (Component):
def __init__(self):
Component.__init__(self, is_dynamic=True)
def define_parameters(self, input_fasta, protein=False, nb_seq_per_file=None):
self.input_fasta = input_fasta
self.protein = protein
self.database = os.path.join(self.output_directory, os.path.basename(input_fasta))
def run(self):
# first make the symbolic link
os.symlink(self.input_fasta, self.database)
if self.protein: p = "T"
else: p = "F"
formatdb = ShellFunction(self.get_exec_path("formatdb") + " -p " + p + " -i $1 -n $2", cmd_format='{EXE} {IN} {OUT}')
formatdb(inputs=self.input_fasta, outputs=self.database)
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from jflow.component import Component
from jflow.utils import split_and_rename_seq, get_file_base
from weaver.function import PythonFunction, ShellFunction
from weaver.abstraction import Merge, Map
class SplitAndRename (Component):
def __init__(self):
Component.__init__(self, is_dynamic=True)
def define_parameters(self, input_file, nb_seq_per_file=None):
self.input_file = input_file
self.nb_seq_per_file = nb_seq_per_file
self.renamed_seq_file = os.path.join(self.output_directory, get_file_base(input_file) + ".numeric_ids.fasta")
def run(self):
rename = PythonFunction(split_and_rename_seq)
rename(outputs=self.renamed_seq_file, arguments=[self.input_file, self.output_directory, self.nb_seq_per_file])
class MergeFiles (Component):
def define_parameters(self, input_files, output_file_name="merged.txt", on_basename=False):
self.input_files = input_files
self.on_basename = on_basename
self.merged_files = []
if self.on_basename:
self.basenames = {}
self.merged_basenames = {}
for file in self.input_files:
basename = os.path.basename(file).split(".")[0]
if self.basenames.has_key(basename):
self.basenames[basename].append(file)
else:
self.basenames[basename] = [file]
out_name = os.path.join(self.output_directory, basename+"_"+output_file_name)
if not self.merged_basenames.has_key(basename):
self.merged_basenames[basename] = out_name
self.merged_files.append(out_name)
else:
self.merged_files.append(os.path.join(self.output_directory, output_file_name))
def run(self):
if self.on_basename and len(self.basenames) < len(self.input_files):
for basename in self.basenames:
merge = Merge(self.basenames[basename], self.merged_basenames[basename], local=False)
else:
merge = Merge(self.input_files, self.merged_files[0], local=False)
class Sort (Component):
def define_parameters(self, input_files, column=1):
self.input_files = input_files
self.column = column
self.sorted_files = self.get_outputs('{basename_woext}.sorted', self.input_files)
def run(self):
sort = ShellFunction("sort -k 1,1 -n $1 > $2", cmd_format='{EXE} {IN} {OUT}')
blast = Map(sort, self.input_files, self.sorted_files)
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import types
import os
import inspect
from ConfigParser import ConfigParser, NoOptionError
from jflow.parameter import Parameter
class WorkflowConfigReader(object):
"""
"""
def __init__(self, path):
"""
"""
self.reader = ConfigParser()
self.reader.read(path)
def get_name(self):
return self.reader.get("global", "name")
def get_description(self):
return self.reader.get("global", "description")
def get_parameters(self):
"""
"""
parameters = []
try:
params = {}
# First grab all parameters name
for param in self.reader.items("parameters"):
cvalue = self.reader.get("parameters", param[0])
cparam = param[0].split(".")[0]
ckey = param[0].split(".")[1]
if not params.has_key(cparam):
params[cparam] = {ckey: cvalue}
else:
params[cparam][ckey] = cvalue
# Then build params
for param in params:
name = params[param]["name"]
flag = params[param]["flag"]
help = params[param]["help"]
try: default = params[param]["default"]
except: default = None
try:
type = eval(params[param]["type"])
metavar = params[param]["type"]
except:
type = types.StringType
metavar = "str"
try: required = params[param]["required"].lower() in ("yes", "y", "true", "t", "1")
except: required = False
try: action = params[param]["action"]
except: action = "store"
try: choices = params[param]["choices"].split("|")
except: choices = None
oparam = Parameter(name=name, flag=flag, help=help, default=default,
type=type, required=required, metavar=metavar,
action=action, choices=choices)
parameters.append(oparam)
except:
pass
return parameters
class JFlowConfigReader(object):
"""
"""
CONFIG_FILE_PATH = "../application.properties"
def __init__(self):
"""
"""
self.reader = ConfigParser()
self.reader.read(os.path.join(os.path.dirname(inspect.getfile(self.__class__)), self.CONFIG_FILE_PATH))
def get_tmp_directory(self):
return self.reader.get("storage", "tmp_directory")
def get_working_directory(self):
return self.reader.get("storage", "working_directory")
def get_exec(self, software):
try:
return self.reader.get("softwares", software)
except NoOptionError, e:
return None
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from weaver.dataset import *
class ArrayList(Dataset):
""" This :class:`Dataset` consists of file paths specified by an ``Array``
expression.
"""
def __init__(self, array):
Dataset.__init__(self)
self.array = array
@cache_generation
def _generate(self):
return (MakeFile(normalize_path(f.strip(), os.curdir), self.nest)
for f in self.array)
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from weaver.compat import execfile
from weaver.logger import D_SCRIPT, debug, fatal
from weaver.nest import Nest
from weaver.options import Options
from weaver.util import Container
from weaver.script import Script
from weaver.script import ABSTRACTIONS
from weaver.script import DATASETS
from weaver.script import FUNCTIONS
from weaver.script import NESTS
from weaver.script import OPTIONS
from weaver.script import STACKS
import weaver.logger
import collections
import os
import sys
import time
class Wfunction(object):
""" Weaver Script class.
Parses command line environment and sets up run-time configuration.
"""
def __init__(self, function=None, force=False, import_builtins=True, output_directory=None,
execute_dag=False, engine_wrapper=None, engine_arguments=None, args=[]):
self.function = function
self.arguments = args
self.force = force # Ignore warnings
self.import_builtins = True # Load built-ins
if output_directory is None:
self.output_directory = os.curdir # Where to create artifacts
else:
self.output_directory = output_directory
self.start_time = time.time() # Record beginning of compiling
self.options = Options()
self.nested_abstractions = False
self.inline_tasks = 1
self.execute_dag = execute_dag
self.globals = {}
self.engine_wrapper = engine_wrapper
self.engine_arguments = engine_arguments
self.include_symbols = False
debug(D_SCRIPT, 'force = {0}'.format(self.force))
debug(D_SCRIPT, 'import_builtins = {0}'.format(self.import_builtins))
debug(D_SCRIPT, 'output_directory = {0}'.format(self.output_directory))
debug(D_SCRIPT, 'start_time = {0}'.format(self.start_time))
debug(D_SCRIPT, 'options = {0}'.format(self.options))
debug(D_SCRIPT, 'nested_abstractions = {0}'.format(self.nested_abstractions))
debug(D_SCRIPT, 'inline_tasks = {0}'.format(self.inline_tasks))
debug(D_SCRIPT, 'execute_dag = {0}'.format(self.execute_dag))
debug(D_SCRIPT, 'engine_wrapper = {0}'.format(self.engine_wrapper))
debug(D_SCRIPT, 'engine_arguments = {0}'.format(self.engine_arguments))
def _import(self, module, symbols):
""" Import ``symbols`` from ``module`` into global namespace. """
# Import module
m = 'weaver.{0}'.format(module)
m = __import__(m, self.globals, self.globals, symbols, -1)
# Import symbols from module into global namespace, which we store as
# an attribute for later use (i.e. during compile)
for symbol in symbols:
self.globals[symbol] = getattr(m, symbol)
debug(D_SCRIPT, 'Imported {0} from {1}'.format(symbol, module))
def compile(self):
""" Compile script in the specified working directory. """
# Save active script instance and set this one as active
work_dir = self.output_directory
# Add nest path and path to script to Python module path to allow
# for importing modules outside of $PYTHONPATH
sys.path.insert(0, os.path.abspath(os.path.dirname(work_dir)))
# Load built-ins if specified on command line. If built-ins are
# not automatically loaded by the Script object, then the user must
# load them manually in their Weaver scripts using the standard
# Python import facilities.
if self.import_builtins:
self._import('abstraction', ABSTRACTIONS)
self._import('dataset', DATASETS)
self._import('function', FUNCTIONS)
self._import('nest', NESTS)
self._import('options', OPTIONS)
self._import('stack', STACKS)
# Execute nest
with Nest(work_dir, wrapper=self.engine_wrapper) as nest:
with self.options:
try:
self.function(*self.arguments)
nest.compile()
except Exception as e:
fatal(D_SCRIPT, 'Error compiling script: {0}'.format(e), print_traceback=True)
if self.execute_dag:
debug(D_SCRIPT, 'Executing generated DAG {0} with {1}'.format(
nest.dag_path, nest.path))
nest.execute(self.engine_arguments, exit_on_failure=True)
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import types
import os
class Output(object):
def __init__(self):
pass
class OutputFile(Output):
def __init__(self, file):
self.file = file
def __str__(self):
return self.file
class OutputFiles(Output):
def __init__(self, files):
self.files = []
for file in files:
self.files.append(file)
def __str__(self):
return " ".join(self.files)
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import types
class Parameter(object):
"""
"""
def __init__(self, name, flag, help, default=None, type=types.StringType, choices=None,
required=False, metavar=None, action="store"):
self.name = name
self.flag = flag
self.help = help
self.default = default
self.type = type
self.choices = choices
self.required = required
self.metavar = str(metavar).upper()
self.action = action
def __str__(self):
return self.name + ": " + self.flag + " | " + self.help + " (default=" + \
self.default + ", required=" + str(self.required) + ")"
\ No newline at end of file
This diff is collapsed.
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
def get_file_base (file):
"""
Return the file base
@param file: input file
"""
# First import required libraries
import os
file_base = os.path.splitext(os.path.basename(file))[0]
if os.path.splitext(file)[1] == ".gz":
file_base = os.path.splitext(file_base)[0]
return file_base
def get_nb_string(value, length=6):
"""
Return a string of n caracters.
@param value: the value to get representation
@param length: the final length required
"""
zeros = ""
s_value = str(value)
for i in range(length - len(s_value)):
zeros += "0";
s_value = zeros + s_value
return s_value
def split_and_rename_seq (fasta_file , outdir, nb_seq_per_file=None, outext="fasta"):
"""
Split a multi fasta in multiple fasta with n sequences per file
@param fasta_file: input multifasta file
@param outdir: output directory
@param nb_seq_per_file: number of sequences per file, if None no split will