addrawfiles.py 11.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#
# Copyright (C) 2012 INRA
# 
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import pickle
20
import logging
21
22
23

from jflow.component import Component

24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def extract_stats_from_seq_file(input_file,output_file_stat):
    import jflow.seqio as seqio
    import logging
    import os
    logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. Entering, working on "+input_file)
    nb_seq, full_size = 0, 0
    try:
        # Get nb_seq and full_size values
        reader = seqio.SequenceReader(input_file)
        for id, desc, seq, qualities in reader:
             nb_seq += 1
             full_size += len(seq)
        f = open(output_file_stat, 'w')
        f.write(str(nb_seq)+":"+str(full_size))  # python will convert \n to os.linesep
        f.close()
    except:
        logging.getLogger("AddRawFiles").debug(file + " seems to not be a seq file")

    logging.getLogger("AddRawFiles").debug("extract_stats_from_seq_files. finished")

def concatenate_stats_file(total_file_stat, *files_to_sync):
    import jflow.seqio as seqio
    import logging
    import os
    from subprocess import Popen, PIPE
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. before md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. content of work_directory of the run ("+os.path.dirname(total_file_stat)+") = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. md5sum in " + str(total_file_stat))
    p = Popen("md5sum " + " ".join(files_to_sync) +"|awk -F/ {'print $1,$NF'} > "+ total_file_stat   ,shell=True,stdout=PIPE,stderr=PIPE,universal_newlines=True)
    stdout,stderr = p.communicate()
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. after md5sum Does work_directory of the run ("+os.path.dirname(total_file_stat)+")exists?" + str(os.path.isdir(os.path.dirname(total_file_stat))))
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. Content of the folder ("+os.path.dirname(total_file_stat)+" = " + ",".join(os.listdir(os.path.dirname(total_file_stat))))  
    logging.getLogger("AddRawFiles").debug("concatenate_stats_file. finished")
    
def add_raw_files (run_dump_path,*files):
59
    import pickle
60
61
62
63
64
65
66
67
68
69
70
71
    import logging
    total_nb_seq = 0
    total_size = 0
    for curr_stat_file in files:
        logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file)
        with open(curr_stat_file, 'r') as myfile:
            nb_seq,size=myfile.read().split(':')
            logging.getLogger("AddRawFiles").debug("add_raw_files. Content of " + curr_stat_file + " nb_seq = " + str(nb_seq) + ", size = " + str(size))
        size= int(size)
        nb_seq= int(nb_seq)
        total_size+=size
        total_nb_seq+=nb_seq
72
73
74
75
    # load the analysis object
    run_dump = open(run_dump_path, "rb")
    my_run = pickle.load(run_dump)
    run_dump.close()
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    logging.getLogger("AddRawFiles").debug("add_raw_files. Before my_run.process_raw_files total_nb_seq = " + str(total_nb_seq) + ", total_size = " + str(total_size))
    logging.getLogger("AddRawFiles").debug("add_raw_files. work_directory of the run "+ my_run.get_work_directory())
    my_run.process_raw_files(total_size,total_nb_seq)
    logging.getLogger("AddRawFiles").debug("add_raw_files. after my_run.process_raw_files, does the work dir ("+my_run.get_work_directory()+") exist? "+ str(os.path.isdir(my_run.get_work_directory())))
    logging.getLogger("AddRawFiles").debug("add_raw_files. Content of the folder ("+my_run.get_work_directory()+" = " + ",".join(os.listdir(my_run.get_work_directory())))

def copy_file (file,output):
    import os
    import logging
    from shutil import copyfile
    logging.getLogger("AddRawFiles").debug("copy_file. copy " + str(file) +" to " + str(output))
    
    try:
        logging.getLogger("AddRawFiles").debug("copy_file. Does work_directory of the run ("+os.path.dirname(output)+")exists?" + str(os.path.isdir(os.path.dirname(output))))
        copyfile(file, output)
        logging.getLogger("AddRawFiles").debug("copy_file. Content of the folder ("+os.path.dirname(output)+" = " + ",".join(os.listdir(os.path.dirname(output))))
    except:
        logging.getLogger("AddRawFiles").error("copy_file. Error raised while copying "+file +" to " + output)
        #raise Exception('Could not ')

def zip_file (file,output):
    import os
    import logging
    from ng6.utils import Utils
    
    logging.getLogger("AddRawFiles").debug("zip_file. zip " + str(file) +" to " + os.path.dirname(output))
    Utils.gzip(file, os.path.dirname(output))
    
def bzip2_file (file,output):
    import os
    import logging
    from ng6.utils import Utils
    
    logging.getLogger("AddRawFiles").debug("bzip2_file. zip " + str(file) +" to " + os.path.dirname(output))
    Utils.bz2(file, os.path.dirname(output))
    
def tar_zip_files (output,files):
    import os
    import logging
    from ng6.utils import Utils
    
    logging.getLogger("AddRawFiles").debug("tar_zip_files.")
    Utils.tar_files(files, os.path.join(os.path.dirname(output), "ng6_archive.tar"), delete)
    Utils.gzip(os.path.join(os.path.dirname(output), archive_name), os.path.dirname(output), False)
120

121
122
123
124
125
126
127
128
129
130
    
def tar_bz2_files (output, files):
    import os
    import logging
    from ng6.utils import Utils
    
    logging.getLogger("AddRawFiles").debug("tar_bz2_files.")
    Utils.tar_files(files, os.path.join(self.__get_work_directory(), archive_name), delete)
    Utils.bz2(os.path.join(self.__get_work_directory(), archive_name), self.__get_work_directory(), False)
     
131
132
class AddRawFiles (Component):
    
133
    def define_parameters(self, runobj, files_to_save, compression):
134
        self.runobj = runobj
Penom Nom's avatar
Penom Nom committed
135
136
137
        self.add_input_file_list( "files_to_save", "File to be saved as raw files", default=files_to_save, required=True)
        self.add_parameter("compression", "How should data be compressed once archived", default=compression, required = True)
        self.add_output_file("stdout", "AddRawfiles stdout", filename="addRawFiles.stdout")
138
139
140
        self.add_output_file("total_stat_file", "total_stat", filename="total_stat.stdout")
        self.add_output_file_list("files_to_save_stats", "Files containing stat of the corresponding read file", items=self.files_to_save)
        self.add_output_file_list("files_to_sync", "Files in the work directory of the run, to be synchronized", items=self.files_to_save)
141
    def process(self):
142
        self.runobj.raw_files = self.files_to_save
143
        logging.getLogger("AddRawFiles").debug("process. work_directory of the run "+ self.runobj.get_work_directory())
144
145
146
147
        run_dump_path = self.get_temporary_file(".dump")
        run_dump = open(run_dump_path, "wb")
        pickle.dump(self.runobj, run_dump)
        run_dump.close()
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        logging.getLogger("AddRawFiles").debug("process. begin does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
        
        files_to_save_stats = self.get_outputs( '{basename_woext}.count', self.files_to_save)
        logging.getLogger("AddRawFiles").debug("process. before extract_stats_from_seq_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
        
        for i,o in zip(self.files_to_save,files_to_save_stats ):
                self.add_python_execution(extract_stats_from_seq_file,cmd_format="{EXE} {IN} {OUT}",
                                      inputs = i, outputs = o, map=False)

        logging.getLogger("AddRawFiles").debug("process. before add_raw_files. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
        
        self.add_python_execution(add_raw_files, cmd_format='{EXE} {ARG} {IN} > {OUT}', map=False,
                                 outputs=self.stdout, inputs=files_to_save_stats,includes=self.files_to_save, arguments=[run_dump_path])
        logging.getLogger("AddRawFiles").debug("process. before self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
        
        files_to_sync_ori = []
        if self.compression=="none":
            for file in self.files_to_save:
                files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)))
            files_to_sync = files_to_sync_ori
            for idx, file in enumerate(self.files_to_save):
                self.add_python_execution(copy_file,cmd_format="{EXE} {IN} {OUT}",
                                      inputs = file, outputs = files_to_sync[idx], map=False)
        elif self.compression=="gz":
            for file in self.files_to_save:
                files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)) + ".gz")
            files_to_sync = files_to_sync_ori
            for idx, file in enumerate(self.files_to_save):
                self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
                                      inputs = file, outputs = files_to_sync[idx], map=False)
        elif self.compression=="bz2":
            for file in self.files_to_save:
                files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),os.path.basename(file)) + ".bz2")
            files_to_sync = files_to_sync_ori
            for idx, file in enumerate(self.files_to_save):
                self.add_python_execution(zip_file,cmd_format="{EXE} {IN} {OUT}",
                                      inputs = file, outputs = files_to_sync[idx], map=False)
        elif mode == "tar.gz":
            files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.gz"))
            files_to_sync = files_to_sync_ori
            self.add_python_execution(tar_zip_files,cmd_format="{EXE} {OUT} {IN}",
                                      inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)

        elif mode == "tar.bz2":
            files_to_sync_ori.append(os.path.join(self.runobj.get_work_directory(),"ng6_archive.tar.bz2"))
            files_to_sync = files_to_sync_ori
            self.add_python_execution(tar_bz2_files,cmd_format="{EXE} {OUT} {IN}",
                                      inputs = self.files_to_save, outputs = files_to_sync[idx], map=False)
        logging.getLogger("AddRawFiles").debug("process. after self.compression. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))    
        total_stat_file = os.path.join(self.runobj.get_work_directory(), "md5sum.txt")
        self.add_python_execution(concatenate_stats_file,cmd_format="{EXE} {OUT} {IN}",
                                      inputs = files_to_sync, outputs = total_stat_file, map=False)
        logging.getLogger("AddRawFiles").debug("process. after concatenate_stats_file. does the work dir ("+self.runobj.get_work_directory()+") exist? "+ str(os.path.isdir(self.runobj.get_work_directory())))
        
    def post_process(self):
        logging.getLogger("AddRawFiles").debug("post_process. does the work dir ("+self.runobj.get_work_directory()+") exist? ")
204