Did I find the right examples for you? yes no      Crawl my project      Python Jobs

All Samples(54)  |  Call(45)  |  Derive(0)  |  Import(9)

src/t/r/Trail-HEAD/src/protocols/nullAPI.py   Trail(Download)
from Assert import Assert
 
META, DATA, DONE = 'META', 'DATA', 'DONE'
 
class null_access:
    def pollmeta(self):
        Assert(self.state == META)
        return "Ready", 1
 
    def getmeta(self):
        Assert(self.state == META)
    def polldata(self):
        Assert(self.state == DATA)
        return "Ready", 1
 
    def getdata(self, maxbytes):
        Assert(self.state == DATA)

src/t/r/Trail-HEAD/src/Cache.py   Trail(Download)
SharedItemExpired = 'SharedItem Expired'
 
from Assert import Assert
import os
import protocols
    def decref(self):
        Assert(self.refcnt > 0)
        self.refcnt = self.refcnt - 1
        self.cache_update()
        if self.refcnt == 0:
    def getdata(self, offset, maxbytes):
        Assert(offset >= 0)
        Assert(maxbytes > 0)
 
        while self.stage == DATA and offset >= self.datalen:
    def pollmeta(self):
        Assert(self.stage == META)
        return self.item.pollmeta()
 
    def getmeta(self):

src/t/r/Trail-HEAD/src/protocols/fileAPI.py   Trail(Download)
from Assert import Assert
import grailutil
import ht_time
import os
 
    def pollmeta(self):
        Assert(self.state == META)
        return "Ready", 1
 
    def getmeta(self):
        Assert(self.state == META)
    def polldata(self):
        Assert(self.state == DATA)
        return "Ready", 1
 
    def getdata(self, maxbytes):
        Assert(self.state == DATA)

src/t/r/Trail-HEAD/src/protocols/httpsAPI.py   Trail(Download)
from urllib import splithost
import mimetools
from Assert import Assert
import grailutil
import select
    def open(self):
        Assert(self.state == WAIT)
        resturl, method, params, data = self.args
        if data:
            Assert(method=="POST")
        else:
            Assert(method in ("GET", "POST"))
    def pollmeta(self, timeout=0):
        Assert(self.state == META)
 
        sock = self.h._conn.sock
        try:

src/t/r/Trail-HEAD/src/protocols/httpAPI.py   Trail(Download)
from urllib import splithost
import mimetools
from Assert import Assert
import grailutil
import select
    def open(self):
        Assert(self.state == WAIT)
        resturl, method, params, data = self.args
        if data:
            Assert(method=="POST")
        else:
            Assert(method in ("GET", "POST"))
    def pollmeta(self, timeout=0):
        Assert(self.state == META)
 
        sock = self.h._conn.sock
        try:

src/t/r/Trail-HEAD/src/protocols/dataAPI.py   Trail(Download)
from Assert import Assert
import nullAPI
import string
 
 
    def getmeta(self):
        Assert(self.state == nullAPI.META)
        self.state = nullAPI.DATA
        headers = {"content-type": self.__ctype,
                   "content-length": `len(self.__data)`,
    def polldata(self):
        Assert(self.state in (nullAPI.META, nullAPI.DATA))
        return "Ready", 1
 
    def getdata(self, maxbytes):
        Assert(self.state == nullAPI.DATA)

src/t/r/Trail-HEAD/src/protocols/ftpAPI.py   Trail(Download)
from urlparse import urljoin
import mimetools
from Assert import Assert
import grailutil
import socket
    def __init__(self, url, method, params):
        Assert(method == 'GET')
        netloc, path = splithost(url)
        if not netloc: raise IOError, ('ftp error', 'no host given')
        host, port = splitport(netloc)
    def pollmeta(self):
        Assert(self.state == META)
        return "Ready", 1
 
    def getmeta(self):
        Assert(self.state == META)
    def polldata(self):
        Assert(self.state in (EOF, DATA))
        return "Ready", 1
 
    def getdata(self, maxbytes):

src/t/r/Trail-HEAD/src/protocols/gopherAPI.py   Trail(Download)
import mimetypes
 
from Assert import Assert
 
# wrap text lines longer than:
    def pollmeta(self):
        Assert(self.state == META)
        return "Ready", 1
 
    def getmeta(self):
        Assert(self.state == META)
    def polldata(self):
        Assert(self.state in (EOF, DATA))
        return "Ready", 1
 
    def getdata(self, maxbytes):

src/t/r/Trail-HEAD/src/CacheMgr.py   Trail(Download)
from Cache import SharedItem, SharedAPI
from Assert import Assert
import urlparse
import string
import os
    def expire(self,key):
        """Should not be used."""
        Assert('night' == 'day')
        Assert(self.items.has_key(key))
        self.items[key].evict()
                        del self.manager.items[key]
                        self.use_order.remove(key)
                        Assert(not key in self.use_order)
                elif kind == '0': # add
                    newentry = DiskCacheEntry(self)
                                self.size = 0
                            return
                    Assert(ver in self.log_ok_versions)
            except IndexError:
                # ignore this line