Skip to content
Snippets Groups Projects
Commit 3ff12318 authored by Robert Bossy's avatar Robert Bossy
Browse files

Python script to replace AlvisNLP plan

parent 58460964
No related branches found
No related tags found
No related merge requests found
#!/bin/env python3
import xml.etree.ElementTree as ET
import sys
import os
import itertools
import re
def read_taxa_id(filename):
result = {}
with open(filename) as f:
for line in f:
cols = line.strip().split('\t')
result[cols[0]] = {
'taxid': cols[1],
'rank': cols[5]
}
return result
class BacDiveEntry:
def __init__(self, ncbi_taxa, filename):
self.filename = filename
self.root = ET.parse(filename).getroot()
self.strain_number = []
self.species = None
self.is_subspecies = False
self.genus = None
self.family = None
self.ordo = None
self.class_ = None
self.phylum = None
self.domain = None
self.strain_number = tuple(self._get_strain_numbers())
self._set_taxonomy()
self.complete_name = tuple('%s %s' % (self.species, sn) for sn in self.strain_number)
self.dispatch, self.ncbi_taxid = self._dispatch(ncbi_taxa)
self.strain_taxid = self._get_strain_taxid()
self.canonical = self._get_canonical()
def _get_strain_numbers(self):
for sne in self.root.iterfind('./strain_availability/strains/list-item/strain_number'):
if sne.text is None:
continue
yield from (snt.strip() for snt in sne.text.split(','))
desig = self.root.findtext('./taxonomy_name/strains/list-item/designation')
if desig is not None and desig != '':
for d in re.split('[,;]', desig):
d = d.strip()
if d.startswith('ST') and d.endswith('(HKI)'):
n = d.replace('ST', '', 1).replace('(HKI)', '')
yield 'ST ' + n
yield 'ST' + n
yield d
def _set_taxonomy(self):
tax = self.root.find('./taxonomy_name/strains/list-item')
if tax is None:
return
self.species = tax.findtext('./species')
self.is_subspecies = bool(tax.findtext('./subspecies_epithet'))
self.genus = tax.findtext('./genus')
self.family = tax.findtext('./family')
self.ordo = tax.findtext('./ordo')
self.class_ = tax.findtext('./class')
self.phylum = tax.findtext('./phylum')
self.domain = tax.findtext('./domain')
def _get_strain_taxid(self):
for sn in self.strain_number:
if sn.startswith('DSM'):
return sn.lower().replace(' ', ':')
if len(self.strain_number) > 0:
return self.strain_number[0].lower().replace(' ', ':')
return 'bacdive:%s' % os.path.basename(self.filename).replace('.xml', '')
def _get_canonical(self):
for sn in self.strain_number:
if sn.startswith('DSM'):
return '%s %s' % (self.species, sn)
if len(self.strain_number) > 0:
return '%s %s' % (self.species, sn)
return None
def _match(self, ncbi_taxa, s, rank=None):
if s in ncbi_taxa:
taxon = ncbi_taxa[s]
if rank is None or taxon['rank'] == rank:
return taxon['taxid']
return None
def _dispatch(self, ncbi_taxa):
for n in itertools.chain(self.strain_number, self.complete_name):
taxid = self._match(ncbi_taxa, n, 'no rank')
if taxid is not None:
return 'equivalent', taxid
for n in itertools.chain(self.strain_number, self.complete_name):
taxid = self._match(ncbi_taxa, n)
if taxid is not None:
return 'type material', taxid
rank = 'subspecies' if self.is_subspecies else 'species'
taxid = self._match(ncbi_taxa, self.species, rank)
if taxid is not None:
return 'append', taxid
taxid = self._match(ncbi_taxa, self.genus, 'genus')
if taxid is not None:
return 'append-species', taxid
taxid = self._match(ncbi_taxa, self.family, 'family')
if taxid is not None:
return 'append-species', taxid
taxid = self._match(ncbi_taxa, self.ordo, 'order')
if taxid is not None:
return 'append-species', taxid
taxid = self._match(ncbi_taxa, self.class_, 'class')
if taxid is not None:
return 'append-species', taxid
taxid = self._match(ncbi_taxa, self.phylum, 'phylum')
if taxid is not None:
return 'append-species', taxid
taxid = self._match(ncbi_taxa, self.domain, 'superkingdom')
if taxid is not None:
return 'append-species', taxid
return 'fail', None
_, NCBI_TAXA_FILE, INPUT_DIR, OUTPUT_DIR = sys.argv
def of(fn):
return open(os.path.join(OUTPUT_DIR, fn), 'w')
def line(f, sep, *cols):
f.write(sep.join(str(i) for i in cols))
f.write('\n')
sys.stderr.write('Reading NCBI taxonomy: %s\n' % NCBI_TAXA_FILE)
NCBI_TAXA = read_taxa_id(NCBI_TAXA_FILE)
os.makedirs(OUTPUT_DIR, exist_ok=True)
with of('dispatch-report.txt') as out_dispatch, of('dsmz-nodes.dmp') as out_nodes, of('dsmz-names.dmp') as out_names, of('warnings.txt') as out_warn:
sys.stderr.write('Reading BacDive entries: %s\n' % INPUT_DIR)
for dirpath, _, filenames in os.walk(INPUT_DIR):
for fn in filenames:
if not fn.endswith('.xml'):
continue
e = BacDiveEntry(NCBI_TAXA, os.path.join(dirpath, fn))
line(out_dispatch, '\t', e.filename, e.dispatch, e.ncbi_taxid)
if e.dispatch == 'append':
line(out_nodes, '\t|\t', e.strain_taxid, e.ncbi_taxid, 'no rank', '', 0, 1, 11, 1, 0, 1, 1, 0)
if e.canonical is None:
line(out_warn, '\t', e.filename, 'no canonical')
else:
for name in itertools.chain(e.strain_number, e.complete_name):
line(out_names, '\t|\t', e.strain_taxid, name, '', 'scientific name' if name == e.canonical else 'strain number')
......@@ -13,4 +13,4 @@ rule match:
input:
config['OUTDIR'] + '/' + config['DSMZ_STRAINS_DIR']
shell: '''{config[ALVISNLP]} -xalias '<input recursive="true">{input}</input>' -alias taxaFile {config[TAXA_FILE]} -outputDir {output} dsmz-match.plan'''
shell: '''./dsmz-match.py {config[TAXA_FILE]} {input} {output}'''
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment