#! /usr/bin/env python3 """mergeSV : merge SV on the basis of their intervals The merging is done by identifiying connected components in a graph. The graph is constructing taking the SV a node and an edge connects two nodes if the conresponding intervals exhibits a reciprocal overlap of a least R0%. Additional constraints on the precision of breakpoints (left and right) can be specified. Each connected component is termed a CNVR (Copy Number Variation Region) and is associated to the corresponding CNVs. """ import argparse, sys import glob import vcf import string import re from svreader.vcfwrapper import VCFReader from svinterval import construct_overlap_graph, connected_components import xlsxwriter alphabet = list(string.ascii_uppercase) xlsx_cols = alphabet.copy() for i in alphabet: for j in alphabet: xlsx_cols.append(i + j) def get_args(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, description="\ mergeIndividualSV \n \ description: Merge SV based on reciprocal overlap") #parser.add_argument('-v', '--vcf', type=str, required=True, help='vcf file(s), comma-separated') parser.add_argument('-v', '--vcf', type=str, required=True, help='folder containing all vcf results files') parser.add_argument('-t', '--true-vcf', type=str, required=True, help='VCF file containing the simulated deletions') parser.add_argument('-f', '--filtered-vcf', type=str, required=False, help='VCF file containing the filtered results') parser.add_argument('--overlap_cutoff', type=float, default=0.5, help='cutoff for reciprocal overlap') parser.add_argument('--left_precision', type=int, default=-1, help='left breakpoint precision') parser.add_argument('--right_precision', type=int, default=-1, help='right breakpoint precision') parser.add_argument('-o', '--output', type=str, default="results.xlsx", help='output Excel file') # parse the arguments args = parser.parse_args() if args.left_precision == -1: args.left_precision = sys.maxsize if args.right_precision == -1: args.right_precision = sys.maxsize # send back the user input return args # A small wrapper to print to stderr def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) def to_vcf_record(cnv): info = { "SVLEN" : cnv.length(), "SVTYPE" : "DEL", "END" : cnv.end, "NUM_CNV" : cnv.NumCNV(), "CNV":",".join(cnv.CNV()), "CALLERS":",".join(cnv.Callers()), "NUM_CALLERS":cnv.NumCallers(), "CIPOS":cnv.cipos(), "CIEND":cnv.ciend(), "PRECISION":",".join(map(str,cnv.precision())), "CALLERSVSAMPLES":",".join(cnv.CallersVsamples()) } alt = [vcf.model._SV("DEL")] if not cnv.IsPrecise(): info['IMPRECISE'] = True else: info['REPR_CNV'] = cnv.repr_cnv() vcf_record = vcf.model._Record(cnv.chrom, cnv.start, cnv.name, "N", alt, ".", ".", info, "", [0], []) return vcf_record def passed_variant(record): """Did this variant pass?""" return record.filter is None or len(record.filter) == 0 or "PASS" in record.filter # -------------------------------------- # main function def read_vcf_file(infile): SVSet=[] ids = [] for record in VCFReader(infile): if not passed_variant(record): continue SVSet.append(record) ids.append(record.id) return SVSet, ids def svsort(sv, records): """ Function to sort regions """ if records[sv]["start"] != "": return int(records[sv]["start"]) first_tool = list(records[sv]["tools"].keys())[0] return int(records[sv]["tools"][first_tool]["start"]) def main(): # parse the command line args args = get_args() overlap_cutoff = args.overlap_cutoff left_precision = args.left_precision right_precision = args.right_precision color_not_found = "#FE2E2E" color_col_filter = "#BEF781" color_is_kept = "#81F781" color_false_positive = "#FE642E" filenames = [] filenames += glob.glob(args.vcf + "/**/*.vcf", recursive=True) filenames += glob.glob(args.vcf + "/**/*.vcf.gz", recursive=True) true_ones = args.true_vcf # Reading all the vcf files SVSet=[] for infile in filenames: eprint(" Reading file %s" % (infile)) SVSet += read_vcf_file(infile)[0] eprint(" Reading file %s" % (true_ones)) SVSet_to, true_ones_records = read_vcf_file(true_ones) SVSet += SVSet_to filtered_records = None if args.filtered_vcf: eprint(" Reading file %s" % (args.filtered_vcf)) filtered_records = read_vcf_file(args.filtered_vcf)[1] eprint("Computing Connected components") construct_overlap_graph(SVSet,overlap_cutoff,left_precision,right_precision) number = 1 records = {} tools = set() orphans = 0 for components in connected_components(SVSet): names = [] results = {} true_one = None for node in components: names.append(node.id) if node.id in true_ones_records: true_one = node.id records[true_one] = { "start": node.start, "end": node.end, "length": int(node.end) - int(node.start), "tools": {}, "orphan": False } else: tool_name = node.id.split("_")[0] results[tool_name] = { "start": node.start, "end": node.end, "length": int(node.end) - int(node.start), "filtered": (node.id in filtered_records) if filtered_records is not None else False } tools.add(tool_name) if true_one is not None: records[true_one]["tools"] = results else: orphans += 1 records["orphan_" + str(orphans)] = { "start": "", "end": "", "length": "", "tools": results, "orphan": True } names.sort() eprint("Group #%i: %s" % (number, ", ".join(names))) number += 1 nb_records = len(records) cells = { "A1": {"text": "RESULTS", "format": {"bg_color": "#ffe856"}}, "A2": {"text": "Deletion", "format": {"bold": True}}, "A" + str(1+nb_records+3): {"text": "DIFFS", "format": {"bg_color": "#ffe856"}}, "A" + str(2+nb_records+3): {"text": "Deletion", "format": {"bold": True}} } tools = sorted(tools) nb_tools = len(tools) i = 1 headers = [] for tool in ["Real data"] + tools + (["Filtered results"] if filtered_records is not None else []): #cells[xlsx_cols[i] + "1"] = cells[xlsx_cols[i] + str(1+nb_records+3)] = {"text": tool, "format": ""} headers.append(tool) l_format = {"bold": True} if tool == "Filtered results": l_format["bg_color"] = color_col_filter cells[xlsx_cols[i] + "2"] = cells[xlsx_cols[i] + str(2+nb_records+3)] = {"text": "Start", "format": l_format} cells[xlsx_cols[i + 1] + "2"] = cells[xlsx_cols[i+1] + str(2+nb_records+3)] = {"text": "End", "format": l_format} cells[xlsx_cols[i + 2] + "2"] = cells[xlsx_cols[i+2] + str(2+nb_records+3)] = {"text": "Length", "format": l_format} i += 3 rec_keys = sorted(records.keys(), key=lambda x:svsort(x, records)) i = 3 for id in rec_keys: record = records[id] cells[xlsx_cols[0] + str(i)] = cells[xlsx_cols[0] + str(i+nb_records+3)] = {"text": id, "format": {}} cells[xlsx_cols[1] + str(i)] = cells[xlsx_cols[1] + str(i+nb_records+3)] = {"text": record["start"], "format": {}} my_start = record["start"] cells[xlsx_cols[2] + str(i)] = cells[xlsx_cols[2] + str(i+nb_records+3)] = {"text": record["end"], "format": {}} my_end = record["end"] cells[xlsx_cols[3] + str(i)] = cells[xlsx_cols[3] + str(i+nb_records+3)] = {"text": record["length"], "format": {}} my_length = record["length"] j = 4 is_kept = False for tool in tools: if tool in record["tools"]: if record["tools"][tool]["filtered"]: is_kept = True sv_format = {"bg_color": color_is_kept} # Raw values: cells[xlsx_cols[1+((nb_tools+1)*3)] + str(i)] = {"text": record["tools"][tool]["start"], "format": {"bg_color": color_col_filter}} cells[xlsx_cols[1+((nb_tools+1)*3)+1] + str(i)] = {"text": record["tools"][tool]["end"], "format": {"bg_color": color_col_filter}} cells[xlsx_cols[1+((nb_tools+1)*3)+2] + str(i)] = {"text": record["tools"][tool]["length"], "format": {"bg_color": color_col_filter}} # Diffs: if my_start != "": cells[xlsx_cols[1+((nb_tools+1)*3)] + str(i+nb_records+3)] = { "text": record["tools"][tool]["start"] - my_start, "format": {"bg_color": color_col_filter}} cells[xlsx_cols[1+((nb_tools+1)*3) + 1] + str(i+nb_records+3)] = { "text": record["tools"][tool]["end"] - my_end, "format": {"bg_color": color_col_filter}} cells[xlsx_cols[1+((nb_tools+1)*3) + 2] + str(i+nb_records+3)] = { "text": record["tools"][tool]["length"] - my_length, "format": {"bg_color": color_col_filter}} else: cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i + nb_records + 3)] = { "text": "?????", "format": {"bg_color": color_col_filter}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i + nb_records + 3)] = { "text": "?????","format": {"bg_color": color_col_filter}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i + nb_records + 3)] = { "text": "?????", "format": {"bg_color": color_col_filter}} else: sv_format = {} # Raw values: cells[xlsx_cols[j] + str(i)] = {"text": record["tools"][tool]["start"], "format": sv_format} cells[xlsx_cols[j + 1] + str(i)] = {"text": record["tools"][tool]["end"], "format": sv_format} cells[xlsx_cols[j + 2] + str(i)] = {"text": record["tools"][tool]["length"], "format": sv_format} # Diffs : if my_start != "": cells[xlsx_cols[j] + str(i+nb_records+3)] = {"text": record["tools"][tool]["start"] - my_start, "format": sv_format} cells[xlsx_cols[j + 1] + str(i+nb_records+3)] = {"text": record["tools"][tool]["end"] - my_end, "format": sv_format} cells[xlsx_cols[j + 2] + str(i+nb_records+3)] = {"text": record["tools"][tool]["length"] - my_length, "format": sv_format} else: cells[xlsx_cols[j] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format} cells[xlsx_cols[j + 1] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format} cells[xlsx_cols[j + 2] + str(i + nb_records + 3)] = {"text": "?????", "format": sv_format} else: for k in range(0,3): cells[xlsx_cols[j + k] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[j + k] + str(i+nb_records+3)] = {"text": "", "format": {"bg_color": color_not_found}} j += 3 if is_kept: cells[xlsx_cols[0] + str(i)]["format"]["bg_color"] = \ cells[xlsx_cols[0] + str(i+nb_records+3)]["format"]["bg_color"] = color_is_kept else: cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i)] = {"text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3)] + str(i + nb_records + 3)] = {"text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 1] + str(i + nb_records + 3)] = { "text": "", "format": {"bg_color": color_not_found}} cells[xlsx_cols[1 + ((nb_tools + 1) * 3) + 2] + str(i + nb_records + 3)] = { "text": "", "format": {"bg_color": color_not_found}} # False positives (orphans) in orange: if re.match(r"^orphan_\d+$", id): cells[xlsx_cols[0] + str(i)]["format"]["bg_color"] = \ cells[xlsx_cols[0] + str(i+nb_records+3)]["format"]["bg_color"] = color_false_positive i += 1 # Create document: with xlsxwriter.Workbook(args.output) as workbook: worksheet = workbook.add_worksheet() i=0 for header in headers: if i < len(headers) - 1 or filtered_records is None: merge_format = workbook.add_format({'align': 'center'}) else: merge_format = workbook.add_format({'align': 'center', 'bg_color': color_col_filter}) worksheet.merge_range(xlsx_cols[1 + i * 3] + "1:" + xlsx_cols[3 + i * 3] + "1", header, merge_format) diff_line = str(1+nb_records+3) worksheet.merge_range(xlsx_cols[1 + i * 3] + diff_line + ":" + xlsx_cols[3 + i * 3] + diff_line, header, merge_format) i += 1 for cell_id, cell_content in cells.items(): cell_format = workbook.add_format(cell_content["format"]) worksheet.write(cell_id, cell_content["text"], cell_format) print("") print("###########") print("# RESULTS #") print("###########") print("") print(str(number-1) + " Results found") print(str(orphans) + " False Positive") print("") print("Results saved in " + args.output) print("") # initialize the script if __name__ == '__main__': try: sys.exit(main()) except: raise