import os
import sys
import pickle
import struct
import uuid
import time
 
from threading import Event
 
from sleekxmpp import ClientXMPP
from sleekxmpp.jid import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.xmlstream.tostring import tostring
from sleekxmpp.exceptions import IqError
from sleekxmpp.plugins.xep_0060.stanza.pubsub_event import EventItem
 
from baboon.baboon.monitor import FileEvent
from baboon.baboon.config import config
from baboon.common import proxy_socket
from baboon.common.eventbus import eventbus
from baboon.common.logger import logger
from baboon.common import pyrsync
from baboon.common.stanza import rsync
from baboon.common.errors.baboon_exception import BaboonException
 
 
@logger
class CommonTransport(ClientXMPP):
 
    def __init__(self):
        """ Initializes the CommonTranport with the XEP-0060 support.
        """
 
        ClientXMPP.__init__(self, config['user']['jid'], config['user'][
            'passwd'])
 
        self.connected = Event()
        self.disconnected = Event()
        self.rsync_running = Event()
        self.rsync_finished = Event()
        self.wait_close = False
        self.failed_auth = False
 
        # Register and configure pubsub plugin.
        self.register_plugin('xep_0060')
        self.register_handler(Callback('Pubsub event', StanzaPath(
            'message/pubsub_event'), self._pubsub_event))
 
        # Shortcut to access to the xep_0060 plugin.
        self.pubsub = self.plugin["xep_0060"]
 
        # Register and configure data form plugin.
        self.register_plugin('xep_0004')
 
        # Shortcuts to access to the xep_0004 plugin.
        self.form = self.plugin['xep_0004']
 
        # Shortcuts to access to the config server information
        self.pubsub_addr = config['server']['pubsub']
        self.server_addr = config['server']['master']
 
        # Register events
        self.add_event_handler('session_start', self.start)
        self.add_event_handler('failed_auth', self._on_failed_auth)
        self.add_event_handler('stream_error', self.stream_err)
        self.add_event_handler('message', self.message)
        self.add_event_handler('message_form', self.message_form)
        self.add_event_handler('message_xform', self.message_form)
        self.register_handler(Callback('RsyncFinished Handler',
                                       StanzaPath('iq@type=set/rsyncfinished'),
                                       self._handle_rsync_finished))
 
        eventbus.register('new-rsync', self._on_new_rsync)
 
    def __enter__(self):
        """ Adds the support of with statement with all CommonTransport
        classes. A new XMPP connection is instantiated and returned when the
        connection is established.
        """
 
        # Open a new connection.
        self.open()
 
        # Wait until the connection is established. Raise a BaboonException if
        # there's an authentication error.
        while not self.connected.is_set():
            if self.failed_auth:
                raise BaboonException("Authentication failed.")
            self.connected.wait(1)
 
        # Return the instance itself.
        return self
 
    def __exit__(self, type, value, traceback):
        """ Disconnects the transport at the end of the with statement.
        """
 
        self.close()
 
    def open(self, block=False):
        """ Connects to the XMPP server.
        """
 
        self.logger.debug("Connecting to XMPP...")
        self.use_ipv6 = False
        if self.connect(use_ssl=False, use_tls=False):
            self.logger.debug("Connected to XMPP")
            self.disconnected.clear()
            self.process(block=block)
        else:
            self.logger.error("Unable to connect.")
 
    def stream_err(self, iq):
        """ Called when a StreamError is received.
        """
 
        self.logger.error(iq['text'])
 
    def  _on_failed_auth(self, event):
        """ Called when authentication failed.
        """
 
        self.logger.error("Authentication failed.")
        eventbus.fire('failed-auth')
        self.failed_auth = True
        self.close()
 
    def start(self, event):
        """ Handler for the session_start sleekxmpp event.
        """
 
        self.send_presence()
        self.get_roster()
 
        self.connected.set()
        self.logger.debug('Connected')
 
    def close(self):
        """ Closes the XMPP connection.
        """
 
        self.connected.clear()
        self.logger.debug('Closing the XMPP connection...')
        self.disconnect(wait=True)
        self.disconnected.set()
        self.logger.debug('XMPP connection closed.')
 
    def _pubsub_event(self, msg):
        """ Called when a pubsub event is received.
        """
 
        if msg['type'] in ('normal', 'headline'):
            self.logger.debug("Received pubsub item(s): \n%s" %
                              msg['pubsub_event'])
 
            items = msg['pubsub_event']['items']['substanzas']
            for item in items:
                notif_msg = ""
                if isinstance(item, EventItem):
                    self.logger.info(item['payload'].get('status'))
                    notif_msg += item['payload'].get('status')
 
                    for err_f in item['payload']:
                        if err_f.text:
                            err = "> %s" % err_f.text
                            self.logger.warning(err)
                            notif_msg = "%s\n%s" % (notif_msg, err)
                    if item['payload'].get('type') == 'error':
                        eventbus.fire('conflict-result', notif_msg)
        else:
            self.logger.debug("Received pubsub event: \n%s" %
                              msg['pubsub_event'])
 
    def _on_new_rsync(self, project, files, **kwargs):
        """ Called when a new rsync needs to be started.
        """
 
        self.connected.wait()
 
        self.rsync(project, files=files)
        eventbus.fire('rsync-finished-success', project, files)
 
    def _handle_rsync_finished(self, iq):
        """ Called when a rsync is finished.
        """
 
        # Retrieve the project context.
        node = iq['rsyncfinished']['node']
 
        # Reply to the iq.
        self.logger.debug("[%s] Sync finished." % node)
        iq.reply().send()
 
        # Set the rsync flags.
        self.rsync_running.clear()
        self.rsync_finished.set()
 
        # It's time to verify if there's a conflict or not.
        if not self.wait_close:
            self.merge_verification(node)
 
    def message_form(self, form):
        self.logger.debug("Received a form message: %s" % form)
        try:
            expected_type = \
                'http://jabber.org/protocol/pubsub#subscribe_authorization'
            if expected_type in form['form']['fields']['FORM_TYPE']['value']:
                node = form['form']['fields']['pubsub#node']['value']
                jid = form['form']['fields']['pubsub#subscriber_jid']['value']
                user = JID(jid).user
 
                self.logger.info(">> %s wants to join the %s project ! <<" % (user, node))
                self.logger.info(" $ baboon accept %s %s" % (node, user))
                self.logger.info(" $ baboon reject %s %s" % (node, user))
        except KeyError:
            pass
 
    def message(self, msg):
        self.logger.info("Received: %s" % msg)
 
    def rsync_error(self, msg):
        """ On rsync error.
        """
 
        self.logger.error(msg)
 
        # Set the rsync flags.
        self.rsync_running.clear()
        self.rsync_finished.set()
 
 
@logger
class WatchTransport(CommonTransport):
    """ The transport has the responsability to communicate via HTTP
    with the baboon server and to subscribe with XMPP 0060 with the
    Baboon XMPP server.
    """
 
    def __init__(self):
        """ WatchTransport initializes all SleekXMPP stuff like plugins,
        events and more.
        """
 
        super(WatchTransport, self).__init__()
 
        # Shortcuts to access to the config server information
        self.streamer_addr = config['server']['streamer']
 
        self.register_plugin('xep_0050')  # Ad-hoc command
        self.register_plugin('xep_0065')  # Socks5 Bytestreams
 
        self.add_event_handler('socks_connected', self._on_socks_connected)
 
    def start(self, event):
        """ Handler for the session_start sleekxmpp event.
        """
 
        self.send_presence()
        self.get_roster()
 
        # Shortcut to access to the xep_0050 plugin.
        self.adhoc = self.plugin["xep_0050"]
 
        # Shortcut to access to the xep_0065 plugin.
        self.streamer = self.plugin["xep_0065"]
 
        # Negotiates the bytestream
        try:
            streamhost_used = self.streamer.handshake(self.server_addr,
                                                      self.streamer_addr)
        except IqError as e:
            self.logger.error("Cannot established the proxy_socket connection. "
                              "Exiting...")
            # If the socks5 bytestream can't be established, disconnect the
            # XMPP connection clearly.
            self.close()
            return
 
        # Registers the SID to retrieve later to send/recv data to the
        # good proxy_socket stored in self.streamer.proxy_threads dict.
        self.sid = streamhost_used['socks']['sid']
 
    def _on_socks_connected(self, event):
        """
        """
 
        proxy_sock = self.streamer.get_socket(self.sid)
        proxy_listener = proxy_socket.listen(self.sid, proxy_sock,
                                             self._on_socks5_data)
 
        self.logger.debug("Connected.")
        self.connected.set()
 
        # Retrieve the list of pending users.
        for project in config['projects']:
            self._get_pending_users(project)
 
    def close(self):
        """ Closes the XMPP connection.
        """
 
        # Wait until all syncs are finished.
        self.wait_close = True
        if self.rsync_running.is_set():
            self.logger.info("A sync task is currently running...")
            self.rsync_finished.wait()
            self.logger.info("Ok, all syncs are now finished.")
 
        # Close the proxy proxy_socket.
        if hasattr(self, 'streamer') and self.streamer:
            self.streamer.close()
 
        # Disconnect...
        super(WatchTransport, self).close()
 
    def rsync(self, project, files=None):
        """ Starts a rsync transaction, rsync and stop the
        transaction.
 
        Raises a BaboonException if there's a problem.
        """
 
        # Verify if the connection is established. Otherwise, wait...
        if not self.connected.is_set():
            self.connected.wait()
 
        # Set the rsync flags.
        self.rsync_running.set()
        self.rsync_finished.clear()
 
        #TODO: make this an int while checking config file
        max_stanza_size = int(config['server']['max_stanza_size'])
 
        # Build first stanza
        iq = self._build_iq(project, files)
 
        try:
            # Get the size of the stanza
            to_xml = tostring(iq.xml)
            size = sys.getsizeof(to_xml)
 
            # If it's bigger than the max_stanza_size, split it !
            if size >= max_stanza_size:
                iqs = self._split_iq(size, project, files)
                self.logger.warning('The xml stanza has been split %s stanzas.'
                                    % len(iqs))
            else:
                # Else the original iq will be the only element to send
                iqs = [iq]
 
            # Send elements in list
            for iq in iqs:
                iq.send()
                self.logger.debug('Sent (%d/%d)!' %
                                  (iqs.index(iq) + 1, len(iqs)))
 
        except IqError as e:
            self.rsync_error(e.iq['error']['text'])
        except Exception as e:
            self.rsync_error(e)
 
    def _build_iq(self, project, files):
        """Build a single rsync stanza.
        """
        iq = self.Iq(sto=self.server_addr, stype='set')
 
        # Generate a new rsync ID.
        iq['rsync']['sid'] = self.sid
        iq['rsync']['rid'] = str(uuid.uuid4())
        iq['rsync']['node'] = project
 
        for f in files:
            if f.event_type == FileEvent.MODIF:
                iq['rsync'].add_file(f.src_path)
            elif f.event_type == FileEvent.CREATE:
                iq['rsync'].add_create_file(f.src_path)
            elif f.event_type == FileEvent.DELETE:
                iq['rsync'].add_delete_file(f.src_path)
 
        return iq
 
    def _split_iq(self, size, project, files):
        """Splits a stanza into multiple stanzas whith size < max_stanza_size.
        Returns a list a stanzas
        """
 
        iqs = []
 
        # We don't need the exact result of the division. Let's add 1 to
        # overcome "round" issues. How many chunks do we need ?
        chunk_num = size / int(config['server']['max_stanza_size']) + 1
 
        # How many files per chunk then ?
        step = len(files) / chunk_num
 
        # Get the splitted files list
        chunks = list(self._get_chunks(files, step))
 
        # Build a stanza for each of them
        for chunk in chunks:
            iqs.append(self._build_iq(project, chunk))
 
        return iqs
 
    def _get_chunks(self, files, step):
        """ Generate the chunks from the files list.
        """
        for i in xrange(0, len(files), step):
            yield files[i:i + step]
 
    def _on_socks5_data(self, sid, data, **kwargs):
        """ Called when receiving data over the socks5 proxy_socket (xep
        0065).
        """
 
        deltas = []  # The list of delta.
 
        # Sets the future proxy_socket response dict.
        ret = {'from': self.boundjid.bare}
 
        # Gets the current project.
        ret['node'] = data['node']
 
        # Gets the RID.
        ret['rid'] = data['rid']
 
        # Gets the list of hashes.
        all_hashes = data['hashes']
 
        for elem in all_hashes:
            # 'elem' is a tuple. The first element is the relative
            # path to the current file. The second is the server-side
            # hashes associated to this path.
            relpath = elem[0]
            hashes = elem[1]
 
            # TODO: Handle the possible AttributeError.
            project_path = config['projects'][data['node']]['path']
            project_path = os.path.expanduser(project_path)
 
            fullpath = os.path.join(project_path, relpath)
            if os.path.exists(fullpath) and os.path.isfile(fullpath):
                # Computes the local delta of the current file.
                patchedfile = open(fullpath, 'rb')
                delta = pyrsync.rsyncdelta(patchedfile, hashes,
                                           blocksize=8192)
                delta = (relpath, delta)
 
                # Appends the result to the list of delta.
                deltas.append(delta)
            else:
                # TODO: Handle this error ?
                pass
 
        # Adds the list of deltas in the response dict.
        ret['delta'] = deltas
 
        # Sends the result over the proxy_socket.
        self.streamer.send(sid, proxy_socket.pack(ret))
 
    def merge_verification(self, project):
        """ Sends an IQ to verify if there's a conflict or not.
        """
 
        iq = self.Iq(sto=self.server_addr, stype='set')
        iq['merge']['node'] = project
 
        try:
            iq.send()
        except IqError as e:
            self.logger.error(e.iq['error']['text'])
 
    def _get_pending_users(self, node):
        """ Build and send the message to get the list of pending users on the
        node.
        """
 
        # Build the IQ.
        iq = self.Iq(sto=self.pubsub_addr, stype='set')
        iq['command']['action'] = 'execute'
        iq['command']['sessionid'] = 'pubsub-get-pending:20031021T150901Z-600'
        iq['command']['node'] = 'http://jabber.org/protocol/pubsub#get-pending'
        iq['command']['form'].add_field(var='pubsub#node', value=node)
 
        # Send the IQ to the pubsub server !
        try:
            iq.send()
        except IqError:
            pass
 
 
@logger
class AdminTransport(CommonTransport):
 
    def __init__(self, logger_enabled=True):
 
        super(AdminTransport, self).__init__()
        self.logger.disabled = not logger_enabled
 
    def create_project(self, project):
        """ Creates a node on the XMPP server with the name project. Sets also
        the correct subscriptions and affiliations.
        """
 
        try:
            # Update the default configuration to have 'Authorize' node access
            # model.
            node_config = self.pubsub.get_node_config(self.pubsub_addr)
            node_config_form = node_config['pubsub_owner']['default']['config']
            node_config_form.field['pubsub#access_model'].set_value(
                'authorize')
            node_config_form.field['pubsub#notify_delete'].set_value(True)
 
            # Create the node (name == project).
            self.pubsub.create_node(self.pubsub_addr, project,
                                    config=node_config_form)
 
            # The owner must subscribe to the node to receive the alerts.
            self.pubsub.modify_subscriptions(self.pubsub_addr, project,
                                             [(config['user']['jid'],
                                               'subscribed')])
 
            # The admin must have the owner affiliation to publish alerts
            # into the node.
            self.pubsub.modify_affiliations(self.pubsub_addr, project,
                                            [(self.server_addr, 'owner')])
 
            return (200, 'The project %s is successfuly created.' % project)
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong during the creation of the project " \
                "%s." % project
 
            if status_code == 409:
                msg = 'The project %s already exists.' % project
 
            return (status_code, msg)
 
    def delete_project(self, project):
 
        try:
            self.pubsub.delete_node(self.pubsub_addr, project)
            return (200, 'The project %s is successfuly deleted.' % project)
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong during the deletion of the project " \
                "%s." % project
 
            if status_code == 403:
                msg = 'You are not authorized to delete %s project.' % project
            elif status_code == 404:
                msg = 'The project %s does not exist.' % project
 
            return (status_code, msg)
 
    def join_project(self, project):
        try:
            # TODO: Before the subscription, we need to verify if the user is
            # not already subscribed in order to have a correct message.
            # Otherwise, the status code is 202. Strange behavior.
            ret_iq = self.pubsub.subscribe(self.pubsub_addr, project)
            status = ret_iq['pubsub']['subscription']['subscription']
 
            if status == 'pending':
                return (202, "Invitation sent. You need to wait until the "
                        "owner accepts your invitation.")
            elif status == 'subscribed':
                return (200, "You are now a contributor of the %s project." %
                        project)
            else:
                return (500, "Something went wrong. Cannot join the %s "
                        "project." % project)
 
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong. Cannot join the %s project." % project
 
            if status_code == 404:
                msg = "The %s project does not exist." % project
 
            return (status_code, msg)
 
    def unjoin_project(self, project):
 
        try:
            self.pubsub.unsubscribe(self.pubsub_addr, project)
            return (200, "Successfully unjoin the %s project." % project)
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong. Cannot unjoin the %s project." \
                  % project
 
            if status_code == 401:
                msg = "You are not a contributor of the %s project." % project
            elif status_code == 404:
                msg = "The %s project does not exist." % project
 
            return (status_code, msg)
 
    def get_project_users(self, project):
        try:
            ret = self.pubsub.get_node_subscriptions(self.pubsub_addr, project)
            return ret['pubsub_owner']['subscriptions']
        except IqError:
            # TODO: Handle this error.
            pass
 
    def accept_pending(self, project, user):
        self._allow_pending(project, user, 'true')
        return (200, "%s is now successfuly subscribed on %s." % (user,
                                                                  project))
 
    def reject(self, project, user):
        self._allow_pending(project, user, 'false')
        return (200, "%s is now successfuly rejected on %s." % (user,
                                                                project))
 
    def kick(self, project, user):
 
        subscriptions = [(user, 'none')]
        try:
            self.pubsub.modify_subscriptions(self.pubsub_addr, project,
                                             subscriptions=subscriptions)
            return (200, "%s is now successfuly kicked from %s." % (user,
                                                                    project))
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong."
 
            if status_code == 403:
                msg = "You don't have the permission to do this."
            elif status_code == 404:
                msg = "The %s project does not exist." % project
 
            return (status_code, msg)
 
    def first_git_init(self, project, url):
 
        iq = self.Iq(sto=self.server_addr, stype='set')
        iq['git-init']['node'] = project
        iq['git-init']['url'] = url
 
        try:
            iq.send(timeout=240)
            return (200, "The project %s is now correctly initialized." %
                    project)
        except IqError as e:
            status_code = int(e.iq['error']['code'])
            msg = "Something went wrong."
 
            if status_code == 503:
                msg = e.iq['error']['text']
 
            return (status_code, msg)
 
    def _allow_pending(self, project, user, allow):
        """ Build and send the message to accept/reject the user on the node
        project depending on allow boolean.
        """
 
        # Build the data form.
        payload = self.form.make_form(ftype='submit')
        payload.add_field(var='FORM_TYPE', ftype='hidden',
                          value='http://jabber.org/protocol/pubsub'
                          '#subscribe_authorization')
        payload.add_field(var='pubsub#subid', value='ididid')
        payload.add_field(var='pubsub#node', value=project)
        payload.add_field(var='pubsub#subscriber_jid', value=user)
        payload.add_field(var='pubsub#allow', value=allow)
 
        # Build the message.
        message = self.make_message(self.pubsub_addr)
        message.appendxml(payload.xml)
 
        # Send the message to the pubsub server !
        message.send()
 
 
class RegisterTransport(CommonTransport):
 
    def __init__(self, callback=None):
 
        super(RegisterTransport, self).__init__()
 
        self.callback = callback
 
        self.register_plugin('xep_0077')  # In-band Registration
        self.add_event_handler('register', self.register)
 
    def register(self, iq):
        """ Handler for the register event.
        """
 
        resp = self.Iq()
        resp['type'] = 'set'
        resp['register']['username'] = self.boundjid.user
        resp['register']['password'] = self.password
 
        try:
            resp.send(now=True)
 
            if self.callback:
                self.callback(200, 'You are now registered as %s.' %
                              config['user']['jid'])
        except IqError as e:
            if self.callback:
                status_code = int(e.iq['error']['code'])
                msg = "Something went wrong during the registration."
 
                if status_code == 409:
                    msg = "This username is already use. Please choose " \
                          "another one."
                elif status_code == 500:
                    # Often, registration limit exception.
                    msg = e.iq['error']['text']
 
                self.callback(status_code, msg, fatal=True)
 
        self.close()