X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht%2FCacheManager.py;h=c70cee257fda9e0b83be04a085ceb41ecd926b16;hp=3789eae679cc4ee236fa6df885785e29805b9249;hb=2e861b86ef341c92d9a8aebdf54742755c172f2f;hpb=7f4706b8aa3444a73eac587b317b62b68e2405be diff --git a/apt_dht/CacheManager.py b/apt_dht/CacheManager.py index 3789eae..c70cee2 100644 --- a/apt_dht/CacheManager.py +++ b/apt_dht/CacheManager.py @@ -7,12 +7,12 @@ import os from twisted.python import log from twisted.python.filepath import FilePath -from twisted.internet import defer +from twisted.internet import defer, reactor from twisted.trial import unittest from twisted.web2 import stream from twisted.web2.http import splitHostPort -from AptPackages import AptPackages +from Hash import HashObject aptpkg_dir='apt-packages' @@ -147,11 +147,80 @@ class ProxyFileStream(stream.SimpleStream): class CacheManager: """Manages all requests for cached objects.""" - def __init__(self, cache_dir, db, manager = None): + def __init__(self, cache_dir, db, other_dirs = [], manager = None): self.cache_dir = cache_dir + self.other_dirs = other_dirs + self.all_dirs = self.other_dirs[:] + self.all_dirs.insert(0, self.cache_dir) self.db = db self.manager = manager + self.scanning = [] + + # Init the database, remove old files + self.db.removeUntrackedFiles(self.all_dirs) + + + def scanDirectories(self): + """Scan the cache directories, hashing new and rehashing changed files.""" + assert not self.scanning, "a directory scan is already under way" + self.scanning = self.all_dirs[:] + self._scanDirectories() + + def _scanDirectories(self, result = None, walker = None): + # Need to start waling a new directory + if walker is None: + # If there are any left, get them + if self.scanning: + log.msg('started scanning directory: %s' % self.scanning[0].path) + walker = self.scanning[0].walk() + else: + log.msg('cache directory scan complete') + return + + try: + # Get the next file in the directory + file = walker.next() + except StopIteration: + # No files left, go to the next directory + log.msg('done scanning directory: %s' % self.scanning[0].path) + self.scanning.pop(0) + reactor.callLater(0, self._scanDirectories) + return + + # If it's not a file, or it's already properly in the DB, ignore it + if not file.isfile() or self.db.isUnchanged(file): + if not file.isfile(): + log.msg('entering directory: %s' % file.path) + else: + log.msg('file is unchanged: %s' % file.path) + reactor.callLater(0, self._scanDirectories, None, walker) + return + + # Otherwise hash it + log.msg('start hash checking file: %s' % file.path) + hash = HashObject() + df = hash.hashInThread(file) + df.addBoth(self._doneHashing, file, walker) + df.addErrback(log.err) + + def _doneHashing(self, result, file, walker): + if isinstance(result, HashObject): + log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest())) + url = None + if self.scanning[0] == self.cache_dir: + url = 'http:/' + file.path[len(self.cache_dir.path):] + self.db.storeFile(file, result.digest()) + df = self.manager.new_cached_file(file, result, url, True) + if df is None: + reactor.callLater(0, self._scanDirectories, None, walker) + else: + df.addBoth(self._scanDirectories, walker) + else: + log.msg('hash check of %s failed' % file.path) + log.err(result) + reactor.callLater(0, self._scanDirectories, None, walker) + def save_file(self, response, hash, url): """Save a downloaded file to the cache and stream it.""" if response.code != 200: @@ -204,16 +273,13 @@ class CacheManager: else: log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) - urlpath, newdir = self.db.storeFile(destFile, hash.digest(), self.cache_dir) - log.msg('now avaliable at %s: %s' % (urlpath, url)) - if newdir and self.manager: - log.msg('A new web directory was created, so enable it') - self.manager.setDirectories(self.db.getAllDirectories()) + self.db.storeFile(destFile, hash.digest()) + log.msg('now avaliable: %s' % (url)) if self.manager: - self.manager.new_cached_file(url, destFile, hash, urlpath) + self.manager.new_cached_file(destFile, hash, url) if ext: - self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath) + self.manager.new_cached_file(decFile, None, url[:-len(ext)]) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove()