From f83e37b26a473ed99b57b99a424e6cfba7ea9978 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Fri, 29 Feb 2008 18:46:06 -0800 Subject: [PATCH] A few more documentation fixes. --- apt_dht/CacheManager.py | 162 ++++++++++++++++++++++++++++++++++------ apt_dht/apt_dht_conf.py | 2 +- apt_dht/interfaces.py | 5 +- 3 files changed, 141 insertions(+), 28 deletions(-) diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py index 02021dc..a0f6b6b 100644 --- a/apt_dht/CacheManager.py +++ b/apt_dht/CacheManager.py @@ -1,4 +1,10 @@ +"""Manage a cache of downloaded files. + +@var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed +@var DECOMPRESS_FILES: a list of file names that need to be decompressed +""" + from bz2 import BZ2Decompressor from zlib import decompressobj, MAX_WBITS from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT @@ -14,20 +20,46 @@ from twisted.web2.http import splitHostPort from Hash import HashObject -aptpkg_dir='apt-packages' - DECOMPRESS_EXTS = ['.gz', '.bz2'] DECOMPRESS_FILES = ['release', 'sources', 'packages'] class ProxyFileStream(stream.SimpleStream): - """Saves a stream to a file while providing a new stream.""" + """Saves a stream to a file while providing a new stream. + + Also optionally decompresses the file while it is being downloaded. + + @type stream: L{twisted.web2.stream.IByteStream} + @ivar stream: the input stream being read + @type outFile: L{twisted.python.filepath.FilePath} + @ivar outFile: the file being written + @type hash: L{Hash.HashObject} + @ivar hash: the hash object for the file + @type gzfile: C{file} + @ivar gzfile: the open file to write decompressed gzip data to + @type gzdec: L{zlib.decompressobj} + @ivar gzdec: the decompressor to use for the compressed gzip data + @type gzheader: C{boolean} + @ivar gzheader: whether the gzip header still needs to be removed from + the zlib compressed data + @type bz2file: C{file} + @ivar bz2file: the open file to write decompressed bz2 data to + @type bz2dec: L{bz2.BZ2Decompressor} + @ivar bz2dec: the decompressor to use for the compressed bz2 data + @type length: C{int} + @ivar length: the length of the original (compressed) file + @type doneDefer: L{twisted.internet.defer.Deferred} + @ivar doneDefer: the deferred that will fire when done streaming + + @group Stream implementation: read, close + + """ def __init__(self, stream, outFile, hash, decompress = None, decFile = None): """Initializes the proxy. - @type stream: C{twisted.web2.stream.IByteStream} + @type stream: L{twisted.web2.stream.IByteStream} @param stream: the input stream to read from - @type outFile: C{twisted.python.FilePath} + @type outFile: L{twisted.python.filepath.FilePath} @param outFile: the file to write to @type hash: L{Hash.HashObject} @param hash: the hash object to use for the file @@ -51,15 +83,15 @@ class ProxyFileStream(stream.SimpleStream): self.bz2file = decFile.open('w') self.bz2dec = BZ2Decompressor() self.length = self.stream.length - self.start = 0 self.doneDefer = defer.Deferred() def _done(self): - """Close the output file.""" + """Close all the output files, return the result.""" if not self.outFile.closed: self.outFile.close() self.hash.digest() if self.gzfile: + # Finish the decompression data_dec = self.gzdec.flush() self.gzfile.write(data_dec) self.gzfile.close() @@ -75,6 +107,7 @@ class ProxyFileStream(stream.SimpleStream): if self.outFile.closed: return None + # Read data from the stream, deal with the possible deferred data = self.stream.read() if isinstance(data, defer.Deferred): data.addCallbacks(self._write, self._done) @@ -84,15 +117,22 @@ class ProxyFileStream(stream.SimpleStream): return data def _write(self, data): - """Write the stream data to the file and return it for others to use.""" + """Write the stream data to the file and return it for others to use. + + Also optionally decompresses it. + """ if data is None: self._done() return data + # Write and hash the streamed data self.outFile.write(data) self.hash.update(data) + if self.gzfile: + # Decompress the zlib portion of the file if self.gzheader: + # Remove the gzip header junk self.gzheader = False new_data = self._remove_gzip_header(data) dec_data = self.gzdec.decompress(new_data) @@ -100,11 +140,15 @@ class ProxyFileStream(stream.SimpleStream): dec_data = self.gzdec.decompress(data) self.gzfile.write(dec_data) if self.bz2file: + # Decompress the bz2 file dec_data = self.bz2dec.decompress(data) self.bz2file.write(dec_data) + return data def _remove_gzip_header(self, data): + """Remove the gzip header from the zlib compressed data.""" + # Read, check & discard the header fields if data[:2] != '\037\213': raise IOError, 'Not a gzipped file' if ord(data[2]) != 8: @@ -116,7 +160,7 @@ class ProxyFileStream(stream.SimpleStream): skip = 10 if flag & FEXTRA: - # Read & discard the extra field, if present + # Read & discard the extra field xlen = ord(data[10]) xlen = xlen + 256*ord(data[11]) skip = skip + 2 + xlen @@ -136,6 +180,7 @@ class ProxyFileStream(stream.SimpleStream): skip += 1 if flag & FHCRC: skip += 2 # Read & discard the 16-bit header CRC + return data[skip:] def close(self): @@ -145,9 +190,36 @@ class ProxyFileStream(stream.SimpleStream): self.stream.close() class CacheManager: - """Manages all requests for cached objects.""" + """Manages all downloaded files and requests for cached objects. + + @type cache_dir: L{twisted.python.filepath.FilePath} + @ivar cache_dir: the directory to use for storing all files + @type other_dirs: C{list} of L{twisted.python.filepath.FilePath} + @ivar other_dirs: the other directories that have shared files in them + @type all_dirs: C{list} of L{twisted.python.filepath.FilePath} + @ivar all_dirs: all the directories that have cached files in them + @type db: L{db.DB} + @ivar db: the database to use for tracking files and hashes + @type manager: L{apt_dht.AptDHT} + @ivar manager: the main program object to send requests to + @type scanning: C{list} of L{twisted.python.filepath.FilePath} + @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned + """ def __init__(self, cache_dir, db, other_dirs = [], manager = None): + """Initialize the instance and remove any untracked files from the DB.. + + @type cache_dir: L{twisted.python.filepath.FilePath} + @param cache_dir: the directory to use for storing all files + @type db: L{db.DB} + @param db: the database to use for tracking files and hashes + @type other_dirs: C{list} of L{twisted.python.filepath.FilePath} + @param other_dirs: the other directories that have shared files in them + (optional, defaults to only using the cache directory) + @type manager: L{apt_dht.AptDHT} + @param manager: the main program object to send requests to + (optional, defaults to not calling back with cached files) + """ self.cache_dir = cache_dir self.other_dirs = other_dirs self.all_dirs = self.other_dirs[:] @@ -159,7 +231,7 @@ class CacheManager: # Init the database, remove old files self.db.removeUntrackedFiles(self.all_dirs) - + #{ Scanning directories def scanDirectories(self): """Scan the cache directories, hashing new and rehashing changed files.""" assert not self.scanning, "a directory scan is already under way" @@ -167,7 +239,14 @@ class CacheManager: self._scanDirectories() def _scanDirectories(self, result = None, walker = None): - # Need to start waling a new directory + """Walk each directory looking for cached files. + + @param result: the result of a DHT store request, not used (optional) + @param walker: the walker to use to traverse the current directory + (optional, defaults to creating a new walker from the first + directory in the L{CacheManager.scanning} list) + """ + # Need to start walking a new directory if walker is None: # If there are any left, get them if self.scanning: @@ -218,41 +297,61 @@ class CacheManager: df.addErrback(log.err) def _doneHashing(self, result, file, walker): - + """If successful, add the hashed file to the DB and inform the main program.""" if isinstance(result, HashObject): log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest())) + + # Only set a URL if this is a downloaded file url = None if self.scanning[0] == self.cache_dir: url = 'http:/' + file.path[len(self.cache_dir.path):] + + # Store the hashed file in the database new_hash = self.db.storeFile(file, result.digest()) + + # Tell the main program to handle the new cache file df = self.manager.new_cached_file(file, result, new_hash, url, True) if df is None: reactor.callLater(0, self._scanDirectories, None, walker) else: df.addBoth(self._scanDirectories, walker) else: + # Must have returned an error log.msg('hash check of %s failed' % file.path) log.err(result) reactor.callLater(0, self._scanDirectories, None, walker) + #{ Downloading files def save_file(self, response, hash, url): - """Save a downloaded file to the cache and stream it.""" + """Save a downloaded file to the cache and stream it. + + @type response: L{twisted.web2.http.Response} + @param response: the response from the download + @type hash: L{Hash.HashObject} + @param hash: the hash object containing the expected hash for the file + @param url: the URI of the actual mirror request + @rtype: L{twisted.web2.http.Response} + @return: the final response from the download + """ if response.code != 200: log.msg('File was not found (%r): %s' % (response, url)) return response log.msg('Returning file: %s' % url) - + + # Set the destination path for the file parsed = urlparse(url) destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2]) log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path)) + # Make sure there's a free place for the file if destFile.exists(): log.msg('File already exists, removing: %s' % destFile.path) destFile.remove() elif not destFile.parent().exists(): destFile.parent().makedirs() - + + # Determine whether it needs to be decompressed and how root, ext = os.path.splitext(destFile.basename()) if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS: ext = ext.lower() @@ -265,19 +364,35 @@ class CacheManager: ext = None decFile = None + # Create the new stream from the old one. orig_stream = response.stream response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile) response.stream.doneDefer.addCallback(self._save_complete, url, destFile, response.headers.getHeader('Last-Modified'), - ext, decFile) + decFile) response.stream.doneDefer.addErrback(self.save_error, url) + + # Return the modified response with the new stream return response - def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None): - """Update the modification time and AptPackages.""" + def _save_complete(self, hash, url, destFile, modtime = None, decFile = None): + """Update the modification time and inform the main program. + + @type hash: L{Hash.HashObject} + @param hash: the hash object containing the expected hash for the file + @param url: the URI of the actual mirror request + @type destFile: C{twisted.python.FilePath} + @param destFile: the file where the download was written to + @type modtime: C{int} + @param modtime: the modified time of the cached file (seconds since epoch) + (optional, defaults to not setting the modification time of the file) + @type decFile: C{twisted.python.FilePath} + @param decFile: the file where the decompressed download was written to + (optional, defaults to the file not having been compressed) + """ if modtime: os.utime(destFile.path, (modtime, modtime)) - if ext: + if decFile: os.utime(decFile.path, (modtime, modtime)) result = hash.verify() @@ -292,12 +407,13 @@ class CacheManager: if self.manager: self.manager.new_cached_file(destFile, hash, new_hash, url) - if ext: - self.manager.new_cached_file(decFile, None, False, url[:-len(ext)]) + if decFile: + ext_len = len(destFile.path) - len(decFile.path) + self.manager.new_cached_file(decFile, None, False, url[:-ext_len]) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove() - if ext: + if decFile: decFile.remove() def save_error(self, failure, url): diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index 0724172..f5d75a5 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -123,7 +123,7 @@ DHT_DEFAULTS = { class AptDHTConfigParser(SafeConfigParser): """Adds 'gettime' and 'getstringlist' to ConfigParser objects. - @param time_multipliers: the 'gettime' suffixes and the multipliers needed + @ivar time_multipliers: the 'gettime' suffixes and the multipliers needed to convert them to seconds """ diff --git a/apt_dht/interfaces.py b/apt_dht/interfaces.py index 8cb2e69..2c022c5 100644 --- a/apt_dht/interfaces.py +++ b/apt_dht/interfaces.py @@ -1,8 +1,5 @@ -""" -Some interfaces that are used by the apt-dht classes. - -""" +"""Some interfaces that are used by the apt-dht classes.""" from zope.interface import Interface -- 2.39.5