Commit 78fdedf5 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

No commit message

No commit message
parent d9c35cb1
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from subprocess import Popen, PIPE
from jflow.component import Component
from jflow.iotypes import OutputFile, InputFile, Formats
from weaver.function import PythonFunction, ShellFunction
def blast_formatdb(exec_path, type, input_fasta, databank, stdout_path, stderr_path):
from subprocess import Popen, PIPE
# first make the symbolic link
os.symlink(input_fasta, databank)
# then execute bwa index
cmd = [exec_path, "-p", type, "-i", databank]
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
# write down the stdout
stdoh = open(stdout_path, "w")
stdoh.write(stdout)
stdoh.close()
# write down the stderr
stdeh = open(stderr_path, "w")
stdeh.write(stderr)
stdeh.close()
class FormatDB (Component):
def define_parameters(self, input_fasta, protein=False):
self.input_fasta = InputFile(input_fasta, Formats.FASTA)
self.protein = protein
self.databank = OutputFile(os.path.join(self.output_directory, os.path.basename(input_fasta)))
self.stdout = OutputFile(os.path.join(self.output_directory, "formatdb.stdout"))
self.stderr = OutputFile(os.path.join(self.output_directory, "formatdb.stderr"))
def process(self):
if self.protein: p = "T"
else: p = "F"
formatdb = PythonFunction(blast_formatdb, cmd_format="{EXE} {ARG} {IN} {OUT}")
formatdb(inputs=self.input_fasta, outputs=[self.databank, self.stdout, self.stderr], arguments=[self.get_exec_path("formatdb"), p])
def get_version(self):
cmd = [self.get_exec_path("formatdb"), "--help"]
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
for line in stdout.split("\n"):
if line.startswith("formatdb"):
return line.split()[1]
return "-"
\ No newline at end of file
#
# Copyright (C) 2012 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from jflow.component import Component
from jflow.iotypes import OutputFileList, InputFileList, Formats
from weaver.abstraction import Merge
class MergeFiles (Component):
def define_parameters(self, input_files, output_file_name="merged.txt", on_basename=False):
self.input_files = InputFileList(input_files)
self.on_basename = on_basename
self.merged_files = OutputFileList([])
if self.on_basename:
self.basenames = {}
self.merged_basenames = {}
for file in self.input_files:
basename = os.path.basename(file).split(".")[0]
if self.basenames.has_key(basename):
self.basenames[basename].append(file)
else:
self.basenames[basename] = [file]
out_name = os.path.join(self.output_directory, basename+"_"+output_file_name)
if not self.merged_basenames.has_key(basename):
self.merged_basenames[basename] = out_name
self.merged_files.append(out_name)
else:
self.merged_files.append(os.path.join(self.output_directory, output_file_name))
def process(self):
if self.on_basename and len(self.basenames) < len(self.input_files):
for basename in self.basenames:
merge = Merge(self.basenames[basename], self.merged_basenames[basename], local=False)
else:
merge = Merge(self.input_files, self.merged_files[0], local=False)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment