Skip to content
Snippets Groups Projects
annotation.py 5.67 KiB
Newer Older
Thomas Faraut's avatar
Thomas Faraut committed

Thomas Faraut's avatar
Thomas Faraut committed
import sys
from svrunner_utils import eprint
from svreader import SVReader, SVWriter
Thomas Faraut's avatar
Thomas Faraut committed
from pysam import VariantFile

HOM_REF = (0, 0)
HET_VAR = (0, 1)
HOM_VAR = (1, 1)

Thomas Faraut's avatar
Thomas Faraut committed
class AnnotatedRecord(object):
    """
    A lightweight object to annotated the final records
    """
    def __init__(self, record):
        """
        A pysam VariantRecord wrapper
        """
        self.__record = record
        if "SVTYPE" in record.info.keys():
            self._sv_type = record.info["SVTYPE"]
        else:
            self._sv_type = record.alts[0][1:-1]

    @property
    def record(self):
        return self.__record

    @property
    def id(self):
        return self.record.id

    @id.setter
    def id(self, id):
        self.__record.id = id

    @property
    def pos(self):
        return self.record.pos

    @property
    def start(self):
        return self.record.pos

Thomas Faraut's avatar
Thomas Faraut committed
    @property
    def chrom(self):
        return self.record.chrom

    @property
    def stop(self):
        return self.record.stop

    @property
    def end(self):
        return self.record.stop

Thomas Faraut's avatar
Thomas Faraut committed
    @property
    def svtype(self):
        return self._sv_type

    @property
    def filter(self):
        return self.record.filter

    @property
    def samples(self):
Thomas Faraut's avatar
Thomas Faraut committed
        return [AnnotatedRecordSample(s) for s in self.record.samples.values()]

    def setNewId(self, identifier):
        try:
            self.record.info['SOURCEID'] = self.id
        except KeyError:
            eprint("SOURCEID absent from record info keys,")
            sys.exit(1)
        self.id = identifier

    def num_variant_samples(self):
        return sum([s.isVariant for s in self.samples])

    def variant_read_support(self):
        return max([s.AltSupport() for s in self.samples])
Thomas Faraut's avatar
Thomas Faraut committed

    def qual(self):
        return sum([s.SQ_score() for s in self.samples if s.isVariant])

    def setQual(self):
        self.record.qual = self.qual()

    def AddSupportInfos(self):
        supp_reads = self.variant_read_support()
        num_supp_samples = self.num_variant_samples()
        try:
            self.record.info['MAX_SUPP_READS'] = supp_reads
Thomas Faraut's avatar
Thomas Faraut committed
            self.record.info['NUM_SUPP_SAMPLES'] = num_supp_samples
        except KeyError:
            eprint("SUPP_READS or NUM_SUPP_SAMPLES absent from record info keys")
            sys.exit(1)

    def UnifiedPass(self):
        """
           All records passing the filters (PASS, .) ar now labelled PASS
        """
        record = self.record
        filters = [f for f in record.filter]
        # We make the assumption when a "." is present no other filter
        # are present
        if len(filters) == 0 or "." in filters:
            record.filter.clear()
            record.filter.add("PASS")
Thomas Faraut's avatar
Thomas Faraut committed


class AnnotatedRecordSample(object):
    """
    A lightweight object to VariantRecordSample
    """
    def __init__(self, variantsample):
        self.variantsample = variantsample

Thomas Faraut's avatar
Thomas Faraut committed
    def genotype(self):
        return self.variantsample.get('GT')

    def SQ_score(self):
        return self.variantsample.get('SQ')

    def AltSupport(self):
        return self.variantsample.get('AO')[0]

    @property
    def isVariant(self):
        if self.genotype() in [HET_VAR, HOM_VAR]:
            return True
        else:
            return False

Thomas Faraut's avatar
Thomas Faraut committed

class VCFReader(SVReader):

Thomas Faraut's avatar
Thomas Faraut committed
    def __init__(self, file_name, sv_to_report=None):
Thomas Faraut's avatar
Thomas Faraut committed
        super(VCFReader, self).__init__(file_name)
Thomas Faraut's avatar
Thomas Faraut committed
        self.filename = file_name
Thomas Faraut's avatar
Thomas Faraut committed
        self.sv_to_report = sv_to_report
Thomas Faraut's avatar
Thomas Faraut committed
        self.vcf_reader = VariantFile(file_name)
Thomas Faraut's avatar
Thomas Faraut committed

Thomas Faraut's avatar
Thomas Faraut committed
        self.add_metadata()

Thomas Faraut's avatar
Thomas Faraut committed
    def __iter__(self):
        return self

    def __next__(self):
        while True:
            raw_record = next(self.vcf_reader)
Thomas Faraut's avatar
Thomas Faraut committed
            record = AnnotatedRecord(raw_record)
Thomas Faraut's avatar
Thomas Faraut committed
            return record

    def getHeader(self):
        return self.vcf_reader.header

    def addInfo(self, name, number, type, description):
        self.vcf_reader.header.info.add(id=name,
                                        number=number,
                                        type=type,
                                        description=description)

    def addFilter(self, name, description):
        self.vcf_reader.header.filters.add(id=name,
                                           number=None,
                                           type=None,
                                           description=description)

Thomas Faraut's avatar
Thomas Faraut committed
    def add_metadata(self):
        self.addInfo("SOURCEID", 1, "String",
                     "The source sv identifier")
        self.addInfo("MAX_SUPP_READS", 1, "Integer",
                     "Max number of supporting reads")
Thomas Faraut's avatar
Thomas Faraut committed
        self.addInfo("NUM_SUPP_SAMPLES", 1, "Integer",
                     "Number of supporting samples")
        self.addFilter("LOWSUPPORT",
                       "total supp reads < 5 or supp samples < 2")

Thomas Faraut's avatar
Thomas Faraut committed
    def getOrderedSamples(self):
        samples = self.vcf_reader.header.samples
        sample_names = [sample.rsplit('.')[0] for sample in samples]
        return sample_names

    def numSamples(self):
        return len(self.vcf_reader.header.samples)


class VCFWriter(SVWriter):

    def __init__(self, file_name,  template_reader, index=True):
Thomas Faraut's avatar
Thomas Faraut committed
        super(VCFWriter, self).__init__(file_name,
                                        template_reader.tool_name,
                                        template_reader)

    def _open(self):
        self.vcf_writer = VariantFile(self.filename, 'w',
                                      header=self.template_reader.getHeader())
        self._isopen = True

    def _write(self, record):
        self.vcf_writer.write(record.record)

    def _close(self):
        if self._isopen:
            self.vcf_writer.close()
        else:   # nothing was written
            self._dumpemptyvcf()