X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=apt_p2p%2FCacheManager.py;h=b991093f61c3266c50f27cb93188854abb7960bf;hb=53b39e4234ad1ce483aafcd96d4d711d80fc67de;hp=a990768dfc3d7f7d906ace58b67f53f303bef1b8;hpb=d989fd732226930085c417a452dab4a5353fead6;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/CacheManager.py b/apt_p2p/CacheManager.py index a990768..b991093 100644 --- a/apt_p2p/CacheManager.py +++ b/apt_p2p/CacheManager.py @@ -19,10 +19,14 @@ from twisted.web2 import stream from twisted.web2.http import splitHostPort from Hash import HashObject +from apt_p2p_conf import config DECOMPRESS_EXTS = ['.gz', '.bz2'] DECOMPRESS_FILES = ['release', 'sources', 'packages'] +class CacheError(Exception): + """Error occurred downloading a file to the cache.""" + class ProxyFileStream(stream.SimpleStream): """Saves a stream to a file while providing a new stream. @@ -99,9 +103,14 @@ class ProxyFileStream(stream.SimpleStream): if self.bz2file: self.bz2file.close() self.bz2file = None - - self.doneDefer.callback(self.hash) + def _error(self, err): + """Close all the output files, return the error.""" + if not self.outFile.closed: + self._done() + self.stream.close() + self.doneDefer.errback(err) + def read(self): """Read some data from the stream.""" if self.outFile.closed: @@ -110,7 +119,7 @@ class ProxyFileStream(stream.SimpleStream): # Read data from the stream, deal with the possible deferred data = self.stream.read() if isinstance(data, defer.Deferred): - data.addCallbacks(self._write, self._done) + data.addCallbacks(self._write, self._error) return data self._write(data) @@ -122,7 +131,9 @@ class ProxyFileStream(stream.SimpleStream): Also optionally decompresses it. """ if data is None: - self._done() + if not self.outFile.closed: + self._done() + self.doneDefer.callback(self.hash) return data # Write and hash the streamed data @@ -185,8 +196,13 @@ class ProxyFileStream(stream.SimpleStream): def close(self): """Clean everything up and return None to future reads.""" + log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length)) + if self.hash.size < self.length: + self._error(CacheError('Prematurely closed, all data was not written')) + elif not self.outFile.closed: + self._done() + self.doneDefer.callback(self.hash) self.length = 0 - self._done() self.stream.close() class CacheManager: @@ -206,22 +222,19 @@ class CacheManager: @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned """ - def __init__(self, cache_dir, db, other_dirs = [], manager = None): + def __init__(self, cache_dir, db, manager = None): """Initialize the instance and remove any untracked files from the DB.. @type cache_dir: L{twisted.python.filepath.FilePath} @param cache_dir: the directory to use for storing all files @type db: L{db.DB} @param db: the database to use for tracking files and hashes - @type other_dirs: C{list} of L{twisted.python.filepath.FilePath} - @param other_dirs: the other directories that have shared files in them - (optional, defaults to only using the cache directory) @type manager: L{apt_p2p.AptP2P} @param manager: the main program object to send requests to (optional, defaults to not calling back with cached files) """ self.cache_dir = cache_dir - self.other_dirs = other_dirs + self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')] self.all_dirs = self.other_dirs[:] self.all_dirs.insert(0, self.cache_dir) self.db = db @@ -232,7 +245,7 @@ class CacheManager: self.db.removeUntrackedFiles(self.all_dirs) #{ Scanning directories - def scanDirectories(self): + def scanDirectories(self, result = None): """Scan the cache directories, hashing new and rehashing changed files.""" assert not self.scanning, "a directory scan is already under way" self.scanning = self.all_dirs[:] @@ -268,14 +281,12 @@ class CacheManager: # If it's not a file ignore it if not file.isfile(): - log.msg('entering directory: %s' % file.path) reactor.callLater(0, self._scanDirectories, None, walker) return # If it's already properly in the DB, ignore it db_status = self.db.isUnchanged(file) if db_status: - log.msg('file is unchanged: %s' % file.path) reactor.callLater(0, self._scanDirectories, None, walker) return @@ -294,7 +305,6 @@ class CacheManager: hash = HashObject() df = hash.hashInThread(file) df.addBoth(self._doneHashing, file, walker) - df.addErrback(log.err) def _doneHashing(self, result, file, walker): """If successful, add the hashed file to the DB and inform the main program.""" @@ -307,7 +317,7 @@ class CacheManager: url = 'http:/' + file.path[len(self.cache_dir.path):] # Store the hashed file in the database - new_hash = self.db.storeFile(file, result.digest(), + new_hash = self.db.storeFile(file, result.digest(), True, ''.join(result.pieceDigests())) # Tell the main program to handle the new cache file @@ -349,7 +359,7 @@ class CacheManager: if destFile.exists(): log.msg('File already exists, removing: %s' % destFile.path) destFile.remove() - elif not destFile.parent().exists(): + if not destFile.parent().exists(): destFile.parent().makedirs() # Determine whether it needs to be decompressed and how @@ -371,7 +381,7 @@ class CacheManager: response.stream.doneDefer.addCallback(self._save_complete, url, destFile, response.headers.getHeader('Last-Modified'), decFile) - response.stream.doneDefer.addErrback(self.save_error, url) + response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile) # Return the modified response with the new stream return response @@ -391,35 +401,53 @@ class CacheManager: @param decFile: the file where the decompressed download was written to (optional, defaults to the file not having been compressed) """ - if modtime: - os.utime(destFile.path, (modtime, modtime)) - if decFile: - os.utime(decFile.path, (modtime, modtime)) - result = hash.verify() if result or result is None: + if modtime: + os.utime(destFile.path, (modtime, modtime)) + if result: log.msg('Hashes match: %s' % url) + dht = True else: log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url)) + dht = False - new_hash = self.db.storeFile(destFile, hash.digest(), + new_hash = self.db.storeFile(destFile, hash.digest(), dht, ''.join(hash.pieceDigests())) - log.msg('now avaliable: %s' % (url)) if self.manager: self.manager.new_cached_file(destFile, hash, new_hash, url) - if decFile: - ext_len = len(destFile.path) - len(decFile.path) - self.manager.new_cached_file(decFile, None, False, url[:-ext_len]) + + if decFile: + # Hash the decompressed file and add it to the DB + decHash = HashObject() + ext_len = len(destFile.path) - len(decFile.path) + df = decHash.hashInThread(decFile) + df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime) + df.addErrback(self._save_error, url[:-ext_len], decFile) else: log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url)) destFile.remove() if decFile: decFile.remove() + def _save_error(self, failure, url, destFile, decFile = None): + """Remove the destination files.""" + log.msg('Error occurred downloading %s' % url) + log.err(failure) + destFile.restat(False) + if destFile.exists(): + log.msg('Removing the incomplete file: %s' % destFile.path) + destFile.remove() + if decFile: + decFile.restat(False) + if decFile.exists(): + log.msg('Removing the incomplete file: %s' % decFile.path) + decFile.remove() + def save_error(self, failure, url): - """An error has occurred in downloadign or saving the file.""" + """An error has occurred in downloading or saving the file""" log.msg('Error occurred downloading %s' % url) log.err(failure) return failure