Skip to content
Snippets Groups Projects
build_xls_results.py 15.5 KiB
Newer Older
#! /usr/bin/env python3

"""mergeSV  : merge SV on the basis of their intervals

The merging is done by identifiying connected components in a graph.

The graph is constructing taking the SV a node and an edge connects two nodes
if the conresponding intervals exhibits a reciprocal overlap of a least R0%.

Additional constraints on the precision of breakpoints (left and right) can be specified.

Each connected component is termed a CNVR (Copy Number Variation Region)
and is associated to the corresponding CNVs.

"""

import argparse, sys
import glob
import vcf
import string
import re

from svreader.vcfwrapper import VCFReader
from svinterval import construct_overlap_graph, connected_components

import xlsxwriter

alphabet = list(string.ascii_uppercase)
xlsx_cols = alphabet.copy()
for i in alphabet:
    for j in alphabet:
        xlsx_cols.append(i + j)


def get_args():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="\
mergeIndividualSV \n \
description: Merge SV based on reciprocal overlap")
    #parser.add_argument('-v', '--vcf', type=str, required=True, help='vcf file(s), comma-separated')
    parser.add_argument('-v', '--vcf', type=str, required=True, help='folder containing all vcf results files')
    parser.add_argument('-t', '--true-vcf', type=str, required=True, help='VCF file containing the simulated deletions')
    parser.add_argument('-f', '--filtered-vcf', type=str, required=False,
                        help='VCF file containing the filtered results')
    parser.add_argument('--overlap_cutoff',  type=float, default=0.5, help='cutoff for reciprocal overlap')
    parser.add_argument('--left_precision',  type=int, default=-1, help='left breakpoint precision')
    parser.add_argument('--right_precision',  type=int, default=-1, help='right breakpoint precision')
Floreal Cabanettes's avatar
Floreal Cabanettes committed
    parser.add_argument('-o', '--output', type=str, default="results.xlsx", help='output Excel file')
    # parse the arguments
    args = parser.parse_args()

    if args.left_precision == -1:
        args.left_precision = sys.maxsize

    if args.right_precision == -1:
        args.right_precision = sys.maxsize

    # send back the user input
    return args

# A small wrapper to print to stderr
def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)


def to_vcf_record(cnv):

        info = { "SVLEN" : cnv.length(),
                 "SVTYPE" : "DEL",
                 "END" : cnv.end,
                 "NUM_CNV" : cnv.NumCNV(),
                 "CNV":",".join(cnv.CNV()),
                 "CALLERS":",".join(cnv.Callers()),
                 "NUM_CALLERS":cnv.NumCallers(),
                 "CIPOS":cnv.cipos(),
                 "CIEND":cnv.ciend(),
                 "PRECISION":",".join(map(str,cnv.precision())),
                 "CALLERSVSAMPLES":",".join(cnv.CallersVsamples())
                  }
        alt = [vcf.model._SV("DEL")]

        if not cnv.IsPrecise():
            info['IMPRECISE'] = True
        else:
            info['REPR_CNV'] = cnv.repr_cnv()


        vcf_record = vcf.model._Record(cnv.chrom,
                                       cnv.start,
                                       cnv.name,
                                       "N",
                                       alt,
                                       ".",
                                       ".",
                                       info,
                                       "",
                                       [0],
                                       [])
        return vcf_record

def passed_variant(record):
        """Did this variant pass?"""
        return record.filter is None or len(record.filter) == 0 or "PASS" in record.filter

# --------------------------------------
# main function


def read_vcf_file(infile):
    SVSet=[]
    ids = []
    for record in VCFReader(infile):
        if not passed_variant(record):
            continue
        SVSet.append(record)
        ids.append(record.id)
    return SVSet, ids


def svsort(sv, records):
    """
    Function to sort regions
    """
    if records[sv]["start"] != "":
        return int(records[sv]["start"])
    first_tool = list(records[sv]["tools"].keys())[0]
    return int(records[sv]["tools"][first_tool]["start"])

def main():
    # parse the command line args
    args = get_args()

    overlap_cutoff   = args.overlap_cutoff
    left_precision   = args.left_precision
    right_precision  = args.right_precision

    color_not_found = "#FE2E2E"
    color_col_filter = "#BEF781"
    color_is_kept = "#81F781"
    color_false_positive = "#FE642E"

    filenames = []

    filenames += glob.glob(args.vcf + "/**/*.vcf", recursive=True)
    filenames += glob.glob(args.vcf + "/**/*.vcf.gz", recursive=True)

    true_ones = args.true_vcf

    # Reading all the vcf files
    SVSet=[]
    for infile in filenames:
        eprint(" Reading file %s" % (infile))
        SVSet += read_vcf_file(infile)[0]

Floreal Cabanettes's avatar
Floreal Cabanettes committed
    eprint(" Reading file %s" % (true_ones))
    SVSet_to, true_ones_records = read_vcf_file(true_ones)
    SVSet += SVSet_to
    filtered_records = None

    if args.filtered_vcf:
Floreal Cabanettes's avatar
Floreal Cabanettes committed
        eprint(" Reading file %s" % (args.filtered_vcf))
        filtered_records = read_vcf_file(args.filtered_vcf)[1]

    eprint("Computing Connected components")
    construct_overlap_graph(SVSet,overlap_cutoff,left_precision,right_precision)

    number = 1
    records = {}
    tools = set()
    orphans = 0
    for components in connected_components(SVSet):
        names = []
        results = {}
        true_one = None

        for node in components:
            names.append(node.id)
            if node.id in true_ones_records:
                true_one = node.id
                records[true_one] = {
                    "start": node.start,
                    "end": node.end,
                    "length": int(node.end) - int(node.start),
                    "tools": {},
                    "orphan": False
                }
            else:
                tool_name = node.id.split("_")[0]
                results[tool_name] = {
                    "start": node.start,
                    "end": node.end,
                    "length": int(node.end) - int(node.start),
                    "filtered": (node.id in filtered_records) if filtered_records is not None else False
                }
                tools.add(tool_name)

        if true_one is not None:
            records[true_one]["tools"] = results
        else:
            orphans += 1
            records["orphan_" + str(orphans)] = {
                "start": "",
                "end": "",
                "length": "",
                "tools": results,
                "orphan": True
            }

        names.sort()

        eprint("Group #%i: %s" % (number, ", ".join(names)))
        number += 1

    nb_records = len(records)

    cells = {
        "A1": {"text": "RESULTS", "format": {"bg_color": "#ffe856"}},
        "A2": {"text": "Deletion", "format": {"bold": True}},
        "A" + str(1+nb_records+3): {"text": "DIFFS", "format": {"bg_color": "#ffe856"}},
        "A" + str(2+nb_records+3): {"text": "Deletion", "format": {"bold": True}}
    }
    tools = sorted(tools)
    nb_tools = len(tools)

    i = 1
    headers = []
    for tool in ["Real data"] + tools + (["Filtered results"] if filtered_records is not None else []):
        #cells[xlsx_cols[i] + "1"] = cells[xlsx_cols[i] + str(1+nb_records+3)] = {"text": tool, "format": ""}
        headers.append(tool)
        l_format = {"bold": True}
        if tool == "Filtered results":
            l_format["bg_color"] = color_col_filter
        cells[xlsx_cols[i] + "2"] = cells[xlsx_cols[i] + str(2+nb_records+3)] = {"text": "Start", "format": l_format}
        cells[xlsx_cols[i + 1] + "2"] = cells[xlsx_cols[i+1] + str(2+nb_records+3)] = {"text": "End",
                                                                                       "format": l_format}
        cells[xlsx_cols[i + 2] + "2"] = cells[xlsx_cols[i+2] + str(2+nb_records+3)] = {"text": "Length",
                                                                                       "format": l_format}
        i += 3

    rec_keys = sorted(records.keys(), key=lambda x:svsort(x, records))

    i = 3
    for id in rec_keys:
        record = records[id]
        cells[xlsx_cols[0] + str(i)] = cells[xlsx_cols[0] + str(i+nb_records+3)] = {"text": id, "format": {}}
        cells[xlsx_cols[1] + str(i)] = cells[xlsx_cols[1] + str(i+nb_records+3)] = {"text": record["start"],
                                                                                    "format": {}}
        my_start = record["start"]
        cells[xlsx_cols[2] + str(i)] = cells[xlsx_cols[2] + str(i+nb_records+3)] = {"text": record["end"],
                                                                                    "format": {}}
        my_end = record["end"]
        cells[xlsx_cols[3] + str(i)] = cells[xlsx_cols[3] + str(i+nb_records+3)] = {"text": record["length"],
                                                                                    "format": {}}
        my_length = record["length"]

        j = 4
        is_kept = False
        for tool in tools:
            if tool in record["tools"]:
                if record["tools"][tool]["filtered"]:
                    is_kept = True
                    sv_format = {"bg_color": color_is_kept}
                    # Raw values:
                    cells[xlsx_cols[1+((nb_tools+1)*3)] + str(i)] = {"text": record["tools"][tool]["start"],
                                                                     "format": {"bg_color": color_col_filter}}
                    cells[xlsx_cols[1+((nb_tools+1)*3)+1] + str(i)] = {"text": record["tools"][tool]["end"],
                                                                       "format": {"bg_color": color_col_filter}}
                    cells[xlsx_cols[1+((nb_tools+1)*3)+2] + str(i)] = {"text": record["tools"][tool]["length"],
                                                                       "format": {"bg_color": color_col_filter}}
                    # Diffs:
                    if my_start != "":
                        cells[xlsx_cols[1+((nb_tools+1)*3)] + str(i+nb_records+3)] = {
                            "text": record["tools"][tool]["start"] - my_start, "format": {"bg_color": color_col_filter}}
                        cells[xlsx_cols[1+((nb_tools+1)*3) + 1] + str(i+nb_records+3)] = {
                            "text": record["tools"][tool]["end"] - my_end, "format": {"bg_color": color_col_filter}}
                        cells[xlsx_cols[1+((nb_tools+1)*3) + 2] + str(i+nb_records+3)] = {
                            "text": record["tools"][tool]["length"] - my_length,
                            "format": {"bg_color": color_col_filter}}
                    else:
                        cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i + nb_records + 3)] = {
                            "text": "?????", "format": {"bg_color": color_col_filter}}
                        cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i + nb_records + 3)] = {
                            "text": "?????","format": {"bg_color": color_col_filter}}
                        cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i + nb_records + 3)] = {
                            "text": "?????", "format": {"bg_color": color_col_filter}}

                else:
                    sv_format = {}
                # Raw values:
                cells[xlsx_cols[j] + str(i)] = {"text": record["tools"][tool]["start"], "format": sv_format}
                cells[xlsx_cols[j + 1] + str(i)] = {"text": record["tools"][tool]["end"], "format": sv_format}
                cells[xlsx_cols[j + 2] + str(i)] = {"text": record["tools"][tool]["length"], "format": sv_format}

                # Diffs :
                if my_start != "":
                    cells[xlsx_cols[j] + str(i+nb_records+3)] = {"text": record["tools"][tool]["start"] - my_start,
                                                                 "format": sv_format}
                    cells[xlsx_cols[j + 1] + str(i+nb_records+3)] = {"text": record["tools"][tool]["end"] - my_end,
                                                                     "format": sv_format}
                    cells[xlsx_cols[j + 2] + str(i+nb_records+3)] = {"text": record["tools"][tool]["length"] - my_length,
                                                                 "format": sv_format}
                else:
                    cells[xlsx_cols[j] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format}
                    cells[xlsx_cols[j + 1] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format}
                    cells[xlsx_cols[j + 2] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format}

            else:
                for k in range(0,3):
                    cells[xlsx_cols[j + k] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}}
                    cells[xlsx_cols[j + k] + str(i+nb_records+3)] = {"text": "", "format": {"bg_color": color_not_found}}
            j += 3
        if is_kept:
            cells[xlsx_cols[0] + str(i)]["format"]["bg_color"] = \
                cells[xlsx_cols[0] + str(i+nb_records+3)]["format"]["bg_color"] = color_is_kept
        else:
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}}
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}}
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}}
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i + nb_records + 3)] = {"text": "",
                                                                                    "format": {"bg_color": color_not_found}}
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i + nb_records + 3)] = {
                "text": "", "format": {"bg_color": color_not_found}}
            cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i + nb_records + 3)] = {
                "text": "", "format": {"bg_color": color_not_found}}

        # False positives (orphans) in orange:
        if re.match(r"^orphan_\d+$", id):
            cells[xlsx_cols[0] + str(i)]["format"]["bg_color"] = \
                cells[xlsx_cols[0] + str(i+nb_records+3)]["format"]["bg_color"] = color_false_positive

        i += 1

    # Create document:
    with xlsxwriter.Workbook(args.output) as workbook:
        worksheet = workbook.add_worksheet()
        i=0
        for header in headers:
            if i < len(headers) - 1 or filtered_records is None:
                merge_format = workbook.add_format({'align': 'center'})
            else:
                merge_format = workbook.add_format({'align': 'center', 'bg_color': color_col_filter})
            worksheet.merge_range(xlsx_cols[1 + i * 3] + "1:" + xlsx_cols[3 + i * 3] + "1", header, merge_format)
            diff_line = str(1+nb_records+3)
            worksheet.merge_range(xlsx_cols[1 + i * 3] + diff_line + ":" + xlsx_cols[3 + i * 3] + diff_line, header,
                                  merge_format)
            i += 1
        for cell_id, cell_content in cells.items():
            cell_format = workbook.add_format(cell_content["format"])
            worksheet.write(cell_id, cell_content["text"], cell_format)

    print("")
    print("###########")
    print("# RESULTS #")
    print("###########")
    print("")
    print(str(number-1) + " Results found")
    print(str(orphans) + " False Positive")
    print("")
    print("Results saved in " + args.output)
    print("")

# initialize the script
if __name__ == '__main__':
    try:
        sys.exit(main())
    except:
        raise