# Copyright 2002-2011 Nick Mathewson. See LICENSE for licensing information. """mixminion.Filestore Common code for directory-based, security conscious, threadsafe unordered file stores. Also contains common code for journalable DB-backed threadsafe stores. """ # Formerly, this was all in mixminion.server.ServerQueue. But # ClientMain had already grown a minimal version for client file # queues, and we needed another for fragment stores anyway. So it was # time to refactor the common code. import anydbm import binascii import cPickle import dumbdbm import errno import os import stat import threading import time import types import whichdb from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \ createPrivateDir, readFile, replaceFile, tryUnlink, writePickled from mixminion.Crypto import getCommonPRNG __all__ = [ "StringStore", "StringMetadataStore", "ObjectStore", "ObjectMetadataStore", "MixedStore", "MixedMetadataStore", "DBBase", "JournaledDBBase", "BooleanJournaledDBBase", "CorruptedFile", ] class CorruptedFile(MixError): """Raised when a pickled object cannot be properly decoded.""" pass # ====================================================================== # Filestores. # Any inp_* files older than INPUT_TIMEOUT seconds old are assumed to be # trash. INPUT_TIMEOUT = 6000 class BaseStore: """A BaseStore is an unordered collection of files with secure insert, move, and delete operations. This class is not for direct use; combine it with one of the mixin classes below. Abstractly, a BaseStore is a consistent collection of 'things' with (optional) persistent metadata. The 'things' support insert, move, and delete operations. The metadata supports modification. Implementation: a BaseStore is a directory of 'message' files. Each filename in the directory has a name in one of the following formats: rmv_HANDLE (A message waiting to be deleted) msg_HANDLE (A message waiting in the queue. inp_HANDLE (An incomplete message being created.) crp_HANDLE (A corrupted message awaiting debugging analysis) (Where HANDLE is a randomly chosen 8-character string of characters chosen from 'A-Za-z0-9+-'. [Collision probability is negligible, and collisions are detected.]) If metadata is present, is has a names with the analogous rmvm_HANDLE meta_HANDLE inpm_HANDLE crpm_HANDLE Threading notes: Although BaseStore itself is threadsafe, you'll want to synchronize around any multistep operations that you want to run atomically. Use BaseStore.lock() and BaseStore.unlock() for this. In the Mixminion server, no queue currently has more than one producer or more than one consumer ... so synchronization turns out to be fairly easy. """ # Fields: dir--the location of the file store. # n_entries: the number of complete messages in the queue. # <0 if we haven't counted yet. # _lock: A lock that must be held while modifying or accessing # the queue object. Filesystem operations are allowed # without holding the lock, but they must not be visible # to users of the queue. def __init__(self, location, create=0, scrub=0): """Creates a file store object for a given directory, 'location'. If 'create' is true, creates the directory if necessary. If 'scrub' is true, removes any incomplete or invalidated messages from the store.""" secureDelete([]) # Make sure secureDelete is configured. HACK! self._lock = threading.RLock() self.dir = location if not os.path.isabs(location): LOG.warn("Directory path %s isn't absolute.", location) if os.path.exists(location) and not os.path.isdir(location): raise MixFatalError("%s is not a directory" % location) createPrivateDir(location, nocreate=(not create)) if scrub: self.cleanQueue() # Count messages on first time through. self.n_entries = -1 def lock(self): """Prevent access to this filestore from other threads.""" self._lock.acquire() def unlock(self): """Release the lock on this filestore.""" self._lock.release() def count(self, recount=0): """Returns the number of complete messages in the filestore.""" try: self._lock.acquire() if self.n_entries >= 0 and not recount: return self.n_entries else: res = 0 for fn in os.listdir(self.dir): if fn.startswith("msg_"): res += 1 self.n_entries = res return res finally: self._lock.release() def pickRandom(self, count=None): """Returns a list of 'count' handles to messages in this filestore. The messages are chosen randomly, and returned in a random order. If there are fewer than 'count' messages in the filestore, all the messages will be included.""" handles = self.getAllMessages() # handles locking return getCommonPRNG().shuffle(handles, count) def getAllMessages(self): """Returns handles for all messages currently in the filestore. Note: this ordering is not guaranteed to be random.""" self._lock.acquire() hs = [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")] self._lock.release() return hs def messageExists(self, handle): """Return true iff this filestore contains a message with the handle 'handle'.""" return os.path.exists(os.path.join(self.dir, "msg_"+handle)) def _doRemove(self, handle, newState): self._changeState(handle, "msg", newState) def _preserveCorrupted(self, handle): """Given a handle, change the message state to 'crp'.""" self._doRemove(handle, "crp") def removeMessage(self, handle): """Given a handle, removes the corresponding message from the filestore. """ self._doRemove(handle, "rmv") # handles locking. def removeAll(self, secureDeleteFn=None): """Removes all messages from this filestore.""" try: self._lock.acquire() for m in os.listdir(self.dir): if m[:4] in ('inp_', 'msg_'): self._changeState(m[4:], m[:3], "rmv") elif m[:4] in ('inpm_', 'meta_'): self._changeState(m[5:], m[:4], "rmvm") self.n_entries = 0 self.cleanQueue(secureDeleteFn) finally: self._lock.release() def getMessagePath(self, handle): """Given a handle for an existing message, return the name of the file that contains that message.""" # We don't need to lock here: the handle is still valid, or it isn't. return os.path.join(self.dir, "msg_"+handle) def openMessage(self, handle): """Given a handle for an existing message, returns a file descriptor open to read that message.""" # We don't need to lock here; the handle is still valid, or it isn't. return open(os.path.join(self.dir, "msg_"+handle), 'rb') def openNewMessage(self): """Returns (file, handle) tuple to create a new message. Once you're done writing, you must call finishMessage to commit your changes, or abortMessage to reject them.""" while 1: f, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1, "msg_") return f, handle raise AssertionError # unreached; appease pychecker def finishMessage(self, f, handle, _ismeta=0): """Given a file and a corresponding handle, closes the file commits the corresponding message.""" # if '_ismeta' is true, we're finishing not a message, but the # metadata for a message f.close() if _ismeta: self._changeState(handle, "inpm", "meta") else: self._changeState(handle, "inp", "msg") def abortMessage(self, f, handle, _ismeta=0): """Given a file and a corresponding handle, closes the file rejects the corresponding message.""" # if '_ismeta' is true, we're finishing not a message, but the # metadata for a message f.close() if _ismeta: self._changeState(handle, "inpm", "rmvm") else: self._changeState(handle, "inp", "rmv") def cleanQueue(self, secureDeleteFn=None): """Removes all timed-out or trash messages from the filestore. If secureDeleteFn is provided, it is called with a list of filenames to be removed. Otherwise, files are removed using secureDelete. Returns 1 if a clean is already in progress; otherwise returns 0. """ # We don't need to hold the lock here; we synchronize via the # filesystem. rmv = [] allowedTime = int(time.time()) - INPUT_TIMEOUT for m in os.listdir(self.dir): if m.startswith("rmv_") or m.startswith("rmvm_"): rmv.append(os.path.join(self.dir, m)) elif m.startswith("inp_"): try: s = os.stat(m) if s[stat.ST_MTIME] < allowedTime: self._changeState(m[4:], "inp", "rmv") rmv.append(os.path.join(self.dir, m)) except OSError: pass if secureDeleteFn: secureDeleteFn(rmv) else: secureDelete(rmv, blocking=1) def _changeState(self, handle, s1, s2): """Helper method: changes the state of message 'handle' from 's1' to 's2', and changes the internal count.""" try: self._lock.acquire() try: replaceFile(os.path.join(self.dir, s1+"_"+handle), os.path.join(self.dir, s2+"_"+handle)) except OSError, e: contents = os.listdir(self.dir) LOG.error("Error while trying to change %s from %s to %s: %s", handle, s1, s2, e) LOG.error("Directory %s contains: %s", self.dir, contents) self.count(1) return if self.n_entries < 0: return if s1 == 'msg' and s2 != 'msg': self.n_entries -= 1 elif s1 != 'msg' and s2 == 'msg': self.n_entries += 1 finally: self._lock.release() class StringStoreMixin: """Combine the 'StringStoreMixin' class with a BaseStore in order to implement a BaseStore that stores strings. """ def __init__(self): pass def messageContents(self, handle): """Given a message handle, returns the contents of the corresponding message.""" try: self._lock.acquire() return readFile(os.path.join(self.dir, "msg_"+handle), 1) finally: self._lock.release() def queueMessage(self, contents): """Creates a new message in the filestore whose contents are 'contents', and returns a handle to that message.""" f, handle = self.openNewMessage() f.write(contents) self.finishMessage(f, handle) # handles locking return handle class ObjectStoreMixin: """Combine the 'ObjectStoreMixin' class with a BaseStore in order to implement a BaseStore that stores strings. """ def __init__(self): pass def getObject(self, handle): """Given a message handle, read and unpickle the contents of the corresponding message. In rare error cases, raises CorruptedFile. """ try: self._lock.acquire() f = open(os.path.join(self.dir, "msg_"+handle), 'rb') try: res = cPickle.load(f) f.close() return res except (cPickle.UnpicklingError, EOFError, IOError), e: LOG.error("Found damaged object %s in filestore %s: %s", handle, self.dir, str(e)) self._preserveCorrupted(handle) raise CorruptedFile() finally: self._lock.release() def queueObject(self, object): """Queue an object using cPickle, and return a handle to that object.""" f, handle = self.openNewMessage() cPickle.dump(object, f, 1) self.finishMessage(f, handle) # handles locking return handle class BaseMetadataStore(BaseStore): """A BaseMetadataStore is a BaseStore that stores a metadata object for every object in the store. We assume metadata to be relatively volitile compared to the underlying stored objects. Metadata is not always wiped before removal. The representation of a store with metadata is the same as that of a simple store, except that: 1) The meta_, rmvm_, and inpm_ tags are used. 2) For every file in msg_ state, there is a corresponding meta_ file. """ ##Fields: # _metadata_cache: map from handle to cached metadata object. This is # a write-through cache. def __init__(self, location, create=0, scrub=0): """Create a new BaseMetadataStore to store files in 'location'. The 'create' and 'scrub' arguments are as for BaseStore(...).""" BaseStore.__init__(self, location=location, create=create, scrub=scrub) self._metadata_cache = {} if scrub: self.cleanMetadata() def cleanMetadata(self,secureDeleteFn=None): """Find all orphaned metadata files and remove them.""" hSet = {} for h in self.getAllMessages(): hSet[h] = 1 rmv = [] for h in [fn[5:] for fn in os.listdir(self.dir) if fn.startswith("meta_")]: if not hSet.get(h): rmv.append("meta_"+h) if rmv: LOG.warn("Removing %s orphaned metadata files from %s", len(rmv), self.dir) if secureDeleteFn: secureDeleteFn(rmv) else: secureDelete(rmv, blocking=1) def loadAllMetadata(self, newDataFn): """For all objects in the store, load their metadata into the internal cache. If any object is missing its metadata, create metadata for it by invoking newDataFn(handle).""" try: self._lock.acquire() self._metadata_cache = {} for h in self.getAllMessages(): try: self.getMetadata(h) except KeyError: LOG.warn("Missing metadata for file %s",h) self.setMetadata(h, newDataFn(h)) except CorruptedFile: continue finally: self._lock.release() def getMetadata(self, handle): """Return the metadata associated with a given handle. If the metadata is damaged, may raise CorruptedFile.""" fname = os.path.join(self.dir, "meta_"+handle) if not os.path.exists(fname): raise KeyError(handle) try: self._lock.acquire() try: return self._metadata_cache[handle] except KeyError: pass f = open(fname, 'rb') try: res = cPickle.load(f) except cPickle.UnpicklingError, e: LOG.error("Found damaged metadata for %s in filestore %s: %s", handle, self.dir, str(e)) self._preserveCorrupted(handle) raise CorruptedFile() f.close() self._metadata_cache[handle] = res return res finally: self._lock.release() def setMetadata(self, handle, object): """Change the metadata associated with a given handle.""" # On windows or (old-school) mac, binary != text. O_BINARY = getattr(os, 'O_BINARY', 0) flags = os.O_WRONLY|os.O_CREAT|os.O_TRUNC|O_BINARY try: self._lock.acquire() fname = os.path.join(self.dir, "inpm_"+handle) f = os.fdopen(os.open(fname, flags, 0600), "wb") cPickle.dump(object, f, 1) self.finishMessage(f, handle, _ismeta=1) self._metadata_cache[handle] = object return handle finally: self._lock.release() def _doRemove(self, handle, newState): try: self._lock.acquire() # Remove the message before the metadata, so we don't have # a message without metadata. BaseStore._doRemove(self, handle, newState) if os.path.exists(os.path.join(self.dir, "meta_"+handle)): self._changeState(handle, "meta", newState+"m") try: del self._metadata_cache[handle] except KeyError: pass finally: self._lock.release() class StringMetadataStoreMixin(StringStoreMixin): """Add this mixin class to a BaseMetadataStore in order to get a filestore that stores strings with metadata.""" def __init__(self): StringStoreMixin.__init__(self) def queueMessage(self, message): LOG.warn("Called 'queueMessage' on a metadata store.") return self.queueMessageAndMetadata(message, None) def queueMessageAndMetadata(self, message, metadata): f, handle = self.openNewMessage() f.write(message) self.setMetadata(handle, metadata) self.finishMessage(f, handle) # handles locking return handle class ObjectMetadataStoreMixin(ObjectStoreMixin): """Add this mixin class to a BaseMetadataStore in order to get a filestore that stores objects with metadata.""" def __init__(self): ObjectStoreMixin.__init__(self) def queueObject(self, object): LOG.warn("Called 'queueObject' on a metadata store.") return self.queueObjectAndMetadata(object, None) def queueObjectAndMetadata(self, object, metadata): f, handle = self.openNewMessage() cPickle.dump(object, f, 1) self.setMetadata(handle, metadata) self.finishMessage(f, handle) # handles locking return handle class StringStore(BaseStore, StringStoreMixin): def __init__(self, location, create=0, scrub=0): BaseStore.__init__(self, location, create, scrub) StringStoreMixin.__init__(self) class StringMetadataStore(BaseMetadataStore, StringMetadataStoreMixin): def __init__(self, location, create=0, scrub=0): BaseMetadataStore.__init__(self, location, create, scrub) StringMetadataStoreMixin.__init__(self) class ObjectStore(BaseStore, ObjectStoreMixin): def __init__(self, location, create=0, scrub=0): BaseStore.__init__(self, location, create, scrub) ObjectStoreMixin.__init__(self) class ObjectMetadataStore(BaseMetadataStore, ObjectMetadataStoreMixin): def __init__(self, location, create=0, scrub=0): BaseMetadataStore.__init__(self, location, create, scrub) ObjectMetadataStoreMixin.__init__(self) class MixedStore(BaseStore, StringStoreMixin, ObjectStoreMixin): def __init__(self, location, create=0, scrub=0): BaseStore.__init__(self, location, create, scrub) StringStoreMixin.__init__(self) ObjectStoreMixin.__init__(self) class MixedMetadataStore(BaseMetadataStore, StringMetadataStoreMixin, ObjectMetadataStoreMixin): def __init__(self, location, create=0, scrub=0): BaseMetadataStore.__init__(self, location, create, scrub) StringMetadataStoreMixin.__init__(self) ObjectMetadataStoreMixin.__init__(self) # ====================================================================== # Database wrappers def _openDBHash(filename,flag,mode=0666): """Open a Berkeley DB hash database. Equivalent to dbhash.open, but when possible, reaches into bsddb.db and uses the DB_RECOVER* flag(s) to handle possible corruption from crashing without closing the database. """ if 1: #XXXX008 This function is borked. Fix it or remove it. return anydbm.open(filename, flag, mode) try: import bsddb except ImportError: # Fallback to anydbm, which delegates to dbhash return anydbm.open(filename, flag, mode) # Adapted from bsddb.hashopen e = bsddb.db.DBEnv(bsddb.db.DB_PRIVATE| bsddb.db.DB_CREATE | bsddb.db.DB_THREAD | bsddb.db.DB_INIT_LOCK | bsddb.db.DB_INIT_MPOOL | bsddb.db.DB_RECOVER ) flags = bsddb.db.DB_CREATE | bsddb.db.DB_THREAD flags |= getattr(bsddb.db, "DB_AUTO_COMMIT", 0) #flags |= getattr(bsddb.db, "DB_RECOVER", 0) #flags |= getattr(bsddb.db, "DB_RECOVER_FATAL", 0) d = bsddb.db.DB(e) d.open(filename, bsddb.db.DB_HASH, flags, mode) return bsddb._DBWithCursor(d) def openDB(filename, purpose): """Replacement for anydbm.open. Open a database stored in 'filename', using the best available database implementation. The string 'purpose' is used to indicate which database has succeeded or failed in any messages. Changes from anydbm.open: - Create parent directory if it doesn't exist. - Bail with sane error messages if file is non-readable. - Handle the error case where the database file is created but never filled. - Always create the database if it doesn't exist. - Warn if using a dumbdbm database. - Return a 2-tuple of the database object and a no-arguments callable that flushes the database's contents to disk. """ parent = os.path.split(filename)[0] createPrivateDir(parent) # If the file exists, but can't be read, bail. try: st = os.stat(filename) except OSError, e: if e.errno != errno.ENOENT: raise st = None # If the file is empty, delete it and start over. if st and st[stat.ST_SIZE] == 0: LOG.warn("Half-created database %s found; cleaning up.", filename) tryUnlink(filename) dbtype = whichdb.whichdb(filename) LOG.debug("Opening %s database at %s", purpose, filename) try: if dbtype != 'dbhash': db = _openDBHash(filename, 'c', 0600) else: db = anydbm.open(filename, 'c', 0600) except anydbm.error, e: raise MixFatalError("Can't open %s database: %s"%(purpose,e)) except ImportError: raise MixFatalError("Unsupported type for %s database: %s" %(purpose, dbtype)) if hasattr(db, 'sync'): syncLog = db.sync elif hasattr(db, '_commit'): # Workaround for dumbdbm to allow syncing. (Standard in # Python 2.3.) syncLog = db._commit else: # Otherwise, force a no-op sync method. syncLog = lambda : None if isinstance(db, dumbdbm._Database): LOG.warn("Warning: using a flat file for %s database", purpose) return db, syncLog class DBBase: """A DBBase is a persistent store that maps keys to values, using a Python anydbm object. It differs from the standard python 'shelve' module: - by handling broken databases files, - by warning when using dumbdbm, - by providing a 'sync' feature, - by bypassing the pickle module's overhead, - by providing thread-safety To use this class for non-string keys or values, override the _{en|de}code{Key|Value} methods.""" ## Fields: # _lock -- A threading.RLock to protect access to database. # filename -- The name of the underlying database file. Note that some # database implementations (such as dumdbm) create multiple files, # using <filename> as a prefix. # log -- The underlying anydbm object. # _syncLog -- no-arguments function to flush self.log to disk. def __init__(self, filename, purpose=""): """Create a DBBase object for a database stored in 'filename', creating the underlying database if needed.""" self._lock = threading.RLock() self.filename = filename self.log, self._syncLog = openDB(filename, purpose) # Subclasses may want to check whether this is the right database, # flush the journal, and so on. def _encodeKey(self, k): """Given a key for this mapping (a Python object), return a string usable as a key by the underlying databse.""" return k def _encodeVal(self, v): """Given a value for this mapping (a Python object), return a string usable as a value by the underlying databse.""" return v def _decodeVal(self, v): """Given a string-encoded value as used in the underlying database, return the original Python object.""" return v def has_key(self, k): try: _ = self[k] return 1 except KeyError: return 0 def __getitem__(self, k): return self.getItem(k) def keys(self): return map(self._decodeKey, self.log.keys()) def get(self, k, default=None): try: return self[k] except KeyError: return default def __setitem__(self, k, v): self.setItem(k, v) def __delitem__(self, k): self.delItem(k) def getItem(self, k): try: self._lock.acquire() return self._decodeVal(self.log[self._encodeKey(k)]) finally: self._lock.release() def setItem(self, k, v): self._lock.acquire() try: self.log[self._encodeKey(k)] = self._encodeVal(v) finally: self._lock.release() def delItem(self, k): try: self._lock.acquire() del self.log[self._encodeKey(k)] finally: self._lock.release() def sync(self): """Flush all pending changes to disk""" self._lock.acquire() try: self._syncLog() finally: self._lock.release() def close(self): """Release resources associated with this database.""" self._lock.acquire() try: self.log.close() self.log = None finally: self._lock.release() # Flags for use when opening the journal. _JOURNAL_OPEN_FLAGS = os.O_WRONLY|os.O_CREAT|getattr(os,'O_SYNC',0)|getattr(os,'O_BINARY',0) class JournaledDBBase(DBBase): """Optimized version of DBBase that requires fewer sync() operations. Uses a journal file to cache keys and values until they can be written to the underlying database. Keys and values must all encode to stings of the same length.""" # Largest allowed number of journal entries before we flush the journal # to disk. MAX_JOURNAL = 128 ## Fields: # klen -- required length of journal-encoded keys # vlen -- required length of journal-encoded values # vdflt -- If vlen is 0, default value used when reading journaled value # from disk. # journal -- map from journal-encoded key to journal-encoded value. # journalFileName -- filename to use for journal file. # journalFile -- fd for the journal file def __init__(self, location, purpose, klen, vlen, vdflt): """Create a new JournaledDBBase that stores its files to match the pattern 'location*', whose journal-encoded keys are all of length klen, whose journal-encoded values are all of length vlen.""" DBBase.__init__(self, location, purpose) self.klen = klen self.vlen = vlen self.vdefault = vdflt self.journalFileName = location+"_jrnl" self.journal = {} # If there's a journal file, snarf it into memory. if os.path.exists(self.journalFileName): j = readFile(self.journalFileName, 1) for i in xrange(0, len(j), klen+vlen): if vlen: self.journal[j[i:i+klen]] = j[i+klen:i+klen+vlen] else: self.journal[j[i:i+klen]] = self.vdefault self.journalFile = os.open(self.journalFileName, _JOURNAL_OPEN_FLAGS|os.O_APPEND, 0600) self.sync() getItemNoJournal = DBBase.getItem setItemNoJournal = DBBase.setItem def _jEncodeKey(self, k): return k def _jDecodeKey(self, k): return k def _jEncodeVal(self, v): return v def _jDecodeVal(self, v): return v def getItem(self, k): jk = self._jEncodeKey(k) assert len(jk) == self.klen self._lock.acquire() try: if self.journal.has_key(jk): return self._jDecodeVal(self.journal[jk]) return self.getItemNoJournal(k) finally: self._lock.release() def keys(self): return map(self._decodeKey, self.log.keys()) + \ map(self._jDecodeKey, self.journal.keys()) def setItem(self, k, v): jk = self._jEncodeKey(k) jv = self._jEncodeVal(v) assert len(jk) == self.klen if self.vlen: assert len(jv) == self.vlen self._lock.acquire() try: self.journal[jk] = jv os.write(self.journalFile, jk) if self.vlen: os.write(self.journalFile, jv) if len(self.journal) > self.MAX_JOURNAL: self.sync() finally: self._lock.release() def delItem(self, k): deletedOne = 0 try: del self.journal[k] self.sync() deletedOne = 1 except KeyError: pass try: del self.log[k] deletedOne = 1 except KeyError: pass if not deletedOne: raise KeyError def sync(self): self._lock.acquire() try: for jk in self.journal.keys(): ek = self._encodeKey(self._jDecodeKey(jk)) ev = self._encodeVal(self._jDecodeVal(self.journal[jk])) self.log[ek] = ev self._syncLog() os.close(self.journalFile) self.journalFile = os.open(self.journalFileName, _JOURNAL_OPEN_FLAGS|os.O_TRUNC, 0600) self.journal = {} finally: self._lock.release() def close(self): try: self._lock.acquire() self.sync() self.log.close() self.log = None os.close(self.journalFile) finally: self._lock.release() class BooleanJournaledDBBase(JournaledDBBase): """Specialization of JournaledDBBase that encodes a set of keys, mapping each key to the value '1'. (By default, constant-length string keys are accepted, and are hex-encoded when stored in the database, in case the database isn't 8-bit clean.) """ def __init__(self, location, purpose, klen): JournaledDBBase.__init__(self,location,purpose,klen,0,"1") def _encodeKey(self, k): return binascii.b2a_hex(k) def _jEncodeVal(self, v): return "" def _jDecodeVal(self, k): return 1 def _encodeVal(self, v): return "1" def _decodeVal(self, v): return 1 class WritethroughDict: """A persistent mapping from string to pickleable object. The entire mapping is cached in memory, but all modifications are written through to disk immediately. """ ## Fields: # db: A Python database object, as returned by openDB. # _syncLog: A function to call to flush the database to disk, if possible. # cache: A dictionary mapping strings to the objects in this mapping. def __init__(self, filename, purpose): """Open a WritethroughDict to store a mapping in the file 'filename'. Use the string 'purpose' in log and messages about this object.""" self.db, self._syncLog = openDB(filename,purpose) self.cache = {} self.load() def __setitem__(self, k, v): assert type(k) == types.StringType self.cache[k] = v self.db[k] = cPickle.dumps(v,1) def __getitem__(self, k): assert type(k) == types.StringType return self.cache[k] def get(self, k, vOther=None): try: return self.cache[k] except KeyError: return vOther def __delitem__(self, k): del self.cache[k] del self.db[k] def has_key(self, k): return self.cache.has_key(k) def sync(self): """Flush changes in the underlying database to disk.""" self._syncLog() def close(self): """Release all resources held by this object. Users of this class should call this method before exiting if at all possible.""" self._syncLog() self.db.close() del self.cache del self.db del self._syncLog def keys(self): return self.cache.keys() def values(self): return self.cache.values() def items(self): return self.cache.items() def load(self): """Rescan the underlying database for this mapping.""" keys = self.db.keys() self.cache = cache = {} for k in keys: cache[k] = cPickle.loads(self.db[k]) class PickleCache: """DOCDOC""" def __init__(self, fname_base, fname_cache): self._fname_base = fname_base self._fname_cache = fname_cache self._dirty = 0 def _setFromPickle(self, p): raise NotImplemented def _getForPickle(self, p): raise NotImplemented def _reload(self): raise NotImplemented def _loadFromCache(self): # raise OSError or return false on can't/shouldn't load. try: cache_mtime = os.stat(self._fname_cache)[stat.ST_MTIME] file_mtime = os.stat(self._fname_base)[stat.ST_MTIME] except OSError: return 0 if file_mtime >= cache_mtime: return 0 try: p = readPickled(self._fname_cache) except (OSError, cPickle.UnpicklingError), _: return 0 if not self._setFromPickle(p): return 0 self._dirty = 0 return 1 def load(self): if not self._loadFromCache(): self._reload() self._dirty = 1 def save(self, mode): writePickled(self._fname_cache, self._getForPickle(), mode=mode) self._dirty = 0