From 90357051e0769aea188a30fb3341733e97b37d33 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Sun, 6 Jan 2008 18:55:31 -0800 Subject: [PATCH] Added caching of downloaded files to the MirrorManager. Also made the MirrorManager work with full URLs (http://...). Also added some log messages to the HTTPDownloader. --- apt_dht/HTTPDownloader.py | 3 + apt_dht/MirrorManager.py | 127 +++++++++++++++++++++++++++++++------- apt_dht/apt_dht.py | 17 ++--- 3 files changed, 117 insertions(+), 30 deletions(-) diff --git a/apt_dht/HTTPDownloader.py b/apt_dht/HTTPDownloader.py index cd3352f..f43b6e1 100644 --- a/apt_dht/HTTPDownloader.py +++ b/apt_dht/HTTPDownloader.py @@ -2,6 +2,7 @@ from twisted.internet import reactor, defer, protocol from twisted.internet.protocol import ClientFactory from twisted import version as twisted_version +from twisted.python import log from twisted.web2.client.interfaces import IHTTPClientManager from twisted.web2.client.http import ProtocolError, ClientRequest, HTTPClientProtocol from twisted.web2 import stream as stream_mod, http_headers @@ -80,10 +81,12 @@ class HTTPClientManager(ClientFactory): def requestComplete(self, resp): req = self.response_queue.pop(0) + log.msg('Download of %s completed with code %d' % (req.uri, resp.code)) req.deferRequest.callback(resp) def requestError(self, error): req = self.response_queue.pop(0) + log.msg('Download of %s generated error %r' % (req.uri, error)) req.deferRequest.errback(error) def clientBusy(self, proto): diff --git a/apt_dht/MirrorManager.py b/apt_dht/MirrorManager.py index 4f1c32f..e39e99b 100644 --- a/apt_dht/MirrorManager.py +++ b/apt_dht/MirrorManager.py @@ -1,9 +1,12 @@ +from urlparse import urlparse import os -from twisted.python import log +from twisted.python import log, filepath from twisted.internet import defer from twisted.trial import unittest +from twisted.web2 import stream +from twisted.web2.http import splitHostPort from AptPackages import AptPackages @@ -12,22 +15,67 @@ aptpkg_dir='.apt-dht' class MirrorError(Exception): """Exception raised when there's a problem with the mirror.""" +class ProxyFileStream(stream.SimpleStream): + """Saves a stream to a file while providing a new stream.""" + + def __init__(self, stream, outFile): + """Initializes the proxy. + + @type stream: C{twisted.web2.stream.IByteStream} + @param stream: the input stream to read from + @type outFile: C{twisted.python.filepath.FilePath} + @param outFile: the file to write to + """ + self.stream = stream + self.outFile = outFile.open('w') + self.length = self.stream.length + self.start = 0 + + def _done(self): + """Close the output file.""" + self.outFile.close() + + def read(self): + """Read some data from the stream.""" + if self.outFile.closed: + return None + + data = self.stream.read() + if isinstance(data, defer.Deferred): + data.addCallbacks(self._write, self._done) + return data + + self._write(data) + return data + + def _write(self, data): + """Write the stream data to the file and return it for others to use.""" + if data is None: + self._done() + return data + + self.outFile.write(data) + return data + + def close(self): + """Clean everything up and return None to future reads.""" + self.length = 0 + self._done() + self.stream.close() + class MirrorManager: """Manages all requests for mirror objects.""" def __init__(self, cache_dir): self.cache_dir = cache_dir + self.cache = filepath.FilePath(self.cache_dir) self.apt_caches = {} - def extractPath(self, path): - site, path = path.split('/',1) - if not site: - site, path = path.split('/',1) - path = '/'+path - - # Make sure a port is included for consistency - if site.find(':') < 0: - site = site + ":80" + def extractPath(self, url): + parsed = urlparse(url) + host, port = splitHostPort(parsed[0], parsed[1]) + site = host + ":" + str(port) + path = parsed[2] i = max(path.rfind('/dists/'), path.rfind('/pool/')) if i >= 0: @@ -51,6 +99,7 @@ class MirrorManager: baseDir = base_match log.msg("Settled on baseDir: %s" % baseDir) + log.msg("Parsing '%s' gave '%s', '%s', '%s'" % (url, site, baseDir, path)) return site, baseDir, path def init(self, site, baseDir): @@ -61,19 +110,46 @@ class MirrorManager: site_cache = os.path.join(self.cache_dir, aptpkg_dir, 'mirrors', site + baseDir.replace('/', '_')) self.apt_caches[site][baseDir] = AptPackages(site_cache) - def updatedFile(self, path, file_path): - site, baseDir, path = self.extractPath(path) + def updatedFile(self, url, file_path): + site, baseDir, path = self.extractPath(url) self.init(site, baseDir) self.apt_caches[site][baseDir].file_updated(path, file_path) - def findHash(self, path): - site, baseDir, path = self.extractPath(path) + def findHash(self, url): + log.msg('Trying to find hash for %s' % url) + site, baseDir, path = self.extractPath(url) if site in self.apt_caches and baseDir in self.apt_caches[site]: return self.apt_caches[site][baseDir].findHash(path) d = defer.Deferred() d.errback(MirrorError("Site Not Found")) return d + def save_file(self, response, hash, size, url): + """Save a downloaded file to the cache and stream it.""" + log.msg('Returning file: %s' % url) + + parsed = urlparse(url) + destFile = self.cache.preauthChild(parsed[1] + parsed[2]) + log.msg('Cache file: %s' % destFile.path) + + if destFile.exists(): + log.err('File already exists: %s', destFile.path) + d.callback(response) + return + + destFile.parent().makedirs() + log.msg('Saving returned %i byte file to: %s' % (response.stream.length, destFile.path)) + + orig_stream = response.stream + response.stream = ProxyFileStream(orig_stream, destFile) + return response + + def save_error(self, failure, url): + """An error has occurred in downloadign or saving the file.""" + log.msg('Error occurred downloading %s' % url) + log.err(failure) + return failure + class TestMirrorManager(unittest.TestCase): """Unit tests for the mirror manager.""" @@ -85,16 +161,21 @@ class TestMirrorManager(unittest.TestCase): self.client = MirrorManager('/tmp') def test_extractPath(self): - site, baseDir, path = self.client.extractPath('/ftp.us.debian.org/debian/dists/unstable/Release') + site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org/debian/dists/unstable/Release') self.failUnless(site == "ftp.us.debian.org:80", "no match: %s" % site) self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path) - site, baseDir, path = self.client.extractPath('/ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz') + site, baseDir, path = self.client.extractPath('http://ftp.us.debian.org:16999/debian/pool/d/dpkg/dpkg_1.2.1-1.tar.gz') self.failUnless(site == "ftp.us.debian.org:16999", "no match: %s" % site) self.failUnless(baseDir == "/debian", "no match: %s" % baseDir) self.failUnless(path == "/pool/d/dpkg/dpkg_1.2.1-1.tar.gz", "no match: %s" % path) + site, baseDir, path = self.client.extractPath('http://debian.camrdale.org/dists/unstable/Release') + self.failUnless(site == "debian.camrdale.org:80", "no match: %s" % site) + self.failUnless(baseDir == "", "no match: %s" % baseDir) + self.failUnless(path == "/dists/unstable/Release", "no match: %s" % path) + def verifyHash(self, found_hash, path, true_hash): self.failUnless(found_hash[0] == true_hash, "%s hashes don't match: %s != %s" % (path, found_hash[0], true_hash)) @@ -107,12 +188,12 @@ class TestMirrorManager(unittest.TestCase): self.releaseFile = f break - self.client.updatedFile('/' + self.releaseFile.replace('_','/'), + self.client.updatedFile('http://' + self.releaseFile.replace('_','/'), '/var/lib/apt/lists/' + self.releaseFile) - self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + + self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + self.packagesFile[self.packagesFile.find('_dists_')+1:].replace('_','/'), '/var/lib/apt/lists/' + self.packagesFile) - self.client.updatedFile('/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + + self.client.updatedFile('http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + self.sourcesFile[self.sourcesFile.find('_dists_')+1:].replace('_','/'), '/var/lib/apt/lists/' + self.sourcesFile) @@ -122,7 +203,7 @@ class TestMirrorManager(unittest.TestCase): '/var/lib/apt/lists/' + self.releaseFile + ' | grep -E " main/binary-i386/Packages.bz2$"' ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' + idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/binary-i386/Packages.bz2' d = self.client.findHash(idx_path) d.addCallback(self.verifyHash, idx_path, idx_hash) @@ -131,7 +212,7 @@ class TestMirrorManager(unittest.TestCase): '/var/lib/apt/lists/' + self.packagesFile + ' | grep -E "^SHA1:" | head -n 1' + ' | cut -d\ -f 2').read().rstrip('\n') - pkg_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \ + pkg_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + \ os.popen('grep -A 30 -E "^Package: dpkg$" ' + '/var/lib/apt/lists/' + self.packagesFile + ' | grep -E "^Filename:" | head -n 1' + @@ -154,7 +235,7 @@ class TestMirrorManager(unittest.TestCase): ' | cut -d\ -f 4').read().split('\n')[:-1] for i in range(len(src_hashes)): - src_path = '/' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i] + src_path = 'http://' + self.releaseFile[:self.releaseFile.find('_dists_')+1].replace('_','/') + src_dir + '/' + src_paths[i] d = self.client.findHash(src_path) d.addCallback(self.verifyHash, src_path, src_hashes[i]) @@ -162,7 +243,7 @@ class TestMirrorManager(unittest.TestCase): '/var/lib/apt/lists/' + self.releaseFile + ' | grep -E " main/source/Sources.bz2$"' ' | head -n 1 | cut -d\ -f 2').read().rstrip('\n') - idx_path = '/' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2' + idx_path = 'http://' + self.releaseFile.replace('_','/')[:-7] + 'main/source/Sources.bz2' d = self.client.findHash(idx_path) d.addCallback(self.verifyHash, idx_path, idx_hash) diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index 83c48b8..2e55d8e 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -45,13 +45,13 @@ class AptDHT: return d def findHash_error(self, failure, path, d): + log.err(failure) self.findHash_done((None, None), path, d) def findHash_done(self, (hash, size), path, d): if hash is None: log.msg('Hash for %s was not found' % path) - getDefer = self.peers.get([path]) - getDefer.addCallback(d.callback) + self.download_file([path], hash, size, path, d) else: log.msg('Found hash %s for %s' % (hash, path)) # Lookup hash from DHT @@ -61,11 +61,14 @@ class AptDHT: def lookupHash_done(self, locations, hash, size, path, d): if not locations: log.msg('Peers for %s were not found' % path) - getDefer = self.peers.get([path]) - getDefer.addCallback(d.callback) + self.download_file([path], hash, size, path, d) else: log.msg('Found peers for $s: %r' % (path, locations)) # Download from the found peers - getDefer = self.peers.get(locations) - getDefer.addCallback(d.callback) - \ No newline at end of file + self.download_file(locations, hash, size, path, d) + + def download_file(self, locations, hash, size, path, d): + getDefer = self.peers.get(locations) + getDefer.addCallback(self.mirrors.save_file, hash, size, path) + getDefer.addErrback(self.mirrors.save_error, path) + getDefer.addCallbacks(d.callback, d.errback) -- 2.39.2