# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-import apt_pkg, apt_inst, sys, os, stat
+import warnings
+warnings.simplefilter("ignore", FutureWarning)
+import apt_pkg, apt_inst, sys, os, stat, random
from os.path import dirname, basename
import re, shelve, shutil, fcntl
-from twisted.internet import process
+from twisted.internet import process, threads, defer
from twisted.python import log
import copy, UserDict
from twisted.trial import unittest
+from apt import OpProgress
aptpkg_dir='.apt-dht'
apt_pkg.init()
self.packages = PackageFileList(backendName, cache_dir)
self.indexrecords = {}
self.loaded = 0
+ self.loading = None
#print "Loaded aptPackages [%s] %s " % (self.backendName, self.cache_dir)
def __del__(self):
if self.packages.update_file(filename, cache_path, file_path):
self.unload()
- def __save_stdout(self):
- self.real_stdout_fd = os.dup(1)
- os.close(1)
-
- def __restore_stdout(self):
- os.dup2(self.real_stdout_fd, 1)
- os.close(self.real_stdout_fd)
- del self.real_stdout_fd
-
def load(self):
+ if self.loading is None:
+ self.loading = threads.deferToThread(self._load)
+ self.loading.addCallback(self.doneLoading)
+ return self.loading
+
+ def doneLoading(self, loadResult):
+ self.loading = None
+ return loadResult
+
+ def _load(self):
"""
Regenerates the fake configuration and load the packages server.
"""
# for I in apt_pkg.Config.keys():
# print "%s \"%s\";"%(I,apt_pkg.Config[I]);
- # apt_pkg prints progress messages to stdout, disable
- self.__save_stdout()
- try:
- self.cache = apt_pkg.GetCache()
- finally:
- self.__restore_stdout()
-
+ self.cache = apt_pkg.GetCache(OpProgress())
self.records = apt_pkg.GetPkgRecords(self.cache)
self.srcrecords = apt_pkg.GetPkgSrcRecords()
#for p in self.cache.Packages:
self.packages.close()
def findHash(self, path):
+ d = defer.Deferred()
+
for release in self.indexrecords:
if path.startswith(release[:-7]):
for indexFile in self.indexrecords[release]:
if release[:-7] + indexFile == path:
- return self.indexrecords[release][indexFile]['SHA1']
+ d.callback(self.indexrecords[release][indexFile]['SHA1'])
+ return d
- if not self.load():
- return (None, None)
+ deferLoad = self.load()
+ deferLoad.addCallback(self._findHash, path, d)
+
+ return d
+
+ def _findHash(self, loadResult, path, d):
+ if not loadResult:
+ d.callback((None, None))
+ return loadResult
package = path.split('/')[-1].split('_')[0]
for verFile in version.FileList:
if self.records.Lookup(verFile):
if self.records.FileName == path:
- return (self.records.SHA1Hash, size)
+ d.callback((self.records.SHA1Hash, size))
+ return loadResult
except KeyError:
pass
if self.srcrecords.Lookup(package):
for f in self.srcrecords.Files:
if path == f[2]:
- return (f[0], f[1])
-
- return (None, None)
+ d.callback((f[0], f[1]))
+ return loadResult
+ d.callback((None, None))
+ return loadResult
def get_mirror_path(self, name, version):
"Find the path for version 'version' of package 'name'"
self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'),
'/var/lib/apt/lists/' + self.sourcesFile)
- self.client.load()
-
def test_pkg_hash(self):
+ self.client._load()
+
self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0])
pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
"Hashes don't match: %s != %s" % (self.client.records.SHA1Hash, pkg_hash))
def test_src_hash(self):
+ self.client._load()
+
self.client.srcrecords.Lookup('dpkg')
src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
self.failUnless(f[0] in src_hashes, "Couldn't find %s in: %r" % (f[0], src_hashes))
def test_index_hash(self):
+ self.client._load()
+
indexhash = self.client.indexrecords[self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')]['main/binary-i386/Packages.bz2']['SHA1'][0]
idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash))
- def test_findHash(self):
+ def verifyHash(self, found_hash, path, true_hash):
+ self.failUnless(found_hash[0] == true_hash,
+ "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash))
+
+ def test_findIndexHash(self):
+ lastDefer = defer.Deferred()
+
+ idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
+ '/var/lib/apt/lists/' + self.releaseFile +
+ ' | grep -E " main/binary-i386/Packages.bz2$"'
+ ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
+ idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
+
+ d = self.client.findHash(idx_path)
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_findPkgHash(self):
+ lastDefer = defer.Deferred()
+
+ pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^SHA1:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ pkg_path = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^Filename:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+
+ d = self.client.findHash(pkg_path)
+ d.addCallback(self.verifyHash, pkg_path, pkg_hash)
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_findSrcHash(self):
+ lastDefer = defer.Deferred()
+
+ src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -E "^Directory:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+ src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 2').read().split('\n')[:-1]
+ src_paths = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 4').read().split('\n')[:-1]
+
+ i = random.choice(range(len(src_hashes)))
+ d = self.client.findHash(src_dir + '/' + src_paths[i])
+ d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i])
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
+
+ def test_multipleFindHash(self):
+ lastDefer = defer.Deferred()
+
idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
'/var/lib/apt/lists/' + self.releaseFile +
' | grep -E " main/binary-i386/Packages.bz2$"'
' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
idx_path = self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2'
- found_hash = self.client.findHash(idx_path)
- self.failUnless(found_hash[0] == idx_hash,
- "Hashes don't match: %s != %s" % (found_hash[0], idx_hash))
+ d = self.client.findHash(idx_path)
+ d.addCallback(self.verifyHash, idx_path, idx_hash)
pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
'/var/lib/apt/lists/' + self.packagesFile +
' | grep -E "^Filename:" | head -n 1' +
' | cut -d\ -f 2').read().rstrip('\n')
- found_hash = self.client.findHash(pkg_path)
- self.failUnless(found_hash[0] == pkg_hash,
- "Hashes don't match: %s != %s" % (found_hash[0], pkg_hash))
+ d = self.client.findHash(pkg_path)
+ d.addCallback(self.verifyHash, pkg_path, pkg_hash)
src_dir = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
'/var/lib/apt/lists/' + self.sourcesFile +
' | cut -d\ -f 4').read().split('\n')[:-1]
for i in range(len(src_hashes)):
- found_hash = self.client.findHash(src_dir + '/' + src_paths[i])
- self.failUnless(found_hash[0] == src_hashes[i],
- "%s hashes don't match: %s != %s" %
- (src_dir + '/' + src_paths[i], found_hash[0], src_hashes[i]))
+ d = self.client.findHash(src_dir + '/' + src_paths[i])
+ d.addCallback(self.verifyHash, src_dir + '/' + src_paths[i], src_hashes[i])
+
+ d.addCallback(lastDefer.callback)
+ return lastDefer
def tearDown(self):
for p in self.pending_calls: