Commit 5ef8d333 authored by Jerome Mariette's avatar Jerome Mariette
Browse files

No commit message

No commit message
parent 73a6a738
#
# Copyright (C) 2012 INRA
# Copyright (C) 2009 INRA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -15,345 +15,718 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__author__ = 'Marcel Martin'
__author__ = 'Plateforme bioinformatique Midi Pyrenees'
__copyright__ = 'Copyright (C) 2009 INRA'
__license__ = 'GNU General Public License'
__version__ = '1.0'
__email__ = 'support.genopole@toulouse.inra.fr'
__status__ = 'beta'
from collections import namedtuple
import sys
import struct
import os
if sys.version_info[0] < 3:
from itertools import izip as zip
from itertools import izip as zip
else:
basestring = str
from .xopen import xopen
basestring = str
from codecs import getreader, getwriter
from os.path import splitext
import sys
def _shorten(s, n=20):
"""Shorten string s to at most n characters, appending "..." if necessary."""
if s is None:
return None
if len(s) > n:
s = s[:n-3] + '...'
return s
class Sequence:
"""qualities is a string and it contains the qualities encoded as ascii(qual+33)."""
def __init__(self, name, sequence, qualities):
"""Set qualities to None if there are no quality values"""
self.name = name
self.sequence = sequence
self.qualities = qualities
def __getitem__(self, key):
"""slicing"""
return self.__class__(self.name, self.sequence[key], self.qualities[key] if self.qualities is not None else None)
def __repr__(self):
qstr = ''
if self.qualities is not None:
qstr = '\', qualities="{0}"'.format(_shorten(self.qualities))
return 'Sequence(name="{0}", sequence="{1}"{2})'.format(_shorten(self.name), _shorten(self.sequence), qstr)
def __len__(self):
return len(self.sequence)
def __eq__(self, other):
return self.name == other.name and \
self.sequence == other.sequence and \
self.qualities == other.qualities
def __ne__(self, other):
return not self.__eq__(other)
import sys, gzip
def xopen(filename, mode='r'):
"""
Replacement for the "open" function that can also open
files that have been compressed with gzip. If the filename ends with .gz,
the file is opened with gzip.open(). If it doesn't, the regular open()
is used. If the filename is '-', standard output (mode 'w') or input
(mode 'r') is returned.
"""
assert isinstance(filename, basestring)
if filename == '-':
return sys.stdin if 'r' in mode else sys.stdout
if filename.endswith('.gz'):
if sys.version_info[0] < 3:
return gzip.open(filename, mode)
else:
if 'r' in mode:
return getreader('ascii')(gzip.open(filename, mode))
else:
return getwriter('ascii')(gzip.open(filename, mode))
else:
return open(filename, mode)
Sequence = namedtuple("Sequence", "name description sequence qualities")
class FormatError(Exception):
"""
Raised when an input file (FASTA or FASTQ) is malformatted.
"""
pass
"""
Raised when an input file (FASTA or FASTQ) is malformatted.
"""
pass
class FileWithPrependedLine(object):
"""
A file-like object that allows to "prepend" a single
line to an already opened file. That is, further
reads on the file will return the provided line and
only then the actual content. This is needed to solve
the problem of autodetecting input from a stream:
As soon as the first line has been read, we know
the file type, but also that line is "gone" and
unavailable for further processing.
"""
def __init__(self, file, line):
"""
file is an already opened file-like object.
line is a single string (newline will be appended if not included)
"""
if not line.endswith('\n'):
line += '\n'
self.first_line = line
self.file = file
def __iter__(self):
yield self.first_line
for line in self.file:
yield line
"""
A file-like object that allows to "prepend" a single
line to an already opened file. That is, further
reads on the file will return the provided line and
only then the actual content. This is needed to solve
the problem of autodetecting input from a stream:
As soon as the first line has been read, we know
the file type, but also that line is "gone" and
unavailable for further processing.
"""
def __init__(self, file, line):
"""
file is an already opened file-like object.
line is a single string (newline will be appended if not included)
"""
if not line.endswith('\n'):
line += '\n'
self.first_line = line
self.file = file
def __iter__(self):
yield self.first_line
for line in self.file:
yield line
class UnknownFileType(Exception):
"""
Raised when SequenceReader could not autodetect the file type.
"""
pass
"""
Raised when SequenceReader could not autodetect the file type.
"""
pass
def SequenceReader(file, colorspace=False, fileformat=None):
"""
Reader for FASTA and FASTQ files that autodetects the file format.
Returns either an instance of FastaReader or of FastqReader,
depending on file type.
The autodetection can be skipped by setting fileformat to the string
'fasta' or 'fastq'
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
If the file name is available, the file type is detected
by looking at the file name.
If the file name is not available (for example, reading
from standard input), then the file is read and the file
type determined from the content.
"""
if fileformat is not None:
fileformat = fileformat.lower()
if fileformat == 'fasta':
return FastaReader(file)
elif fileformat == 'fastq':
return FastqReader(file, colorspace)
elif fileformat == 'sra-fastq' and colorspace:
return FastqReader(file, colorspace, skip_color=1)
else:
raise UnknownFileType("File format {0} is unknown (expected 'sra-fastq' (only for colorspace), 'fasta' or 'fastq').".format(fileformat))
name = None
if file == "-":
file = sys.stdin
elif isinstance(file, basestring):
name = file
elif hasattr(file, "name"):
name = file.name
if name is not None:
if name.endswith('.gz'):
name = name[:-3]
name, ext = splitext(name)
ext = ext.lower()
if ext in ['.fasta', '.fa', '.fna', '.csfasta', '.csfa']:
return FastaReader(file)
elif ext in ['.fastq', '.fq'] or (ext == '.txt' and name.endswith('_sequence')):
return FastqReader(file, colorspace)
else:
raise UnknownFileType("Could not determine whether this is FASTA or FASTQ: file name extension {0} not recognized".format(ext))
# No name available.
# Assume that 'file' is an open file
# and autodetect its type by reading from it.
for line in file:
if line.startswith('#'):
# Skip comment lines (needed for csfasta)
continue
if line.startswith('>'):
return FastaReader(FileWithPrependedLine(file, line))
if line.startswith('@'):
return FastqReader(FileWithPrependedLine(file, line), colorspace)
raise UnknownFileType("File is neither FASTQ nor FASTA.")
"""
Reader for FASTA, FASTQ, and SFF files that autodetects the file format.
Returns either an instance of FastaReader, FastqReader, SFFReader
depending on file type.
The autodetection can be skipped by setting fileformat to the string
'fasta', 'fastq' or sff
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
If the file name is available, the file type is detected
by looking at the file name.
If the file name is not available (for example, reading
from standard input), then the file is read and the file
type determined from the content.
"""
if fileformat is not None:
fileformat = fileformat.lower()
if fileformat == 'fasta':
return FastaReader(file)
elif fileformat == 'fastq':
return FastqReader(file)
elif fileformat == "sff":
return SFFReader(file)
else:
raise UnknownFileType("File format {} is unknown (expected 'fasta', 'fastq' or 'sff').".format(fileformat))
name = None
if file == "-":
file = sys.stdin
elif isinstance(file, basestring):
name = file
elif hasattr(file, "name"):
name = file.name
if name is not None:
if name.endswith('.gz'):
name = name[:-3]
name, ext = splitext(name)
ext = ext.lower()
if ext in ['.fasta', '.fa', '.fna', '.csfasta', '.csfa']:
return FastaReader(file)
elif ext in ['.fastq', '.fq']:
return FastqReader(file, colorspace)
elif ext in ['.sff']:
return SFFReader(file)
else:
raise UnknownFileType("Could not determine whether this is FASTA, FASTQ or SFF: file name extension {} not recognized".format(ext))
# No name available.
# Assume that 'file' is an open file
# and autodetect its type by reading from it.
# TODO: test if binarie file for SFFReader
for line in file:
if line.startswith('#'):
# Skip comment lines (needed for csfasta)
continue
if line.startswith('>'):
return FastaReader(FileWithPrependedLine(file, line))
if line.startswith('@'):
return FastqReader(FileWithPrependedLine(file, line), colorspace)
raise UnknownFileType("File is neither FASTQ nor FASTA.")
class FastaReader(object):
"""
Reader for FASTA files.
"""
def __init__(self, file, wholefile=False, keep_linebreaks=False):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
If wholefile is True, then it is ok to read the entire file
into memory. This is faster when there are many newlines in
the file, but may obviously need a lot of memory.
keep_linebreaks -- whether to keep the newline characters in the sequence
"""
if isinstance(file, basestring):
file = xopen(file, "r")
self.fp = file
self.wholefile = wholefile
self.keep_linebreaks = keep_linebreaks
assert not (wholefile and keep_linebreaks), "not supported"
def __iter__(self):
"""
Return instances of the Sequence class.
The qualities attribute is always None.
"""
return self._wholefile_iter() if self.wholefile else self._streaming_iter()
def _streaming_iter(self):
"""
Read next entry from the file (single entry at a time).
# TODO this can be quadratic since += is used for the sequence string
"""
name = None
seq = ""
appendchar = '\n' if self.keep_linebreaks else ''
for line in self.fp:
# strip() should also take care of DOS line breaks
line = line.strip()
if line and line[0] == ">":
if name is not None:
assert self.keep_linebreaks or seq.find('\n') == -1
yield Sequence(name, seq, None)
name = line[1:]
seq = ""
else:
seq += line + appendchar
if name is not None:
assert self.keep_linebreaks or seq.find('\n') == -1
yield Sequence(name, seq, None)
def _wholefile_iter(self):
"""
This reads in the entire file at once, but is faster than the above code when there are lots of newlines.
The idea comes from the TAMO package (http://fraenkel.mit.edu/TAMO/), module TAMO.seq.Fasta (author is
David Benjamin Gordon).
"""
wholefile = self.fp.read()
assert '\r' not in wholefile, "Sorry, currently don't know how to deal with files that contain \\r linebreaks"
assert len(wholefile) == 0 or wholefile[0] == '>', "FASTA file must start with '>'"
parts = wholefile.split('\n>')
# first part has '>' in front
parts[0] = parts[0][1:]
for part in parts:
lines = part.split('\n', 1)
yield Sequence(name=lines[0], sequence=lines[1].replace('\n', ''), qualities=None)
def __enter__(self):
if self.fp is None:
raise ValueError("I/O operation on closed FastaReader")
return self
def __exit__(self, *args):
self.fp.close()
"""
Reader for FASTA files.
"""
def __init__(self, file, wholefile=False, keep_linebreaks=False):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
If wholefile is True, then it is ok to read the entire file
into memory. This is faster when there are many newlines in
the file, but may obviously need a lot of memory.
keep_linebreaks -- whether to keep the newline characters in the sequence
"""
if isinstance(file, basestring):
file = xopen(file, "r")
self.fp = file
self.wholefile = wholefile
self.keep_linebreaks = keep_linebreaks
assert not (wholefile and keep_linebreaks), "not supported"
def __iter__(self):
"""
Return instances of the Sequence class.
The qualities attribute is always None.
"""
return self._wholefile_iter() if self.wholefile else self._streaming_iter()
def _streaming_iter(self):
"""
Read next entry from the file (single entry at a time).
# TODO this can be quadratic since += is used for the sequence string
"""
name = None
seq = ""
appendchar = '\n' if self.keep_linebreaks else ''
for line in self.fp:
# strip() should also take care of DOS line breaks
line = line.strip()
if line and line[0] == ">":
if name is not None:
assert self.keep_linebreaks or seq.find('\n') == -1
id = name.split()[0]
desc = " ".join(name.split()[1:])
yield Sequence(id, desc, seq, None)
name = line[1:]
seq = ""
else:
seq += line + appendchar
if name is not None:
assert self.keep_linebreaks or seq.find('\n') == -1
id = name.split()[0]
desc = " ".join(name.split()[1:])
yield Sequence(id, desc, seq, None)
def _wholefile_iter(self):
"""
This reads in the entire file at once, but is faster than the above code when there are lots of newlines.
The idea comes from the TAMO package (http://fraenkel.mit.edu/TAMO/), module TAMO.seq.Fasta (author is
David Benjamin Gordon).
"""
wholefile = self.fp.read()
assert '\r' not in wholefile, "Sorry, currently don't know how to deal with files that contain \\r linebreaks"
assert len(wholefile) == 0 or wholefile[0] == '>', "FASTA file must start with '>'"
parts = wholefile.split('\n>')
# first part has '>' in front
parts[0] = parts[0][1:]
for part in parts:
lines = part.split('\n', 1)
id = lines[0].split()[0]
desc = " ".join(lines[0].split()[1:])
yield Sequence(id, desc, lines[1].replace('\n', ''), None)
def __enter__(self):
if self.fp is None:
raise ValueError("I/O operation on closed FastaReader")
return self
def __exit__(self, *args):
self.fp.close()
class FastqReader(object):
"""
Reader for FASTQ files. Does not support multi-line FASTQ files.
"""
def __init__(self, file, colorspace=False, skip_color=0):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
colorspace -- Usually (when this is False), there must be n characters in the sequence and
n quality values. When this is True, there must be n+1 characters in the sequence and n quality values.
"""
if isinstance(file, basestring):
file = xopen(file, "r")
self.fp = file
self.colorspace = colorspace
self.skip_color = skip_color
self.twoheaders = False
def __iter__(self):
"""
Return tuples: (name, sequence, qualities).
qualities is a string and it contains the unmodified, encoded qualities.
"""
lengthdiff = 1 if self.colorspace else 0
for i, line in enumerate(self.fp):
if i % 4 == 0:
if not line.startswith('@'):
raise FormatError("at line {0}, expected a line starting with '+'".format(i+1))
name = line.strip()[1:]
elif i % 4 == 1:
sequence = line.strip()
elif i % 4 == 2:
line = line.strip()
if not line.startswith('+'):
raise FormatError("at line {0}, expected a line starting with '+'".format(i+1))
if len(line) > 1:
self.twoheaders = True
if not line[1:] == name:
raise FormatError(
"At line {0}: Two sequence descriptions are given in "
"the FASTQ file, but they don't match "
"('{1}' != '{2}')"
" perhaps you should try the 'sra-fastq' format?".format(i+1, name, line.rstrip()[1:]))
elif i % 4 == 3:
qualities = line.rstrip("\n\r")[self.skip_color:]
if len(qualities) + lengthdiff != len(sequence):
raise ValueError("Length of quality sequence and length of read do not match (%d+%d!=%d)" % (len(qualities), lengthdiff, len(sequence)))
yield Sequence(name, sequence, qualities)
def __enter__(self):
if self.fp is None:
raise ValueError("I/O operation on closed FastqReader")
return self
def __exit__(self, *args):
self.fp.close()
"""
Reader for FASTQ files. Does not support multi-line FASTQ files.
"""
def __init__(self, file, colorspace=False):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
colorspace -- Usually (when this is False), there must be n characters in the sequence and
n quality values. When this is True, there must be n+1 characters in the sequence and n quality values.
"""
if isinstance(file, basestring):
file = xopen(file, "r")
self.fp = file
self.colorspace = colorspace
self.twoheaders = False
def __iter__(self):
"""
Return tuples: (name, sequence, qualities).
qualities is a string and it contains the unmodified, encoded qualities.
"""
lengthdiff = 1 if self.colorspace else 0
for i, line in enumerate(self.fp):
if i % 4 == 0:
if not line.startswith('@'):
raise FormatError("at line {}, expected a line starting with '+'".format(i+1))
name = line.strip()[1:]
elif i % 4 == 1:
sequence = line.strip()
elif i % 4 == 2:
line = line.strip()
if not line.startswith('+'):
raise FormatError("at line {}, expected a line starting with '+'".format(i+1))
if len(line) > 1:
self.twoheaders = True
if not line[1:] == name:
raise FormatError(
"At line {}: Two sequence descriptions are given in "
"the FASTQ file, but they don't match "
"('{}' != '{}')".format(i+1, name, line.rstrip()[1:]))
elif i % 4 == 3:
qualities = line.rstrip("\n\r")
if len(qualities) + lengthdiff != len(sequence):
raise ValueError("Length of quality sequence and length of read do not match (%d+%d!=%d)" % (len(qualities), lengthdiff, len(sequence)))
id = name.split()[0]
desc = " ".join(name.split()[1:])
yield Sequence(id, desc, sequence, qualities)
def __enter__(self):
if self.fp is None:
raise ValueError("I/O operation on closed FastqReader")
return self
def __exit__(self, *args):
self.fp.close()
class SFFReader(object):
"""
Reader for SFF files.
"""
def __init__(self, file):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
"""
if isinstance(file, basestring):
file = xopen(file, "r")
self.fp = file
self.header_data = self.read_header(file)
def read_bin_fragment(self, struct_def, fileh, offset=0, data=None, byte_padding=None):
'''It reads a chunk of a binary file.
You have to provide the struct, a file object, the offset (where to start
reading).
Also you can provide an optional dict that will be populated with the
extracted data.
If a byte_padding is given the number of bytes read will be a multiple of
that number, adding the required pad at the end.
It returns the number of bytes reads and the data dict.
'''
if data is None:
data = {}
#we read each item
bytes_read = 0
for item in struct_def:
#we go to the place and read
fileh.seek(offset + bytes_read)
n_bytes = struct.calcsize(item[1])
buffer = fileh.read(n_bytes)
read = struct.unpack('>' + item[1], buffer)
if len(read) == 1:
read = read[0]
data[item[0]] = read
bytes_read += n_bytes
#if there is byte_padding the bytes_to_read should be a multiple of the
#byte_padding
if byte_padding is not None:
pad = byte_padding
bytes_read = ((bytes_read + pad - 1) // pad) * pad
return (bytes_read, data)