diff --git a/svreader/Annotate.py b/svreader/Annotate.py new file mode 100644 index 0000000000000000000000000000000000000000..5e9d7ec77e113f58391af9519e103f1eba43f770 --- /dev/null +++ b/svreader/Annotate.py @@ -0,0 +1,128 @@ + +from pysam import VariantFile + +HOM_REF = (0, 0) +HET_VAR = (0, 1) +HOM_VAR = (1, 1) + + +class AnnotatedRecord(object): + """ + A lightweight object to annotated the final records + """ + def __init__(self, record): + """ + A pysam VariantRecord wrapper + """ + self.__record = record + if "SVTYPE" in record.info.keys(): + self._sv_type = record.info["SVTYPE"] + else: + self._sv_type = record.alts[0][1:-1] + + @property + def record(self): + return self.__record + + @property + def id(self): + return self.record.id + + @id.setter + def id(self, id): + self.__record.id = id + + @property + def pos(self): + return self.record.pos + + @property + def chrom(self): + return self.record.chrom + + @property + def stop(self): + return self.record.stop + + @property + def svtype(self): + return self._sv_type + + @property + def filter(self): + return self.record.filter + + @property + def samples(self): + return self.record.samples + + +class AnnotatedRecordSample(object): + """ + A lightweight object to VariantRecordSample + """ + def __init__(self, variantsample): + self.variantsample = variantsample + + +class VCFReader(SVReader): + + def __init__(self, file_name, reference_handle=None, svs_to_report=None): + super(VCFReader, self).__init__(file_name, + generic_name, + reference_handle) + self.vcf_reader = VariantFile(file_name) + + def __iter__(self): + return self + + def __next__(self): + while True: + raw_record = next(self.vcf_reader) + record = AnnotatedRecord(raw_record) + return record + + def getHeader(self): + return self.vcf_reader.header + + def addInfo(self, name, number, type, description): + self.vcf_reader.header.info.add(id=name, + number=number, + type=type, + description=description) + + def addFilter(self, name, description): + self.vcf_reader.header.filters.add(id=name, + number=None, + type=None, + description=description) + + def getOrderedSamples(self): + samples = self.vcf_reader.header.samples + sample_names = [sample.rsplit('.')[0] for sample in samples] + return sample_names + + def numSamples(self): + return len(self.vcf_reader.header.samples) + + +class VCFWriter(SVWriter): + + def __init__(self, file_name, template_reader, index=True): + super(VCFWriter, self).__init__(file_name, + template_reader.tool_name, + template_reader) + + def _open(self): + self.vcf_writer = VariantFile(self.filename, 'w', + header=self.template_reader.getHeader()) + self._isopen = True + + def _write(self, record): + self.vcf_writer.write(record.record) + + def _close(self): + if self._isopen: + self.vcf_writer.close() + else: # nothing was written + self._dumpemptyvcf()