# Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license.  Please see the LICENSE file that should have been included
# as part of this package.
This modules allows for asynchronous execution of Fdist and
  spliting of loads.
FDistAsync Allows for the execution of FDist.
SplitFDist splits a single Fdist execution in several, taking advantage
    of multi-core architectures.
import os
import thread
from time import sleep
from Bio.PopGen.Async import Local
from Bio.PopGen.FDist.Controller import FDistController
class FDistAsync(FDistController):
    """Asynchronous FDist execution.
    def __init__(self, fdist_dir = "", ext = None):
        fdist_dir - Where fdist can be found, if = "", then it
            should be on the path.
        ext - Extension of binary names (e.g. nothing on Unix,
              ".exe" on Windows
        FDistController.__init__(self, fdist_dir, ext)
    def run_job(self, parameters, input_files):
        """Runs FDist asynchronously.
           Gets typical Fdist parameters from a dictionary and
           makes a "normal" call. This is run, normally, inside
           a separate thread.
        npops = parameters['npops']
        nsamples = parameters['nsamples']
        fst = parameters['fst']
        sample_size = parameters['sample_size']
        mut = parameters.get('mut', 0)
        num_sims = parameters.get('num_sims', 20000)
        data_dir = parameters.get('data_dir', '.')
        fst = self.run_fdist(npops, nsamples, fst, sample_size,
            mut, num_sims, data_dir)
        output_files = {}
        output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r')
        return fst, output_files
class SplitFDist:
    """Splits a FDist run.
       The idea is to split a certain number of simulations in smaller
       numbers (e.g. 30.000 sims split in 30 packets of 1.000). This
       allows to run simulations in parallel, thus taking advantage
       of multi-core CPUs.
       Each SplitFDist object can only be used to run a single FDist
    def __init__(self, report_fun = None,
        num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
           report_fun - Function that is called when a single packet is
               run, it should have a single parameter: Fst.
           num_thr - Number of desired threads, typically the number
               of cores.
           split_size - Size that a full simulation will be split in.
           ext - Binary extension name (e.g. nothing on Unix, '.exe' on
        self.async = Local.Local(num_thr)
        self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext)
        self.report_fun = report_fun
        self.split_size = split_size
    #There might be races when reporting...
    def monitor(self):
        """Monitors and reports (using report_fun) execution.
           Every time a partial simulation ends, calls report_fun.
           IMPORTANT: monitor calls can be concurrent with other
           events, ie, a tasks might end while report_fun is being
           called. This means that report_fun should be consider that
           other events might be happening while it is running (it
           can call acquire/release if necessary).
            keys =  self.async.done.keys()[:]
            for done in keys:
                fst, files = self.async.done[done]
                del self.async.done[done]
                out_dat = files['out.dat']
                f = open(self.data_dir + os.sep + 'out.dat','a')
                for file in os.listdir(self.parts[done]):
                    os.remove (self.parts[done] + os.sep + file)
                #print fst, out_dat
                if self.report_fun:
            if len(self.async.waiting) == 0 and len(self.async.running) == 0 \
               and len(self.async.done) == 0:
            #print 'R', self.async.running
            #print 'W', self.async.waiting
            #print 'R', self.async.running
    def acquire(self):
        """Allows the external acquisition of the lock.
    def release(self):
        """Allows the external release of the lock.
    def stop(self):
        """Stops all jobs.
        For now it only dequeues waiting jobs.
        self.waiting = [] #No more waiting jobs
    #You can only run a fdist case at a time
    def run_fdist(self, npops, nsamples, fst, sample_size,
        mut = 0, num_sims = 20000, data_dir='.'):
        """Runs FDist.
           Parameters can be seen on FDistController.run_fdist.
           It will split a single execution in several parts and
           create separated data directories.
        num_parts = num_sims/self.split_size
        self.parts = {}
        self.data_dir = data_dir
        for directory in range(num_parts):
           full_path = data_dir + os.sep + str(directory)
           except OSError:
               pass #Its ok, if it is already there
           id = self.async.run_program('fdist', {
               'npops'       : npops,
               'nsamples'    : nsamples,
               'fst'         : fst,
               'sample_size' : sample_size,
               'mut'         : mut,
               'num_sims'    : self.split_size,
               'data_dir'    : full_path
           }, {})
           self.parts[id] = full_path
        thread.start_new_thread(self.monitor, ())