Did I find the right examples for you? yes no      Crawl my project      Python Jobs

All Samples(57)  |  Call(45)  |  Derive(0)  |  Import(12)

src/k/a/kamaelia-HEAD/Code/Python/Axon/Axon/debugConfigFile.py   kamaelia(Download)
import re
# from string import atoi
from Axon.util import next
 
debugConfig = dict()
   def nextLine(lines,filterFuncs=(comment)):
      """Return next line, filtering out undesirable lines, eg. comments"""
      data = next(lines)
      if data:
         while applyFuncs(data,filterFuncs):
            data = next(lines)

src/k/a/kamaelia-HEAD/Code/Python/Axon/Axon/AdaptiveCommsComponent.py   kamaelia(Download)
import Axon.idGen as idGen
from Axon.Box import makeInbox, makeOutbox
from Axon.util import next
 
class _AdaptiveCommsable(object):
      """
      while name in self.inboxes:
          name =name+str(next(idGen.idGen()))
      return name
   #
      """
      while name in self.outboxes:
         name =name+str(next(idGen.idGen()))
      return name
 

src/k/a/kamaelia-HEAD/Code/Python/Axon/Axon/Scheduler.py   kamaelia(Download)
    vrange = range
 
from Axon.util import next
 
def _sort(somelist):
                   try:
#                       result = mprocess.next()
                       result = next(mprocess)
 
                       if isinstance(result, newComponent):

src/k/a/kamaelia-HEAD/Code/Python/Axon/Axon/Microprocess.py   kamaelia(Download)
import Axon.Base
import Axon.CoordinatingAssistantTracker as cat
from Axon.util import next
 
class _NullScheduler(object):

src/k/a/kamaelia-HEAD/Tests/Python/Axon/test_ThreadedComponent.py   kamaelia(Download)
# import thread,Queue,threading
import threading
from Axon.util import next
 
try:
        sched=scheduler()
        t=Test().activate(Scheduler=sched)
        next(t)            # get the thread started
        self.assert_(t.threadid.get() != thread.get_ident(), "main() returns a different value for thread.get_ident()")
 
        msg = "hello!"
        o=OneShotTo( (r,"inbox"), msg).activate(Scheduler=sched)
        next(r)
        next(o)
        next(r)

src/k/a/kamaelia-HEAD/Sketches/MPS/BugReports/FixTests/Axon/Axon/debugConfigFile.py   kamaelia(Download)
import re
# from string import atoi
from Axon.util import next
 
debugConfig = dict()
   def nextLine(lines,filterFuncs=(comment)):
      """Return next line, filtering out undesirable lines, eg. comments"""
      data = next(lines)
      if data:
         while applyFuncs(data,filterFuncs):
            data = next(lines)

src/k/a/kamaelia-HEAD/Sketches/MPS/BugReports/FixTests/Axon/Axon/AdaptiveCommsComponent.py   kamaelia(Download)
import Axon.idGen as idGen
from Axon.Box import makeInbox, makeOutbox
from Axon.util import next
 
class _AdaptiveCommsable(object):
      """
      while name in self.inboxes:
          name =name+str(next(idGen.idGen()))
      return name
   #
      """
      while name in self.outboxes:
         name =name+str(next(idGen.idGen()))
      return name
 

src/k/a/kamaelia-HEAD/Sketches/MPS/BugReports/FixTests/Axon/Axon/Scheduler.py   kamaelia(Download)
    vrange = range
 
from Axon.util import next
 
def _sort(somelist):
                   try:
#                       result = mprocess.next()
                       result = next(mprocess)
 
                       if isinstance(result, newComponent):

src/k/a/kamaelia-HEAD/Tests/Python/Axon/test_Postoffice.py   kamaelia(Download)
from Axon.AxonExceptions import noSpaceInBox
from Axon.AxonExceptions import BoxAlreadyLinkedToDestination
from Axon.util import next,vrange
 
class Dummybox(list):
            try:
                for _ in range(0,waitcycles):
                    next(execute)
            except StopIteration:
                self.fail("Scheduler terminated unexpectedly")
    def runForAWhile(self, cycles=100):
        for _ in range(0,cycles):
            next(self.schedthread)
 
    def test_smoketest(self):

src/k/a/kamaelia-HEAD/Tests/Python/Axon/test_Component.py   kamaelia(Download)
import re
from Axon.Component import *
from Axon.util import next,vrange
import Axon.Linkage as Linkage
 
    def runForAWhile(self, cycles=100):
        for _ in range(0,cycles):
            next(self.schedthread)
 
    def test_NothingHappensIfNothingHappens(self):

  1 | 2  Next