From d2e2a977ff11ad786479c362a8ed716fbc10e04c Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 14 Jan 2008 21:55:22 -0800 Subject: [PATCH] CacheManager scans the cache directory during initialization. Actually the scanning is done after a successful join so that files can be added to the DHT. The HashObjects can now hash a file in a separate thread. The donwload cache is scanned for new files too, causing the decompressed versions of compressed Packages files to be hashed and added to the DHT. This isn't currently consistent since they are not added when they are first downloaded and decompressed. --- apt_dht/CacheManager.py | 87 +++++++++++++++++++++++++++++++++++++---- apt_dht/Hash.py | 52 ++++++++++++++++++------ apt_dht/apt_dht.py | 12 ++++-- 3 files changed, 130 insertions(+), 21 deletions(-) diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py index 368714d..4fdcf40 100644 --- a/apt_dht/CacheManager.py +++ b/apt_dht/CacheManager.py @@ -7,12 +7,12 @@ import os from twisted.python import log from twisted.python.filepath import FilePath -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.trial import unittest from twisted.web2 import stream from twisted.web2.http import splitHostPort -from AptPackages import AptPackages +from Hash import HashObject aptpkg_dir='apt-packages' @@ -151,7 +151,79 @@ class CacheManager: self.cache_dir = cache_dir self.db = db self.manager = manager + self.scanning = [] + + # Init the database, remove old files, init the HTTP dirs + self.db.removeUntrackedFiles([self.cache_dir]) + self.db.reconcileDirectories() + self.manager.setDirectories(self.db.getAllDirectories()) + + + def scanDirectories(self): + """Scan the cache directories, hashing new and rehashing changed files.""" + assert not self.scanning, "a directory scan is already under way" + self.scanning.append(self.cache_dir) + self._scanDirectories() + + def _scanDirectories(self, walker = None): + # Need to start waling a new directory + if walker is None: + # If there are any left, get them + if self.scanning: + log.msg('started scanning directory: %s' % self.scanning[0].path) + walker = self.scanning[0].walk() + else: + # Done, just check if the HTTP directories need updating + log.msg('cache directory scan complete') + if self.db.reconcileDirectories(): + self.manager.setDirectories(self.db.getAllDirectories()) + return + + try: + # Get the next file in the directory + file = walker.next() + except StopIteration: + # No files left, go to the next directory + log.msg('done scanning directory: %s' % self.scanning[0].path) + self.scanning.pop(0) + reactor.callLater(0, self._scanDirectories) + return + + # If it's not a file, or it's already properly in the DB, ignore it + if not file.isfile() or self.db.isUnchanged(file): + if not file.isfile(): + log.msg('entering directory: %s' % file.path) + else: + log.msg('file is unchanged: %s' % file.path) + reactor.callLater(0, self._scanDirectories, walker) + return + + # Otherwise hash it + log.msg('start hash checking file: %s' % file.path) + hash = HashObject() + df = hash.hashInThread(file) + df.addBoth(self._doneHashing, file, walker) + df.addErrback(log.err) + + def _doneHashing(self, result, file, walker): + reactor.callLater(0, self._scanDirectories, walker) + if isinstance(result, HashObject): + log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest())) + if self.scanning[0] == self.cache_dir: + mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0]) + urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir) + url = 'http:/' + file.path[len(self.cache_dir.path):] + else: + urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0]) + url = None + if newdir: + self.manager.setDirectories(self.db.getAllDirectories()) + self.manager.new_cached_file(file, result, urlpath, url) + else: + log.msg('hash check of %s failed' % file.path) + log.err(result) + def save_file(self, response, hash, url): """Save a downloaded file to the cache and stream it.""" if response.code != 200: @@ -207,14 +279,15 @@ class CacheManager: mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0]) urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir) log.msg('now avaliable at %s: %s' % (urlpath, url)) - if newdir and self.manager: - log.msg('A new web directory was created, so enable it') - self.manager.setDirectories(self.db.getAllDirectories()) if self.manager: - self.manager.new_cached_file(url, destFile, hash, urlpath) + if newdir: + log.msg('A new web directory was created, so enable it') + self.manager.setDirectories(self.db.getAllDirectories()) + + self.manager.new_cached_file(destFile, hash, urlpath, url) if ext: - self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath) + self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)]) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove() diff --git a/apt_dht/Hash.py b/apt_dht/Hash.py index bb993f1..2223876 100644 --- a/apt_dht/Hash.py +++ b/apt_dht/Hash.py @@ -2,8 +2,12 @@ from binascii import b2a_hex, a2b_hex import sys +from twisted.internet import threads, defer from twisted.trial import unittest +class HashError(ValueError): + """An error has occurred while hashing a file.""" + class HashObject: """Manages hashes and hashing for a file.""" @@ -53,7 +57,8 @@ class HashObject: if bits is not None: bytes = (bits - 1) // 8 + 1 else: - assert bytes is not None, "you must specify one of bits or bytes" + if bytes is None: + raise HashError, "you must specify one of bits or bytes" if len(hashString) < bytes: hashString = hashString + '\000'*(bytes - len(hashString)) elif len(hashString) > bytes: @@ -102,15 +107,18 @@ class HashObject: def update(self, data): """Add more data to the file hasher.""" if self.result is None: - assert self.done == False, "Already done, you can't add more data after calling digest() or verify()" - assert self.fileHasher is not None, "file hasher not initialized" + if self.done: + raise HashError, "Already done, you can't add more data after calling digest() or verify()" + if self.fileHasher is None: + raise HashError, "file hasher not initialized" self.fileHasher.update(data) self.size += len(data) def digest(self): """Get the hash of the added file data.""" if self.fileHash is None: - assert self.fileHasher is not None, "you must hash some data first" + if self.fileHasher is None: + raise HashError, "you must hash some data first" self.fileHash = self.fileHasher.digest() self.done = True return self.fileHash @@ -136,6 +144,28 @@ class HashObject: self.result = (self.fileHash == self.expHash and self.size == self.expSize) return self.result + def hashInThread(self, file): + """Hashes a file in a separate thread, callback with the result.""" + file.restat(False) + if not file.exists(): + df = defer.Deferred() + df.errback(HashError("file not found")) + return df + + df = threads.deferToThread(self._hashInThread, file) + return df + + def _hashInThread(self, file): + """Hashes a file, returning itself as the result.""" + f = file.open() + self.new(force = True) + data = f.read(4096) + while data: + self.update(data) + data = f.read(4096) + self.digest() + return self + #### Methods for setting the expected hash def set(self, hashType, hashHex, size): """Initialize the hash object. @@ -213,10 +243,10 @@ class TestHashObject(unittest.TestCase): def test_failure(self): h = HashObject() h.set(h.ORDER[0], b2a_hex('12345678901234567890'), '0') - self.failUnlessRaises(AssertionError, h.normexpected) - self.failUnlessRaises(AssertionError, h.digest) - self.failUnlessRaises(AssertionError, h.hexdigest) - self.failUnlessRaises(AssertionError, h.update, 'gfgf') + self.failUnlessRaises(HashError, h.normexpected) + self.failUnlessRaises(HashError, h.digest) + self.failUnlessRaises(HashError, h.hexdigest) + self.failUnlessRaises(HashError, h.update, 'gfgf') def test_sha1(self): h = HashObject() @@ -230,7 +260,7 @@ class TestHashObject(unittest.TestCase): h.new() h.update('apt-dht is the best') self.failUnless(h.hexdigest() == 'c722df87e1acaa64b27aac4e174077afc3623540') - self.failUnlessRaises(AssertionError, h.update, 'gfgf') + self.failUnlessRaises(HashError, h.update, 'gfgf') self.failUnless(h.verify() == True) def test_md5(self): @@ -245,7 +275,7 @@ class TestHashObject(unittest.TestCase): h.new() h.update('apt-dht is the best') self.failUnless(h.hexdigest() == '2a586bcd1befc5082c872dcd96a01403') - self.failUnlessRaises(AssertionError, h.update, 'gfgf') + self.failUnlessRaises(HashError, h.update, 'gfgf') self.failUnless(h.verify() == True) def test_sha256(self): @@ -260,7 +290,7 @@ class TestHashObject(unittest.TestCase): h.new() h.update('apt-dht is the best') self.failUnless(h.hexdigest() == '55b971f64d9772f733de03f23db39224f51a455cc5ad4c2db9d5740d2ab259a7') - self.failUnlessRaises(AssertionError, h.update, 'gfgf') + self.failUnlessRaises(HashError, h.update, 'gfgf') self.failUnless(h.verify() == True) if sys.version_info < (2, 5): diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index 1abeaa5..2211c1d 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -30,12 +30,12 @@ class AptDHT: self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) self.http_server = TopLevel(self.cache_dir.child(download_dir), self) + self.setDirectories = self.http_server.setDirectories self.http_site = server.Site(self.http_server) self.peers = PeerManager() self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_addr = None - self.setDirectories = self.http_server.setDirectories def getSite(self): return self.http_site @@ -44,6 +44,7 @@ class AptDHT: self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT')) if not self.my_addr: raise RuntimeError, "IP address for this machine could not be found" + self.cache.scanDirectories() def joinError(self, failure): log.msg("joining DHT failed miserably") @@ -113,8 +114,13 @@ class AptDHT: return getDefer return response - def new_cached_file(self, url, file_path, hash, urlpath): - self.mirrors.updatedFile(url, file_path) + def new_cached_file(self, file_path, hash, urlpath, url = None): + """Add a newly cached file to the DHT. + + If the file was downloaded, set url to the path it was downloaded for. + """ + if url: + self.mirrors.updatedFile(url, file_path) if self.my_addr and hash: site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT')) -- 2.30.2