]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Cleanup AptPackages code.
authorCameron Dale <camrdale@gmail.com>
Fri, 14 Dec 2007 23:53:54 +0000 (15:53 -0800)
committerCameron Dale <camrdale@gmail.com>
Fri, 14 Dec 2007 23:53:54 +0000 (15:53 -0800)
Cleanup and order the imports.
Remove unnnecessary parts.
Add comments and docstrings.

AptPackages.py

index c2be90b476f1af3ee33dd75c085dcfcfc0bffdbf..d0e9c0ad768815636d6f1aef92711c0bcc0ae3f5 100644 (file)
@@ -1,63 +1,27 @@
-#
-# Copyright (C) 2002 Manuel Estrada Sainz <ranty@debian.org>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of version 2.1 of the GNU Lesser General Public
-# License as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
+# Disable the FutureWarning from the apt module
 import warnings
 warnings.simplefilter("ignore", FutureWarning)
-import apt_pkg, apt_inst, sys, os, stat, random
-from os.path import dirname, basename
-import re, shelve, shutil, fcntl
-from twisted.internet import process, threads, defer
+
+import os, stat, random, re, shelve, shutil, fcntl, copy, UserDict
+from os.path import dirname
+
+from twisted.internet import threads, defer
 from twisted.python import log
-import copy, UserDict
 from twisted.trial import unittest
+
+import apt_pkg, apt_inst
 from apt import OpProgress
 
 aptpkg_dir='.apt-dht'
 apt_pkg.init()
 
-class AptDpkgInfo(UserDict.UserDict):
-    """
-    Gets control fields from a .deb file.
-
-    And then behaves like a regular python dictionary.
-
-    See AptPackages.get_mirror_path
-    """
-
-    def __init__(self, filename):
-        UserDict.UserDict.__init__(self)
-        try:
-            filehandle = open(filename);
-            try:
-                self.control = apt_inst.debExtractControl(filehandle)
-            finally:
-                # Make sure that file is always closed.
-                filehandle.close()
-        except SystemError:
-            log.msg("Had problems reading: %s"%(filename))
-            raise
-        for line in self.control.split('\n'):
-            if line.find(': ') != -1:
-                key, value = line.split(': ', 1)
-                self.data[key] = value
-
 class PackageFileList(UserDict.DictMixin):
+    """Manages a list of package files belonging to a backend.
+    
+    @type packages: C{shelve dictionary}
+    @ivar packages: the files stored for this backend
     """
-    Manages a list of package files belonging to a backend
-    """
+    
     def __init__(self, backendName, cache_dir):
         self.cache_dir = cache_dir
         self.packagedb_dir = cache_dir+'/'+ aptpkg_dir + \
@@ -68,17 +32,20 @@ class PackageFileList(UserDict.DictMixin):
         self.open()
 
     def open(self):
+        """Open the persistent dictionary of files in this backend."""
         if self.packages is None:
             self.packages = shelve.open(self.packagedb_dir+'/packages.db')
 
     def close(self):
+        """Close the persistent dictionary."""
         if self.packages is not None:
             self.packages.close()
 
     def update_file(self, filename, cache_path, file_path):
-        """
-        Called from apt_proxy.py when files get updated so we can update our
-        fake lists/ directory and sources.list.
+        """Check if an updated file needs to be tracked.
+
+        Called from the mirror manager when files get updated so we can update our
+        fake lists and sources.list.
         """
         if filename=="Packages" or filename=="Release" or filename=="Sources":
             log.msg("Registering package file: "+cache_path)
@@ -87,32 +54,29 @@ class PackageFileList(UserDict.DictMixin):
         return False
 
     def check_files(self):
-        """
-        Check all files in the database to make sure it exists.
-        """
+        """Check all files in the database to make sure they exist."""
         files = self.packages.keys()
-        #print self.packages.keys()
         for f in files:
             if not os.path.exists(self.packages[f]):
                 log.msg("File in packages database has been deleted: "+f)
                 del self.packages[f]
-                
+
+    # Standard dictionary implementation so this class can be used like a dictionary.
     def __getitem__(self, key): return self.packages[key]
     def __setitem__(self, key, item): self.packages[key] = item
     def __delitem__(self, key): del self.packages[key]
     def keys(self): return self.packages.keys()
 
 class AptPackages:
-    """
-    Uses AptPackagesServer to answer queries about packages.
+    """Uses python-apt to answer queries about packages.
 
     Makes a fake configuration for python-apt for each backend.
     """
+
     DEFAULT_APT_CONFIG = {
         #'APT' : '',
-        #'APT::Architecture' : 'amd64',  # TODO: Fix this, see bug #436011 and #285360
+        #'APT::Architecture' : 'i386',  # Commented so the machine's config will set this
         #'APT::Default-Release' : 'unstable',
-   
         'Dir':'.', # /
         'Dir::State' : 'apt/', # var/lib/apt/
         'Dir::State::Lists': 'lists/', # lists/
@@ -146,10 +110,10 @@ class AptPackages:
     essential_files = ('apt/dpkg/status', 'apt/etc/sources.list',)
         
     def __init__(self, backendName, cache_dir):
-        """
-        Construct new packages manager
-        backend: Name of backend associated with this packages file
-        cache_dir: cache directory from config file
+        """Construct a new packages manager.
+
+        @ivar backendName: name of backend associated with this packages file
+        @ivar cache_dir: cache directory from config file
         """
         self.backendName = backendName
         self.cache_dir = cache_dir
@@ -170,23 +134,17 @@ class AptPackages:
                 
         self.apt_config['Dir'] = self.status_dir
         self.apt_config['Dir::State::status'] = self.status_dir + '/apt/dpkg/status'
-        #os.system('find '+self.status_dir+' -ls ')
-        #print "status:"+self.apt_config['Dir::State::status']
         self.packages = PackageFileList(backendName, cache_dir)
         self.indexrecords = {}
         self.loaded = 0
         self.loading = None
-        #print "Loaded aptPackages [%s] %s " % (self.backendName, self.cache_dir)
         
     def __del__(self):
         self.cleanup()
-        #print "start aptPackages [%s] %s " % (self.backendName, self.cache_dir)
         self.packages.close()
-        #print "Deleted aptPackages [%s] %s " % (self.backendName, self.cache_dir)
         
     def addRelease(self, cache_path, file_path):
-        """
-        Dirty hack until python-apt supports apt-pkg/indexrecords.h
+        """Dirty hack until python-apt supports apt-pkg/indexrecords.h
         (see Bug #456141)
         """
         self.indexrecords[cache_path] = {}
@@ -220,8 +178,9 @@ class AptPackages:
         f.close()
 
     def file_updated(self, filename, cache_path, file_path):
-        """
-        A file in the backend has changed.  If this affects us, unload our apt database
+        """A file in the backend has changed, manage it.
+        
+        If this affects us, unload our apt database
         """
         if filename == "Release":
             self.addRelease(cache_path, file_path)
@@ -229,22 +188,22 @@ class AptPackages:
             self.unload()
 
     def load(self):
+        """Make sure the package is initialized and loaded."""
         if self.loading is None:
             self.loading = threads.deferToThread(self._load)
             self.loading.addCallback(self.doneLoading)
         return self.loading
         
     def doneLoading(self, loadResult):
+        """Cache is loaded."""
         self.loading = None
+        # Must pass on the result for the next callback
         return loadResult
         
     def _load(self):
-        """
-        Regenerates the fake configuration and load the packages server.
-        """
+        """Regenerates the fake configuration and load the packages cache."""
         if self.loaded: return True
         apt_pkg.InitSystem()
-        #print "Load:", self.status_dir
         shutil.rmtree(self.status_dir+'/apt/lists/')
         os.makedirs(self.status_dir+'/apt/lists/partial')
         sources_filename = self.status_dir+'/'+'apt/etc/sources.list'
@@ -271,7 +230,7 @@ class AptPackages:
                 os.unlink(listpath)
             except:
                 pass
-            os.symlink(self.packages[f], listpath)
+            os.symlink(filepath, listpath)
         sources.close()
 
         if sources_count == 0:
@@ -279,24 +238,18 @@ class AptPackages:
             return False
 
         log.msg("Loading Packages database for "+self.status_dir)
-        #apt_pkg.Config = apt_pkg.newConfiguration(); #-- this causes unit tests to fail!
         for key, value in self.apt_config.items():
             apt_pkg.Config[key] = value
-#         print "apt_pkg config:"
-#         for I in apt_pkg.Config.keys():
-#            print "%s \"%s\";"%(I,apt_pkg.Config[I]);
 
         self.cache = apt_pkg.GetCache(OpProgress())
         self.records = apt_pkg.GetPkgRecords(self.cache)
         self.srcrecords = apt_pkg.GetPkgSrcRecords()
-        #for p in self.cache.Packages:
-        #    print p
-        #log.debug("%s packages found" % (len(self.cache)),'apt_pkg')
+
         self.loaded = 1
         return True
 
     def unload(self):
-        "Tries to make the packages server quit."
+        """Tries to make the packages server quit."""
         if self.loaded:
             del self.cache
             del self.records
@@ -304,12 +257,18 @@ class AptPackages:
             self.loaded = 0
 
     def cleanup(self):
+        """Cleanup and close any loaded caches."""
         self.unload()
         self.packages.close()
         
     def findHash(self, path):
+        """Find the hash for a given path in this mirror.
+        
+        Returns a deferred so it can make sure the cache is loaded first.
+        """
         d = defer.Deferred()
 
+        # First look for the path in the cache of index files
         for release in self.indexrecords:
             if path.startswith(release[:-7]):
                 for indexFile in self.indexrecords[release]:
@@ -323,12 +282,18 @@ class AptPackages:
         return d
 
     def _findHash(self, loadResult, path, d):
+        """Really find the hash for a path.
+        
+        Have to pass the returned loadResult on in case other calls to this
+        function are pending.
+        """
         if not loadResult:
             d.callback((None, None))
             return loadResult
         
         package = path.split('/')[-1].split('_')[0]
-        
+
+        # Check the binary packages
         try:
             for version in self.cache[package].VersionList:
                 size = version.Size
@@ -339,7 +304,8 @@ class AptPackages:
                             return loadResult
         except KeyError:
             pass
-        
+
+        # Check the source packages' files
         self.srcrecords.Restart()
         if self.srcrecords.Lookup(package):
             for f in self.srcrecords.Files:
@@ -350,178 +316,6 @@ class AptPackages:
         d.callback((None, None))
         return loadResult
 
-    def get_mirror_path(self, name, version):
-        "Find the path for version 'version' of package 'name'"
-        if not self.load(): return None
-        try:
-            for pack_vers in self.cache[name].VersionList:
-                if(pack_vers.VerStr == version):
-                    file, index = pack_vers.FileList[0]
-                    self.records.Lookup((file,index))
-                    path = self.records.FileName
-                    if len(path)>2 and path[0:2] == './': 
-                        path = path[2:] # Remove any leading './'
-                    return path
-
-        except KeyError:
-            pass
-        return None
-      
-
-    def get_mirror_versions(self, package_name):
-        """
-        Find the available versions of the package name given
-        @type package_name: string
-        @param package_name: package name to search for e.g. ;apt'
-        @return: A list of mirror versions available
-
-        """
-        vers = []
-        if not self.load(): return vers
-        try:
-            for pack_vers in self.cache[package_name].VersionList:
-                vers.append(pack_vers.VerStr)
-        except KeyError:
-            pass
-        return vers
-
-
-def cleanup(factory):
-    for backend in factory.backends.values():
-        backend.get_packages_db().cleanup()
-
-def get_mirror_path(factory, file):
-    """
-    Look for the path of 'file' in all backends.
-    """
-    info = AptDpkgInfo(file)
-    paths = []
-    for backend in factory.backends.values():
-        path = backend.get_packages_db().get_mirror_path(info['Package'],
-                                                info['Version'])
-        if path:
-            paths.append('/'+backend.base+'/'+path)
-    return paths
-
-def get_mirror_versions(factory, package):
-    """
-    Look for the available version of a package in all backends, given
-    an existing package name
-    """
-    all_vers = []
-    for backend in factory.backends.values():
-        vers = backend.get_packages_db().get_mirror_versions(package)
-        for ver in vers:
-            path = backend.get_packages_db().get_mirror_path(package, ver)
-            all_vers.append((ver, "%s/%s"%(backend.base,path)))
-    return all_vers
-
-def closest_match(info, others):
-    def compare(a, b):
-        return apt_pkg.VersionCompare(a[0], b[0])
-
-    others.sort(compare)
-    version = info['Version']
-    match = None
-    for ver,path in others:
-        if version <= ver:
-            match = path
-            break
-    if not match:
-        if not others:
-            return None
-        match = others[-1][1]
-
-    dirname=re.sub(r'/[^/]*$', '', match)
-    version=re.sub(r'^[^:]*:', '', info['Version'])
-    if dirname.find('/pool/') != -1:
-        return "/%s/%s_%s_%s.deb"%(dirname, info['Package'],
-                                  version, info['Architecture'])
-    else:
-        return "/%s/%s_%s.deb"%(dirname, info['Package'], version)
-
-def import_directory(factory, dir, recursive=0):
-    """
-    Import all files in a given directory into the cache
-    This is used by apt-proxy-import to import new files
-    into the cache
-    """
-    imported_count  = 0
-
-    if not os.path.exists(dir):
-        log.err('Directory ' + dir + ' does not exist')
-        return
-
-    if recursive:    
-        log.msg("Importing packages from directory tree: " + dir)
-        for root, dirs, files in os.walk(dir):
-            for file in files:
-                imported_count += import_file(factory, root, file)
-    else:
-        log.msg("Importing packages from directory: " + dir)
-        for file in os.listdir(dir):
-            mode = os.stat(dir + '/' + file)[stat.ST_MODE]
-            if not stat.S_ISDIR(mode):
-                imported_count += import_file(factory, dir, file)
-
-    for backend in factory.backends.values():
-        backend.get_packages_db().unload()
-
-    log.msg("Imported %s files" % (imported_count))
-    return imported_count
-
-def import_file(factory, dir, file):
-    """
-    Import a .deb or .udeb into cache from given filename
-    """
-    if file[-4:]!='.deb' and file[-5:]!='.udeb':
-        log.msg("Ignoring (unknown file type):"+ file)
-        return 0
-    
-    log.msg("considering: " + dir + '/' + file)
-    try:
-        paths = get_mirror_path(factory, dir+'/'+file)
-    except SystemError:
-        log.msg(file + ' skipped - wrong format or corrupted')
-        return 0
-    if paths:
-        if len(paths) != 1:
-            log.msg("WARNING: multiple ocurrences")
-            log.msg(str(paths), 'import')
-        cache_path = paths[0]
-    else:
-        log.msg("Not found, trying to guess")
-        info = AptDpkgInfo(dir+'/'+file)
-        cache_path = closest_match(info,
-                                get_mirror_versions(factory, info['Package']))
-    if cache_path:
-        log.msg("MIRROR_PATH:"+ cache_path)
-        src_path = dir+'/'+file
-        dest_path = factory.config.cache_dir+cache_path
-        
-        if not os.path.exists(dest_path):
-            log.msg("IMPORTING:" + src_path)
-            dest_path = re.sub(r'/\./', '/', dest_path)
-            if not os.path.exists(dirname(dest_path)):
-                os.makedirs(dirname(dest_path))
-            f = open(dest_path, 'w')
-            fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
-            f.truncate(0)
-            shutil.copy2(src_path, dest_path)
-            f.close()
-            if hasattr(factory, 'access_times'):
-                atime = os.stat(src_path)[stat.ST_ATIME]
-                factory.access_times[cache_path] = atime
-            log.msg(file + ' imported')
-            return 1
-        else:
-            log.msg(file + ' skipped - already in cache')
-            return 0
-
-    else:
-        log.msg(file + ' skipped - no suitable backend found')
-        return 0
-            
 class TestAptPackages(unittest.TestCase):
     """Unit tests for the AptPackages cache."""