]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - AptPackages.py
Add a findHash function to AptPackages that searches everywhere for a hash.
[quix0rs-apt-p2p.git] / AptPackages.py
index 8d8e937f05dddb5639871d5ecbf02ae68f35b237..80899c5f32ae7f239e04819a85ad6e3688d54940 100644 (file)
@@ -307,6 +307,37 @@ class AptPackages:
     def cleanup(self):
         self.unload()
         self.packages.close()
+        
+    def findHash(self, path):
+        for release in self.indexrecords:
+            if path.startswith(release[:-7]):
+                for indexFile in self.indexrecords[release]:
+                    if release[:-7] + indexFile == path:
+                        return self.indexrecords[release][indexFile]['SHA1']
+        
+        if not self.load():
+            return (None, None)
+        
+        package = path.split('/')[-1].split('_')[0]
+        
+        try:
+            for version in self.cache[package].VersionList:
+                size = version.Size
+                for verFile in version.FileList:
+                    if self.records.Lookup(verFile):
+                        if self.records.FileName == path:
+                            return (self.records.SHA1Hash, size)
+        except KeyError:
+            pass
+        
+        self.srcrecords.Restart()
+        if self.srcrecords.Lookup(package):
+            for f in self.srcrecords.Files:
+                if path == f[2]:
+                    return (f[0], f[1])
+        
+        return (None, None)
+        
 
     def get_mirror_path(self, name, version):
         "Find the path for version 'version' of package 'name'"
@@ -543,6 +574,49 @@ class TestAptPackages(unittest.TestCase):
 
         self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash))
 
+    def test_findHash(self):
+        idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + 
+                            '/var/lib/apt/lists/' + self.releaseFile + 
+                            ' | grep -E " main/binary-i386/Packages.bz2$"'
+                            ' | head -n 1 | cut -d\  -f 2').read().rstrip('\n')
+        idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
+
+        found_hash = self.client.findHash(idx_path)
+        self.failUnless(found_hash[0] == idx_hash, 
+                        "Hashes don't match: %s != %s" % (found_hash[0], idx_hash))
+
+        pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.packagesFile + 
+                            ' | grep -E "^SHA1:" | head -n 1' + 
+                            ' | cut -d\  -f 2').read().rstrip('\n')
+        pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.packagesFile + 
+                            ' | grep -E "^Filename:" | head -n 1' + 
+                            ' | cut -d\  -f 2').read().rstrip('\n')
+
+        found_hash = self.client.findHash(pkg_path)
+        self.failUnless(found_hash[0] == pkg_hash, 
+                        "Hashes don't match: %s != %s" % (found_hash[0], pkg_hash))
+
+        src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.sourcesFile + 
+                            ' | grep -E "^Directory:" | head -n 1' + 
+                            ' | cut -d\  -f 2').read().rstrip('\n')
+        src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.sourcesFile + 
+                            ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + 
+                            ' | cut -d\  -f 2').read().split('\n')[:-1]
+        src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' + 
+                            '/var/lib/apt/lists/' + self.sourcesFile + 
+                            ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + 
+                            ' | cut -d\  -f 4').read().split('\n')[:-1]
+
+        for i in range(len(src_hashes)):
+            found_hash = self.client.findHash(src_dir + '/' + src_paths[i])
+            self.failUnless(found_hash[0] == src_hashes[i],
+                            "%s hashes don't match: %s != %s" % 
+                            (src_dir + '/' + src_paths[i], found_hash[0], src_hashes[i]))
+
     def tearDown(self):
         for p in self.pending_calls:
             if p.active():