Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
svlib
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container Registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SVdetection
svlib
Commits
aa96035d
Commit
aa96035d
authored
5 years ago
by
Thomas Faraut
Browse files
Options
Downloads
Patches
Plain Diff
annotating tools and some corrections
parent
a47a5d83
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
svreader/annotation.py
+151
-9
151 additions, 9 deletions
svreader/annotation.py
with
151 additions
and
9 deletions
svreader/annotation.py
+
151
−
9
View file @
aa96035d
...
@@ -2,6 +2,8 @@
...
@@ -2,6 +2,8 @@
import
sys
import
sys
from
collections
import
defaultdict
from
collections
import
defaultdict
from
networkx
import
Graph
,
connected_components
import
numpy
as
np
import
numpy
as
np
from
math
import
isnan
from
math
import
isnan
...
@@ -40,6 +42,13 @@ def Variant(sample):
...
@@ -40,6 +42,13 @@ def Variant(sample):
return
False
return
False
def
Heterozygote
(
sample
):
if
sample
.
get
(
'
GT
'
)
in
[
HET_VAR
]:
return
True
else
:
return
False
class
AnnotateRecord
(
VCFRecord
):
class
AnnotateRecord
(
VCFRecord
):
"""
"""
A lightweight object to annotated the final records
A lightweight object to annotated the final records
...
@@ -78,8 +87,8 @@ class AnnotateRecord(VCFRecord):
...
@@ -78,8 +87,8 @@ class AnnotateRecord(VCFRecord):
def
num_samples
(
self
):
def
num_samples
(
self
):
return
len
(
self
.
record
.
samples
.
keys
())
return
len
(
self
.
record
.
samples
.
keys
())
def
setNewId
(
self
,
identifier
):
def
setNewId
(
self
,
new_id
):
self
.
new_id
=
identifier
self
.
new_id
=
new_id
def
rename
(
self
):
def
rename
(
self
):
try
:
try
:
...
@@ -93,16 +102,31 @@ class AnnotateRecord(VCFRecord):
...
@@ -93,16 +102,31 @@ class AnnotateRecord(VCFRecord):
return
sum
([
Variant
(
s
)
for
s
in
self
.
samples
.
values
()])
return
sum
([
Variant
(
s
)
for
s
in
self
.
samples
.
values
()])
def
variant_read_support
(
self
):
def
variant_read_support
(
self
):
return
max
([
s
.
get
(
'
AO
'
)[
0
]
for
s
in
self
.
samples
.
values
()])
support
=
[]
for
s
in
self
.
samples
.
values
():
if
s
.
get
(
'
AO
'
)
is
not
None
:
support
.
append
(
s
.
get
(
'
AO
'
)[
0
])
return
max
(
support
)
def
qual
(
self
):
def
qual
(
self
):
return
sum
([
s
.
get
(
'
SQ
'
)
for
s
in
self
.
samples
.
values
()])
variant_qual
=
[]
for
s
in
self
.
samples
.
values
():
if
s
.
get
(
'
SQ
'
)
is
not
None
:
variant_qual
.
append
(
s
.
get
(
'
SQ
'
))
return
sum
(
variant_qual
)
def
GQ_samples
(
self
):
genotype_qual
=
[]
for
s
in
self
.
samples
.
values
():
if
s
.
get
(
'
GQ
'
)
is
not
None
:
genotype_qual
.
append
(
s
.
get
(
'
GQ
'
))
return
genotype_qual
def
GQ_sum_score
(
self
):
def
GQ_sum_score
(
self
):
return
sum
(
[
s
.
get
(
'
GQ
'
)
for
s
in
self
.
samples
.
values
()
]
)
return
sum
(
self
.
GQ_
samples
())
def
maxGQ
(
self
):
def
maxGQ
(
self
):
return
max
(
[
s
.
get
(
'
GQ
'
)
for
s
in
self
.
samples
.
values
()
]
)
return
max
(
self
.
GQ_
samples
())
def
setQual
(
self
):
def
setQual
(
self
):
self
.
record
.
qual
=
self
.
qual
()
self
.
record
.
qual
=
self
.
qual
()
...
@@ -128,12 +152,22 @@ class AnnotateRecord(VCFRecord):
...
@@ -128,12 +152,22 @@ class AnnotateRecord(VCFRecord):
sys
.
exit
(
1
)
sys
.
exit
(
1
)
def
CallRate
(
self
,
cutoff
):
def
CallRate
(
self
,
cutoff
):
num_qual_call
=
sum
([(
s
.
get
(
'
GQ
'
)
>
cutoff
)
for
s
in
self
.
samples
.
values
()])
call_qual
=
[]
for
s
in
self
.
samples
.
values
():
if
s
.
get
(
'
GQ
'
)
is
not
None
:
call_qual
.
append
(
s
.
get
(
'
GQ
'
))
num_qual_call
=
sum
([(
qual
>
cutoff
)
for
qual
in
call_qual
])
return
num_qual_call
/
self
.
num_samples
return
num_qual_call
/
self
.
num_samples
def
VariantCallRate
(
self
,
cutoff
):
def
VariantCallRate
(
self
,
cutoff
):
num_qual_call
=
sum
([(
s
.
get
(
'
GQ
'
)
>
cutoff
)
for
s
in
self
.
samples
.
values
()
if
Variant
(
s
)])
samples
=
self
.
samples
.
values
()
return
num_qual_call
/
self
.
num_variant_samples
()
num_qual_var
=
0
for
s
in
samples
:
if
s
.
get
(
"
GQ
"
)
is
not
None
and
s
.
get
(
"
GQ
"
)
>
cutoff
and
Variant
(
s
):
num_qual_var
+=
1
num_var_samples
=
self
.
num_variant_samples
()
var_call_rate
=
num_qual_var
/
num_var_samples
if
num_var_samples
else
0
return
var_call_rate
def
UnifiedPass
(
self
):
def
UnifiedPass
(
self
):
"""
"""
...
@@ -379,6 +413,9 @@ def add_redundancy_infos_header(reader):
...
@@ -379,6 +413,9 @@ def add_redundancy_infos_header(reader):
# Adding NONDUPLICATEOVERLAP
# Adding NONDUPLICATEOVERLAP
reader
.
addInfo
(
"
NONDUPLICATEOVERLAP
"
,
1
,
"
Float
"
,
reader
.
addInfo
(
"
NONDUPLICATEOVERLAP
"
,
1
,
"
Float
"
,
"
Amount of overlap with a non-duplicate
"
)
"
Amount of overlap with a non-duplicate
"
)
# Adding TOOLSUPPORT
reader
.
addInfo
(
"
TOOLSUPPORT
"
,
"
.
"
,
"
String
"
,
"
Tools supporting (detecting) the sv
"
)
def
GenomeSTRIPLikeRedundancyAnnotator
(
SVSet
,
reader
,
def
GenomeSTRIPLikeRedundancyAnnotator
(
SVSet
,
reader
,
...
@@ -483,6 +520,7 @@ def add_filter_infos_header(reader):
...
@@ -483,6 +520,7 @@ def add_filter_infos_header(reader):
reader
.
addFilter
(
"
MONOMORPH
"
,
"
All samples have the same genotype
"
)
reader
.
addFilter
(
"
MONOMORPH
"
,
"
All samples have the same genotype
"
)
reader
.
addFilter
(
"
DUPLICATE
"
,
"
GSDUPLICATESCORE>0
"
)
reader
.
addFilter
(
"
DUPLICATE
"
,
"
GSDUPLICATESCORE>0
"
)
reader
.
addFilter
(
"
OVERLAP
"
,
"
NONDUPLICATEOVERLAP>0.7
"
)
reader
.
addFilter
(
"
OVERLAP
"
,
"
NONDUPLICATEOVERLAP>0.7
"
)
reader
.
addFilter
(
"
ABFREQ
"
,
"
AB frequency <0.3 for >50% heterosamples
"
)
def
GenomeSTRIPLikefiltering
(
SVSet
,
reader
):
def
GenomeSTRIPLikefiltering
(
SVSet
,
reader
):
...
@@ -511,3 +549,107 @@ def GenomeSTRIPLikefiltering(SVSet, reader):
...
@@ -511,3 +549,107 @@ def GenomeSTRIPLikefiltering(SVSet, reader):
sv
.
filter
.
add
(
"
OVERLAP
"
)
sv
.
filter
.
add
(
"
OVERLAP
"
)
if
"
DUPLICATESCORE
"
in
info
is
not
None
and
info
[
'
DUPLICATESCORE
'
]
>
-
2
:
if
"
DUPLICATESCORE
"
in
info
is
not
None
and
info
[
'
DUPLICATESCORE
'
]
>
-
2
:
sv
.
filter
.
add
(
"
DUPLICATE
"
)
sv
.
filter
.
add
(
"
DUPLICATE
"
)
def
ABFreqFiltering
(
SVSet
):
"""
Filtering the candidate CNVs according to the following criteria
- more than 50% of variant samples should have AB freq > 0.3
"""
for
sv
in
SVSet
:
ABfreqOK
=
[]
for
s
in
sv
.
record
.
samples
.
values
():
if
Heterozygote
(
s
):
ABfreqOK
.
append
((
s
.
get
(
'
AB
'
)[
0
]
>
0.3
))
if
len
(
ABfreqOK
)
>
0
and
sum
(
ABfreqOK
)
<
len
(
ABfreqOK
)
/
2
:
sv
.
filter
.
add
(
"
ABFREQ
"
)
def
GetConnectedDuplicates
(
SVSet
):
"""
Construct connected components of duplicates and rename the variants
"""
undirected
=
Graph
()
variant_dict
=
defaultdict
()
representatives
=
defaultdict
()
for
s
in
SVSet
:
variant_dict
[
s
.
id
]
=
s
if
"
DUPLICATE
"
in
s
.
filter
:
for
dupli_repr
in
s
.
record
.
info
[
"
DUPLICATEOF
"
]:
undirected
.
add_edge
(
s
.
id
,
dupli_repr
)
for
component
in
connected_components
(
undirected
):
for
c
in
component
:
if
"
DUPLICATEOF
"
in
variant_dict
[
c
].
record
.
info
:
# the current variant is a duplicate
continue
rep
=
c
# the representative of the equivalence class
break
duplicates
=
component
duplicates
.
remove
(
rep
)
representatives
[
rep
]
=
duplicates
add_duplicate_infos
(
representatives
,
variant_dict
)
def
add_duplicate_infos
(
representatives
,
sv_dict
):
for
rep
,
elements
in
representatives
.
items
():
for
d
in
elements
:
sv_dict
[
d
].
record
.
info
[
'
DUPLICATEOF
'
]
=
rep
duplicates
=
list
(
elements
)
if
'
DUPLICATES
'
in
sv_dict
[
rep
].
record
.
info
:
print
(
sv_dict
[
rep
].
record
.
info
[
'
DUPLICATES
'
])
duplicates
.
extend
(
sv_dict
[
rep
].
record
.
info
[
'
DUPLICATES
'
])
if
len
(
duplicates
)
>
0
:
sv_dict
[
rep
].
record
.
info
[
'
DUPLICATES
'
]
=
duplicates
def
get_tool_name
(
sv_ident
):
return
sv_ident
.
split
(
"
_
"
)[
0
]
def
SetSupportingTools
(
SVSet
):
for
sv
in
SVSet
:
tools
=
{
get_tool_name
(
sv
.
id
)}
if
"
DUPLICATES
"
in
sv
.
record
.
info
:
duplicates
=
sv
.
record
.
info
[
'
DUPLICATES
'
]
# print(duplicates)
for
dupli
in
duplicates
:
tools
.
add
(
get_tool_name
(
dupli
))
if
'
TOOLSUPPORT
'
in
sv
.
record
.
info
:
supporting
=
set
(
sv
.
record
.
info
[
'
TOOLSUPPORT
'
])
tools
.
union
(
supporting
)
sv
.
record
.
info
[
'
TOOLSUPPORT
'
]
=
list
(
tools
)
def
static_vars
(
**
kwargs
):
def
decorate
(
func
):
for
k
in
kwargs
:
setattr
(
func
,
k
,
kwargs
[
k
])
return
func
return
decorate
@static_vars
(
counter
=
0
)
def
new_id_str
(
sv
):
new_id_str
.
counter
+=
1
return
"
_
"
.
join
([
"
cnvpipeline
"
,
sv
.
chrom
,
sv
.
svtype
,
str
(
new_id_str
.
counter
)])
def
rename_info_field
(
sv
,
key
,
sv_dict
):
if
key
in
sv
.
record
.
info
:
info_oldid
=
sv
.
record
.
info
[
key
]
info_newid
=
[
sv_dict
[
id
]
for
id
in
info_oldid
]
sv
.
record
.
info
[
key
]
=
info_newid
def
RenameSV
(
SVSet
):
sv_dict
=
defaultdict
()
for
sv
in
SVSet
:
new_id
=
new_id_str
(
sv
)
sv
.
setNewId
(
new_id
)
sv_dict
[
sv
.
id
]
=
new_id
for
sv
in
SVSet
:
rename_info_field
(
sv
,
"
DUPLICATEOF
"
,
sv_dict
)
rename_info_field
(
sv
,
"
DUPLICATES
"
,
sv_dict
)
sv
.
record
.
info
[
'
SOURCEID
'
]
=
sv
.
id
sv
.
rename
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment