Newer
Older
import sys
from svrunner_utils import eprint
from svreader import SVReader, SVWriter
from pysam import VariantFile
HOM_REF = (0, 0)
HET_VAR = (0, 1)
HOM_VAR = (1, 1)
class AnnotatedRecord(object):
"""
A lightweight object to annotated the final records
"""
def __init__(self, record):
"""
A pysam VariantRecord wrapper
"""
self.__record = record
if "SVTYPE" in record.info.keys():
self._sv_type = record.info["SVTYPE"]
else:
self._sv_type = record.alts[0][1:-1]
@property
def record(self):
return self.__record
@property
def id(self):
return self.record.id
@id.setter
def id(self, id):
self.__record.id = id
@property
def pos(self):
return self.record.pos
@property
def start(self):
return self.record.pos
@property
def chrom(self):
return self.record.chrom
@property
def stop(self):
return self.record.stop
@property
def end(self):
return self.record.stop
@property
def svtype(self):
return self._sv_type
@property
def filter(self):
return self.record.filter
@property
def samples(self):
return [AnnotatedRecordSample(s) for s in self.record.samples.values()]
def setNewId(self, identifier):
try:
self.record.info['SOURCEID'] = self.id
except KeyError:
eprint("SOURCEID absent from record info keys,")
sys.exit(1)
self.id = identifier
def num_variant_samples(self):
return sum([s.isVariant for s in self.samples])
def variant_read_support(self):
return max([s.AltSupport() for s in self.samples])
def qual(self):
return sum([s.SQ_score() for s in self.samples if s.isVariant])
def setQual(self):
self.record.qual = self.qual()
def AddSupportInfos(self):
supp_reads = self.variant_read_support()
num_supp_samples = self.num_variant_samples()
try:
self.record.info['MAX_SUPP_READS'] = supp_reads
self.record.info['NUM_SUPP_SAMPLES'] = num_supp_samples
except KeyError:
eprint("SUPP_READS or NUM_SUPP_SAMPLES absent from record info keys")
sys.exit(1)
def UnifiedPass(self):
"""
All records passing the filters (PASS, .) ar now labelled PASS
"""
record = self.record
filters = [f for f in record.filter]
# We make the assumption when a "." is present no other filter
# are present
if len(filters) == 0 or "." in filters:
record.filter.clear()
record.filter.add("PASS")
class AnnotatedRecordSample(object):
"""
A lightweight object to VariantRecordSample
"""
def __init__(self, variantsample):
self.variantsample = variantsample
def genotype(self):
return self.variantsample.get('GT')
def SQ_score(self):
return self.variantsample.get('SQ')
def AltSupport(self):
return self.variantsample.get('AO')[0]
@property
def isVariant(self):
if self.genotype() in [HET_VAR, HOM_VAR]:
return True
else:
return False
def __init__(self, file_name, sv_to_report=None):
def __iter__(self):
return self
def __next__(self):
while True:
raw_record = next(self.vcf_reader)
return record
def getHeader(self):
return self.vcf_reader.header
def addInfo(self, name, number, type, description):
self.vcf_reader.header.info.add(id=name,
number=number,
type=type,
description=description)
def addFilter(self, name, description):
self.vcf_reader.header.filters.add(id=name,
number=None,
type=None,
description=description)
def add_metadata(self):
self.addInfo("SOURCEID", 1, "String",
"The source sv identifier")
self.addInfo("MAX_SUPP_READS", 1, "Integer",
"Max number of supporting reads")
self.addInfo("NUM_SUPP_SAMPLES", 1, "Integer",
"Number of supporting samples")
self.addFilter("LOWSUPPORT",
"total supp reads < 5 or supp samples < 2")
def getOrderedSamples(self):
samples = self.vcf_reader.header.samples
sample_names = [sample.rsplit('.')[0] for sample in samples]
return sample_names
def numSamples(self):
return len(self.vcf_reader.header.samples)
class VCFWriter(SVWriter):
def __init__(self, file_name, template_reader, index=True):
super(VCFWriter, self).__init__(file_name,
template_reader.tool_name,
template_reader)
def _open(self):
self.vcf_writer = VariantFile(self.filename, 'w',
header=self.template_reader.getHeader())
self._isopen = True
def _write(self, record):
self.vcf_writer.write(record.record)
def _close(self):
if self._isopen:
self.vcf_writer.close()
else: # nothing was written
self._dumpemptyvcf()