Added support for source package hashes to AptPackages.
authorCameron Dale <camrdale@gmail.com>
Fri, 14 Dec 2007 02:57:23 +0000 (18:57 -0800)
committerCameron Dale <camrdale@gmail.com>
Fri, 14 Dec 2007 02:57:23 +0000 (18:57 -0800)
Also added a test for it.

AptPackages.py

index 27d34bfdfa019905e82c40de45609d7ca04ba0f2..d75bc405d2e6ebe989c2a3ef80b7d34e0bb94188 100644 (file)
@@ -77,7 +77,7 @@ class PackageFileList(UserDict.DictMixin):
         Called from apt_proxy.py when files get updated so we can update our
         fake lists/ directory and sources.list.
         """
-        if filename=="Packages" or filename=="Release":
+        if filename=="Packages" or filename=="Release" or filename=="Sources":
             log.msg("Registering package file: "+cache_path)
             self.packages[cache_path] = file_path
             return True
@@ -212,7 +212,10 @@ class AptPackages:
             # take into account the recorded mtime as optimization
             filepath = self.packages[f]
             fake_uri='http://apt-dht/'+f
-            source_line='deb '+dirname(fake_uri)+'/ /'
+            if f.endswith('Sources'):
+                source_line='deb-src '+dirname(fake_uri)+'/ /'
+            else:
+                source_line='deb '+dirname(fake_uri)+'/ /'
             listpath=(self.status_dir+'/apt/lists/'
                     +apt_pkg.URItoFileName(fake_uri))
             sources.write(source_line+'\n')
@@ -247,6 +250,7 @@ class AptPackages:
             self.__restore_stdout()
 
         self.records = apt_pkg.GetPkgRecords(self.cache)
+        self.srcrecords = apt_pkg.GetPkgSrcRecords()
         #for p in self.cache.Packages:
         #    print p
         #log.debug("%s packages found" % (len(self.cache)),'apt_pkg')
@@ -258,10 +262,12 @@ class AptPackages:
         if self.loaded:
             del self.cache
             del self.records
+            del self.srcrecords
             self.loaded = 0
 
     def cleanup(self):
         self.unload()
+        self.packages.close()
 
     def get_mirror_path(self, name, version):
         "Find the path for version 'version' of package 'name'"
@@ -439,29 +445,58 @@ class TestAptPackages(unittest.TestCase):
     """Unit tests for the AptPackages cache."""
     
     pending_calls = []
-
-    def test_sha1(self):
-        a = AptPackages('whatever', '/tmp')
+    client = None
+    packagesFile = ''
+    sourcesFile = ''
+    releaseFile = ''
     
-        packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | tail -n 1').read().rstrip('\n')
+    def setUp(self):
+        self.client = AptPackages('whatever', '/tmp')
+    
+        self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n')
+        self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n')
         for f in os.walk('/var/lib/apt/lists').next()[2]:
-            if f[-7:] == "Release" and packagesFile.startswith(f[:-7]):
-                releaseFile = f
+            if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]):
+                self.releaseFile = f
                 break
         
-        a.file_updated('Release', releaseFile[releaseFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + releaseFile)
-        a.file_updated('Packages', packagesFile[packagesFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + packagesFile)
-    
-        a.load()
+        self.client.file_updated('Release', 
+                                 self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/'), 
+                                 '/var/lib/apt/lists/' + self.releaseFile)
+        self.client.file_updated('Packages', 
+                                 self.packagesFile[self.packagesFile.find('_debian_')+1:].replace('_','/'), 
+                                 '/var/lib/apt/lists/' + self.packagesFile)
+        self.client.file_updated('Sources', 
+                                 self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'), 
+                                 '/var/lib/apt/lists/' + self.sourcesFile)
     
-        a.records.Lookup(a.cache['dpkg'].VersionList[0].FileList[0])
+        self.client.load()
+
+    def test_pkg_hash(self):
+        self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0])
         
-        pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + '/var/lib/apt/lists/' + packagesFile + ' | grep -E "^SHA1:" | head -n 1 | cut -d\  -f 2').read().rstrip('\n')
+        pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.packagesFile + 
+                            ' | grep -E "^SHA1:" | head -n 1' + 
+                            ' | cut -d\  -f 2').read().rstrip('\n')
+
+        self.failUnless(self.client.records.SHA1Hash == pkg_hash)
+
+    def test_src_hash(self):
+        self.client.srcrecords.Lookup('dpkg')
+
+        src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.sourcesFile + 
+                            ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + 
+                            ' | cut -d\  -f 2').read().split('\n')[:-1]
 
-        self.failUnless(a.records.SHA1Hash == pkg_hash)
+        for f in self.client.srcrecords.Files:
+            self.failUnless(f[0] in src_hashes)
 
     def tearDown(self):
         for p in self.pending_calls:
             if p.active():
                 p.cancel()
         self.pending_calls = []
+        self.client.cleanup()
+        self.client = None