Skip to content
Snippets Groups Projects
Commit 4f118e1d authored by Thomas Faraut's avatar Thomas Faraut
Browse files

new implementation of annotation using vcfwrapper

parent 5eff6941
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,20 @@ GENOMESTRIP_GENO_LIKELI_TAG = "GP"
Geno_likeli_tag = SVTYPER_GENO_LIKELI_TAG
class Representative(object):
def __init__(self, variant):
self._variant = variant
self.duplicates = []
@property
def variant(self):
return self._variant
def add_duplicates(self, duplicates):
self.duplicates.extend(duplicates)
def getNonVariantSamples(sv, variants, samples):
return set([s for s in samples if s not in variants])
......@@ -291,6 +305,7 @@ def GenomeSTRIPLikeRedundancyAnnotator(SVSet, reader,
overlapping = defaultdict(tuple)
reference = defaultdict()
for o in self_overlap:
print("Here ?", o)
if o[3] == o[7]:
continue
a, b = ordered(o[3], o[7])
......@@ -324,12 +339,21 @@ def GenomeSTRIPLikeRedundancyAnnotator(SVSet, reader,
for u in SVSet:
if u.id in duplicates:
# print("tagged as duplicate %s" % u.id)
print("tagged as duplicate %s" % u.id)
duplis = sorted([a for (a, s, o) in duplicates[u.id]])
score = max([s for (a, s, o) in duplicates[u.id]])
overlap = max([o for (a, s, o) in duplicates[u.id]])
add_duplicate_info_sv(u, overlap, score, duplis)
# representatives = defaultdict()
# for u in SVSet:
# u_id = u.variant.id
# duplicates = []
# if "DUPLICATES" in u.record.info:
# duplicates = sv.record.info['DUPLICATES']
# if u_id in representatives:
for o in overlapping:
info = overlapping[o]
u, v = o
......
import sys
from svrunner_utils import eprint
from svreader import SVReader, SVWriter
from collections import defaultdict
import numpy as np
from math import isnan
from pysam import VariantFile
from svrunner_utils import eprint, warn, vcf_to_pybed
from svreader.vcfwrapper import VCFRecord, SVReader, SVWriter
HOM_REF = (0, 0)
HET_VAR = (0, 1)
HOM_VAR = (1, 1)
SVTYPER_GENO_LIKELI_TAG = "GL"
GENOMESTRIP_GENO_LIKELI_TAG = "GP"
Geno_likeli_tag = SVTYPER_GENO_LIKELI_TAG
def Variant(sample):
if sample.get('GT') in [HET_VAR, HOM_VAR]:
return True
else:
return False
class AnnotatedRecord(object):
class VariantRecord(VCFRecord):
"""
A lightweight object to annotated the final records
"""
......@@ -18,40 +35,12 @@ class AnnotatedRecord(object):
"""
A pysam VariantRecord wrapper
"""
self.__record = record
if "SVTYPE" in record.info.keys():
self._sv_type = record.info["SVTYPE"]
else:
self._sv_type = record.alts[0][1:-1]
@property
def record(self):
return self.__record
@property
def id(self):
return self.record.id
@id.setter
def id(self, id):
self.__record.id = id
@property
def pos(self):
return self.record.pos
super(VariantRecord, self).__init__(record)
@property
def start(self):
return self.record.pos
@property
def chrom(self):
return self.record.chrom
@property
def stop(self):
return self.record.stop
@property
def end(self):
return self.record.stop
......@@ -61,12 +50,15 @@ class AnnotatedRecord(object):
return self._sv_type
@property
def filter(self):
return self.record.filter
def svlen(self):
return abs(self.stop - self.start)
@property
def samples(self):
return [AnnotatedRecordSample(s) for s in self.record.samples.values()]
def passed(self):
if "PASS" in self.record.filter:
return True
else:
return False
def setNewId(self, identifier):
try:
......@@ -77,13 +69,16 @@ class AnnotatedRecord(object):
self.id = identifier
def num_variant_samples(self):
return sum([s.isVariant for s in self.samples])
return sum([Variant(s) for s in self.samples.values()])
def variant_read_support(self):
return max([s.AltSupport() for s in self.samples])
return max([s.get('AO')[0] for s in self.samples.values()])
def qual(self):
return sum([s.SQ_score() for s in self.samples if s.isVariant])
return sum([s.get('SQ') for s in self.samples.values()])
def GQ_sum_score(self):
return sum([s.get('SQ') for s in self.samples.values()])
def setQual(self):
self.record.qual = self.qual()
......@@ -111,7 +106,7 @@ class AnnotatedRecord(object):
record.filter.add("PASS")
class AnnotatedRecordSample(object):
class VariantRecordSample(object):
"""
A lightweight object to VariantRecordSample
"""
......@@ -122,8 +117,20 @@ class AnnotatedRecordSample(object):
return self.variantsample.get('GT')
def SQ_score(self):
# Phred-scaled probability that this site is variant
# (non-reference in this sample)
return self.variantsample.get('SQ')
def GQ_score(self):
# Genotype quality
return self.variantsample.get('GQ')
def GL_score(self):
# Genotype Likelihood, log10-scaled likelihoods of the data given the
# called genotype for each possible genotype generated from the
# reference and alternate alleles given the sample ploidy
return self.variantsample.get('GL')
def AltSupport(self):
return self.variantsample.get('AO')[0]
......@@ -151,7 +158,7 @@ class VCFReader(SVReader):
def __next__(self):
while True:
raw_record = next(self.vcf_reader)
record = AnnotatedRecord(raw_record)
record = VariantRecord(raw_record)
return record
def getHeader(self):
......@@ -209,3 +216,278 @@ class VCFWriter(SVWriter):
self.vcf_writer.close()
else: # nothing was written
self._dumpemptyvcf()
def ordered(a, b):
# simply ordering the two string
return (a, b) if a < b else (b, a)
def setLikeliTag(genotyper):
global Geno_likeli_tag
if genotyper == "svtyper":
Geno_likeli_tag = SVTYPER_GENO_LIKELI_TAG
warn("Assuming genotypes provided by svtyper software" +
" hence tag is %s" % (Geno_likeli_tag))
elif genotyper == "genomestrip":
Geno_likeli_tag = GENOMESTRIP_GENO_LIKELI_TAG
warn("Assuming genotypes provided by genomestrip software" +
" hence tag is %s" % (Geno_likeli_tag))
else:
print("Unknown genotyping software")
exit(1)
def getlikelihoods(sample):
return sample.get('GL')
def probas(likelihoods):
# transform log likelihoods into likelihoods
return np.exp(np.array(likelihoods))
def getprobas(sample):
# Transforming likelihods into probabilities
return probas(getlikelihoods(sample))/np.sum(probas(getlikelihoods(sample)))
def ondiagonal(u_s, v_s):
# Probability that, for that individual, the two SVs are identically
# genotyped P1(0/0)*P2(0/0) + ... P1(1/1)*P2(1/1)
# in the same way :
p = getprobas(u_s)
q = getprobas(v_s)
proba = 0
for a, b in zip(p, q):
proba += a*b
# print("Proba on %3.5f" %(proba))
return proba
def offdiagonal(u_s, v_s):
# Probability that, for that individual, the two SVs are not identically
# in the same way, complement of the previous one
p = getprobas(u_s)
q = getprobas(v_s)
proba = 0
for i, a in enumerate(p):
for j, b in enumerate(q):
if i != j:
proba += a*b
# print("Proba off %3.2f" %(proba))
return proba
def duplicatescore(u, v):
# For the two SVs, max discordant genotype log-ratio proba
# same genotypes against discordant against (worse individual)
# u_s is the sample of id s of u (idem for v_s)
# Valid samples
valid_samples = []
for s in u.samples:
u_s = u.samples[s]
v_s = v.samples[s]
if (u_s.get('GQ') is not None
and v_s.get('GQ') is not None):
valid_samples.append(s)
max_disc = 0
computed = float('NaN')
for s in valid_samples:
# ondiago is not used, we keep it just for comprehension
# ondiago = ondiagonal(s, u, v)
u_s = u.samples[s]
v_s = v.samples[s]
offdiago = offdiagonal(u_s, v_s)
if offdiago > max_disc:
max_disc = offdiago
if max_disc > 0 and max_disc < 1:
ratio = (1-max_disc)/max_disc
computed = np.log(ratio)
return computed
def gstrength(u):
# Sum of phred-like genotype qualities provides a measure of the
# combined genotype quality of the site
# np.sum([s['GQ'] if s['GQ'] is not None else 0 for s in u.samples.values()])
return u.GQ_sum_score()
def variantstrength(u):
# maximum SQ, where SQ stands for
# Phred-scaled probability that this site is variant (non-reference)
# in this sample)
# QUAL = -10 * log(P(locus is reference in all samples)), which is
# equal to the sum of the SQ scores.
# see https://github.com/hall-lab/svtyper/issues/10
# sum([s['SQ'] if s['SQ'] is not None else 0 for s in u.samples.values()])
return u.qual()
# max([s['SQ'] if s['SQ'] is not None else 0 for s in u.samples.values()])
def getduplicates_GQ(u, v):
# select the prefered duplicate on the basis of the
# Sum of phred-like genotype qualities
# see gstrength
# returns prefered, discarded, strength of both
if gstrength(u) > gstrength(v):
return (u, v, gstrength(u), gstrength(v))
else:
return (v, u, gstrength(v), gstrength(u))
def getduplicates_QUAL(u, v):
# select the prefered duplicate on the basis of
# Phred-scaled probability that this site is a variant
# see variantstrength
# returns prefered, discarded, strength of both
if variantstrength(u) > variantstrength(v):
return (u, v, variantstrength(u), variantstrength(v))
else:
return (v, u, variantstrength(v), variantstrength(u))
def getoverlap(u, osize):
# percentage overlap given the size of the overlap
return 100*osize/u.svlen
def add_redundancy_infos_header(reader):
# Adding DUPLICATESCORE info (equivalent to GSDUPLICATESCORE)
reader.addInfo("DUPLICATESCORE", 1, "Float",
"LOD score that the events are distinct based on the "
"genotypes of the most discordant sample")
# Adding DUPLICATES info (equivalent to GSDUPLICATEOVERLAP)
reader.addInfo("DUPLICATEOVERLAP", 1, "Float",
"Highest overlap with a duplicate event")
# Adding DUPLICATEOVERLAP info (equivalent to GSDUPLICATEOVERLAP)
reader.addInfo("DUPLICATES", ".", "String",
"List of duplicate events preferred to this one")
# Adding NONDUPLICATEOVERLAP
reader.addInfo("NONDUPLICATEOVERLAP", 1, "Float",
"Amount of overlap with a non-duplicate")
def GenomeSTRIPLikeRedundancyAnnotator(SVSet, reader,
duplicatescore_threshold=-2,
genotyper="svtyper"):
""" Annotating duplicate candidates based on the genotype likelihoods
- genotype likelihoods can be provided by svtyper or genomestrip
"""
add_redundancy_infos_header(reader)
setLikeliTag(genotyper)
variants = defaultdict()
for sv in SVSet:
variants[sv.id] = sv
pybed_variants = vcf_to_pybed(SVSet)
self_overlap = pybed_variants.intersect(pybed_variants,
f=0.5, r=True, wo=True)
seen = defaultdict(tuple)
duplicates = defaultdict(list)
overlapping = defaultdict(tuple)
reference = defaultdict()
for o in self_overlap:
print("Here ?", o)
if o[3] == o[7]:
continue
a, b = ordered(o[3], o[7])
if seen[(a, b)]:
continue
seen[(a, b)] = True
u = variants[a]
v = variants[b]
score = duplicatescore(u, v)
# print("Comparing %s and %s : %3.8f" % (u.id, v.id, score))
if score > duplicatescore_threshold:
ref, dupli, s1, s2 = getduplicates_GQ(u, v)
# print("%s prefered to %s %3.8f" % (ref.id, dupli.id, score))
reference[ref] = 1
overlap_size = int(o[-1])
overlap = getoverlap(dupli, overlap_size)
if maxGQ(ref) > 0 and passed(dupli):
# are dismissed
# - reference variant with 0 genotype quality for all markers
# - duplicate that are already tagged as filtered out
# dupli.id is considered as a duplicate of ref.id
duplicates[dupli.id].append((ref.id, score, overlap))
else:
overlap_size = int(o[-1])
overlap_u = getoverlap(u, overlap_size)
overlap_v = getoverlap(v, overlap_size)
overlapping[(u, v)] = {'dupli_score': score,
'overlap_left': overlap_u,
'overlap_right': overlap_v,
}
for u in SVSet:
if u.id in duplicates:
print("tagged as duplicate %s" % u.id)
duplis = sorted([a for (a, s, o) in duplicates[u.id]])
score = max([s for (a, s, o) in duplicates[u.id]])
overlap = max([o for (a, s, o) in duplicates[u.id]])
add_duplicate_info_sv(u, overlap, score, duplis)
def add_duplicate_info_sv(sv, duplicateoverlap, duplicatescore, duplicates):
"""
simply adding two information to the sv infos
"""
if isnan(duplicatescore):
sv.record.info['DUPLICATESCORE'] = float('nan')
else:
sv.record.info['DUPLICATESCORE'] = duplicatescore
sv.record.info['DUPLICATEOVERLAP'] = duplicateoverlap
sv.record.info['DUPLICATES'] = duplicates
def add_overlap_info_sv(sv, overlap, duplicatescore):
"""
simply adding two information to the sv infos
"""
if isnan(duplicatescore):
sv.record.info['DUPLICATESCORE'] = float('nan')
else:
sv.record.info['DUPLICATESCORE'] = duplicatescore
sv.record.info['NONDUPLICATEOVERLAP'] = overlap
def add_filter_infos_header(reader):
# FILTERS
# Adding specific filters
reader.addFilter("CALLRATE", "Call rate <0.75")
reader.addFilter("MONOMORPH", "All samples have the same genotype")
reader.addFilter("DUPLICATE", "GSDUPLICATESCORE>0")
reader.addFilter("OVERLAP", "NONDUPLICATEOVERLAP>0.7")
def GenomeSTRIPLikefiltering(SVSet, reader):
""" Filtering the candidate CNVs according to the following criteria
- non duplicate sites
- variant sites
- call rate > 0.8
- at least one variant (homozygous or heterozygote) has a genotype
quality > 20
- the variant is not everywhere heterozygote or homozygote
(use NONVARIANTSCORE in both cases)
"""
add_filter_infos_header(reader)
for sv in SVSet:
info = sv.record.info
if callRate(sv) < 0.75:
sv.filter.add("CALLRATE")
if not Polymorph(sv):
sv.filter.add("MONOMORPH")
if 'NONDUPLICATEOVERLAP' in info and info['NONDUPLICATEOVERLAP'] > 0.7:
sv.filter.add("OVERLAP")
if "DUPLICATESCORE" in info is not None and info['DUPLICATESCORE'] > -2:
sv.filter.add("DUPLICATE")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment