Commit 7b26f1fa authored by MARTIN Pierre's avatar MARTIN Pierre
Browse files

september 13 fixes to merge_abundance_and_functional_annot.py; main.nf; some documentation

parent 5b92898f
......@@ -89,21 +89,21 @@ concat_diamond_files = pd.DataFrame()
# Concatenate diamond files.
for (diamond_idx,diamond_path) in enumerate(diamond_files):
diamond_file = pd.read_csv(diamond_path, delimiter='\t', decimal='.', header=None)
diamond_file.loc[:,1] = 'https://www.ncbi.nlm.nih.gov/protein/' + diamond_file.loc[:,1]
group_diamond_file = diamond_file.groupby(diamond_file.columns[0])\
.agg({diamond_file.columns[14] : ';'.join, diamond_file.columns[1] : ','.join})\
.reset_index()\
.reindex(columns=diamond_file.columns)
res_diamond_file = group_diamond_file.iloc[:,[0,1,14]]
diamond_columns = ["qseqid","sseqid","pident","length","mismatch","gapopen","qstart","qend","sstart","send","evalue","bitscore","qlen","slen","stitle"]
diamond_file = pd.read_csv(diamond_path, delimiter='\t', decimal='.', header=None, names=diamond_columns)
diamond_file.loc[:,"sseqid"] = 'https://www.ncbi.nlm.nih.gov/protein/' + diamond_file.loc[:,"sseqid"]
group_diamond_file = diamond_file.groupby("qseqid")\
.agg({"stitle" : ';'.join, "sseqid" : ','.join})\
.reset_index()\
.reindex(columns=diamond_file.columns)
res_diamond_file = group_diamond_file.loc[:,["qseqid","sseqid","stitle"]]
concat_diamond_files = pd.concat([concat_diamond_files, res_diamond_file])
# Merge counts, annotation and diamond results.
merge_annot = pd.merge(counts_file,concat_eggnog_mapper_files,left_on="seed_cluster",right_on='#query_name', how='left')
merge = pd.merge(merge_annot,concat_diamond_files,left_on="seed_cluster",right_on=concat_diamond_files.columns[0], how='left')
merge = pd.merge(merge_annot,concat_diamond_files,left_on="seed_cluster",right_on="qseqid", how='left')
merge.drop('#query_name', inplace=True, axis=1)
merge.drop(merge.columns[28], inplace=True, axis=1)
res_merge = merge.rename(columns = {1: 'diamond_db_id', 14: 'diamond_db_description'})
merge.drop("qseqid", inplace=True, axis=1)
# Write merge data frame in output file.
res_merge.to_csv(args.output_file, sep="\t", index=False)
merge.to_csv(output_file, sep="\t", index=False)
......@@ -71,14 +71,19 @@ percontig = pd.read_csv(args.percontig_file, delimiter='\t', dtype=str)
# Merge idxstats and .percontig.tsv files.
merge = pd.merge(idxstats,percontig,left_on='contig',right_on='#contig', how='outer')
#add depth
# Add depth
merge = pd.merge(merge,mosdepth,left_on='contig',right_on='contig', how='outer')
# Fill NaN values to keep unmapped contigs.
merge['consensus_lineage'] = merge['consensus_lineage'].fillna('Unknown')
merge['tax_id_by_level'] = merge['tax_id_by_level'].fillna(1)
merge['consensus_tax_id'] = merge['consensus_tax_id'].fillna(1)
# Group by lineage and sum number of reads and contigs.
res = merge.groupby(['consensus_lineage','consensus_tax_id', 'tax_id_by_level']).agg({'contig' : [';'.join, 'count'], 'mapped': 'sum', 'depth': 'mean'}).reset_index()
res.columns=['lineage_by_level', 'consensus_tax_id', 'tax_id_by_level', 'name_contigs', 'nb_contigs', 'nb_reads', 'depth']
# Fill the NaN by 0.
# Fill NaN values with 0.
res.fillna(0, inplace=True)
# Split by taxonomic level
......
......@@ -57,7 +57,7 @@ with open(args.list_of_kaiju_files) as fkaiju_list:
kaiju_files = fkaiju_list.read().split()
# Merge kaiju results for all samples.
for (kaiju_idx,kaiju_path) in enumerate(kaiju_files):
for (kaiju_idx,kaiju_path) in enumerate(sorted(kaiju_files)):
print(kaiju_idx)
if(kaiju_idx==0):
merge = pd.read_csv(kaiju_path, delimiter='\t', dtype=str)
......
......@@ -76,7 +76,7 @@ merge.rename(columns = {'name_contigs': 'name_contigs_' + sample_name, \
'nb_contigs': 'nb_contigs_' + sample_name,\
'nb_reads': 'nb_reads_' + sample_name},inplace=True)
# Fill the NaN by 0.
# Fill NaN values with 0.
merge.fillna(0, inplace=True)
# Write merge data frame in output file.
......
......@@ -44,14 +44,10 @@ process {
memory = { 50.GB * task.attempt }
cpus = 25
}
withName: metaspades {
withName: assembly {
memory = { 110.GB * task.attempt }
cpus = 20
}
withName: megahit {
cpus = 20
memory = { 100.GB * task.attempt }
}
withName: quast {
cpus = 4
memory = { 8.GB * task.attempt }
......
......@@ -45,14 +45,10 @@ process {
memory = { 2.GB * task.attempt }
cpus = 20
}
withName: metaspades {
withName: assembly {
memory = { 60.GB * task.attempt }
cpus = 14
}
withName: megahit {
cpus = 20
memory = { 60.GB * task.attempt }
}
withName: quast {
cpus = 3
memory = { 8.GB * task.attempt }
......
......@@ -41,14 +41,10 @@ process {
memory = { 36.GB * task.attempt }
cpus = 4
}
withName: metaspades {
withName: assembly {
memory = { 10.GB * task.attempt }
cpus = 8
}
withName: megahit {
cpus = 8
memory = { 10.GB * task.attempt }
}
withName: quast {
cpus = 2
memory = { 2.GB * task.attempt }
......
......@@ -38,17 +38,13 @@ process {
cpus = 6
}
withName: kaiju {
memory = { 100.GB * task.attempt }
memory = { 50.GB * task.attempt }
cpus = 4
}
withName: metaspades {
withName: assembly {
memory = { 10.GB * task.attempt }
cpus = 8
}
withName: megahit {
cpus = 8
memory = { 10.GB * task.attempt }
}
withName: quast {
cpus = 2
memory = { 2.GB * task.attempt }
......
......@@ -37,14 +37,10 @@ process {
memory = { 10.GB * task.attempt }
cpus = 2
}
withName: metaspades {
withName: assembly {
memory = { 2.GB * task.attempt }
cpus = 3
}
withName: megahit {
cpus = 3
memory = { 2.GB * task.attempt }
}
withName: quast {
cpus = 2
memory = { 2.GB * task.attempt }
......
# Functional tests: Usage
## I. Pre-requisites
1. metagWGS is still under development: you need to use the `dev-test` branch of the metagwgs repository.
Run:
```bash
cd metagwgs
git checkout dev-test
git pull
cd functional_tests
```
2. Make sure you are in the directory where you downloaded `metagwgs` source files and added the three mandatory Singularity images in `metagwgs/env`
3. Make sure you downloaded all the required data files for metagwgs. If not, they will be downloaded by the pipeline each time you run it in a different folder.
4. Download the expected results directory for test files from [link-to-exp-dir]
## II. Launch test
The script can be used alongside a homemade script containing the launching command of MetagWGS on the computational cluster of your choice.
If you want to, you can
### Launch with script
### Launch without script
\ No newline at end of file
......@@ -87,11 +87,13 @@ def check_files(exp_dir, obs_dir, step, methods, verbose):
not_tested_log = 'ft_{}.not_tested'.format(step)
os.remove(not_tested_log) if path.exists(not_tested_log) else None
log = open('ft_{}.log'.format(step), 'w+')
# Create a new log for this functional test
tested_log = 'ft_{}.log'.format(step)
log = open(tested_log, 'w+')
# Paths to expected and observed parent directories
expected_prefix = path.join(path.abspath(exp_dir), step)
observed_prefix = path.join(path.abspath(obs_dir), step)
log.write('Expected directory: {a}\nvs\nObserved directory: {b}\n'.format(a = expected_prefix, b = observed_prefix))
# Passed and failed tests count initialization
......@@ -103,38 +105,31 @@ def check_files(exp_dir, obs_dir, step, methods, verbose):
if verbose: print('\nLaunching {}...\n'.format(step))
# Test each file
for file_path in files_list:
# Metadata on file to find them and know which test to perform
expected_path = path.join(expected_prefix, file_path)
observed_path = path.join(observed_prefix, file_path)
file_name = path.basename(file_path)
file_extension = path.splitext(file_name)[1]
if re.search(r"taxo_affi_reads_.*\.tsv", file_name):
method = "taxo_diff"
elif re.search(r".*_sickle\.log", file_name):
method = "diff"
elif re.search(r".*_cutadapt\.log", file_name):
method = "cut_diff"
# Find which test to perform on given file (exceptions being "taxo_diff" and "cut_diff")
method = ''
for test in methods:
if type(methods[test]) != list and re.search(methods[test], file_name):
method = test
break
elif file_extension in methods["diff"]:
method = "diff"
elif type(methods[test]) == list and file_extension in methods[test]:
method = test
break
elif file_extension in methods["not_empty"]:
method = "not_empty"
if method == '':
sys.exit('Method {} doesn\'t exist for {} in {}'.format(test, file_name, expected_path))
elif file_extension in methods["no_header_diff"]:
method = "no_header_diff"
elif file_extension in methods["zdiff"]:
method = "zdiff"
else:
sys.exit("Non existing method for {} with ext {}".format(file_name, file_extension))
# Make log of non existing files
# Non existing files
if not path.exists(observed_path):
nt = open(not_tested_log, 'a')
nt.write(observed_path + '\n')
......@@ -151,10 +146,13 @@ Not tested (list in {b})
log.write(file_out)
if verbose: print(file_out)
# Existing files
if path.exists(expected_path) and path.exists(observed_path):
test, out = test_file(expected_path, observed_path, method)
# Test failed:
if test == False:
file_out = '''
------------------------------------------------------------------------------
......@@ -168,6 +166,7 @@ Test method: {b}
log.write(out)
if verbose: print(out)
# Test passed:
elif test == True:
file_out = '''
------------------------------------------------------------------------------
......@@ -207,9 +206,11 @@ Passed: {c} ({d}%)
Missed: {e} ({f}%)
Not tested: {g}
Find more details in {h}
-----------------------------------------
=========================================
'''.format(a = step, b = max_cnt, c = true_cnt, d = true_perc, e = false_cnt, f = false_perc, g = nt_cnt)
'''.format(a = step, b = max_cnt, c = true_cnt, d = true_perc, e = false_cnt, f = false_perc, g = nt_cnt, h = tested_log)
log.write(out)
log.close()
......@@ -230,13 +231,15 @@ def test_file(exp_path, obs_path, method):
command = 'diff <(tail -n+6 {}) <(tail -n+6 {})'.format(exp_path, obs_path)
elif method == 'no_header_diff':
command = 'diff <(grep -w "^#" {}) <(grep -w "^#" {})'.format(exp_path, obs_path)
command = 'diff <(grep -w "^?#" {}) <(grep -w "^?#" {})'.format(exp_path, obs_path)
elif method == 'zdiff':
command = 'zdiff {} {}'.format(exp_path, obs_path)
elif method == 'taxo_diff':
command = 'diff <(cut -f1 {} | sort) <(cut -f1 {} | sort)'.format(exp_path, obs_path)
command = 'diff {} {}'.format(exp_path, obs_path)
# command = 'diff <(sort {}) <(sort {})'.format(exp_path, obs_path)
# command = 'diff <(cut -f1 {} | sort) <(cut -f1 {} | sort)'.format(exp_path, obs_path)
process = subprocess.Popen(command, stdout = subprocess.PIPE, shell = True, executable = '/bin/bash')
diff_out, error = process.communicate()
......
......@@ -35,7 +35,8 @@ steps_list = OrderedDict([
])
global methods
methods = dict([
methods = OrderedDict([
("cut_diff", r".*_cutadapt\.log"),
("diff", [".flagstat",".idxstats",".fasta",".fa",".faa",".ffn",".fna",".gff",".len",".bed",".m8",".clstr",".txt",".summary",".best_hit", ".log", ".bam", ".tsv"]),
("not_empty", [".zip",".html",".pdf", ".bai"]),
("no_header_diff", [".annotations",".seed_orthologs"]),
......
......@@ -532,52 +532,35 @@ else {
* ASSEMBLY.
*/
// Metaspades.
if(params.assembly=='metaspades') {
// megahit_ch = Channel.from(false)
process metaspades {
tag "${sampleId}"
publishDir "${params.outdir}/02_assembly", mode: 'copy'
input:
set sampleId, file(preprocessed_reads_R1), file(preprocessed_reads_R2) from input_reads_for_assembly_ch
val spades_mem from metaspades_mem_ch
output:
set sampleId, file("${sampleId}_metaspades/${sampleId}_scaffolds.fasta") into assembly_for_quast_ch, assembly_for_dedup_ch, assembly_for_filter_ch, assembly_no_filter_ch
set sampleId, file("${sampleId}_metaspades/${sampleId}_spades_log.txt"), file("${sampleId}_metaspades/params.txt") into logs_metaspades_ch
when: ('02_assembly' in step || '03_filtering' in step || '04_structural_annot' in step || '05_alignment' in step || '06_func_annot' in step || '07_taxo_affi' in step || '08_binning' in step)
script:
"""
metaspades.py -t ${task.cpus} -m ${spades_mem} -1 ${preprocessed_reads_R1} -2 ${preprocessed_reads_R2} -o ${sampleId}_metaspades
mv ${sampleId}_metaspades/scaffolds.fasta ${sampleId}_metaspades/${sampleId}_scaffolds.fasta
mv ${sampleId}_metaspades/spades.log ${sampleId}_metaspades/${sampleId}_spades_log.txt
"""
}
} else { // Megahit.
if(params.assembly=='megahit') {
// metaspades_ch = Channel.from(false)
process megahit {
tag "${sampleId}"
publishDir "${params.outdir}/02_assembly", mode: 'copy'
// Assembly (metaspades or megahit).
process assembly {
tag "${sampleId}"
publishDir "${params.outdir}/02_assembly", mode: 'copy'
label 'assembly'
input:
set sampleId, file(preprocessed_reads_R1), file(preprocessed_reads_R2) from input_reads_for_assembly_ch
input:
set sampleId, file(preprocessed_reads_R1), file(preprocessed_reads_R2) from input_reads_for_assembly_ch
val spades_mem from metaspades_mem_ch
output:
set sampleId, file("${sampleId}_megahit/${sampleId}.contigs.fa") into assembly_for_quast_ch, assembly_for_dedup_ch, assembly_for_filter_ch, assembly_no_filter_ch
set sampleId, file("${sampleId}_megahit/${sampleId}.log"), file("${sampleId}_megahit/options.json") into logs_megahit_ch
output:
set sampleId, file("${sampleId}_assembly/${sampleId}.contigs.fa") into assembly_for_quast_ch, assembly_for_dedup_ch, assembly_for_filter_ch, assembly_no_filter_ch
set sampleId, file("${sampleId}_assembly/${sampleId}.log"), file("${sampleId}_assembly/params.txt") into logs_assembly_ch
when: ('02_assembly' in step || '03_filtering' in step || '04_structural_annot' in step || '05_alignment' in step || '06_func_annot' in step || '07_taxo_affi' in step || '08_binning' in step)
when: ('02_assembly' in step || '03_filtering' in step || '04_structural_annot' in step || '05_alignment' in step || '06_func_annot' in step || '07_taxo_affi' in step || '08_binning' in step)
script:
"""
megahit -t ${task.cpus} -1 ${preprocessed_reads_R1} -2 ${preprocessed_reads_R2} -o ${sampleId}_megahit --out-prefix "${sampleId}"
"""
}
}
script:
if(params.assembly=='metaspades')
"""
metaspades.py -t ${task.cpus} -m ${spades_mem} -1 ${preprocessed_reads_R1} -2 ${preprocessed_reads_R2} -o ${sampleId}_assembly
mv ${sampleId}_assembly/scaffolds.fasta ${sampleId}_assembly/${sampleId}.contigs.fa
mv ${sampleId}_assembly/spades.log ${sampleId}_assembly/${sampleId}.log
"""
else if(params.assembly=='megahit')
"""
megahit -t ${task.cpus} -1 ${preprocessed_reads_R1} -2 ${preprocessed_reads_R2} -o ${sampleId}_assembly --out-prefix "${sampleId}"
"""
else
error "Invalid parameter: ${params.assembly}"
}
// Assembly metrics.
......@@ -1082,7 +1065,7 @@ else if(params.taxonomy_dir) {
taxonomy_ch = accession2taxid_ch.combine(taxdump_ch)
}
else {
exit 1, "You must specify --accession2taxid and --taxdump or --taxonomy_dir"
exit 1, "You must specify [--accession2taxid and --taxdump] or --taxonomy_dir"
}
/*
......@@ -1100,8 +1083,8 @@ process diamond_parser {
when: ('07_taxo_affi' in step)
input:
set file(accession2taxid), file(taxdump) from taxonomy_ch
set sampleId, file(diamond_file), file(prot_len), file(idxstats), file(depth) from diamond_parser_input_ch
set file(accession2taxid), file(taxdump) from taxonomy_ch.collect()
set sampleId, file(diamond_file), file(prot_len), file(idxstats), file(depth) from diamond_parser_input_ch
output:
set sampleId, file("${sampleId}.percontig.tsv") into taxo_percontig_ch
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment