from twisted.trial import unittest
aptpkg_dir='.apt-dht'
-apt_pkg.InitSystem()
+apt_pkg.init()
class AptDpkgInfo(UserDict.UserDict):
"""
Called from apt_proxy.py when files get updated so we can update our
fake lists/ directory and sources.list.
"""
- if filename=="Packages" or filename=="Release":
+ if filename=="Packages" or filename=="Release" or filename=="Sources":
log.msg("Registering package file: "+cache_path)
self.packages[cache_path] = file_path
return True
"""
DEFAULT_APT_CONFIG = {
#'APT' : '',
- 'APT::Architecture' : 'amd64', # TODO: Fix this, see bug #436011 and #285360
+ #'APT::Architecture' : 'amd64', # TODO: Fix this, see bug #436011 and #285360
#'APT::Default-Release' : 'unstable',
'Dir':'.', # /
#os.system('find '+self.status_dir+' -ls ')
#print "status:"+self.apt_config['Dir::State::status']
self.packages = PackageFileList(backendName, cache_dir)
+ self.indexrecords = {}
self.loaded = 0
#print "Loaded aptPackages [%s] %s " % (self.backendName, self.cache_dir)
#print "start aptPackages [%s] %s " % (self.backendName, self.cache_dir)
self.packages.close()
#print "Deleted aptPackages [%s] %s " % (self.backendName, self.cache_dir)
+
+ def addRelease(self, cache_path, file_path):
+ """
+ Dirty hack until python-apt supports apt-pkg/indexrecords.h
+ (see Bug #456141)
+ """
+ self.indexrecords[cache_path] = {}
+
+ read_packages = False
+ f = open(file_path, 'r')
+
+ for line in f:
+ line = line.rstrip()
+
+ if line[:1] != " ":
+ read_packages = False
+ try:
+ # Read the various headers from the file
+ h, v = line.split(":", 1)
+ if h == "MD5Sum" or h == "SHA1" or h == "SHA256":
+ read_packages = True
+ hash_type = h
+ except:
+ # Bad header line, just ignore it
+ log.msg("WARNING: Ignoring badly formatted Release line: %s" % line)
+
+ # Skip to the next line
+ continue
+
+ # Read file names from the multiple hash sections of the file
+ if read_packages:
+ p = line.split()
+ self.indexrecords[cache_path].setdefault(p[2], {})[hash_type] = (p[0], p[1])
+
+ f.close()
+
def file_updated(self, filename, cache_path, file_path):
"""
A file in the backend has changed. If this affects us, unload our apt database
"""
+ if filename == "Release":
+ self.addRelease(cache_path, file_path)
if self.packages.update_file(filename, cache_path, file_path):
self.unload()
# take into account the recorded mtime as optimization
filepath = self.packages[f]
fake_uri='http://apt-dht/'+f
- source_line='deb '+dirname(fake_uri)+'/ /'
+ if f.endswith('Sources'):
+ source_line='deb-src '+dirname(fake_uri)+'/ /'
+ else:
+ source_line='deb '+dirname(fake_uri)+'/ /'
listpath=(self.status_dir+'/apt/lists/'
+apt_pkg.URItoFileName(fake_uri))
sources.write(source_line+'\n')
self.__restore_stdout()
self.records = apt_pkg.GetPkgRecords(self.cache)
+ self.srcrecords = apt_pkg.GetPkgSrcRecords()
#for p in self.cache.Packages:
# print p
#log.debug("%s packages found" % (len(self.cache)),'apt_pkg')
if self.loaded:
del self.cache
del self.records
+ del self.srcrecords
self.loaded = 0
def cleanup(self):
self.unload()
+ self.packages.close()
def get_mirror_path(self, name, version):
"Find the path for version 'version' of package 'name'"
"""Unit tests for the AptPackages cache."""
pending_calls = []
-
- def test_sha1(self):
- a = AptPackages('whatever', '/tmp')
+ client = None
+ packagesFile = ''
+ sourcesFile = ''
+ releaseFile = ''
+
+ def setUp(self):
+ self.client = AptPackages('whatever', '/tmp')
- packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | tail -n 1').read().rstrip('\n')
+ self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n')
+ self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n')
for f in os.walk('/var/lib/apt/lists').next()[2]:
- if f[-7:] == "Release" and packagesFile.startswith(f[:-7]):
- releaseFile = f
+ if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]):
+ self.releaseFile = f
break
- a.file_updated('Release', releaseFile[releaseFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + releaseFile)
- a.file_updated('Packages', packagesFile[packagesFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + packagesFile)
-
- a.load()
+ self.client.file_updated('Release',
+ self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/'),
+ '/var/lib/apt/lists/' + self.releaseFile)
+ self.client.file_updated('Packages',
+ self.packagesFile[self.packagesFile.find('_debian_')+1:].replace('_','/'),
+ '/var/lib/apt/lists/' + self.packagesFile)
+ self.client.file_updated('Sources',
+ self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'),
+ '/var/lib/apt/lists/' + self.sourcesFile)
- a.records.Lookup(a.cache['dpkg'].VersionList[0].FileList[0])
+ self.client.load()
+
+ def test_pkg_hash(self):
+ self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0])
- pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + '/var/lib/apt/lists/' + packagesFile + ' | grep -E "^SHA1:" | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
+ pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.packagesFile +
+ ' | grep -E "^SHA1:" | head -n 1' +
+ ' | cut -d\ -f 2').read().rstrip('\n')
+
+ self.failUnless(self.client.records.SHA1Hash == pkg_hash,
+ "Hashes don't match: %s != %s" % (self.client.records.SHA1Hash, pkg_hash))
+
+ def test_src_hash(self):
+ self.client.srcrecords.Lookup('dpkg')
+
+ src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' +
+ '/var/lib/apt/lists/' + self.sourcesFile +
+ ' | grep -A 4 -E "^Files:" | grep -E "^ " ' +
+ ' | cut -d\ -f 2').read().split('\n')[:-1]
+
+ for f in self.client.srcrecords.Files:
+ self.failUnless(f[0] in src_hashes, "Couldn't find %s in: %r" % (f[0], src_hashes))
+
+ def test_index_hash(self):
+ indexhash = self.client.indexrecords[self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')]['main/binary-i386/Packages.bz2']['SHA1'][0]
+
+ idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' +
+ '/var/lib/apt/lists/' + self.releaseFile +
+ ' | grep -E " main/binary-i386/Packages.bz2$"'
+ ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n')
- self.failUnless(a.records.SHA1Hash == pkg_hash)
+ self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash))
def tearDown(self):
for p in self.pending_calls:
if p.active():
p.cancel()
self.pending_calls = []
+ self.client.cleanup()
+ self.client = None