Distribute thrift requests between workers.
from __future__ import absolute_import
import logging
from pyuv import Loop
from thrift.protocol import TBinaryProtocol
from .transports.base import Acceptors
from .state import set_current_app, get_current_app
from .listener import Listener, Listeners
from .hub import Hub
from .services import Services
from .utils.decorators import cached_property
from .utils.mixin import SubclassMixin
from .utils.stats import Counters, Timers
logger = logging.getLogger(__name__)
class ThriftWorker(SubclassMixin):
    """Store global application state. Also acts as factory for whole
    acceptor_cls = 'thriftworker.transports.framed:FramedAcceptor'
    def __init__(self, loop=None, protocol_factory=None, port_range=None,
                 pool_size=None, shutdown_timeout=None):
        self.counters = Counters()
        self.timeouts = Timers()
        self.execution_timers = Timers()
        self.dispatching_timers = Timers()
        # Set provided instance if we can.
        if loop is not None:
            self.loop = loop
        if protocol_factory is not None:
            self.protocol_factory = protocol_factory
        self.port_range = port_range
        self.pool_size = pool_size
        self.shutdown_timeout = shutdown_timeout or 30.0
        super(ThriftWorker, self).__init__()
    def pool_size(self):
        """Return default pool size."""
        return 1
    def pool_size(self, value):
        if value is not None and value < 0:
            raise ValueError('Pool size can not be negative.')
        return int(value or 1) or 1
    def port_range(self):
        """Return range from which we allowed to allocate ports."""
        return None
    def port_range(self, value):
        if value is None:
            return None
            start, stop = int(value[0]), int(value[1])
        except (IndexError, ValueError):
            raise ValueError('Port range must be tuple of two integers.')
        return (start, stop)
    def protocol_factory(self):
        """Specify which protocol should be used."""
        return TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
    def default(cls):
        """Return default application instance."""
            app = cls.instance()
        except RuntimeError:
            app = cls()
        return app
    def instance():
        """Return global instance."""
        return get_current_app()
    def loop(self):
        """Create event loop. Should be running in separate thread."""
        return Loop()
    def Hub(self):
        """Create bounded :class:`Hub` class."""
        return self.subclass_with_self(Hub)
    def hub(self):
        """Instance of bounded :class:`LoopContainer`."""
        return self.Hub()
    def Services(self):
        """Create bounded :class:`Processor` class."""
        return self.subclass_with_self(Services)
    def services(self):
        """Create global request processor instance."""
        return self.Services()
    def Listener(self):
        """Create bounded :class:`Listener` class."""
        return self.subclass_with_self(Listener)
    def Listeners(self):
        """Create bounded :class:`Listeners` class."""
        return self.subclass_with_self(Listeners)
    def listeners(self):
        """Create pool of listeners."""
        return self.Listeners()
    def Acceptor(self):
        return self.subclass_with_self(self.acceptor_cls, reverse='Acceptor')
    def Acceptors(self):
        return self.subclass_with_self(Acceptors)
    def acceptors(self):
        """Create pool of acceptors."""
        return self.Acceptors()
    def worker_cls(self):
        if self.pool_size == 1:
            return 'thriftworker.workers.sync:SyncWorker'
            return 'thriftworker.workers.threads:ThreadsWorker'
    def Worker(self):
        return self.subclass_with_self(self.worker_cls, reverse='Worker')
    def worker(self):
        """Create some worker routine."""
        logger.debug('Using {0!r} worker'.format(self.Worker))
        return self.Worker(self.pool_size)