BinaryTree inter-engine communication class
use from bintree_script.py
Provides parallel [all]reduce functionality
import cPickle as pickle
import re
import socket
import uuid
import zmq
from IPython.parallel.util import disambiguate_url
# bintree-related construction/printing helpers
def bintree(ids, parent=None):
    """construct {child:parent} dict representation of a binary tree
    keys are the nodes in the tree, and values are the parent of each node.
    The root node has parent `parent`, default: None.
    >>> tree = bintree(range(7))
    >>> tree
    {0: None, 1: 0, 2: 1, 3: 1, 4: 0, 5: 4, 6: 4}
    >>> print_bintree(tree)
    parents = {}
    n = len(ids)
    if n == 0:
        return parents
    root = ids[0]
    parents[root] = parent
    if len(ids) == 1:
        return parents
        ids = ids[1:]
        n = len(ids)
        left = bintree(ids[:n/2], parent=root)
        right = bintree(ids[n/2:], parent=root)
    return parents
def reverse_bintree(parents):
    """construct {parent:[children]} dict from {child:parent}
    keys are the nodes in the tree, and values are the lists of children
    of that node in the tree.
    reverse_tree[None] is the root node
    >>> tree = bintree(range(7))
    >>> reverse_bintree(tree)
    {None: 0, 0: [1, 4], 4: [5, 6], 1: [2, 3]}
    children = {}
    for child,parent in parents.iteritems():
        if parent is None:
            children[None] = child
        elif parent not in children:
            children[parent] = []
    return children
def depth(n, tree):
    """get depth of an element in the tree"""
    d = 0
    parent = tree[n]
    while parent is not None:
        d += 1
        parent = tree[parent]
    return d
def print_bintree(tree, indent='  '):
    """print a binary tree"""
    for n in sorted(tree.keys()):
        print "%s%s" % (indent * depth(n,tree), n)
# Communicator class for a binary-tree map
ip_pat = re.compile(r'^\d+\.\d+\.\d+\.\d+$')
def disambiguate_dns_url(url, location):
    """accept either IP address or dns name, and return IP"""
    if not ip_pat.match(location):
        location = socket.gethostbyname(location)
    return disambiguate_url(url, location)
class BinaryTreeCommunicator(object):
    id          = None
    pub         = None
    sub         = None
    downstream  = None
    upstream    = None
    pub_url     = None
    tree_url    = None
    def __init__(self, id, interface='tcp://*', root=False):
        self.id = id
        self.root = root
        # create context and sockets
        self._ctx = zmq.Context()
        if root:
            self.pub = self._ctx.socket(zmq.PUB)
            self.sub = self._ctx.socket(zmq.SUB)
            self.sub.setsockopt(zmq.SUBSCRIBE, b'')
        self.downstream = self._ctx.socket(zmq.PULL)
        self.upstream = self._ctx.socket(zmq.PUSH)
        # bind to ports
        interface_f = interface + ":%i"
        if self.root:
            pub_port = self.pub.bind_to_random_port(interface)
            self.pub_url = interface_f % pub_port
        tree_port = self.downstream.bind_to_random_port(interface)
        self.tree_url = interface_f % tree_port
        self.downstream_poller = zmq.Poller()
        self.downstream_poller.register(self.downstream, zmq.POLLIN)
        # guess first public IP from socket
        self.location = socket.gethostbyname_ex(socket.gethostname())[-1][0]
    def __del__(self):
        if self.root:
    def info(self):
        """return the connection info for this object's sockets."""
        return (self.tree_url, self.location)
    def connect(self, peers, btree, pub_url, root_id=0):
        """connect to peers.  `peers` will be a dict of 4-tuples, keyed by name.
        {peer : (ident, addr, pub_addr, location)}
        where peer is the name, ident is the XREP identity, addr,pub_addr are the
        # count the number of children we have
        self.nchildren = btree.values().count(self.id)
        if self.root:
            return # root only binds
        root_location = peers[root_id][-1]
        self.sub.connect(disambiguate_dns_url(pub_url, root_location))
        parent = btree[self.id]
        tree_url, location = peers[parent]
        self.upstream.connect(disambiguate_dns_url(tree_url, location))
    def serialize(self, obj):
        """serialize objects.
        Must return list of sendable buffers.
        Can be extended for more efficient/noncopying serialization of numpy arrays, etc.
        return [pickle.dumps(obj)]
    def unserialize(self, msg):
        """inverse of serialize"""
        return pickle.loads(msg[0])
    def publish(self, value):
        assert self.root
    def consume(self):
        assert not self.root
        return self.unserialize(self.sub.recv_multipart())
    def send_upstream(self, value, flags=0):
        assert not self.root
        self.upstream.send_multipart(self.serialize(value), flags=flags|zmq.NOBLOCK)
    def recv_downstream(self, flags=0, timeout=2000.):
        # wait for a message, so we won't block if there was a bug
        msg = self.downstream.recv_multipart(zmq.NOBLOCK|flags)
        return self.unserialize(msg)
    def reduce(self, f, value, flat=True, all=False):
        """parallel reduce on binary tree
        if flat:
            value is an entry in the sequence
            value is a list of entries in the sequence
        if all:
            broadcast final result to all nodes
            only root gets final result
        if not flat:
            value = reduce(f, value)
        for i in range(self.nchildren):
            value = f(value, self.recv_downstream())
        if not self.root:
        if all:
            if self.root:
                value = self.consume()
        return value
    def allreduce(self, f, value, flat=True):
        """parallel reduce followed by broadcast of the result"""
        return self.reduce(f, value, flat=flat, all=True)