import unittest import doctest import Milter import bms from Milter.test import TestBase import mime import rfc822 import StringIO import email import sys #import pdb def DNSLookup(name,qtype,strict=True,timeout=None): return [] try: import spf except: spf = None class TestMilter(TestBase,bms.bmsMilter): def __init__(self): TestBase.__init__(self) bms.config = bms.Config() bms.config.access_file = 'test/access.db' bms.bmsMilter.__init__(self) self.setsymval('j','test.milter.org') # disable SPF for now if spf: spf.DNSLookup = DNSLookup class BMSMilterTestCase(unittest.TestCase): ## Test SMTP AUTH feature. def testAuth(self): milter = TestMilter() # Try a SMTP authorized user from an unauthorized IP, that is # authorized to use example.com milter.setsymval('{auth_authen}','good') milter.setsymval('{cipher_bits}','256') milter.setsymval('{auth_ssf}','0') rc = milter.connect('testAuth',ip='192.0.3.1') self.assertEqual(rc,Milter.CONTINUE) rc = milter.feedMsg('test1',sender='grief@example.com') self.assertEqual(rc,Milter.ACCEPT) # Try a user *not* authorized to use example.com milter.setsymval('{auth_authen}','bad') rc = milter.connect('testAuth',ip='192.0.2.1') self.assertEqual(rc,Milter.CONTINUE) rc = milter.feedMsg('test1',sender='good@example.com') self.assertEqual(rc,Milter.REJECT) # Try to break it by using an implicit domain milter.setsymval('{auth_authen}','bad') rc = milter.connect('testAuth',ip='192.0.2.1') self.assertEqual(rc,Milter.CONTINUE) rc = milter.feedMsg('test1',sender='good') # unlike spfmilter, bms milter doesn't check AUTH for implicit domain self.assertEqual(rc,Milter.ACCEPT) milter.close() def testDefang(self,fname='virus1'): milter = TestMilter() rc = milter.connect('testDefang') self.assertEqual(rc,Milter.CONTINUE) rc = milter.feedMsg(fname) self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") fp = milter._body open('test/'+fname+".tstout","w").write(fp.getvalue()) #self.failUnless(fp.getvalue() == open("test/virus1.out","r").read()) fp.seek(0) msg = mime.message_from_file(fp) str = msg.get_payload(1).get_payload() milter.log(str) milter.close() # test some spams that crashed our parser def testParse(self,fname='spam7'): milter = TestMilter() milter.connect('testParse') rc = milter.feedMsg(fname) self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Milter needlessly replaced body.") fp = milter._body open('test/'+fname+".tstout","w").write(fp.getvalue()) milter.connect('pro-send.com') rc = milter.feedMsg('spam8') self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Milter needlessly replaced body.") rc = milter.feedMsg('bounce') self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Milter needlessly replaced body.") rc = milter.feedMsg('bounce1') self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Milter needlessly replaced body.") milter.close() def testDefang2(self): milter = TestMilter() milter.connect('testDefang2') rc = milter.feedMsg('samp1') self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Milter needlessly replaced body.") rc = milter.feedMsg("virus3") self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") fp = milter._body open("test/virus3.tstout","w").write(fp.getvalue()) #self.failUnless(fp.getvalue() == open("test/virus3.out","r").read()) rc = milter.feedMsg("virus6") self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") self.failUnless(milter._headerschanged,"Message headers not adjusted") fp = milter._body open("test/virus6.tstout","w").write(fp.getvalue()) milter.close() def testDefang3(self): milter = TestMilter() milter.connect('testDefang3') # test script removal on complex HTML attachment rc = milter.feedMsg('amazon') self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") fp = milter._body open("test/amazon.tstout","w").write(fp.getvalue()) # test defanging Klez virus rc = milter.feedMsg("virus13") self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") fp = milter._body open("test/virus13.tstout","w").write(fp.getvalue()) # test script removal on quoted-printable HTML attachment # sgmllib can't handle the <![if cond]> syntax rc = milter.feedMsg('spam44') self.assertEqual(rc,Milter.ACCEPT) self.failIf(milter._bodyreplaced,"Message body replaced") fp = milter._body open("test/spam44.tstout","w").write(fp.getvalue()) milter.close() def testRFC822(self): milter = TestMilter() milter.connect('testRFC822') # test encoded rfc822 attachment #pdb.set_trace() rc = milter.feedMsg('test8') self.assertEqual(rc,Milter.ACCEPT) # python2.4 doesn't scan encoded message attachments if sys.hexversion < 0x02040000: self.failUnless(milter._bodyreplaced,"Message body not replaced") #self.failIf(milter._bodyreplaced,"Message body replaced") fp = milter._body open("test/test8.tstout","w").write(fp.getvalue()) rc = milter.feedMsg('virus7') self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._bodyreplaced,"Message body not replaced") #self.failIf(milter._bodyreplaced,"Message body replaced") fp = milter._body open("test/virus7.tstout","w").write(fp.getvalue()) def testSmartAlias(self): milter = TestMilter() milter.connect('testSmartAlias') # test smart alias feature key = ('foo@example.com','baz@bat.com') bms.smart_alias[key] = ['ham@eggs.com'] rc = milter.feedMsg('test8',key[0],key[1]) self.assertEqual(rc,Milter.ACCEPT) self.failUnless(milter._delrcpt == ['<baz@bat.com>']) self.failUnless(milter._addrcpt == ['<ham@eggs.com>']) # python2.4 email does not decode message attachments, so script # is not replaced if sys.hexversion < 0x02040000: self.failUnless(milter._bodyreplaced,"Message body not replaced") def testBadBoundary(self): milter = TestMilter() milter.connect('testBadBoundary') # test rfc822 attachment with invalid boundaries #pdb.set_trace() rc = milter.feedMsg('bound') if sys.hexversion < 0x02040000: # python2.4 adds invalid boundaries to decects list and makes # payload a str self.assertEqual(rc,Milter.REJECT) self.assertEqual(milter._reply[0],'554') #self.failUnless(milter._bodyreplaced,"Message body not replaced") self.failIf(milter._bodyreplaced,"Message body replaced") fp = milter._body open("test/bound.tstout","w").write(fp.getvalue()) def testCompoundFilename(self): milter = TestMilter() milter.connect('testCompoundFilename') # test rfc822 attachment with invalid boundaries #pdb.set_trace() rc = milter.feedMsg('test1') self.assertEqual(rc,Milter.ACCEPT) #self.failUnless(milter._bodyreplaced,"Message body not replaced") self.failIf(milter._bodyreplaced,"Message body replaced") fp = milter._body open("test/test1.tstout","w").write(fp.getvalue()) def testFindsrs(self): if not bms.srs: import SRS bms.srs = SRS.new(secret='test') sender = bms.srs.forward('foo@bar.com','mail.example.com') sndr = bms.findsrs(StringIO.StringIO( """Received: from [1.16.33.86] (helo=mail.example.com) by bastion4.mail.zen.co.uk with smtp (Exim 4.50) id 1H3IBC-00013b-O9 for foo@bar.com; Sat, 06 Jan 2007 20:30:17 +0000 X-Mailer: "PyMilter-0.8.5" <%s> foo MIME-Version: 1.0 Content-Type: text/plain To: foo@bar.com From: postmaster@mail.example.com """ % sender )) self.assertEqual(sndr,'foo@bar.com') def testBanned(self): bd = set(('*.foo.bar','*.info','baz.bar')) self.assertTrue(bms.isbanned('bif.foo.bar',bd)) self.assertFalse(bms.isbanned('bif.foo.com',bd)) self.assertTrue(bms.isbanned('foo.info',bd)) self.assertFalse(bms.isbanned('foo.baz.bar',bd)) self.assertTrue(bms.isbanned('baz.bar',bd)) # def testReject(self): # "Test content based spam rejection." # milter = TestMilter() # milter.connect('gogo-china.com') # rc = milter.feedMsg('big5'); # self.failUnless(rc == Milter.REJECT) # milter.close(); def suite(): s = unittest.makeSuite(BMSMilterTestCase,'test') s.addTest(doctest.DocTestSuite(bms)) return s if __name__ == '__main__': if len(sys.argv) > 1: for fname in sys.argv[1:]: milter = TestMilter() milter.connect('main') fp = open(fname,'r') rc = milter.feedFile(fp) fp = milter._body sys.stdout.write(fp.getvalue()) else: #unittest.main() unittest.TextTestRunner().run(suite())