Commit 392c7ce0 authored by Floreal Cabanettes's avatar Floreal Cabanettes
Browse files

Add transcripts + exon ID + indel exon association

parent b7badedd
#!/usr/bin/env python3
from collections import OrderedDict
import multiprocessing
import os
import pysam
import re
import subprocess
import sys
import yaml
import subprocess
PRG_PATH = os.path.dirname(os.path.realpath(__file__))
......@@ -17,6 +19,7 @@ class Miniannotator:
self.reference = reference
self.assembly = assembly
self.afasta = pysam.FastaFile(self.assembly)
self.chr_order = pysam.FastaFile(self.reference).references
self.qoverlap = qoverlap
self.map = map
......@@ -110,6 +113,7 @@ class Miniannotator:
exon_end = start
qstart = 0
complete = False
exon_nb = 1
for cigar in read.cigartuples:
operation = cigar[0]
length = cigar[1]
......@@ -118,16 +122,18 @@ class Miniannotator:
complete = False
qstart += length
elif operation == 1: # INSERTION
indels.append({"type": "ins", "start": exon_end, "end": exon_end + length,
indels.append({"type": "ins", "start": exon_end + 1, "end": exon_end + length, # 1-based coords
"seq": self._get_query_sequence(read.query_name, qstart, qstart+length),
"contig": "%s:%d-%d" % (read.query_name, qstart+1, qstart+length)}) # 1-based coords
"contig": "%s:%d-%d" % (read.query_name, qstart+1, qstart+length), # 1-based coords
"exon": exon_nb})
qstart += length
elif operation == 2: # DELETION
indels.append({"type": "del", "start": exon_end, "end": exon_end + length})
indels.append({"type": "del", "start": exon_end + 1, "end": exon_end + length, "exon": exon_nb}) # 1-based coords
exon_end += length
elif operation == 3: # SPLIT
exons.append((exon_start, exon_end))
exons.append({"start": exon_start + 1, "end": exon_end, "nb": exon_nb}) # 1-based coords
complete = True
exon_nb += 1
exon_start = exon_end + length
exon_end = exon_start
elif operation == 4: # SOFT CLIP
......@@ -138,7 +144,7 @@ class Miniannotator:
else:
print("Operation not expected: %d" % operation)
if not complete:
exons.append((exon_start, exon_end))
exons.append({"start": exon_start + 1, "end": exon_end, "nb": exon_nb}) # 1-based coords
return exons, indels
@staticmethod
......@@ -147,58 +153,216 @@ class Miniannotator:
return match.group(1), int(match.group(2)), int(match.group(3)), match.group(4), \
int(match.group(6)) if match.group(6) is not None else 0
def _write_gene(self, gtf, gene, exons, indels):
def _build_genes_objects(self, genes):
"""
Write gene into GTF file
:param gtf: GTF file handler
:type gtf: _io.TextIOWrapper
:param gene: dictionnary describing gene. Required keys : id, seqname, start, end, strand
:type gene: dict
:param exons: list of exons. Each exon is a tuple (start, end)
:type exons: list
:param indels: list of indels. Each indel is a dictionnary with keys type (ins or del), start and end
:type indels: list
:return:
Build genes candidates by chromosomes
:param genes: genes definition, by position. Key: position of the gene (chr:start-stop{strand}), value:
other genes with same coordinates (if any)
:type genes: dict
:return: genes dict object. For each chromosome, on each strand, each gene (or gene transcript,
not distinguishable yet) is described by it start end and strand
"""
genes_by_chr = {}
for gene in genes:
match = re.match(r"([^:]+):(\d+)-(\d+)([+-])(_(\d))?", gene)
seqname = match.group(1)
strand = match.group(4)
gene_c = {
"seqname": seqname,
"start": int(match.group(2)) + 1, # Transform 0-based => 1-based
"end": int(match.group(3)),
"strand": strand,
"gene_id": gene
}
gene_c["length"] = gene_c["end"] - gene_c["start"]
keydict = seqname + strand
if keydict not in genes_by_chr:
genes_by_chr[keydict] = []
genes_by_chr[keydict].append(gene_c)
return genes_by_chr
@staticmethod
def _build_genes_transcripts_by_chr(genes):
"""
Parallelise staff by chromosome and strand of chromosomes
:param genes: genes object list. Each gene is a dict with 3 keys: start, end and strand. All genes
are located on the same strand
:type genes: list
:return: dictionnary: {seqname => chromosome name, strand => strand of genes, genes => dict of genes}
:rtype: dict
"""
line = '{seqname}\tminiannotator\t{feature}\t{start}\t{end}\t.\t{strand}\t.\tgene_id "{gene_id}" {attrs}\n'
gene_line = line.format(
seqname=gene["seqname"],
feature="gene",
start=gene["start"],
end=gene["end"],
strand=gene["strand"],
gene_id=gene["id"],
attrs=""
)
gtf.write(gene_line)
for exon in sorted(exons):
exon_line = line.format(
seqname=gene["seqname"],
feature="exon",
start=exon[0] + 1, # Transform 0-based => 1-based
end=exon[1],
strand=gene["strand"],
gene_id=gene["id"],
attrs=""
)
gtf.write(exon_line)
if len(indels) > 0:
for indel in sorted(indels, key=lambda x: (x["start"], x["end"])):
indel_line = line.format(
genes.sort(key=lambda x: x["start"])
seqname = genes[0]["seqname"]
strand = genes[0]["strand"]
all_genes = {}
interval = None
interval_genes = []
for gene in genes:
if interval is None:
interval = [gene["start"], gene["end"]]
interval_genes.append(gene)
else:
# If gene overlaps with the interval
if interval[0] < gene["start"] < interval[1] or interval[0] < gene["end"] < interval[1] \
or gene["start"] < interval[0] < gene["end"] or gene["start"] < interval[1] < gene["end"] \
or (gene["start"] == interval[0] and gene["end"] == interval[1]):
interval_genes.append(gene)
interval[0] = min(interval[0], gene["start"])
interval[1] = max(interval[1], gene["end"])
else:
all_genes[tuple(interval + [strand])] = interval_genes
interval = [gene["start"], gene["end"]]
interval_genes = [gene]
if interval is not None:
all_genes[tuple(interval + [strand])] = interval_genes
return {"seqname": seqname, "strand": strand, "genes": all_genes}
def _merge_by_chromosomes(self, genes_pool):
"""
Merge genes detection build by chromosome and by strand to make it only by chromosome
:param genes_pool: list of genes by chromosome and by strand
:type genes_pool: list
:return: genes by chromosome
:rtype: dict
"""
all_genes = {}
for pool in genes_pool:
seqname = pool["seqname"]
if seqname not in all_genes:
all_genes[seqname] = pool["genes"]
else:
all_genes[seqname] = {**all_genes[seqname], **pool["genes"]}
return all_genes
def _build_genes_transcripts(self, genes, exons, indels):
"""
Build genes and transcripts using overlap.
Two genes that share at least one base are considered as two alternative transcripts of the same gene
:param genes: genes definition, by position. Key: position of the gene (chr:start-stop{strand}), value:
other genes with same coordinates (if any)
:type genes: dict
:return: genes with associated transcripts, and exons and indels associated to them
"""
print("Computing transcripts...", flush=True)
genes_by_chr = self._build_genes_objects(genes)
pool = multiprocessing.Pool(processes=self.conf["threads"])
all_genes = self._merge_by_chromosomes(pool.map(Miniannotator._build_genes_transcripts_by_chr,
genes_by_chr.values()))
print("Sorting genes...", flush=True)
chr_order = sorted(all_genes.keys())
gene_id_nb = 1
full_genes = OrderedDict()
print("Building final genes object...", flush=True)
for chrm in chr_order:
chr_genes = all_genes[chrm]
chr_genes_list = sorted(chr_genes.keys())
for c_gene in chr_genes_list:
gene_id = "GENE%0.10d" % gene_id_nb
full_genes[gene_id] = {
"seqname": chrm,
"start": c_gene[0],
"end": c_gene[1],
"strand": c_gene[2],
"transcripts": OrderedDict()
}
transcripts = chr_genes[c_gene]
tr_nb = 1
for transcript in transcripts:
tr_id = "TR%0.10d_%0.3d" % (gene_id_nb, tr_nb)
tr = {
"start": transcript["start"],
"end": transcript["end"],
"exons": OrderedDict(),
"indels": []
}
tr_exons = exons[transcript["gene_id"]]
for tr_exon in tr_exons:
ex_id = tr_id + ".e%d" % tr_exon["nb"]
tr["exons"][ex_id] = {
"start": tr_exon["start"],
"end": tr_exon["end"]
}
tr_indels = indels[transcript["gene_id"]]
for tr_indel in tr_indels:
tr_indel["exon"] = tr_id + ".e%d" % tr_indel["exon"]
tr["indels"].append(tr_indel)
full_genes[gene_id]["transcripts"][tr_id] = tr
tr_nb += 1
gene_id_nb += 1
return full_genes
def write_gtf_file(self, gtf_file, genes):
"""
Write genes, transcripts, exons and indels to a GTF file
:param gtf_file: GTF file path
:type gtf_file: str
:param genes: final genes object. Each gene has position and transcripts of this gene. Each transcript contains
associated exons and list of indels
:type genes: dict
"""
with open(gtf_file, "w") as gtf:
for gene_id, gene in genes.items():
line = '{seqname}\tminiannotator\t{feature}\t{start}\t{end}\t.\t{strand}\t.\tgene_id "{gene_id}"' \
' {attrs}\n'
gtf.write(line.format(
seqname=gene["seqname"],
feature=indel["type"],
start=indel["start"] + 1, # Transform 0-based => 1-based
end=indel["end"],
feature="gene",
start=gene["start"],
end=gene["end"],
strand=gene["strand"],
gene_id=gene["id"],
attrs="" if indel["type"] == "del" else ('sequence "%s" contig_pos "%s"' % (indel["seq"],
indel["contig"]))
)
gtf.write(indel_line)
gene_id=gene_id,
attrs=""
))
for tr_id, transcript in gene["transcripts"].items():
attrs = 'transcript_id "{tr_id}"'.format(tr_id=tr_id)
gtf.write(line.format(
seqname=gene["seqname"],
feature="transcript",
start=transcript["start"],
end=transcript["end"],
strand=gene["strand"],
gene_id=gene_id,
attrs=attrs
))
for ex_id, exon in transcript["exons"].items():
attrs_ex = attrs + ' exon_id "{ex_id}"'.format(ex_id=ex_id)
gtf.write(line.format(
seqname=gene["seqname"],
feature="exon",
start=exon["start"],
end=exon["end"],
strand=gene["strand"],
gene_id=gene_id,
attrs=attrs_ex
))
for indel in transcript["indels"]:
attrs_ex = attrs + ' exon_id "{ex_id}"'.format(ex_id=indel["exon"])
if indel["type"] == "ins":
attrs_ex += ' sequence "%s" contig_pos "%s"' % (indel["seq"], indel["contig"])
gtf.write(line.format(
seqname=gene["seqname"],
feature=indel["type"],
start=indel["start"],
end=indel["end"],
strand=gene["strand"],
gene_id=gene_id,
attrs=attrs_ex
))
def search_genes(self, gtf_file):
"""
......@@ -234,34 +398,10 @@ class Miniannotator:
exons[gene_id] = g_exons
indels[gene_id] = g_indels
print("Sorting genes...", flush=True)
genes_order = sorted(genes.keys(), key=lambda x: self._sort_genes_ids(x))
print("Writing genes in GTF file...", flush=True)
full_genes = self._build_genes_transcripts(genes=genes, exons=exons, indels=indels)
gene_id_nb = 0
with open(os.path.join(gtf_file), "w") as gtf:
for gene in genes_order:
match = re.match(r"([^:]+):(\d+)-(\d+)([+-])(_(\d))?", gene)
if match.group(6) is None:
gene_id_nb += 1
gene_id = "GENE%0.10d"
else:
gene_id = "GENE%0.10d_" + match.group(6)
gene_c = {
"id": gene_id % gene_id_nb,
"seqname": match.group(1),
"start": int(match.group(2)) + 1, # Transform 0-based => 1-based
"end": int(match.group(3)),
"strand": match.group(4)
}
g_exons = exons[gene]
g_indels = indels[gene]
self._write_gene(gtf=gtf,
gene=gene_c,
exons=g_exons,
indels=g_indels)
print("Writing to GTF file...", flush=True)
self.write_gtf_file(gtf_file=gtf_file, genes=full_genes)
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment