A few more documentation fixes.
authorCameron Dale <camrdale@gmail.com>
Sat, 1 Mar 2008 02:46:06 +0000 (18:46 -0800)
committerCameron Dale <camrdale@gmail.com>
Sat, 1 Mar 2008 02:46:06 +0000 (18:46 -0800)
apt_dht/CacheManager.py
apt_dht/apt_dht_conf.py
apt_dht/interfaces.py

index 02021dc34c16a359b360f11c5100af260a5220ca..a0f6b6b48b4fa044252b8bab7fe64afb81a4b453 100644 (file)
@@ -1,4 +1,10 @@
 
+"""Manage a cache of downloaded files.
+
+@var DECOMPRESS_EXTS: a list of file extensions that need to be decompressed
+@var DECOMPRESS_FILES: a list of file names that need to be decompressed
+"""
+
 from bz2 import BZ2Decompressor
 from zlib import decompressobj, MAX_WBITS
 from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT
@@ -14,20 +20,46 @@ from twisted.web2.http import splitHostPort
 
 from Hash import HashObject
 
-aptpkg_dir='apt-packages'
-
 DECOMPRESS_EXTS = ['.gz', '.bz2']
 DECOMPRESS_FILES = ['release', 'sources', 'packages']
 
 class ProxyFileStream(stream.SimpleStream):
-    """Saves a stream to a file while providing a new stream."""
+    """Saves a stream to a file while providing a new stream.
+    
+    Also optionally decompresses the file while it is being downloaded.
+    
+    @type stream: L{twisted.web2.stream.IByteStream}
+    @ivar stream: the input stream being read
+    @type outFile: L{twisted.python.filepath.FilePath}
+    @ivar outFile: the file being written
+    @type hash: L{Hash.HashObject}
+    @ivar hash: the hash object for the file
+    @type gzfile: C{file}
+    @ivar gzfile: the open file to write decompressed gzip data to
+    @type gzdec: L{zlib.decompressobj}
+    @ivar gzdec: the decompressor to use for the compressed gzip data
+    @type gzheader: C{boolean}
+    @ivar gzheader: whether the gzip header still needs to be removed from
+        the zlib compressed data
+    @type bz2file: C{file}
+    @ivar bz2file: the open file to write decompressed bz2 data to
+    @type bz2dec: L{bz2.BZ2Decompressor}
+    @ivar bz2dec: the decompressor to use for the compressed bz2 data
+    @type length: C{int}
+    @ivar length: the length of the original (compressed) file
+    @type doneDefer: L{twisted.internet.defer.Deferred}
+    @ivar doneDefer: the deferred that will fire when done streaming
+    
+    @group Stream implementation: read, close
+    
+    """
     
     def __init__(self, stream, outFile, hash, decompress = None, decFile = None):
         """Initializes the proxy.
         
-        @type stream: C{twisted.web2.stream.IByteStream}
+        @type stream: L{twisted.web2.stream.IByteStream}
         @param stream: the input stream to read from
-        @type outFile: C{twisted.python.FilePath}
+        @type outFile: L{twisted.python.filepath.FilePath}
         @param outFile: the file to write to
         @type hash: L{Hash.HashObject}
         @param hash: the hash object to use for the file
@@ -51,15 +83,15 @@ class ProxyFileStream(stream.SimpleStream):
             self.bz2file = decFile.open('w')
             self.bz2dec = BZ2Decompressor()
         self.length = self.stream.length
-        self.start = 0
         self.doneDefer = defer.Deferred()
 
     def _done(self):
-        """Close the output file."""
+        """Close all the output files, return the result."""
         if not self.outFile.closed:
             self.outFile.close()
             self.hash.digest()
             if self.gzfile:
+                # Finish the decompression
                 data_dec = self.gzdec.flush()
                 self.gzfile.write(data_dec)
                 self.gzfile.close()
@@ -75,6 +107,7 @@ class ProxyFileStream(stream.SimpleStream):
         if self.outFile.closed:
             return None
         
+        # Read data from the stream, deal with the possible deferred
         data = self.stream.read()
         if isinstance(data, defer.Deferred):
             data.addCallbacks(self._write, self._done)
@@ -84,15 +117,22 @@ class ProxyFileStream(stream.SimpleStream):
         return data
     
     def _write(self, data):
-        """Write the stream data to the file and return it for others to use."""
+        """Write the stream data to the file and return it for others to use.
+        
+        Also optionally decompresses it.
+        """
         if data is None:
             self._done()
             return data
         
+        # Write and hash the streamed data
         self.outFile.write(data)
         self.hash.update(data)
+        
         if self.gzfile:
+            # Decompress the zlib portion of the file
             if self.gzheader:
+                # Remove the gzip header junk
                 self.gzheader = False
                 new_data = self._remove_gzip_header(data)
                 dec_data = self.gzdec.decompress(new_data)
@@ -100,11 +140,15 @@ class ProxyFileStream(stream.SimpleStream):
                 dec_data = self.gzdec.decompress(data)
             self.gzfile.write(dec_data)
         if self.bz2file:
+            # Decompress the bz2 file
             dec_data = self.bz2dec.decompress(data)
             self.bz2file.write(dec_data)
+
         return data
     
     def _remove_gzip_header(self, data):
+        """Remove the gzip header from the zlib compressed data."""
+        # Read, check & discard the header fields
         if data[:2] != '\037\213':
             raise IOError, 'Not a gzipped file'
         if ord(data[2]) != 8:
@@ -116,7 +160,7 @@ class ProxyFileStream(stream.SimpleStream):
 
         skip = 10
         if flag & FEXTRA:
-            # Read & discard the extra field, if present
+            # Read & discard the extra field
             xlen = ord(data[10])
             xlen = xlen + 256*ord(data[11])
             skip = skip + 2 + xlen
@@ -136,6 +180,7 @@ class ProxyFileStream(stream.SimpleStream):
             skip += 1
         if flag & FHCRC:
             skip += 2     # Read & discard the 16-bit header CRC
+
         return data[skip:]
 
     def close(self):
@@ -145,9 +190,36 @@ class ProxyFileStream(stream.SimpleStream):
         self.stream.close()
 
 class CacheManager:
-    """Manages all requests for cached objects."""
+    """Manages all downloaded files and requests for cached objects.
+    
+    @type cache_dir: L{twisted.python.filepath.FilePath}
+    @ivar cache_dir: the directory to use for storing all files
+    @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
+    @ivar other_dirs: the other directories that have shared files in them
+    @type all_dirs: C{list} of L{twisted.python.filepath.FilePath}
+    @ivar all_dirs: all the directories that have cached files in them
+    @type db: L{db.DB}
+    @ivar db: the database to use for tracking files and hashes
+    @type manager: L{apt_dht.AptDHT}
+    @ivar manager: the main program object to send requests to
+    @type scanning: C{list} of L{twisted.python.filepath.FilePath}
+    @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
+    """
     
     def __init__(self, cache_dir, db, other_dirs = [], manager = None):
+        """Initialize the instance and remove any untracked files from the DB..
+        
+        @type cache_dir: L{twisted.python.filepath.FilePath}
+        @param cache_dir: the directory to use for storing all files
+        @type db: L{db.DB}
+        @param db: the database to use for tracking files and hashes
+        @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
+        @param other_dirs: the other directories that have shared files in them
+            (optional, defaults to only using the cache directory)
+        @type manager: L{apt_dht.AptDHT}
+        @param manager: the main program object to send requests to
+            (optional, defaults to not calling back with cached files)
+        """
         self.cache_dir = cache_dir
         self.other_dirs = other_dirs
         self.all_dirs = self.other_dirs[:]
@@ -159,7 +231,7 @@ class CacheManager:
         # Init the database, remove old files
         self.db.removeUntrackedFiles(self.all_dirs)
         
-        
+    #{ Scanning directories
     def scanDirectories(self):
         """Scan the cache directories, hashing new and rehashing changed files."""
         assert not self.scanning, "a directory scan is already under way"
@@ -167,7 +239,14 @@ class CacheManager:
         self._scanDirectories()
 
     def _scanDirectories(self, result = None, walker = None):
-        # Need to start waling a new directory
+        """Walk each directory looking for cached files.
+        
+        @param result: the result of a DHT store request, not used (optional)
+        @param walker: the walker to use to traverse the current directory
+            (optional, defaults to creating a new walker from the first
+            directory in the L{CacheManager.scanning} list)
+        """
+        # Need to start walking a new directory
         if walker is None:
             # If there are any left, get them
             if self.scanning:
@@ -218,41 +297,61 @@ class CacheManager:
         df.addErrback(log.err)
     
     def _doneHashing(self, result, file, walker):
-    
+        """If successful, add the hashed file to the DB and inform the main program."""
         if isinstance(result, HashObject):
             log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+            
+            # Only set a URL if this is a downloaded file
             url = None
             if self.scanning[0] == self.cache_dir:
                 url = 'http:/' + file.path[len(self.cache_dir.path):]
+                
+            # Store the hashed file in the database
             new_hash = self.db.storeFile(file, result.digest())
+            
+            # Tell the main program to handle the new cache file
             df = self.manager.new_cached_file(file, result, new_hash, url, True)
             if df is None:
                 reactor.callLater(0, self._scanDirectories, None, walker)
             else:
                 df.addBoth(self._scanDirectories, walker)
         else:
+            # Must have returned an error
             log.msg('hash check of %s failed' % file.path)
             log.err(result)
             reactor.callLater(0, self._scanDirectories, None, walker)
 
+    #{ Downloading files
     def save_file(self, response, hash, url):
-        """Save a downloaded file to the cache and stream it."""
+        """Save a downloaded file to the cache and stream it.
+        
+        @type response: L{twisted.web2.http.Response}
+        @param response: the response from the download
+        @type hash: L{Hash.HashObject}
+        @param hash: the hash object containing the expected hash for the file
+        @param url: the URI of the actual mirror request
+        @rtype: L{twisted.web2.http.Response}
+        @return: the final response from the download
+        """
         if response.code != 200:
             log.msg('File was not found (%r): %s' % (response, url))
             return response
         
         log.msg('Returning file: %s' % url)
-        
+
+        # Set the destination path for the file
         parsed = urlparse(url)
         destFile = self.cache_dir.preauthChild(parsed[1] + parsed[2])
         log.msg('Saving returned %r byte file to cache: %s' % (response.stream.length, destFile.path))
         
+        # Make sure there's a free place for the file
         if destFile.exists():
             log.msg('File already exists, removing: %s' % destFile.path)
             destFile.remove()
         elif not destFile.parent().exists():
             destFile.parent().makedirs()
-            
+
+        # Determine whether it needs to be decompressed and how
         root, ext = os.path.splitext(destFile.basename())
         if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS:
             ext = ext.lower()
@@ -265,19 +364,35 @@ class CacheManager:
             ext = None
             decFile = None
             
+        # Create the new stream from the old one.
         orig_stream = response.stream
         response.stream = ProxyFileStream(orig_stream, destFile, hash, ext, decFile)
         response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
                                               response.headers.getHeader('Last-Modified'),
-                                              ext, decFile)
+                                              decFile)
         response.stream.doneDefer.addErrback(self.save_error, url)
+
+        # Return the modified response with the new stream
         return response
 
-    def _save_complete(self, hash, url, destFile, modtime = None, ext = None, decFile = None):
-        """Update the modification time and AptPackages."""
+    def _save_complete(self, hash, url, destFile, modtime = None, decFile = None):
+        """Update the modification time and inform the main program.
+        
+        @type hash: L{Hash.HashObject}
+        @param hash: the hash object containing the expected hash for the file
+        @param url: the URI of the actual mirror request
+        @type destFile: C{twisted.python.FilePath}
+        @param destFile: the file where the download was written to
+        @type modtime: C{int}
+        @param modtime: the modified time of the cached file (seconds since epoch)
+            (optional, defaults to not setting the modification time of the file)
+        @type decFile: C{twisted.python.FilePath}
+        @param decFile: the file where the decompressed download was written to
+            (optional, defaults to the file not having been compressed)
+        """
         if modtime:
             os.utime(destFile.path, (modtime, modtime))
-            if ext:
+            if decFile:
                 os.utime(decFile.path, (modtime, modtime))
         
         result = hash.verify()
@@ -292,12 +407,13 @@ class CacheManager:
 
             if self.manager:
                 self.manager.new_cached_file(destFile, hash, new_hash, url)
-                if ext:
-                    self.manager.new_cached_file(decFile, None, False, url[:-len(ext)])
+                if decFile:
+                    ext_len = len(destFile.path) - len(decFile.path)
+                    self.manager.new_cached_file(decFile, None, False, url[:-ext_len])
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()
-            if ext:
+            if decFile:
                 decFile.remove()
 
     def save_error(self, failure, url):
index 07241723a560c71a40c8dc2955e9b428d3e00d8e..f5d75a528555c166d0a7e53a5ae1c08083d57653 100644 (file)
@@ -123,7 +123,7 @@ DHT_DEFAULTS = {
 class AptDHTConfigParser(SafeConfigParser):
     """Adds 'gettime' and 'getstringlist' to ConfigParser objects.
     
-    @param time_multipliers: the 'gettime' suffixes and the multipliers needed
+    @ivar time_multipliers: the 'gettime' suffixes and the multipliers needed
         to convert them to seconds
     """
     
index 8cb2e69c95e2f421593c452f32d59ccec8b18d66..2c022c5a7d65e6f4b5bf2872bdc23b575c1de76a 100644 (file)
@@ -1,8 +1,5 @@
 
-"""
-Some interfaces that are used by the apt-dht classes.
-
-"""
+"""Some interfaces that are used by the apt-dht classes."""
 
 from zope.interface import Interface