Newer
Older
from svreader import SVRecord, SVReader, SVWriter
from pysam import VariantFile
generic_name = "generic"
generic_source = set([generic_name])
class VCFRecord(SVRecord):
"""
"""
def __init__(self, record):
""" The required vcf record values and adittional optional values
required: chrom, pos, sropt, id,
optional: reference, quality
"""
super(VCFRecord, self).__init__(generic_name,
record.chrom,
record.pos,
record.stop,
record.id,
ref=record.ref,
qual=record.qual,
filter=record.filter)
self.__record = record
if "SVTYPE" in record.info.keys():
self._sv_type = record.info["SVTYPE"]
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@property
def record(self):
return self.__record
@property
def id(self):
return self.record.id
@property
def pos(self):
return self.record.pos
@property
def chrom(self):
return self.record.chrom
@property
def stop(self):
return self.record.stop
@property
def filter(self):
return self.record.filter
@property
def samples(self):
return self.record.samples
class VCFReader(SVReader):
def __init__(self, file_name, reference_handle=None, svs_to_report=None):
super(VCFReader, self).__init__(file_name,
generic_name,
reference_handle)
self.vcf_reader = VariantFile(file_name)
def __iter__(self):
return self
def __next__(self):
while True:
raw_record = next(self.vcf_reader)
record = VCFRecord(raw_record)
return record
def getHeader(self):
return self.vcf_reader.header
def removeInfoHeaderLine(self, key):
self.vcf_reader.header.info.remove_header(key)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def addInfo(self, name, number, type, description):
self.vcf_reader.header.info.add(id=name,
number=number,
type=type,
description=description)
def addFilter(self, name, description):
self.vcf_reader.header.filters.add(id=name,
number=None,
type=None,
description=description)
def getOrderedSamples(self):
samples = self.vcf_reader.header.samples
sample_names = [sample.rsplit('.')[0] for sample in samples]
return sample_names
def numSamples(self):
return len(self.vcf_reader.header.samples)
class VCFWriter(SVWriter):
def __init__(self, file_name, template_reader, index=True):
super(VCFWriter, self).__init__(file_name,
template_reader.tool_name,
template_reader)
def _open(self):
self.vcf_writer = VariantFile(self.filename, 'w',
header=self.template_reader.getHeader())
self._isopen = True
def _write(self, record):
self.vcf_writer.write(record.record)
def _close(self):
if self._isopen:
self.vcf_writer.close()
else: # nothing was written
self._dumpemptyvcf()