From 3ff123185016f8105e4520975ae6dda2bf90dbba Mon Sep 17 00:00:00 2001 From: Robert Bossy <Robert.Bossy@inra.fr> Date: Fri, 19 Feb 2021 02:44:47 +0100 Subject: [PATCH] Python script to replace AlvisNLP plan --- dsmz-match.py | 157 +++++++++++++++++++++++++++++++++++++++++++ dsmz-match.snakefile | 2 +- 2 files changed, 158 insertions(+), 1 deletion(-) create mode 100755 dsmz-match.py diff --git a/dsmz-match.py b/dsmz-match.py new file mode 100755 index 0000000..c0bdee1 --- /dev/null +++ b/dsmz-match.py @@ -0,0 +1,157 @@ +#!/bin/env python3 + + +import xml.etree.ElementTree as ET +import sys +import os +import itertools +import re + + +def read_taxa_id(filename): + result = {} + with open(filename) as f: + for line in f: + cols = line.strip().split('\t') + result[cols[0]] = { + 'taxid': cols[1], + 'rank': cols[5] + } + return result + + +class BacDiveEntry: + def __init__(self, ncbi_taxa, filename): + self.filename = filename + self.root = ET.parse(filename).getroot() + self.strain_number = [] + self.species = None + self.is_subspecies = False + self.genus = None + self.family = None + self.ordo = None + self.class_ = None + self.phylum = None + self.domain = None + self.strain_number = tuple(self._get_strain_numbers()) + self._set_taxonomy() + self.complete_name = tuple('%s %s' % (self.species, sn) for sn in self.strain_number) + self.dispatch, self.ncbi_taxid = self._dispatch(ncbi_taxa) + self.strain_taxid = self._get_strain_taxid() + self.canonical = self._get_canonical() + + def _get_strain_numbers(self): + for sne in self.root.iterfind('./strain_availability/strains/list-item/strain_number'): + if sne.text is None: + continue + yield from (snt.strip() for snt in sne.text.split(',')) + desig = self.root.findtext('./taxonomy_name/strains/list-item/designation') + if desig is not None and desig != '': + for d in re.split('[,;]', desig): + d = d.strip() + if d.startswith('ST') and d.endswith('(HKI)'): + n = d.replace('ST', '', 1).replace('(HKI)', '') + yield 'ST ' + n + yield 'ST' + n + yield d + + def _set_taxonomy(self): + tax = self.root.find('./taxonomy_name/strains/list-item') + if tax is None: + return + self.species = tax.findtext('./species') + self.is_subspecies = bool(tax.findtext('./subspecies_epithet')) + self.genus = tax.findtext('./genus') + self.family = tax.findtext('./family') + self.ordo = tax.findtext('./ordo') + self.class_ = tax.findtext('./class') + self.phylum = tax.findtext('./phylum') + self.domain = tax.findtext('./domain') + + def _get_strain_taxid(self): + for sn in self.strain_number: + if sn.startswith('DSM'): + return sn.lower().replace(' ', ':') + if len(self.strain_number) > 0: + return self.strain_number[0].lower().replace(' ', ':') + return 'bacdive:%s' % os.path.basename(self.filename).replace('.xml', '') + + def _get_canonical(self): + for sn in self.strain_number: + if sn.startswith('DSM'): + return '%s %s' % (self.species, sn) + if len(self.strain_number) > 0: + return '%s %s' % (self.species, sn) + return None + + def _match(self, ncbi_taxa, s, rank=None): + if s in ncbi_taxa: + taxon = ncbi_taxa[s] + if rank is None or taxon['rank'] == rank: + return taxon['taxid'] + return None + + def _dispatch(self, ncbi_taxa): + for n in itertools.chain(self.strain_number, self.complete_name): + taxid = self._match(ncbi_taxa, n, 'no rank') + if taxid is not None: + return 'equivalent', taxid + for n in itertools.chain(self.strain_number, self.complete_name): + taxid = self._match(ncbi_taxa, n) + if taxid is not None: + return 'type material', taxid + rank = 'subspecies' if self.is_subspecies else 'species' + taxid = self._match(ncbi_taxa, self.species, rank) + if taxid is not None: + return 'append', taxid + taxid = self._match(ncbi_taxa, self.genus, 'genus') + if taxid is not None: + return 'append-species', taxid + taxid = self._match(ncbi_taxa, self.family, 'family') + if taxid is not None: + return 'append-species', taxid + taxid = self._match(ncbi_taxa, self.ordo, 'order') + if taxid is not None: + return 'append-species', taxid + taxid = self._match(ncbi_taxa, self.class_, 'class') + if taxid is not None: + return 'append-species', taxid + taxid = self._match(ncbi_taxa, self.phylum, 'phylum') + if taxid is not None: + return 'append-species', taxid + taxid = self._match(ncbi_taxa, self.domain, 'superkingdom') + if taxid is not None: + return 'append-species', taxid + return 'fail', None + + +_, NCBI_TAXA_FILE, INPUT_DIR, OUTPUT_DIR = sys.argv + + +def of(fn): + return open(os.path.join(OUTPUT_DIR, fn), 'w') + + +def line(f, sep, *cols): + f.write(sep.join(str(i) for i in cols)) + f.write('\n') + + +sys.stderr.write('Reading NCBI taxonomy: %s\n' % NCBI_TAXA_FILE) +NCBI_TAXA = read_taxa_id(NCBI_TAXA_FILE) +os.makedirs(OUTPUT_DIR, exist_ok=True) +with of('dispatch-report.txt') as out_dispatch, of('dsmz-nodes.dmp') as out_nodes, of('dsmz-names.dmp') as out_names, of('warnings.txt') as out_warn: + sys.stderr.write('Reading BacDive entries: %s\n' % INPUT_DIR) + for dirpath, _, filenames in os.walk(INPUT_DIR): + for fn in filenames: + if not fn.endswith('.xml'): + continue + e = BacDiveEntry(NCBI_TAXA, os.path.join(dirpath, fn)) + line(out_dispatch, '\t', e.filename, e.dispatch, e.ncbi_taxid) + if e.dispatch == 'append': + line(out_nodes, '\t|\t', e.strain_taxid, e.ncbi_taxid, 'no rank', '', 0, 1, 11, 1, 0, 1, 1, 0) + if e.canonical is None: + line(out_warn, '\t', e.filename, 'no canonical') + else: + for name in itertools.chain(e.strain_number, e.complete_name): + line(out_names, '\t|\t', e.strain_taxid, name, '', 'scientific name' if name == e.canonical else 'strain number') diff --git a/dsmz-match.snakefile b/dsmz-match.snakefile index bb0bc92..a5fba2e 100644 --- a/dsmz-match.snakefile +++ b/dsmz-match.snakefile @@ -13,4 +13,4 @@ rule match: input: config['OUTDIR'] + '/' + config['DSMZ_STRAINS_DIR'] - shell: '''{config[ALVISNLP]} -xalias '<input recursive="true">{input}</input>' -alias taxaFile {config[TAXA_FILE]} -outputDir {output} dsmz-match.plan''' + shell: '''./dsmz-match.py {config[TAXA_FILE]} {input} {output}''' -- GitLab