+ def test_findIndexHash(self):
+ lastDefer = defer.Deferred()
+
+ idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
+ '/var/lib/apt/lists/' + self.releaseFile +
+ ' | grep -E " main/binary-i386/Packages.bz2$"'
+ ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
+ idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
+
+ d = self.client.findHash(idx_path)
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_findPkgHash(self):
+ lastDefer = defer.Deferred()
+
+ pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^SHA1:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^Filename:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+
+ d = self.client.findHash(pkg_path)
+ d.addCallback(self.verifyHash, pkg_path, pkg_hash)
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_findSrcHash(self):
+ lastDefer = defer.Deferred()
+
+ src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -E "^Directory:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 2').read().split('\n')[:-1]
+ src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 4').read().split('\n')[:-1]
+
+ i = random.choice(range(len(src_hashes)))
+ d = self.client.findHash(src_dir + '/' + src_paths[i])
+ d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i])
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_multipleFindHash(self):
+ lastDefer = defer.Deferred()
+
+ idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
+ '/var/lib/apt/lists/' + self.releaseFile +
+ ' | grep -E " main/binary-i386/Packages.bz2$"'
+ ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
+ idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
+
+ d = self.client.findHash(idx_path)
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
+
+ pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^SHA1:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^Filename:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+
+ d = self.client.findHash(pkg_path)
+ d.addCallback(self.verifyHash, pkg_path, pkg_hash)
+
+ src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -E "^Directory:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 2').read().split('\n')[:-1]
+ src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 4').read().split('\n')[:-1]
+
+ for i in range(len(src_hashes)):
+ d = self.client.findHash(src_dir + '/' + src_paths[i])
+ d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i])
+
+ idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
+ '/var/lib/apt/lists/' + self.releaseFile +
+ ' | grep -E " main/source/Sources.bz2$"'
+ ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
+ idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/source/Sources.bz2'
+
+ d = self.client.findHash(idx_path)
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def tearDown(self):
+ for p in self.pending_calls:
+ if p.active():
+ p.cancel()
+ self.pending_calls = []
+ self.client.cleanup()
+ self.client = None