From 48b7787f8a2c434d67eb92d1d8bc6e97e9ecb755 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 7 Jan 2008 00:00:57 -0800 Subject: [PATCH] Decompress needed files while downloading them. Decompress the files so they can be tracked by AptPackages and used to generate hashes. --- apt_dht/AptPackages.py | 4 +- apt_dht/MirrorManager.py | 104 +++++++++++++++++++++++++++++++++++---- 2 files changed, 98 insertions(+), 10 deletions(-) diff --git a/apt_dht/AptPackages.py b/apt_dht/AptPackages.py index ce462b3..a2e743e 100644 --- a/apt_dht/AptPackages.py +++ b/apt_dht/AptPackages.py @@ -17,6 +17,8 @@ from apt import OpProgress apt_pkg.init() +TRACKED_FILES = ['release', 'sources', 'packages'] + class PackageFileList(DictMixin): """Manages a list of package files belonging to a backend. @@ -48,7 +50,7 @@ class PackageFileList(DictMixin): fake lists and sources.list. """ filename = cache_path.split('/')[-1] - if filename=="Packages" or filename=="Release" or filename=="Sources": + if filename.lower() in TRACKED_FILES: log.msg("Registering package file: "+cache_path) self.packages[cache_path] = file_path return True diff --git a/apt_dht/MirrorManager.py b/apt_dht/MirrorManager.py index e5b8b1c..efa46d8 100644 --- a/apt_dht/MirrorManager.py +++ b/apt_dht/MirrorManager.py @@ -1,4 +1,7 @@ +from bz2 import BZ2Decompressor +from zlib import decompressobj, MAX_WBITS +from gzip import FCOMMENT, FEXTRA, FHCRC, FNAME, FTEXT from urlparse import urlparse import os @@ -12,22 +15,39 @@ from AptPackages import AptPackages aptpkg_dir='.apt-dht' +DECOMPRESS_EXTS = ['.gz', '.bz2'] +DECOMPRESS_FILES = ['release', 'sources', 'packages'] + class MirrorError(Exception): """Exception raised when there's a problem with the mirror.""" class ProxyFileStream(stream.SimpleStream): """Saves a stream to a file while providing a new stream.""" - def __init__(self, stream, outFile): + def __init__(self, stream, outFile, decompress = None, decFile = None): """Initializes the proxy. @type stream: C{twisted.web2.stream.IByteStream} @param stream: the input stream to read from @type outFile: C{twisted.python.filepath.FilePath} @param outFile: the file to write to + @type decompress: C{string} + @param decompress: also decompress the file as this type + (currently only '.gz' and '.bz2' are supported) + @type decFile: C{twisted.python.filepath.FilePath} + @param decFile: the file to write the decompressed data to """ self.stream = stream self.outFile = outFile.open('w') + self.gzfile = None + self.bz2file = None + if decompress == ".gz": + self.gzheader = True + self.gzfile = decFile.open('w') + self.gzdec = decompressobj(-MAX_WBITS) + elif decompress == ".bz2": + self.bz2file = decFile.open('w') + self.bz2dec = BZ2Decompressor() self.length = self.stream.length self.start = 0 self.doneDefer = defer.Deferred() @@ -36,6 +56,15 @@ class ProxyFileStream(stream.SimpleStream): """Close the output file.""" if not self.outFile.closed: self.outFile.close() + if self.gzfile: + data_dec = self.gzdec.flush() + self.gzfile.write(data_dec) + self.gzfile.close() + self.gzfile = None + if self.bz2file: + self.bz2file.close() + self.bz2file = None + self.doneDefer.callback(1) def read(self): @@ -58,8 +87,53 @@ class ProxyFileStream(stream.SimpleStream): return data self.outFile.write(data) + if self.gzfile: + if self.gzheader: + self.gzheader = False + new_data = self._remove_gzip_header(data) + dec_data = self.gzdec.decompress(new_data) + else: + dec_data = self.gzdec.decompress(data) + self.gzfile.write(dec_data) + if self.bz2file: + dec_data = self.bz2dec.decompress(data) + self.bz2file.write(dec_data) return data + def _remove_gzip_header(self, data): + if data[:2] != '\037\213': + raise IOError, 'Not a gzipped file' + if ord(data[2]) != 8: + raise IOError, 'Unknown compression method' + flag = ord(data[3]) + # modtime = self.fileobj.read(4) + # extraflag = self.fileobj.read(1) + # os = self.fileobj.read(1) + + skip = 10 + if flag & FEXTRA: + # Read & discard the extra field, if present + xlen = ord(data[10]) + xlen = xlen + 256*ord(data[11]) + skip = skip + 2 + xlen + if flag & FNAME: + # Read and discard a null-terminated string containing the filename + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FCOMMENT: + # Read and discard a null-terminated string containing a comment + while True: + if not data[skip] or data[skip] == '\000': + break + skip += 1 + skip += 1 + if flag & FHCRC: + skip += 2 # Read & discard the 16-bit header CRC + return data[skip:] + def close(self): """Clean everything up and return None to future reads.""" self.length = 0 @@ -112,11 +186,6 @@ class MirrorManager: site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_')) self.apt_caches[site][baseDir] = AptPackages(site_cache) - def updatedFile(self, url, file_path): - site, baseDir, path = self.extractPath(url) - self.init(site, baseDir) - self.apt_caches[site][baseDir].file_updated(path, file_path) - def findHash(self, url): site, baseDir, path = self.extractPath(url) if site in self.apt_caches and baseDir in self.apt_caches[site]: @@ -138,22 +207,39 @@ class MirrorManager: destFile.remove() else: destFile.parent().makedirs() + + root, ext = os.path.splitext(destFile.basename()) + if root.lower() in DECOMPRESS_FILES and ext.lower() in DECOMPRESS_EXTS: + ext = ext.lower() + decFile = destFile.sibling(root) + log.msg('Decompressing to: %s' % decFile.path) + if decFile.exists(): + log.msg('File already exists, removing: %s' % decFile.path) + decFile.remove() + else: + ext = None + decFile = None orig_stream = response.stream - response.stream = ProxyFileStream(orig_stream, destFile) + response.stream = ProxyFileStream(orig_stream, destFile, ext, decFile) response.stream.doneDefer.addCallback(self.save_complete, url, destFile, - response.headers.getHeader('Last-Modified')) + response.headers.getHeader('Last-Modified'), + ext, decFile) response.stream.doneDefer.addErrback(self.save_error, url) return response - def save_complete(self, result, url, destFile, modtime = None): + def save_complete(self, result, url, destFile, modtime = None, ext = None, decFile = None): """Update the modification time and AptPackages.""" if modtime: os.utime(destFile.path, (modtime, modtime)) + if ext: + os.utime(decFile.path, (modtime, modtime)) site, baseDir, path = self.extractPath(url) self.init(site, baseDir) self.apt_caches[site][baseDir].file_updated(path, destFile.path) + if ext: + self.apt_caches[site][baseDir].file_updated(path[:-len(ext)], decFile.path) def save_error(self, failure, url): """An error has occurred in downloadign or saving the file.""" -- 2.39.5