Commit a40f987f authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add writing of output GTF file

parent 14b056db
......@@ -2,20 +2,22 @@
import os
import pysam
import re
import sys
import yaml
import subprocess
from collections import OrderedDict
PRG_PATH = os.path.dirname(os.path.realpath(__file__))
class Miniannotator:
def __init__(self, reference, assembly, map=None):
def __init__(self, reference, assembly, qoverlap=0.9, map=None):
self.conf = self._load_config()
self.reference = reference
self.assembly = assembly
self.afasta = pysam.FastaFile(self.assembly)
self.qoverlap = qoverlap
self.map = map
@staticmethod
......@@ -55,20 +57,211 @@ class Miniannotator:
self.map = map_f
return rcode
def search_genes(self):
def _query_overlap(self, read):
"""
Get query mapping length
:param read: read pysam object
:type read: pysam.libcalignedsegment.AlignedSegment
:return: read mapping length on target percent [0-1]
:rtype: float
"""
return (read.query_alignment_length / read.infer_query_length()) if read.cigarstring is not None else -1
# Remove reads without cigarstring
def _read_pass(self, read):
"""
Check if read pass the mapping size filter
:param read: read pysam object
:type read: pysam.libcalignedsegment.AlignedSegment
:return: True if pass the filter, else False
:rtype: float
"""
return self._query_overlap(read) >= self.qoverlap
def _read_tposition(self, read):
"""
Get mapping position of read on the reference
:param read: read pysam object
:type read: pysam.libcalignedsegment.AlignedSegment
:return: position (start, end)
:rtype: int, int
"""
return read.reference_start, read.reference_end
def _get_query_sequence(self, contig, start, end):
return str(self.afasta.fetch(reference=contig, start=start, end=end))
def _exons(self, read, start):
"""
Define exons on reference with read split, and get indels
:param read: read pysam object
:type read: pysam.libcalignedsegment.AlignedSegment
:param start: start position of the read on the reference
:type start: int
:return:
"""
exons = []
indels = []
exon_start = start
exon_end = start
qstart = 0
complete = False
for cigar in read.cigartuples:
operation = cigar[0]
length = cigar[1]
if operation == 0: # MATCH
exon_end += length
complete = False
qstart += length
elif operation == 1: # INSERTION
indels.append({"type": "ins", "start": exon_end, "end": exon_end + length,
"seq": self._get_query_sequence(read.query_name, qstart, qstart+length),
"contig": "%s:%d-%d" % (read.query_name, qstart+1, qstart+length)}) # 1-based coords
qstart += length
elif operation == 2: # DELETION
indels.append({"type": "del", "start": exon_end, "end": exon_end + length})
exon_end += length
elif operation == 3: # SPLIT
exons.append((exon_start, exon_end))
complete = True
exon_start = exon_end + length
exon_end = exon_start
elif operation == 4: # SOFT CLIP
exon_start += length
qstart += length
elif operation == 5: # HARD CLIP
pass
else:
print("Operation not expected: %d" % operation)
if not complete:
exons.append((exon_start, exon_end))
return exons, indels
@staticmethod
def _sort_genes_ids(item):
match = re.match(r"([^:]+):(\d+)-(\d+)([+-])(_(\d))?", item)
return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4), \
int(match.group(6)) if match.group(6) is not None else 0
def _write_gene(self, gtf, gene, exons, indels):
"""
Write gene into GTF file
:param gtf: GTF file handler
:type gtf: _io.TextIOWrapper
:param gene: dictionnary describing gene. Required keys : id, seqname, start, end, strand
:type gene: dict
:param exons: list of exons. Each exon is a tuple (start, end)
:type exons: list
:param indels: list of indels. Each indel is a dictionnary with keys type (ins or del), start and end
:type indels: list
:return:
"""
line = '{seqname}\tminiannotator\t{feature}\t{start}\t{end}\t.\t{strand}\t.\tgene_id "{gene_id}" {attrs}\n'
gene_line = line.format(
seqname=gene["seqname"],
feature="gene",
start=gene["start"],
end=gene["end"],
strand=gene["strand"],
gene_id=gene["id"],
attrs=""
)
gtf.write(gene_line)
for exon in sorted(exons):
exon_line = line.format(
seqname=gene["seqname"],
feature="exon",
start=exon[0] + 1, # Transform 0-based => 1-based
end=exon[1],
strand=gene["strand"],
gene_id=gene["id"],
attrs=""
)
gtf.write(exon_line)
if len(indels) > 0:
for indel in sorted(indels, key=lambda x: (x["start"], x["end"])):
indel_line = line.format(
seqname=gene["seqname"],
feature=indel["type"],
start=indel["start"] + 1, # Transform 0-based => 1-based
end=indel["end"],
strand=gene["strand"],
gene_id=gene["id"],
attrs="" if indel["type"] == "del" else ('sequence "%s" contig_pos "%s"' % (indel["seq"],
indel["contig"]))
)
gtf.write(indel_line)
def search_genes(self, gtf_file):
"""
Parse BAM file to search genes and exons positions
Query match position on the reference defines gene position
Splice reads define exons: introns are the "N" in the cigarline
Save also DEL/INS events in genes
:return:
:param gtf_file: output GTF file path
:type gtf_file: str
"""
genes = OrderedDict()
exons = OrderedDict()
indels = OrderedDict()
print("Searching genes in genome...", flush=True)
genes = {}
exons = {}
indels = {}
align = pysam.AlignmentFile(self.map)
for read in align:
if self._read_pass(read):
start, end = self._read_tposition(read)
g_exons, g_indels = self._exons(read, start)
reference = read.reference_name
gene_id = "%s:%d-%d" % (reference, start, end) + ("-" if read.is_reverse else "+")
if gene_id in genes:
it = 2
new_gene_id = gene_id + "_" + str(it)
while new_gene_id in genes:
it += 1
new_gene_id = gene_id + "_" + str(it)
else:
genes[gene_id] = None
exons[gene_id] = g_exons
indels[gene_id] = g_indels
print("Sorting genes...", flush=True)
genes_order = sorted(genes.keys(), key=lambda x: self._sort_genes_ids(x))
print("Writing genes in GTF file...", flush=True)
gene_id_nb = 0
with open(os.path.join(gtf_file), "w") as gtf:
for gene in genes_order:
match = re.match(r"([^:]+):(\d+)-(\d+)([+-])(_(\d))?", gene)
if match.group(6) is None:
gene_id_nb += 1
gene_id = "GENE%0.10d"
else:
gene_id = "GENE%0.10d_" + match.group(6)
gene_c = {
"id": gene_id % gene_id_nb,
"seqname": match.group(1),
"start": int(match.group(2)) + 1, # Transform 0-based => 1-based
"end": int(match.group(3)),
"strand": match.group(4)
}
g_exons = exons[gene]
g_indels = indels[gene]
self._write_gene(gtf=gtf,
gene=gene_c,
exons=g_exons,
indels=g_indels)
if __name__ == "__main__":
......@@ -79,6 +272,8 @@ if __name__ == "__main__":
parser.add_argument("-a", "--assembly", help="De novo assembly fasta file", required=True)
parser.add_argument("-m", "--map", help="BAM map file build with minimap2 (if not given, will be computed now)",
required=False)
parser.add_argument("-q", "--min-qoverlap", help="Minimal query overlap [0-100]", type=int, default=90,
required=False)
parser.add_argument("-o", "--output-dir", help="Output folder path", required=False, default=".")
args = parser.parse_args()
......@@ -88,12 +283,20 @@ if __name__ == "__main__":
elif not os.path.isdir(args.output_dir):
print("Error: output folder %s exists but is not a folder" % args.output_dir, file=sys.stderr)
annotator = Miniannotator(args.reference, args.assembly)
# Map:
if args.map is None:
annotator = Miniannotator(reference=args.reference,
assembly=args.assembly,
qoverlap=args.min_qoverlap / 100)
map_file = os.path.join(args.output_dir, "map.bam")
annotator.launch_minimap(map_file)
else:
print("Using map from %s..." % args.map)
map_file = args.map
print("Using map from %s..." % args.map, flush=True)
annotator = Miniannotator(reference=args.reference,
assembly=args.assembly,
qoverlap=args.min_qoverlap / 100,
map=args.map)
# Search genes
annotator.search_genes(os.path.join(args.output_dir, "annotations.gtf"))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment