From 044bbf6a49ff9ae7c727f1967f0fcde605e28fbf Mon Sep 17 00:00:00 2001 From: local_comparaison <mathieu.umec@inrae.fr> Date: Tue, 9 Jan 2024 15:07:18 +0100 Subject: [PATCH] cleaning code and starting the unittest --- Mapping_using_the_API.py | 421 +++++++++--------- Tests_unitaires/test_Mapping_using_the_API.py | 65 ++- 2 files changed, 271 insertions(+), 215 deletions(-) diff --git a/Mapping_using_the_API.py b/Mapping_using_the_API.py index 55b1f79..338a8ec 100644 --- a/Mapping_using_the_API.py +++ b/Mapping_using_the_API.py @@ -7,12 +7,23 @@ import json from urllib import request import xmltodict import pandas as pd +FOLDER = "C:\\Users\\mumec\\Desktop\\Mini_codes\\" + -### utils def recup_all_inf_excel(file): + """ + This function takes infos from a .xlsx + + Arg: + file = the file to read + Returns: + Type of return: 1 list of list line + """ datas = pd.read_excel(file, header=None, na_filter=False) l_datas = datas.values.tolist() return l_datas + + def send_request_to_mapping_api(url, data_json, head, met='POST'): """ This function gives the result of mapping of a metabolites list from RAMP. @@ -20,8 +31,6 @@ def send_request_to_mapping_api(url, data_json, head, met='POST'): ["KEGG:C01157","hmdb:HMDB0000064","hmdb:HMDB0000148","chebi:16015"] Arg: - metabolites_list = - outfiles = list of names files to write url = the url to use data_json = the data to post head = headers to use @@ -33,7 +42,7 @@ def send_request_to_mapping_api(url, data_json, head, met='POST'): req = request.Request(url, data=data_json, headers=head, method=met) with request.urlopen(req) as response: result = response.read() - out_data=result.decode('utf-8') + out_data = result.decode('utf-8') return out_data @@ -49,12 +58,12 @@ def excel_file_writer(dataframe, name_out_file, sheetname="Resultats"): Returns: Type of return: 1 excel file whith 5 columns """ - excel_file = pd.ExcelWriter(name_out_file) - dataframe.to_excel(excel_file, sheet_name=sheetname,index=False, header=False) - excel_file.close() + ex_f = pd.ExcelWriter(name_out_file) + dataframe.to_excel(ex_f, sheet_name=sheetname, index=False, header=False) + ex_f.close() -def pre_cut (list): +def pre_cut(listed): """ cut only 1 type of ID by the first entree @@ -66,19 +75,18 @@ def pre_cut (list): """ clean_list = [] cump = 0 - while list[cump]=="NA": - cump+=1 - pos_cut = list[cump].index(":") - for elem in list: - if elem=="NA": + while listed[cump] == "NA": + cump += 1 + pos_cut = listed[cump].index(":") + for elem in listed: + if elem == "NA": clean_list.append("NA") else: clean_list.append(elem[pos_cut+1:]) return clean_list -### Mapping Ramp -def mapping_from_ramp_api(metabolites_list, outfile, infos="optimization"): +def mapping_ramp_api(metabolites_list, outfile, inf="opti"): """ This function gives the result of mapping of a metabolites list from RAMP. Here's an example of 4 metabolites giving 505 lines. @@ -87,73 +95,71 @@ def mapping_from_ramp_api(metabolites_list, outfile, infos="optimization"): Arg: metabolites_list = a list of metabolites id outfiles = name of the outfile to write - infos = if all give the full information + inf = if all give the full information Returns: Type of return: 1 excel file whith 5 columns """ - if len(metabolites_list)==0 : + if len(metabolites_list) == 0: badend = " Your metabolite list is empty. Here's an example" badend += "['KEGG:C01157','hmdb:HMDB0000148','chebi:16015']" raise ValueError(badend) - data_for_request={"analytes": metabolites_list} + data_for_request = {"analytes": metabolites_list} json_data = json.dumps(data_for_request).encode('utf-8') - urlramp ='https://rampdb.nih.gov/api/pathways-from-analytes' - dichead={'Accept': '*/*','Content-Type': 'application/json'} - api_datas=send_request_to_mapping_api(urlramp, json_data, dichead) - len_oa=len(api_datas) - index_begin_interest=api_datas.find("[") - index_end_interest=api_datas.find("]") - datas_to_treat=api_datas[index_begin_interest:index_end_interest+1] + urlramp = 'https://rampdb.nih.gov/api/pathways-from-analytes' + dichead = {'Accept': '*/*', 'Content-Type': 'application/json'} + api_datas = send_request_to_mapping_api(urlramp, json_data, dichead) + index_begin_interest = api_datas.find("[") + index_end_interest = api_datas.find("]") + datas_to_treat = api_datas[index_begin_interest:index_end_interest+1] index = datas_to_treat.find("{") - index_begin_lines=[index] + i_b_l = [index] while index != -1: - index+=1 - index = datas_to_treat.find("{",index) - index_begin_lines.append(index) - index_begin_lines[-1]=len(datas_to_treat) - inputid=[] - if infos=="optimization": + index += 1 + index = datas_to_treat.find("{", index) + i_b_l.append(index) + i_b_l[-1] = len(datas_to_treat) + inputid = [] + if inf == "opti": l_met_map = [] - for index_pos in range (len(index_begin_lines)-1): - one_l=datas_to_treat[index_begin_lines[index_pos]:index_begin_lines[index_pos+1]] - inputid.append(one_l[one_l.find("inputId")+10:one_l.find("commonName")-3]) + for index_pos in range(len(i_b_l)-1): + onel = datas_to_treat[i_b_l[index_pos]:i_b_l[index_pos+1]] + inputid.append(onel[onel.find("inputId")+10:onel.find("commonName")-3]) for meta_map in inputid: if meta_map not in l_met_map: l_met_map.append(meta_map) - if len(l_met_map)==len(metabolites_list): + if len(l_met_map) == len(metabolites_list): break print(str(len(l_met_map))+" metabolites were found") - return(len(l_met_map),l_met_map) - - elif infos=="all" : - pathwaysource=[] - pathwayid=[] - commonname=[] - pathwayname=[] - for index_pos in range (len(index_begin_lines)-1): - one_l=datas_to_treat[index_begin_lines[index_pos]:index_begin_lines[index_pos+1]] - pathwayname.append(one_l[16:one_l.find("pathwaySource")-3]) - inputid.append(one_l[one_l.find("inputId")+10:one_l.find("commonName")-3]) - if infos=="all" : - pathwaysource.append(one_l[one_l.find("pathwaySource")+16:one_l.find("pathwayId")-3]) - pathwayid.append(one_l[one_l.find("pathwayId")+12:one_l.find("inputId")-3]) - commonname.append(one_l[one_l.find("commonName")+13:len(one_l)-3]) - pathwayname.insert(0,"pathwayName") - inputid.insert(0,"inputid") - if infos=="all" : - pathwaysource.insert(0,"pathway_source") - pathwayid.insert(0,"pathwayid") - commonname.insert(0,"commonname") - list_result=[pathwayname, pathwaysource, pathwayid, inputid, commonname] + return (len(l_met_map), l_met_map) + if inf == "all": + psource = [] + pathwayid = [] + commonname = [] + pathwayname = [] + for index_pos in range(len(i_b_l)-1): + onel = datas_to_treat[i_b_l[index_pos]:i_b_l[index_pos+1]] + pathwayname.append(onel[16:onel.find("pathwaySource")-3]) + inputid.append(onel[onel.find("inputId")+10:onel.find("commonName")-3]) + if inf == "all": + psource.append(onel[onel.find("pathwaySource")+16:onel.find("pathwayId")-3]) + pathwayid.append(onel[onel.find("pathwayId")+12:onel.find("inputId")-3]) + commonname.append(onel[onel.find("commonName")+13:len(onel)-3]) + pathwayname.insert(0, "pathwayName") + inputid.insert(0, "inputid") + if inf == "all": + psource.insert(0, "pathway_source") + pathwayid.insert(0, "pathwayid") + commonname.insert(0, "commonname") + list_result = [pathwayname, psource, pathwayid, inputid, commonname] else: list_result = [pathwayname, inputid] - df_result=pd.DataFrame(data=list_result).transpose() - excel_file_writer(df_result,outfile, sheetname="Resultats du mapping") + df_result = pd.DataFrame(data=list_result).transpose() + excel_file_writer(df_result, outfile, sheetname="Resultats du mapping") + return (len(list_result[0]), list_result) -#### MetaboAnalyst -def mapping_from_ma_api(metabolites_list): +def equiv_from_ma_api(metabolites_list): """ gives the results of name conversion on MetaboAnalyst @@ -163,21 +169,22 @@ def mapping_from_ma_api(metabolites_list): Returns: Type of return: dic """ - if len(metabolites_list)==0 : + if len(metabolites_list) == 0: bout = "Your metabolite list is empty. Here's an example" bout += "['1,3-Diaminopropane','2-Hydroxybutyric acid']" return bout - headers = {'Content-Type':"application/json",'cache-control':"no-cache"} - metabo_data='' - for name in metabolites_list : - metabo_data=metabo_data+name+';' - payload = {"queryList": metabo_data,"inputType": "name"} + headers = {'Content-Type': "application/json", 'cache-control': "no-cache"} + metabo_data = '' + for name in metabolites_list: + metabo_data = metabo_data+name+';' + payload = {"queryList": metabo_data, "inputType": "name"} json_data = json.dumps(payload).encode('utf-8') ma_url = "https://www.xialab.ca/api/mapcompounds" - api_datas=send_request_to_mapping_api(ma_url, json_data, headers) + api_datas = send_request_to_mapping_api(ma_url, json_data, headers) print(api_datas) + return "The request is complet" + -### ConsensusPathDB BrgEnrichment def get_cpdb_available_fset_types(entity_type): """ function getCpdbAvaibleFsetTypes translation. give the FsetTypes allows. @@ -220,6 +227,7 @@ def get_cpdb_available_fset_types(entity_type): results = {"ID": ids, "description": desc} return results + def get_cpdb_available_accesion_types(entity_type): """ function getCpdbAccessionTypes translation. give the accesion types allows. @@ -259,7 +267,7 @@ def get_cpdb_available_accesion_types(entity_type): xml_content = response.read() xml_dict = xmltodict.parse(xml_content) xmlbod = xml_dict["SOAP-ENV:Envelope"]["SOAP-ENV:Body"] - results= xmlbod["ns1:getAvailableAccessionTypesResponse"]["ns1:accType"] + results = xmlbod["ns1:getAvailableAccessionTypesResponse"]["ns1:accType"] return results @@ -307,6 +315,7 @@ def get_cpdb_available_accesion_id(acctype, accnumbers): return dic_results + def get_cpdb_version(): """ function getCpdbVersion translation. return the cpdb version used @@ -314,7 +323,7 @@ def get_cpdb_version(): Returns: only print the version """ - body = f''' + body = ''' <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:cpd="cpdbns"> <soapenv:Header/> <soapenv:Body> @@ -334,12 +343,14 @@ def get_cpdb_version(): with request.urlopen(req) as response: xml_content = response.read() xml_dict = xmltodict.parse(xml_content) - - results= xml_dict["SOAP-ENV:Envelope"]["SOAP-ENV:Body"]["ns1:getCpdbVersionResponse"]["ns1:cpdbVersion"] + xml_body = xml_dict["SOAP-ENV:Envelope"]["SOAP-ENV:Body"] + results = xml_body["ns1:getCpdbVersionResponse"]["ns1:cpdbVersion"] print(results) -def mapping_ora_cpdb(accnumbers, acctype, cpdbidsbg = None, pthreshold = 0.05, infos="all",ofile="C:\\Users\\mumec\\Desktop\\test_out_cpdb.xlsx"): +def m_ora_cpdb(accnumbers, acctype, cpdbidsbg=None, + pthreshold=0.05, infos="all", + ofile="C:\\Users\\mumec\\Desktop\\test_out_cpdb.xlsx"): """ Give the result of id mapping on CPDB @@ -353,20 +364,19 @@ def mapping_ora_cpdb(accnumbers, acctype, cpdbidsbg = None, pthreshold = 0.05, i Returns: Type of return: excel file """ - cpdbids = get_cpdb_available_accesion_id(acctype, accnumbers) # dictionnaire - tab_cor=[cpdbids['accNumber'],cpdbids['cpdbId']] - if len(accnumbers)>1: + cpdbids = get_cpdb_available_accesion_id(acctype, accnumbers) + tab_cor = [cpdbids['accNumber'], cpdbids['cpdbId']] + if len(accnumbers) > 1: cpdbids = pd.DataFrame(cpdbids) else: cpdbids = pd.DataFrame([cpdbids]) l_id = cpdbids["cpdbId"].tolist() l_id_c = [] for testid in l_id: - if testid is not None : + if testid is not None: l_id_c.append(testid) - cpdbidsfg = [f"<cpd:cpdbIdsFg>{''.join(dat)}</cpd:cpdbIdsFg>" for dat in l_id_c] - + idfg = [f"<cpd:cpdbIdsFg>{''.join(dat)}</cpd:cpdbIdsFg>" for dat in l_id_c] body = ''' <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:cpd="cpdbns"> <soapenv:Header/> @@ -375,7 +385,7 @@ def mapping_ora_cpdb(accnumbers, acctype, cpdbidsbg = None, pthreshold = 0.05, i <cpd:entityType>{}</cpd:entityType> <cpd:fsetType>{}</cpd:fsetType> {} - '''.format("metabolites", "P", ' '.join(cpdbidsfg)) + '''.format("metabolites", "P", ' '.join(idfg)) if cpdbidsbg is not None: cpdbidsbg = ["<cpd:cpdbIdsBg>{}</cpd:cpdbIdsBg>".format(" ".join(map(str, dat))) for dat in cpdbids[:, 1]] @@ -398,121 +408,114 @@ def mapping_ora_cpdb(accnumbers, acctype, cpdbidsbg = None, pthreshold = 0.05, i with request.urlopen(req) as response: xml_content = response.read() xml_dict = xmltodict.parse(xml_content) - xml_dict_body = xml_dict["SOAP-ENV:Envelope"]["SOAP-ENV:Body"]["ns1:overRepresentationAnalysisResponse"] + xml_dict_body = xml_dict["SOAP-ENV:Envelope"]["SOAP-ENV:Body"] + xml_dict_orar = xml_dict_body["ns1:overRepresentationAnalysisResponse"] try: - name = str(xml_dict_body["ns1:name"])[2:-2] + name = str(xml_dict_orar["ns1:name"])[2:-2] except TypeError: print("Mapping empty") - return([]) - else: - print("CPDB mapping ok") - details = str(xml_dict_body["ns1:details"]) - pval = str(xml_dict_body["ns1:pValue"])[2:-2] - qval = str(xml_dict_body["ns1:qValue"])[2:-2] - a_e_n = str(xml_dict_body["ns1:allEntitiesNum"])[:-2] - - - - if infos=="optimization": + return [] + print("CPDB mapping ok") + details = str(xml_dict_orar["ns1:details"]) + pval = str(xml_dict_orar["ns1:pValue"])[2:-2] + qval = str(xml_dict_orar["ns1:qValue"])[2:-2] + a_e_n = str(xml_dict_orar["ns1:allEntitiesNum"])[:-2] + if infos == "opti": splited = details.split("',") l_map = [] l_map_cor = [] for entree in splited: entree_split = entree[2:].split(";;")[-1] - if (len(entree_split[7:])>13): + if len(entree_split[7:]) > 13: map_p = entree_split[7:].split(",") - else : + else: map_p = entree_split[7:] for meta in map_p: if meta not in l_map: l_map.append(meta) - if len(l_map)==len(accnumbers): + if len(l_map) == len(accnumbers): break - for id in l_map: - id_clean = tab_cor[1].index(id.replace("k","kegg").replace("c","chebi").replace("']","")) - l_map_cor.append(tab_cor[0][id_clean]) - return(l_map_cor) - - - - - - - - - - if infos=="all": + for id_m in l_map: + id_t = id_m.replace("k", "kegg").replace("']", "") + id_cln = tab_cor[1].index(id_t) + l_map_cor.append(tab_cor[0][id_cln]) + return l_map_cor + if infos == "all": splited = details.split("',") fsetid = ["fsetId"] cpdburl = ["URLCPDB"] url = ["pathway link"] pmids = ["PMIDS"] ovlent = ["menber input"] - overlapping =["Input overlapping"] - output_1=[fsetid, cpdburl, url, pmids, ovlent, overlapping] + overlapping = ["Input overlapping"] + output_1 = [fsetid, cpdburl, url, pmids, ovlent, overlapping] for entree in splited: entree_split = entree[2:].split(";;") - if len(entree_split)==5: - for index,infos in enumerate(entree_split): - output_1[index].append(infos) + if len(entree_split) == 5: + for index, infos_split in enumerate(entree_split): + output_1[index].append(infos_split) output_1[5].append(entree_split[4].count(',')) - elif len(entree_split)==4: - for index,infos in enumerate(entree_split[:3]): - output_1[index].append(infos) + elif len(entree_split) == 4: + for index, infos_split in enumerate(entree_split[:3]): + output_1[index].append(infos_split) output_1[3].append("NA") output_1[4].append(entree_split[3]) output_1[5].append(entree_split[3].count(',')) - else : + else: print("anomalie") - for pos in range(1,len(output_1[0])): + for pos in range(1, len(output_1[0])): output_1[0][pos] = output_1[0][pos][7:] output_1[1][pos] = output_1[1][pos][8:] - if len(output_1[3][pos])>3: + if len(output_1[3][pos]) > 3: output_1[3][pos] = output_1[3][pos][6:] output_1[4][pos] = output_1[4][pos][9:] name_splited = name.split("', '") source = ["source"] - for nam_i,nam in enumerate(name_splited): - l_nam = len(nam) + for nam_i, nam in enumerate(name_splited): p_ind = nam.index("(") sub_nam = nam[p_ind+1:-1] - while sub_nam.find(" ")!=-1: - p_ind+=sub_nam.index("(") + while sub_nam.find(" ") != -1: + p_ind += sub_nam.index("(") sub_nam = nam[p_ind+2:-1] sub_nam = sub_nam.strip("(") source.append(sub_nam) name_splited[nam_i] = nam[:p_ind-1] - name_splited.insert(0,"Pathways") # modifier avec les sources + name_splited.insert(0, "Pathways") psplited = pval.split("', '") - psplited.insert(0,"p-value") + psplited.insert(0, "p-value") qsplited = qval.split("', '") - qsplited.insert(0,"q-value") + qsplited.insert(0, "q-value") numsplited = a_e_n.split("',") size = ["size"] e_size = ["effective_size"] - for ee in numsplited : + for ee in numsplited: p_index = ee.index("(") e_size.append(ee[2:p_index]) size.append(ee[p_index+1:-1]) for links_i, links in enumerate(url): url[links_i] = links.replace("url:", "") ovlent_c = ["menber input"] - for l_ovlent_index, l_ovlent in enumerate(ovlent[1:]): + for l_ovlent in ovlent[1:]: clean_id = [] - #print(l_ovlent) ov_splited = l_ovlent.split(",") for o_s_i, o_s in enumerate(ov_splited): - if o_s_i==0: - clean_id.append(tab_cor[0][tab_cor[1].index(o_s.replace("C","kegg:C").replace("']",""))]) + if o_s_i == 0: + o_s_i_temp = o_s.replace("C", "kegg:C").replace("']", "") + i_u = tab_cor[0][tab_cor[1].index(o_s_i_temp)] + clean_id.append(i_u) else: - clean_id.append(tab_cor[0][tab_cor[1].index(o_s.replace("k","kegg").replace("c","chebi").replace("']",""))]) + o_s_temp = o_s.replace("k", "kegg").replace("']", "") + i_u2 = tab_cor[0][tab_cor[1].index(o_s_temp)] + clean_id.append(i_u2) ovlent_c.append(clean_id) - out_f = [psplited,qsplited, name_splited, source, url, ovlent_c, overlapping, size, e_size, fsetid, pmids, cpdburl] # remplacer les ovlent par les input de base fournit par l'utilisateur. + out_f = [psplited, qsplited, name_splited, source, url, ovlent_c, + overlapping, size, e_size, fsetid, pmids, cpdburl] out_df = pd.DataFrame(data=out_f).transpose() - excel_file_writer(out_df,ofile, sheetname="Resultats") + excel_file_writer(out_df, ofile, sheetname="Resultats") + return ovlent_c -### Multimapping -def multimapping_ramp(file ,num_col, outfiles,infpath="Yes"): + +def multimapping_ramp(file, num_col, outfiles, infpath="Yes"): """ Processe multimapping from multi list @@ -530,18 +533,18 @@ def multimapping_ramp(file ,num_col, outfiles,infpath="Yes"): l_o_l = [[] for _ in num_col] l_o_d = [] for elem in read_l_id: - for pos,v_id in enumerate(elem): + for pos, v_id in enumerate(elem): if str(v_id).strip() != "nan": l_o_l[pos].append(str(v_id).strip()) for pos, val in enumerate(l_o_l): - if infpath=="Yes": - l_o_d.append(mapping_from_ramp_api(val, outfiles[pos], infos="all")) + if infpath == "Yes": + l_o_d.append(mapping_ramp_api(val, outfiles[pos], inf="all")) else: - mapping_from_ramp_api(l_o_l[pos], outfiles[pos]) + mapping_ramp_api(l_o_l[pos], outfiles[pos]) return l_o_d -def opti_multimapping(file, outfolder, mapping="NO"): +def opti_multimapping(file, outfolder, mapping="YES"): """ Processe optimal mapping of RAMP and CPDB @@ -555,101 +558,99 @@ def opti_multimapping(file, outfolder, mapping="NO"): """ inf = recup_all_inf_excel(file) to_test = [] - recap=[[]] - ### cas CPDB - l_opt_cpdb = [] - cpdb_outfile = FOLDER+"optimapping_cpdb.xlsx" + recap = [[]] id_dif = [] - col_aso_id = [] + col_id = [] for line in inf: recap[0].append(line[0]) for ind_head, headers in enumerate(inf[0][1:-1]): if headers not in id_dif: id_dif.append(headers) - col_aso_id.append([ind_head+1]) + col_id.append([ind_head+1]) else: - col_aso_id[-1].append(ind_head+1) - for index_t_i, type_id in enumerate(id_dif):# boucle sur les différents id + col_id[-1].append(ind_head+1) + for i_t_i, type_id in enumerate(id_dif): + # boucle sur les différents id acctype = type_id - meta_mapper = [] - id_to_search=[i for i in range(len(inf)-1)] + id_to_search = list(range(len(inf[1:]))) to_test.clear() l_opti_for_this_id = ["NA" for i in range(len(inf))] - for num_col in col_aso_id[index_t_i]:# boucle sur les colonnes d'un même id + for n_col in col_id[i_t_i]: + # boucle sur les colonnes d'un même id col_actu = [] - for l in inf[1:]: - col_actu.append(l[num_col]) + for lig in inf[1:]: + col_actu.append(lig[n_col]) col_actu = pre_cut(col_actu) new_id = [] - for index_to_test, id_to_t in enumerate(col_actu):# boucle sur les id d'une même colonne - if index_to_test in id_to_search and id_to_t!="NA": + for index_to_test, id_to_t in enumerate(col_actu): + # boucle sur les id d'une même colonne + if index_to_test in id_to_search and id_to_t != "NA": new_id.append(id_to_t) to_test.append(id_to_t) - if len(new_id)!=0: - cpdb_out_opti = mapping_ora_cpdb(to_test, acctype, infos="optimization") + if len(new_id) != 0: + cpdb_o_opti = m_ora_cpdb(to_test, acctype, infos="opti") for t_new in new_id: - if t_new in cpdb_out_opti : + if t_new in cpdb_o_opti: id_to_search.remove(col_actu.index(t_new)) - l_opti_for_this_id[col_actu.index(t_new)+1]=t_new + l_opti_for_this_id[col_actu.index(t_new)+1] = t_new for index_change in id_to_search: - if col_actu[index_change]!="NA": - to_test.remove(col_actu[index_chang]) - if len(cpdb_out_opti)==len(inf[1:]) or num_col==col_aso_id[index_t_i][-1]: - if mapping=="YES": - outfile = outfolder+acctype+"_mapping_opti_cpdb.xlsx" - mapping_ora_cpdb(cpdb_out_opti, acctype, infos="all", ofile=outfile) + if col_actu[index_change] != "NA": + to_test.remove(col_actu[index_change]) + if len(cpdb_o_opti) == len(inf[1:]) or n_col == col_id[i_t_i][-1]: + if mapping == "YES": + cpdbf = outfolder+acctype+"_mapping_opti_cpdb.xlsx" + m_ora_cpdb(cpdb_o_opti, acctype, infos="all", ofile=cpdbf) l_opti_for_this_id[0] = "CPDB "+acctype recap.append(l_opti_for_this_id) break - ### cas ramp for line in inf[1:]: to_test.append(line[1]) l_opt_ramp = [] l_opt_ramp_tri = ["NA" for i in range(len(inf))] n_meta_map = 0 - ramp_outfile = FOLDER+"optimapping_ramp.xlsx" - n_meta_map, l_opt_ramp = mapping_from_ramp_api(to_test, ramp_outfile, infos="optimization") - if n_meta_map==len(inf)-1: - mapping_from_ramp_api(l_opt_ramp, ramp_outfile, infos="all") - return("la premiére liste est optimal") - else: - to_test.clear() - index_still=[] - for index_l, li in enumerate(inf[1:]): - if li[1] not in l_opt_ramp: - index_still.append(index_l+1) - if li[2]!="NA": - to_test.append(li[2]) - else: - l_opt_ramp_tri[index_l+1] = li[1] - input_col = 2 - while n_meta_map!=len(inf)-1 and input_col!=(len(inf[0])-1): # prend en compte la derniére colones de fold-change - if len(to_test)!=0: - n_meta_sup, second_map = mapping_from_ramp_api(to_test, ramp_outfile, infos="optimization") - n_meta_map+=n_meta_sup - l_opt_ramp+=second_map - index_to_remove = [] - for s_ind in index_still: - if inf[s_ind][input_col] in second_map: - index_to_remove.append(s_ind) - l_opt_ramp_tri[s_ind] = inf[s_ind][input_col] - for ix in index_to_remove: - index_still.remove(ix) - to_test.clear() - input_col+=1 - for ind_ind in index_still: - if inf[ind_ind][input_col]!="NA": - to_test.append(inf[ind_ind][input_col]) - if mapping=="YES": - mapping_from_ramp_api(l_opt_ramp, ramp_outfile, infos="all") + ramp_outf = FOLDER+"optimapping_ramp.xlsx" + n_meta_map, l_opt_ramp = mapping_ramp_api(to_test, ramp_outf, inf="opti") + if n_meta_map == len(inf)-1: + mapping_ramp_api(l_opt_ramp, ramp_outf, inf="all") + return "la premiére liste est optimal" + to_test.clear() + index_still = [] + for index_l, li in enumerate(inf[1:]): + if li[1] not in l_opt_ramp: + index_still.append(index_l+1) + if li[2] != "NA": + to_test.append(li[2]) + else: + l_opt_ramp_tri[index_l+1] = li[1] + input_col = 2 + while n_meta_map != len(inf)-1 and input_col != (len(inf[0])-1): + # prend en compte la derniére colones de fold-change + if len(to_test) != 0: + n_sup, s_map = mapping_ramp_api(to_test, ramp_outf, inf="opti") + n_meta_map += n_sup + l_opt_ramp += s_map + index_to_remove = [] + for s_ind in index_still: + if inf[s_ind][input_col] in s_map: + index_to_remove.append(s_ind) + l_opt_ramp_tri[s_ind] = inf[s_ind][input_col] + for ix in index_to_remove: + index_still.remove(ix) + to_test.clear() + input_col += 1 + for ind_ind in index_still: + if inf[ind_ind][input_col] != "NA": + to_test.append(inf[ind_ind][input_col]) + if mapping == "YES": + mapping_ramp_api(l_opt_ramp, ramp_outf, inf="all") l_opt_ramp_tri[0] = "RAMP" recap.append(l_opt_ramp_tri) recap = pd.DataFrame(data=recap).transpose() n_out_f = outfolder+"recap_mapping_opti.xlsx" excel_file_writer(recap, n_out_f, sheetname="Resultats") + return "all is ok" + -### Lancement if __name__ == "__main__": - FOLDER = "C:\\Users\\mumec\\Desktop\\Mini_codes\\" - f_enter = FOLDER+"fichier_entree_test_multi_mapping.xlsx" - opti_multimapping(f_enter, FOLDER) + F_ENTER = FOLDER+"fichier_entree_test_multi_mapping.xlsx" + opti_multimapping(F_ENTER, FOLDER) diff --git a/Tests_unitaires/test_Mapping_using_the_API.py b/Tests_unitaires/test_Mapping_using_the_API.py index b0b0fd5..222d0fd 100644 --- a/Tests_unitaires/test_Mapping_using_the_API.py +++ b/Tests_unitaires/test_Mapping_using_the_API.py @@ -3,7 +3,17 @@ import sys sys.path.append('C:\\Users\\mumec\\Desktop\\Dossier_gitlab_local\\traitement_des_données') import Mapping_using_the_API -class TestMappingAPI(unittest.TestCase): +class Testutils(unittest.TestCase): + """ + recup_all_inf_excel(file) + + send_request_to_mapping_api(url, data_json, head, met='POST') + + excel_file_writer(dataframe, name_out_file, sheetname="Resultats") + + pre_cut(listed) + """ + def test_recup_all_inf_excel(self): def test_send_request_to_Mapping_API(self): result =Mapping_using_the_API.send_request_to_Mapping_API('https://rampdb.nih.gov/api/pathways-from-analytes',["hmdb:HMDB0000064"],{'Accept': '*/*','Content-Type': 'application/json'}) @@ -13,13 +23,58 @@ class TestMappingAPI(unittest.TestCase): with self.assertRaises(ValueError): Mapping_using_the_API.send_request_to_Mapping_API('https://rampdb.nih.gov/api/pathways-from-analytes',[],{'Accept': '*/*','Content-Type': 'application/json'}) -""" - def test_mapping_from_RAMP_API(self): + def test_excel_file_writer(self): - def test_excel_file_writer(self): -""" + def test_pre_cut(self): + +class TestMappingAPI(unittest.TestCase): + """ + mapping_ramp_api(metabolites_list, outfile, inf="opti") + + m_ora_cpdb(accnumbers, acctype, cpdbidsbg=None, + pthreshold=0.05, infos="all", + ofile="C:\\Users\\mumec\\Desktop\\test_out_cpdb.xlsx") + + equiv_from_ma_api(metabolites_list) + """ + + def test_mapping_ramp_api(self): + + + def test_m_ora_cpdb(self): + + + def test_equiv_from_ma_api(self): + + +class TestCPDBannexe(unittest.TestCase): + """ + get_cpdb_available_fset_types(entity_type) + + get_cpdb_available_accesion_types(entity_type) + + get_cpdb_available_accesion_id(acctype, accnumbers) + + get_cpdb_version() + """ + def test_get_cpdb_available_fset_types(self): + + def test_get_cpdb_available_accesion_types(self): + + def test_get_cpdb_available_accesion_id(self): + + def test_get_cpdb_version(self): + +class TestMultiMapping(unittest.TestCase): + """ + multimapping_ramp(file, num_col, outfiles, infpath="Yes") + + opti_multimapping(file, outfolder, mapping="YES") + """ + def test_multimapping_ramp(self): + def test_opti_multimapping(self): if __name__=='__main__': unittest.main() \ No newline at end of file -- GitLab