X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=AptPackages.py;h=8d8e937f05dddb5639871d5ecbf02ae68f35b237;hb=d1507ff7d3cca77f48dffe460097f6ab3cd5d567;hp=6a01f40c7ecb69e241e39e32cbcabef2102eadb5;hpb=0232c5e783b9de582d131cd925d984dd7b9d0e0c;p=quix0rs-apt-p2p.git diff --git a/AptPackages.py b/AptPackages.py index 6a01f40..8d8e937 100644 --- a/AptPackages.py +++ b/AptPackages.py @@ -23,7 +23,7 @@ import copy, UserDict from twisted.trial import unittest aptpkg_dir='.apt-dht' -apt_pkg.InitSystem() +apt_pkg.init() class AptDpkgInfo(UserDict.UserDict): """ @@ -77,7 +77,7 @@ class PackageFileList(UserDict.DictMixin): Called from apt_proxy.py when files get updated so we can update our fake lists/ directory and sources.list. """ - if filename=="Packages" or filename=="Release": + if filename=="Packages" or filename=="Release" or filename=="Sources": log.msg("Registering package file: "+cache_path) self.packages[cache_path] = file_path return True @@ -107,7 +107,7 @@ class AptPackages: """ DEFAULT_APT_CONFIG = { #'APT' : '', - 'APT::Architecture' : 'amd64', # TODO: Fix this, see bug #436011 and #285360 + #'APT::Architecture' : 'amd64', # TODO: Fix this, see bug #436011 and #285360 #'APT::Default-Release' : 'unstable', 'Dir':'.', # / @@ -170,6 +170,7 @@ class AptPackages: #os.system('find '+self.status_dir+' -ls ') #print "status:"+self.apt_config['Dir::State::status'] self.packages = PackageFileList(backendName, cache_dir) + self.indexrecords = {} self.loaded = 0 #print "Loaded aptPackages [%s] %s " % (self.backendName, self.cache_dir) @@ -178,10 +179,48 @@ class AptPackages: #print "start aptPackages [%s] %s " % (self.backendName, self.cache_dir) self.packages.close() #print "Deleted aptPackages [%s] %s " % (self.backendName, self.cache_dir) + + def addRelease(self, cache_path, file_path): + """ + Dirty hack until python-apt supports apt-pkg/indexrecords.h + (see Bug #456141) + """ + self.indexrecords[cache_path] = {} + + read_packages = False + f = open(file_path, 'r') + + for line in f: + line = line.rstrip() + + if line[:1] != " ": + read_packages = False + try: + # Read the various headers from the file + h, v = line.split(":", 1) + if h == "MD5Sum" or h == "SHA1" or h == "SHA256": + read_packages = True + hash_type = h + except: + # Bad header line, just ignore it + log.msg("WARNING: Ignoring badly formatted Release line: %s" % line) + + # Skip to the next line + continue + + # Read file names from the multiple hash sections of the file + if read_packages: + p = line.split() + self.indexrecords[cache_path].setdefault(p[2], {})[hash_type] = (p[0], p[1]) + + f.close() + def file_updated(self, filename, cache_path, file_path): """ A file in the backend has changed. If this affects us, unload our apt database """ + if filename == "Release": + self.addRelease(cache_path, file_path) if self.packages.update_file(filename, cache_path, file_path): self.unload() @@ -212,7 +251,10 @@ class AptPackages: # take into account the recorded mtime as optimization filepath = self.packages[f] fake_uri='http://apt-dht/'+f - source_line='deb '+dirname(fake_uri)+'/ /' + if f.endswith('Sources'): + source_line='deb-src '+dirname(fake_uri)+'/ /' + else: + source_line='deb '+dirname(fake_uri)+'/ /' listpath=(self.status_dir+'/apt/lists/' +apt_pkg.URItoFileName(fake_uri)) sources.write(source_line+'\n') @@ -247,6 +289,7 @@ class AptPackages: self.__restore_stdout() self.records = apt_pkg.GetPkgRecords(self.cache) + self.srcrecords = apt_pkg.GetPkgSrcRecords() #for p in self.cache.Packages: # print p #log.debug("%s packages found" % (len(self.cache)),'apt_pkg') @@ -258,10 +301,12 @@ class AptPackages: if self.loaded: del self.cache del self.records + del self.srcrecords self.loaded = 0 def cleanup(self): self.unload() + self.packages.close() def get_mirror_path(self, name, version): "Find the path for version 'version' of package 'name'" @@ -439,29 +484,69 @@ class TestAptPackages(unittest.TestCase): """Unit tests for the AptPackages cache.""" pending_calls = [] - - def test_sha1(self): - a = AptPackages('whatever', '/tmp') + client = None + packagesFile = '' + sourcesFile = '' + releaseFile = '' + + def setUp(self): + self.client = AptPackages('whatever', '/tmp') - packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | tail -n 1').read().rstrip('\n') + self.packagesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Packages$" | tail -n 1').read().rstrip('\n') + self.sourcesFile = os.popen('ls -Sr /var/lib/apt/lists/ | grep -E "Sources$" | tail -n 1').read().rstrip('\n') for f in os.walk('/var/lib/apt/lists').next()[2]: - if f[-7:] == "Release" and packagesFile.startswith(f[:-7]): - releaseFile = f + if f[-7:] == "Release" and self.packagesFile.startswith(f[:-7]): + self.releaseFile = f break - a.file_updated('Release', releaseFile[releaseFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + releaseFile) - a.file_updated('Packages', packagesFile[packagesFile.find('_debian_')+1:].replace('_','/'), '/var/lib/apt/lists/' + packagesFile) - - a.load() + self.client.file_updated('Release', + self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.releaseFile) + self.client.file_updated('Packages', + self.packagesFile[self.packagesFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.packagesFile) + self.client.file_updated('Sources', + self.sourcesFile[self.sourcesFile.find('_debian_')+1:].replace('_','/'), + '/var/lib/apt/lists/' + self.sourcesFile) - a.records.Lookup(a.cache['dpkg'].VersionList[0].FileList[0]) + self.client.load() + + def test_pkg_hash(self): + self.client.records.Lookup(self.client.cache['dpkg'].VersionList[0].FileList[0]) - pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + '/var/lib/apt/lists/' + packagesFile + ' | grep -E "^SHA1:" | head -n 1 | cut -d\ -f 2').read().rstrip('\n') + pkg_hash = os.popen('grep -A 30 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.packagesFile + + ' | grep -E "^SHA1:" | head -n 1' + + ' | cut -d\ -f 2').read().rstrip('\n') + + self.failUnless(self.client.records.SHA1Hash == pkg_hash, + "Hashes don't match: %s != %s" % (self.client.records.SHA1Hash, pkg_hash)) + + def test_src_hash(self): + self.client.srcrecords.Lookup('dpkg') + + src_hashes = os.popen('grep -A 20 -E "^Package: dpkg$" ' + + '/var/lib/apt/lists/' + self.sourcesFile + + ' | grep -A 4 -E "^Files:" | grep -E "^ " ' + + ' | cut -d\ -f 2').read().split('\n')[:-1] + + for f in self.client.srcrecords.Files: + self.failUnless(f[0] in src_hashes, "Couldn't find %s in: %r" % (f[0], src_hashes)) + + def test_index_hash(self): + indexhash = self.client.indexrecords[self.releaseFile[self.releaseFile.find('_debian_')+1:].replace('_','/')]['main/binary-i386/Packages.bz2']['SHA1'][0] + + idx_hash = os.popen('grep -A 3000 -E "^SHA1:" ' + + '/var/lib/apt/lists/' + self.releaseFile + + ' | grep -E " main/binary-i386/Packages.bz2$"' + ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - self.failUnless(a.records.SHA1Hash == pkg_hash) + self.failUnless(indexhash == idx_hash, "Hashes don't match: %s != %s" % (indexhash, idx_hash)) def tearDown(self): for p in self.pending_calls: if p.active(): p.cancel() self.pending_calls = [] + self.client.cleanup() + self.client = None