#!/usr/bin/env python """Tests of BLAST parser. """ from string import split, strip from cogent.util.unit_test import TestCase, main from cogent.parse.blast import iter_finder, query_finder, iteration_set_finder,\ is_blast_junk, is_blat_junk, make_label, PsiBlastQueryFinder, \ TableToValues, \ PsiBlastTableParser, PsiBlastFinder, GenericBlastParser9, \ PsiBlastParser9, LastProteinIds9, QMEBlast9, QMEPsiBlast9, \ fastacmd_taxonomy_splitter, FastacmdTaxonomyParser __author__ = "Micah Hamady" __copyright__ = "Copyright 2007-2012, The Cogent Project" __credits__ = ["Micah Hamady", "Rob Knight"] __license__ = "GPL" __version__ = "1.5.3" __maintainer__ = "Micah Hamady" __email__ = "hamady@colorado.edu" __status__ = "Production" class BlastTests(TestCase): """Tests of top-level functions""" def setUp(self): """Define some standard data""" self.rec = """# BLASTP 2.2.10 [Oct-19-2004] # Iteration: 1 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ece:Z4181 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 cvi:CV2421 41.67 72 42 0 39 110 29 100 2e-06 52.8 # BLASTP 2.2.10 [Oct-19-2004] # Iteration: 2 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ece:Z4181 100.00 110 0 0 1 110 1 110 3e-54 211 ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-54 211 ece:Z4181 cvi:CV2421 41.67 72 42 0 39 110 29 100 2e-08 59.0 ece:Z4181 sfl:CP0138 33.98 103 57 2 8 110 6 97 6e-06 50.5 ece:Z4181 spt:SPA2730 37.50 72 45 0 39 110 30 101 1e-05 49.8 ece:Z4181 sec:SC2804 37.50 72 45 0 39 110 30 101 1e-05 49.8 ece:Z4181 stm:STM2872 37.50 72 45 0 39 110 30 101 1e-05 49.8""".split('\n') self.rec2 = """# BLASTP 2.2.10 [Oct-19-2004] # Iteration: 1 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ece:Z4181 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 cvi:CV2421 41.67 72 42 0 39 110 29 100 2e-06 52.8 # BLASTP 2.2.10 [Oct-19-2004] # Iteration: 2 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ece:Z4181 100.00 110 0 0 1 110 1 110 3e-54 211 ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-54 211 ece:Z4181 cvi:CV2421 41.67 72 42 0 39 110 29 100 2e-08 59.0 ece:Z4181 sfl:CP0138 33.98 103 57 2 8 110 6 97 6e-06 50.5 ece:Z4181 spt:SPA2730 37.50 72 45 0 39 110 30 101 1e-05 49.8 ece:Z4181 sec:SC2804 37.50 72 45 0 39 110 30 101 1e-05 49.8 ece:Z4181 stm:STM2872 37.50 72 45 0 39 110 30 101 1e-05 49.8 # BLASTP 2.2.10 [Oct-19-2004] # Iteration: 1 # Query: ece:Z4182 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4182 ece:Z4182 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4182 ecs:ECs3718 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4182 cvi:CV2422 41.67 72 42 0 39 110 29 100 2e-06 52.8""".split('\n') self.rec3 = """# BLASTP 2.2.10 [Oct-19-2004] # Iteration: 1 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ece:Z4181 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4181 spt:SPA2730 37.50 72 45 0 39 110 30 101 1e-05 49.8 # BLASTP 2.2.10 [Oct-19-2004] # Iteration: 2 # Query: ece:Z4181 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4181 ecs:ECs3717 100.00 110 0 0 1 110 1 110 3e-54 211 ece:Z4181 cvi:CV2421 41.67 72 42 0 39 110 29 100 2e-08 59.0 # BLASTP 2.2.10 [Oct-19-2004] # Iteration: 1 # Query: ece:Z4182 # Database: db/everything.faa # Fields: Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score ece:Z4182 ece:Z4182 100.00 110 0 0 1 110 1 110 3e-47 187 ece:Z4182 cvi:CV2422 41.67 72 42 0 39 110 29 100 2e-06 52.8""".split('\n') def test_iter_finder(self): """iter_finder should split on lines starting with '# Iteration:'""" lines = 'abc\n# Iteration: 3\ndef'.split('\n') self.assertEqual(map(iter_finder,lines), [False, True, False]) def test_query_finder(self): """query_finder should split on lines starting with '# Query:'""" lines = 'abc\n# Query: dfdsffsd\ndef'.split('\n') self.assertEqual(map(query_finder,lines), [False, True, False]) def test_iteration_set_finder(self): """iter_finder should split on lines starting with '# Iteration:'""" lines = 'abc\n# Iteration: 3\ndef\n# Iteration: 1'.split('\n') self.assertEqual(map(iteration_set_finder,lines), \ [False, False, False, True]) def test_is_junk(self): """is_junk should reject an assortment of invalid lines""" #Note: testing two functions that call it instead of function itself lines = 'abc\n# BLAST blah blah\n \n# BLAT blah\n123'.split('\n') self.assertEqual(map(is_blast_junk, lines), \ [False, True, True, False, False]) self.assertEqual(map(is_blat_junk, lines), \ [False, False, True, True, False]) def test_make_label(self): """make_label should turn comment lines into (key, val) pairs""" a = 'this test will fail: no # at start' b = '#this test will fail because no colon' c = '# Iteration: 1' d = '# Query: ece:Z4147 ygdP; putative invasion protein [EC:3.6.1.-]' e = '#Iteration: 1' #no space after the hash self.assertRaises(ValueError, make_label, a) self.assertRaises(ValueError, make_label, b) #Note that we _do_ map the data type of known values value, so the #value of the iteration will be 1, not '1' self.assertEqual(make_label(c), ('ITERATION', 1)) self.assertEqual(make_label(d), ('QUERY', \ 'ece:Z4147 ygdP; putative invasion protein [EC:3.6.1.-]')) self.assertEqual(make_label(e), ('ITERATION', 1)) def test_TableToValues(self): """TableToValues should convert itself into the correct type.""" constructors = {'a':int, 'b':float, 'c':str} table=[['c','b','a','d'], ['1.5', '3.5', '2', '2.5'],['1','2','3','4']] self.assertEqual(TableToValues(table, constructors), \ ([['1.5',3.5,2,'2.5'],['1',2.0,3,'4']], ['c','b','a','d'])) #check that it works with supplied header self.assertEqual(TableToValues(table[1:], constructors, list('cbad')), \ ([['1.5',3.5,2,'2.5'],['1',2.0,3,'4']], ['c','b','a','d'])) def test_PsiBlastTableParser(self): """PsiBlastTableParser should wrap values in table.""" fields = map(strip, 'Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score'.split(',')) table = map(split, """ece:Z4147 ece:Z4147 100.00 176 0 0 1 176 1 176 2e-89 328 ece:Z4147 ecs:ECs3687 100.00 176 0 0 1 176 1 176 2e-89 328 ece:Z4147 ecc:c3425 100.00 176 0 0 1 176 1 176 2e-89 328 ece:Z4147 sfl:SF2840 100.00 176 0 0 1 176 1 176 2e-89 328""".split('\n')) headed_table = [fields] + table new_table, new_fields = PsiBlastTableParser(headed_table) self.assertEqual(new_fields, fields) self.assertEqual(len(new_table), 4) self.assertEqual(new_table[1], ['ece:Z4147', 'ecs:ECs3687', 100.0, \ 176, 0, 0, 1, 176, 1, 176, 2e-89, 328]) def test_GenericBlastParser9(self): """GenericBlastParser9 should read blast's tabular format (#9).""" rec = self.rec p = GenericBlastParser9(rec, PsiBlastFinder) result = list(p) self.assertEqual(len(result), 2) first, second = result self.assertEqual(first[0], {'ITERATION':1,'QUERY':'ece:Z4181',\ 'DATABASE':'db/everything.faa', 'FIELDS':'Query id, Subject id, % identity, alignment length, mismatches, gap openings, q. start, q. end, s. start, s. end, e-value, bit score'}) self.assertEqual(len(first[1]), 3) self.assertEqual(second[0]['ITERATION'], 2) self.assertEqual(len(second[1]), 7) self.assertEqual(second[1][-1], \ 'ece:Z4181 stm:STM2872 37.50 72 45 0 39 110 30 101 1e-05 49.8'.split()) def test_PsiBlastParser9(self): """PsiBlastParser9 should provide convenient results for format #9.""" result = PsiBlastParser9(self.rec2) self.assertEqual(len(result), 2) assert 'ece:Z4181' in result assert 'ece:Z4182' in result first = result['ece:Z4181'] second = result['ece:Z4182'] self.assertEqual(len(first), 2) self.assertEqual(len(second), 1) iter_1 = first[0] iter_2 = first[1] self.assertEqual(len(iter_1), 3) self.assertEqual(len(iter_2), 7) iter_1_2 = second[0] self.assertEqual(len(iter_1_2), 3) self.assertEqual(len(result['ece:Z4181'][1][3]), 12) self.assertEqual(result['ece:Z4181'][1][3]['ALIGNMENT LENGTH'], 103) def test_LastProteinIds9(self): """LastProteinIds9 should give last protein ids in iter""" result = LastProteinIds9(self.rec) self.assertEqual(result, ['ece:Z4181', 'ecs:ECs3717', 'cvi:CV2421',\ 'sfl:CP0138', 'spt:SPA2730', 'sec:SC2804', 'stm:STM2872']) #should also work if threshold set result = LastProteinIds9(self.rec, False, threshold=8e-6) self.assertEqual(result, ['ece:Z4181', 'ecs:ECs3717', 'cvi:CV2421',\ 'sfl:CP0138']) #should work on multiple records result = map(LastProteinIds9, PsiBlastQueryFinder(self.rec2)) self.assertEqual(len(result), 2) self.assertEqual(result[0], ['ece:Z4181', 'ecs:ECs3717', 'cvi:CV2421',\ 'sfl:CP0138', 'spt:SPA2730', 'sec:SC2804', 'stm:STM2872']) self.assertEqual(result[1], ['ece:Z4182','ecs:ECs3718','cvi:CV2422']) def test_QMEBlast9(self): """QMEBlast9 should return expected lines from all iterations""" self.assertFloatEqual(QMEBlast9(self.rec3), [\ ('ece:Z4181','ece:Z4181',3e-47), ('ece:Z4181','ecs:ECs3717',3e-47), ('ece:Z4181','spt:SPA2730', 1e-5), ('ece:Z4181','ecs:ECs3717',3e-54), #WARNING: allows duplicates ('ece:Z4181','cvi:CV2421',2e-8), ('ece:Z4182','ece:Z4182',3e-47), ('ece:Z4182','cvi:CV2422',2e-6), ]) def test_QMEPsiBlast9(self): """QMEPsiBlast9 should only return items from last iterations""" self.assertFloatEqual(QMEPsiBlast9(self.rec3), [\ ('ece:Z4181','ecs:ECs3717',3e-54), ('ece:Z4181','cvi:CV2421',2e-8), ('ece:Z4182','ece:Z4182',3e-47), ('ece:Z4182','cvi:CV2422',2e-6), ]) def test_fastacmd_taxonomy_splitter(self): """fastacmd_taxonomy_splitter should split records into groups""" text = """NCBI sequence id: gi|3021565|emb|AJ223314.1|PSAJ3314 NCBI taxonomy id: 3349 Common name: Scots pine Scientific name: Pinus sylvestris NCBI sequence id: gi|37777029|dbj|AB108787.1| NCBI taxonomy id: 228610 Common name: cf. Acremonium sp. KR21-2 Scientific name: cf. Acremonium sp. KR21-2 """.splitlines() recs = list(fastacmd_taxonomy_splitter(text)) self.assertEqual(len(recs), 2) self.assertEqual(recs[0], text[:5]) #includes trailing blank def test_FastaCmdTaxonomyParser(self): """FastaCmdTaxonomyParser should parse taxonomy record to dict""" text = """NCBI sequence id: gi|3021565|emb|AJ223314.1|PSAJ3314 NCBI taxonomy id: 3349 Common name: Scots pine Scientific name: Pinus sylvestris NCBI sequence id: gi|37777029|dbj|AB108787.1| NCBI taxonomy id: 228610 Common name: cf. Acremonium sp. KR21-2 Scientific name: cf. Acremonium sp. KR21-2 """.splitlines() recs = list(FastacmdTaxonomyParser(text)) self.assertEqual(len(recs), 2) for r in recs: self.assertEqual(sorted(r.keys()), ['common_name','scientific_name', 'seq_id', 'tax_id']) r0, r1 = recs self.assertEqual(r0['tax_id'], '3349') self.assertEqual(r0['common_name'], 'Scots pine') self.assertEqual(r0['scientific_name'], 'Pinus sylvestris') self.assertEqual(r0['seq_id'], 'gi|3021565|emb|AJ223314.1|PSAJ3314') self.assertEqual(r1['tax_id'], '228610') if __name__ == "__main__": main()