Did I find the right examples for you? yes no      Crawl my project      Python Jobs

All Samples(5)  |  Call(5)  |  Derive(0)  |  Import(0)
Process the 'ping' control message.

:param daemon: The control daemon; used to get at the
               configuration and the database.
:param channel: The publish channel to which to send the
                response.
:param data: Optional extra data.  Will be returned as the
             second argument of the response.

Responds to the named channel with a command of 'pong' and(more...)

        @register('ping')
def ping(daemon, channel, data=None):
    """
    Process the 'ping' control message.

    :param daemon: The control daemon; used to get at the
                   configuration and the database.
    :param channel: The publish channel to which to send the
                    response.
    :param data: Optional extra data.  Will be returned as the
                 second argument of the response.

    Responds to the named channel with a command of 'pong' and
    with the node_name (if configured) and provided data as
    arguments.
    """

    if not channel:
        # No place to reply to
        return

    # Get our configured node name
    node_name = daemon.config['control'].get('node_name')

    # Format the response
    reply = ['pong']
    if node_name or data:
        reply.append(node_name or '')
    if data:
        reply.append(data)

    # And send it
    with utils.ignore_except():
        daemon.db.publish(channel, ':'.join(reply))
        


src/t/u/turnstile-HEAD/tests/unit/test_control.py   turnstile(Download)
        daemon = mock.Mock(config=conf, db=db)
 
        control.ping(daemon, '')
 
        self.assertFalse(db.publish.called)
        daemon = mock.Mock(config=conf, db=db)
 
        control.ping(daemon, 'reply')
 
        db.publish.assert_called_once_with('reply', 'pong')
        daemon = mock.Mock(config=conf, db=db)
 
        control.ping(daemon, 'reply', 'data')
 
        db.publish.assert_called_once_with('reply', 'pong::data')
        daemon = mock.Mock(config=conf, db=db)
 
        control.ping(daemon, 'reply')
 
        db.publish.assert_called_once_with('reply', 'pong:node')
        daemon = mock.Mock(config=conf, db=db)
 
        control.ping(daemon, 'reply', 'data')
 
        db.publish.assert_called_once_with('reply', 'pong:node:data')