]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/CacheManager.py
Move the DHT stuff out of the main program and into the new DHTManager module.
[quix0rs-apt-p2p.git] / apt_p2p / CacheManager.py
index ccf13c5e308f9d105e29e003a6f43fc9019171db..b991093f61c3266c50f27cb93188854abb7960bf 100644 (file)
@@ -19,10 +19,14 @@ from twisted.web2 import stream
 from twisted.web2.http import splitHostPort
 
 from Hash import HashObject
+from apt_p2p_conf import config
 
 DECOMPRESS_EXTS = ['.gz', '.bz2']
 DECOMPRESS_FILES = ['release', 'sources', 'packages']
 
+class CacheError(Exception):
+    """Error occurred downloading a file to the cache."""
+
 class ProxyFileStream(stream.SimpleStream):
     """Saves a stream to a file while providing a new stream.
     
@@ -99,9 +103,14 @@ class ProxyFileStream(stream.SimpleStream):
             if self.bz2file:
                 self.bz2file.close()
                 self.bz2file = None
-                
-            self.doneDefer.callback(self.hash)
     
+    def _error(self, err):
+        """Close all the output files, return the error."""
+        if not self.outFile.closed:
+            self._done()
+            self.stream.close()
+            self.doneDefer.errback(err)
+
     def read(self):
         """Read some data from the stream."""
         if self.outFile.closed:
@@ -110,7 +119,7 @@ class ProxyFileStream(stream.SimpleStream):
         # Read data from the stream, deal with the possible deferred
         data = self.stream.read()
         if isinstance(data, defer.Deferred):
-            data.addCallbacks(self._write, self._done)
+            data.addCallbacks(self._write, self._error)
             return data
         
         self._write(data)
@@ -122,7 +131,9 @@ class ProxyFileStream(stream.SimpleStream):
         Also optionally decompresses it.
         """
         if data is None:
-            self._done()
+            if not self.outFile.closed:
+                self._done()
+                self.doneDefer.callback(self.hash)
             return data
         
         # Write and hash the streamed data
@@ -185,8 +196,13 @@ class ProxyFileStream(stream.SimpleStream):
 
     def close(self):
         """Clean everything up and return None to future reads."""
+        log.msg('ProxyFileStream was prematurely closed after only %d/%d bytes' % (self.hash.size, self.length))
+        if self.hash.size < self.length:
+            self._error(CacheError('Prematurely closed, all data was not written'))
+        elif not self.outFile.closed:
+            self._done()
+            self.doneDefer.callback(self.hash)
         self.length = 0
-        self._done()
         self.stream.close()
 
 class CacheManager:
@@ -206,22 +222,19 @@ class CacheManager:
     @ivar scanning: all the directories that are currectly being scanned or waiting to be scanned
     """
     
-    def __init__(self, cache_dir, db, other_dirs = [], manager = None):
+    def __init__(self, cache_dir, db, manager = None):
         """Initialize the instance and remove any untracked files from the DB..
         
         @type cache_dir: L{twisted.python.filepath.FilePath}
         @param cache_dir: the directory to use for storing all files
         @type db: L{db.DB}
         @param db: the database to use for tracking files and hashes
-        @type other_dirs: C{list} of L{twisted.python.filepath.FilePath}
-        @param other_dirs: the other directories that have shared files in them
-            (optional, defaults to only using the cache directory)
         @type manager: L{apt_p2p.AptP2P}
         @param manager: the main program object to send requests to
             (optional, defaults to not calling back with cached files)
         """
         self.cache_dir = cache_dir
-        self.other_dirs = other_dirs
+        self.other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.all_dirs = self.other_dirs[:]
         self.all_dirs.insert(0, self.cache_dir)
         self.db = db
@@ -232,7 +245,7 @@ class CacheManager:
         self.db.removeUntrackedFiles(self.all_dirs)
         
     #{ Scanning directories
-    def scanDirectories(self):
+    def scanDirectories(self, result = None):
         """Scan the cache directories, hashing new and rehashing changed files."""
         assert not self.scanning, "a directory scan is already under way"
         self.scanning = self.all_dirs[:]
@@ -268,14 +281,12 @@ class CacheManager:
 
         # If it's not a file ignore it
         if not file.isfile():
-            log.msg('entering directory: %s' % file.path)
             reactor.callLater(0, self._scanDirectories, None, walker)
             return
 
         # If it's already properly in the DB, ignore it
         db_status = self.db.isUnchanged(file)
         if db_status:
-            log.msg('file is unchanged: %s' % file.path)
             reactor.callLater(0, self._scanDirectories, None, walker)
             return
         
@@ -294,7 +305,6 @@ class CacheManager:
         hash = HashObject()
         df = hash.hashInThread(file)
         df.addBoth(self._doneHashing, file, walker)
-        df.addErrback(log.err)
     
     def _doneHashing(self, result, file, walker):
         """If successful, add the hashed file to the DB and inform the main program."""
@@ -307,7 +317,8 @@ class CacheManager:
                 url = 'http:/' + file.path[len(self.cache_dir.path):]
                 
             # Store the hashed file in the database
-            new_hash = self.db.storeFile(file, result.digest())
+            new_hash = self.db.storeFile(file, result.digest(), True,
+                                         ''.join(result.pieceDigests()))
             
             # Tell the main program to handle the new cache file
             df = self.manager.new_cached_file(file, result, new_hash, url, True)
@@ -348,7 +359,7 @@ class CacheManager:
         if destFile.exists():
             log.msg('File already exists, removing: %s' % destFile.path)
             destFile.remove()
-        elif not destFile.parent().exists():
+        if not destFile.parent().exists():
             destFile.parent().makedirs()
 
         # Determine whether it needs to be decompressed and how
@@ -370,7 +381,7 @@ class CacheManager:
         response.stream.doneDefer.addCallback(self._save_complete, url, destFile,
                                               response.headers.getHeader('Last-Modified'),
                                               decFile)
-        response.stream.doneDefer.addErrback(self.save_error, url)
+        response.stream.doneDefer.addErrback(self._save_error, url, destFile, decFile)
 
         # Return the modified response with the new stream
         return response
@@ -390,34 +401,53 @@ class CacheManager:
         @param decFile: the file where the decompressed download was written to
             (optional, defaults to the file not having been compressed)
         """
-        if modtime:
-            os.utime(destFile.path, (modtime, modtime))
-            if decFile:
-                os.utime(decFile.path, (modtime, modtime))
-        
         result = hash.verify()
         if result or result is None:
+            if modtime:
+                os.utime(destFile.path, (modtime, modtime))
+            
             if result:
                 log.msg('Hashes match: %s' % url)
+                dht = True
             else:
                 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
+                dht = False
                 
-            new_hash = self.db.storeFile(destFile, hash.digest())
-            log.msg('now avaliable: %s' % (url))
+            new_hash = self.db.storeFile(destFile, hash.digest(), dht,
+                                         ''.join(hash.pieceDigests()))
 
             if self.manager:
                 self.manager.new_cached_file(destFile, hash, new_hash, url)
-                if decFile:
-                    ext_len = len(destFile.path) - len(decFile.path)
-                    self.manager.new_cached_file(decFile, None, False, url[:-ext_len])
+
+            if decFile:
+                # Hash the decompressed file and add it to the DB
+                decHash = HashObject()
+                ext_len = len(destFile.path) - len(decFile.path)
+                df = decHash.hashInThread(decFile)
+                df.addCallback(self._save_complete, url[:-ext_len], decFile, modtime)
+                df.addErrback(self._save_error, url[:-ext_len], decFile)
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()
             if decFile:
                 decFile.remove()
 
+    def _save_error(self, failure, url, destFile, decFile = None):
+        """Remove the destination files."""
+        log.msg('Error occurred downloading %s' % url)
+        log.err(failure)
+        destFile.restat(False)
+        if destFile.exists():
+            log.msg('Removing the incomplete file: %s' % destFile.path)
+            destFile.remove()
+        if decFile:
+            decFile.restat(False)
+            if decFile.exists():
+                log.msg('Removing the incomplete file: %s' % decFile.path)
+                decFile.remove()
+
     def save_error(self, failure, url):
-        """An error has occurred in downloadign or saving the file."""
+        """An error has occurred in downloading or saving the file"""
         log.msg('Error occurred downloading %s' % url)
         log.err(failure)
         return failure