import os import re import math import vcf from pybedtools import BedTool from pybedtools import create_interval_from_list from pysam import tabix_compress, tabix_index from svreader.vcf_utils import get_template from svrunner_utils import eprint, vcf_to_pybed ''' A Vcf wrapper object, following the specification of VCFv4.2 https://samtools.github.io/hts-specs/VCFv4.2.pdf ''' GT_HOMO_REF = "0/0" GT_HETERO = "0/1" GT_HOMO_VAR = "1/1" class SVInter(object): ''' SVInter object : a lightweight interval object for the purpose of constructing connected components of SV each SVRecord corresponds to a node in the graph ''' def __init__(self, chrom, start, end, id): self.__chrom = chrom self.__start = start self.__end = end self.__id = id self.__links = set() @property def id(self): return self.__id @id.setter def id(self, x): self.__id = x @property def chrom(self): return self.__chrom @property def start(self): return self.__start @property def end(self): return self.__end @start.setter def start(self, x): self.__start = x @end.setter def end(self, x): self.__end = x def length(self): return self.end-self.start+1 # TODO should wee keep this here or have another object for connected # component construction ? @property def links(self): return set(self.__links) def add_link(self, other): self.__links.add(other) other.__links.add(self) def __str__(self): return self.chrom+":"+str(self.start)+"-"+str(self.end) class SVRecord(SVInter): ''' SVRecord object : a generic object for storing SV records Toolo specific SVrecods will inherit from this tool ''' def __init__(self, tool_name, chrom, pos, end, id, ref=None, qual=None, filter=None): super(SVRecord, self).__init__(chrom, pos, end, id) # VCF specific fields self.__ref = ref or "N" self.__qual = qual or "." self.__filter = filter self.__tool_name = tool_name self._sv_type = None @property def tool_name(self): return self.__tool_name @property def sv_len(self): return self.length() @property def sv_type(self): return self._sv_type @property def filter(self): return self.__filter def to_vcf_record(self): return None def __str__(self): return self.chrom+":"+str(self.start)+"-"+str(self.end) def from_vcf_records_to_intervals(vcf_records): intervals = [] for record in vcf_records: chrom = record.chrom start = record.start - 1 end = record.end name = record.id intervals.append(create_interval_from_list( map(str, [chrom, start, end, name]))) return intervals class SVReader(object): ''' SVReader object : virtual object for reading sv-tool output files The actual reading is made by the child objects ''' svs_supported = set(["DEL", "INS", "DUP", "INV"]) def __init__(self, file_name, tool_name, reference_handle=None): self.file_name = file_name self.reference_handle = reference_handle self.__tool_name = tool_name def __iter__(self): return self def next(self): while True: raw_record = self.vcf_reader.next() record = SVRecord(self.tool_name, raw_record.CHROM, raw_record.POS, raw_record.INFO['END'], raw_record.ID, raw_record.REF, raw_record.QUAL, raw_record.FILTER) return record @property def tool_name(self): return self.__tool_name def filtering(self, all_vcf_records, minlen, maxlen, excluded_regions=None, cutoff_proximity=50): """ Filter the vcfrecords according to specified criteria - keep only sv > mnilen and < max_len - specific default filtering criteria for each tool - sv close to gaps (less than cutoff_proximity) :param all_vcf_records: list of VcfRecords :param minlen: minimum SV length :param maxlen: maximum SV length :param excluded_regions: file with regions to exclude (e.g gaps) :param cutoff_proximity: maximum tolerated proximity with exlucded region :return: list of VcfRecords """ # First filtering on size vcf_records = [] eprint("Filtering on size (>%d and <%d)" % (minlen, maxlen)) for record in all_vcf_records: if (self.GeneralFilterPass(record, minlen, maxlen) and self.SpecificFilterPass(record)): vcf_records.append(record) eprint("%d records" % (len(vcf_records))) if excluded_regions is not None: # Convert vcf to intervals eprint("Converting to bedtools intervals") variants = vcf_to_pybed(vcf_records) # Identify SV overlapping regions to exclude eprint("Loading exlusion intervals ") eprint(excluded_regions) toexclude = BedTool(excluded_regions).sort() excluded = dict() if len(toexclude): eprint("Computing distance with gaps: %d gaps" % len(toexclude)) # removing cnv in a close proximity to gaps in the assembly # -d=True In addition to the closest feature in B, # report its distance to A as an extra column for sv in variants.closest(toexclude, d=True): if (sv[4] != "." and sv[4] != "-1" and float(sv[-1]) < cutoff_proximity): excluded[sv[3]] = 1 vcf_records_filtered = [] for record in vcf_records: if record.id not in excluded: vcf_records_filtered.append(record) vcf_records = vcf_records_filtered return vcf_records def GeneralFilterPass(self, record, minlen, maxlen): return (abs(record.sv_len) >= minlen and abs(record.sv_len) <= maxlen) def SpecificFilterPass(self, record): return True def bnd_merge(self, svtype, records): return records # nothing to do in the majority of cases def remove_duplicate(self, records): return records # nothing to do in the majority of tools (see pindel) class SVWriter(object): def __init__(self, file_name, tool_name, template_reader): self.tool_name = tool_name self.template_reader = template_reader self._setFilename(file_name) self._isopen = False def _setFilename(self, filename): if filename.endswith(".gz"): filename = re.sub('\.gz$', '', filename) self.__index = True else: self.__index = False self.__filename = filename @property def filename(self): return self.__filename def write(self, record): if not self._isopen: self._open() self._write(record) def close(self): self._close() if self.__index: tabix_compress(self.__filename, self.__filename+".gz", force=True) tabix_index(self.__filename+".gz", force=True, preset="vcf") if os.path.exists(self.__filename): os.remove(self.__filename) def _dumpemptyvcf(self): vcf_template_reader = get_template(self.template_reader.tool_name) vcf_template_reader.samples = self.template_reader.getOrderedSamples() self.vcf_writer = vcf.Writer(open(self.filename, 'w'), vcf_template_reader) self.vcf_writer.close()