import os import errno from . import FSQTestCase from .internal import test_type_own_mode, normalize from .constants import ROOT1, ROOT2, NON_ASCII, NOT_NORMAL, ILLEGAL_NAMES,\ ILLEGAL_MODE, ILLEGAL_NAME, ILLEGAL_UID,\ ILLEGAL_UNAME, MODES, UID, GID, UNAME, GNAME, NOROOT # FROM PAPA-BEAR IMPORT THE FOLLOWING from .. import install, down, up, is_down, trigger, untrigger, trigger_pull,\ constants as _c, FSQPathError, FSQCoerceError, FSQConfigError,\ FSQTriggerPullError ########## INTERNAL VALIDATION METHODS def _valid_up(queue): '''Boolean Test if ``up'' Succeeded''' d_path = os.path.join(_c.FSQ_ROOT, queue, _c.FSQ_DOWN) try: os.stat(d_path) except (OSError, IOError, ), e: if e.errno != errno.ENOENT: raise e return True raise ValueError('queue is down, should be up: {0}'.format(queue)) def _valid_down(queue, user=None, group=None, mode=None): '''Boolean Test if ``down'' Succeeded''' d_path = os.path.join(_c.FSQ_ROOT, queue, _c.FSQ_DOWN) st = os.stat(d_path) test_type_own_mode(st, d_path, 'f', user if user is None else UID, group if group is None else GID, mode) return True def _valid_trigger(queue, user=None, group=None, mode=None): '''Boolean Test if ``trigger'' Succeeded''' t_path = os.path.join(_c.FSQ_ROOT, queue, _c.FSQ_TRIGGER) st = os.stat(t_path) test_type_own_mode(st, t_path, 'p', user if user is None else UID, group if group is None else GID, mode) return True def _valid_untrigger(queue): '''Boolean Test if ``untrigger'' Succeeded''' t_path = os.path.join(_c.FSQ_ROOT, queue, _c.FSQ_TRIGGER) try: os.stat(t_path) except (OSError, IOError, ), e: if e.errno != errno.ENOENT: raise e return True raise ValueError('queue is triggered, should be untriggered:'\ ' {0}'.format(queue)) ########### EXPOSED TEST CASES class TestUpDownIsDown(FSQTestCase): '''Test the down, up and is_down functions provided by the configure module of fsq.''' _fns = ( down, up, is_down, ) _perm = FSQConfigError _var = 'FSQ_DOWN' def _cycle(self, queue=None, **kwargs): queue = self._down(queue, **kwargs) self.assertTrue(is_down(queue)) self._up(queue) self.assertFalse(is_down(queue)) return queue def _down(self, queue=None, installqueue=True, is_down=False, fsq_down=None, fsq_item_mode=None, fsq_item_user=None, fsq_item_group=None, **kwargs): if queue is None: queue = normalize() if fsq_down is not None: _c.FSQ_DOWN = fsq_down if installqueue == True: install(queue, is_down) if fsq_item_mode is not None: _c.FSQ_ITEM_MODE = fsq_item_mode if fsq_item_user is not None: _c.FSQ_ITEM_USER = fsq_item_user if fsq_item_group is not None: _c.FSQ_ITEM_GROUP = fsq_item_group down(queue, **kwargs) if fsq_item_user: kwargs['user'] = kwargs.get('user', fsq_item_user) if fsq_item_group: kwargs['group'] = kwargs.get('group', fsq_item_group) if fsq_item_mode: kwargs['mode'] = kwargs.get('mode', fsq_item_mode) self.assertTrue(_valid_down(queue, **kwargs)) return queue def _up(self, queue): up(queue) self.assertTrue(_valid_up(queue)) def test_basic(self): '''Test all permutations of arguments, or most of them anyway, combined with permutations of module constants values.''' for root in ROOT1, ROOT2: # test different down file names for d in (getattr(_c, self._var), NOT_NORMAL[4], NON_ASCII,): for is_d in True, False: # test normal cycle with a set down queue = self._cycle(fsq_down=d, is_down=is_d) # test that up doesn't throw an error if already up up(queue) # test with user id/group id and user name/group name for uid, gid in ((UID, GID,), (UNAME, GNAME,)): # just user kwarg self._cycle(fsq_down=d, is_down=is_d, user=uid) # just user set self._cycle(fsq_down=d, is_down=is_d, fsq_item_user=uid) # just group kwarg self._cycle(fsq_down=d, is_down=is_d, group=gid) # just group set self._cycle(fsq_down=d, is_down=is_d, fsq_item_group=gid) # both kwargs self._cycle(fsq_down=d, is_down=is_d, user=uid, group=gid) # both set self._cycle(fsq_down=d, is_down=is_d, fsq_item_user=uid, fsq_item_group=gid) for mode in MODES: # just mode kwarg self._cycle(fsq_down=d, is_down=is_d, mode=mode) # just mode set self._cycle(fsq_down=d, is_down=is_d, fsq_item_mode=mode) # mode and user kwarg self._cycle(fsq_down=d, is_down=is_d, mode=mode, user=uid) # mode and user set self._cycle(fsq_down=d, is_down=is_d, fsq_item_mode=mode, fsq_item_user=uid) # mode and group kwarg self._cycle(fsq_down=d, is_down=is_d, mode=mode, group=gid) # mode and group set self._cycle(fsq_down=d, is_down=is_d, fsq_item_mode=mode, fsq_item_group=gid) # mode and group and user kwarg self._cycle(fsq_down=d, is_down=is_d, mode=mode, user=uid, group=gid) # mode and group and user set self._cycle(fsq_down=d, is_down=is_d, fsq_item_mode=mode, fsq_item_user=uid, fsq_item_group=gid) # END mode LOOP # END uid,gid LOOP # END is_down installed LOOP # END FSQ_DOWN loop # END root LOOP def test_invalidmode(self): '''Test invalid modes both set and passed''' self.assertRaises(TypeError, self._cycle, mode=ILLEGAL_MODE) self.assertRaises(TypeError, self._cycle, fsq_item_mode=ILLEGAL_MODE) def test_invaliduidgid(self): '''Test invalid user and group ids and names''' # TEST ILLEGAL UID for User/Group set and passed self.assertRaises(FSQConfigError, self._cycle, user=ILLEGAL_UID) self.assertRaises(FSQConfigError, self._cycle, fsq_item_group=ILLEGAL_UID) self.assertRaises(FSQConfigError, self._cycle, group=ILLEGAL_UID) self.assertRaises(FSQConfigError, self._cycle, fsq_item_group=ILLEGAL_UID) self.assertRaises(FSQConfigError, self._cycle, user=ILLEGAL_UID, group=ILLEGAL_UID) self.assertRaises(FSQConfigError, self._cycle, fsq_item_user=ILLEGAL_UID, fsq_item_group=ILLEGAL_UID) # TEST ILLEGAL UNAME for User/Group set and passed self.assertRaises(TypeError, self._cycle, user=ILLEGAL_UNAME) self.assertRaises(TypeError, self._cycle, fsq_item_group=ILLEGAL_UNAME) self.assertRaises(TypeError, self._cycle, group=ILLEGAL_UNAME) self.assertRaises(TypeError, self._cycle, fsq_item_group=ILLEGAL_UNAME) self.assertRaises(TypeError, self._cycle, user=ILLEGAL_UNAME, group=ILLEGAL_UID) self.assertRaises(TypeError, self._cycle, fsq_item_user=ILLEGAL_UNAME, fsq_item_group=ILLEGAL_UNAME) def test_invalrootorqueue(self): '''Test proper failure for non-existant roots and non-existant queues''' # test invalid root or no queue for root in (NOROOT, ROOT1,): _c.FSQ_ROOT = root for fn in self._fns: self.assertRaises(FSQConfigError, fn, 'abc') def test_permdenied(self): '''Test proper failure for permission denied''' # test permission denied root queue = self._cycle() os.chmod(os.path.join(_c.FSQ_ROOT, queue), 0) try: for fn in self._fns: self.assertRaises(self._perm, fn, queue) finally: os.chmod(os.path.join(_c.FSQ_ROOT, queue), 0750) def test_invalnames(self): '''Test failure with invalid or illegal names''' for root in (ROOT1, ROOT2,): _c.FSQ_ROOT = root for name in ILLEGAL_NAMES: queue = self._cycle() setattr(_c, self._var, name) for fn in self._fns: self.assertRaises(FSQPathError, fn, queue) queue = self._cycle() setattr(_c, self._var, ILLEGAL_NAME) for fn in self._fns: self.assertRaises(FSQCoerceError, fn, queue) class TestTriggers(TestUpDownIsDown): '''Test the Triggers functions''' _fns = ( trigger, untrigger, trigger_pull, ) _perm = (FSQTriggerPullError, FSQConfigError,) _var = 'FSQ_TRIGGER' def _cycle(self, queue=None, **kwargs): queue = self._trigger(queue, **kwargs) self.assertTrue(self._test_pull(queue)) self._untrigger(queue) self.assertRaises(FSQTriggerPullError, self._test_pull, queue) return queue def _trigger(self, queue=None, installqueue=True, is_down=False, fsq_down=None, fsq_item_mode=None, fsq_item_user=None, fsq_item_group=None, **kwargs): # hack, as the tests are identical fsq_trigger = fsq_down del fsq_down if queue is None: queue = normalize() if fsq_trigger is not None: _c.FSQ_TRIGGER = fsq_trigger if installqueue == True: install(queue, is_down) if fsq_item_mode is not None: _c.FSQ_ITEM_MODE = fsq_item_mode if fsq_item_user is not None: _c.FSQ_ITEM_USER = fsq_item_user if fsq_item_group is not None: _c.FSQ_ITEM_GROUP = fsq_item_group trigger(queue, **kwargs) if fsq_item_user: kwargs['user'] = kwargs.get('user', fsq_item_user) if fsq_item_group: kwargs['group'] = kwargs.get('group', fsq_item_group) if fsq_item_mode: kwargs['mode'] = kwargs.get('mode', fsq_item_mode) self.assertTrue(_valid_trigger(queue, **kwargs)) return queue def _untrigger(self, queue): untrigger(queue) self.assertTrue(_valid_untrigger(queue)) def _test_pull(self, queue): t_path = os.path.join(_c.FSQ_ROOT, queue, _c.FSQ_TRIGGER) # test with no listener trigger_pull(queue, ignore_listener=True) # test raise with no listener self.assertRaises(FSQTriggerPullError, trigger_pull, queue) # ghetto trigger-listen emulator; man 1 trigger-listen fd2 = None fd = os.open(t_path, os.O_RDONLY|os.O_NDELAY|os.O_NONBLOCK) try: # keep the FIFO open even when write side closes fd2 = os.open(t_path, os.O_WRONLY|os.O_NDELAY|os.O_NONBLOCK) trigger_pull(queue) res = os.read(fd, 1) if res == '\0': return True return False finally: os.close(fd) if fd2 is not None: os.close(fd2)