CacheManager scans the cache directory during initialization.
[quix0rs-apt-p2p.git] / apt_dht / CacheManager.py
index 368714d017d6856de43f8fb69f4d1ddb35f6e5d2..4fdcf40c0a1069ec548d48bffa8b5bac7fe89ca2 100644 (file)
@@ -7,12 +7,12 @@ import os
 
 from twisted.python import log
 from twisted.python.filepath import FilePath
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import splitHostPort
 
-from AptPackages import AptPackages
+from Hash import HashObject
 
 aptpkg_dir='apt-packages'
 
@@ -151,7 +151,79 @@ class CacheManager:
         self.cache_dir = cache_dir
         self.db = db
         self.manager = manager
+        self.scanning = []
+        
+        # Init the database, remove old files, init the HTTP dirs
+        self.db.removeUntrackedFiles([self.cache_dir])
+        self.db.reconcileDirectories()
+        self.manager.setDirectories(self.db.getAllDirectories())
+        
+        
+    def scanDirectories(self):
+        """Scan the cache directories, hashing new and rehashing changed files."""
+        assert not self.scanning, "a directory scan is already under way"
+        self.scanning.append(self.cache_dir)
+        self._scanDirectories()
+
+    def _scanDirectories(self, walker = None):
+        # Need to start waling a new directory
+        if walker is None:
+            # If there are any left, get them
+            if self.scanning:
+                log.msg('started scanning directory: %s' % self.scanning[0].path)
+                walker = self.scanning[0].walk()
+            else:
+                # Done, just check if the HTTP directories need updating
+                log.msg('cache directory scan complete')
+                if self.db.reconcileDirectories():
+                    self.manager.setDirectories(self.db.getAllDirectories())
+                return
+            
+        try:
+            # Get the next file in the directory
+            file = walker.next()
+        except StopIteration:
+            # No files left, go to the next directory
+            log.msg('done scanning directory: %s' % self.scanning[0].path)
+            self.scanning.pop(0)
+            reactor.callLater(0, self._scanDirectories)
+            return
+
+        # If it's not a file, or it's already properly in the DB, ignore it
+        if not file.isfile() or self.db.isUnchanged(file):
+            if not file.isfile():
+                log.msg('entering directory: %s' % file.path)
+            else:
+                log.msg('file is unchanged: %s' % file.path)
+            reactor.callLater(0, self._scanDirectories, walker)
+            return
+
+        # Otherwise hash it
+        log.msg('start hash checking file: %s' % file.path)
+        hash = HashObject()
+        df = hash.hashInThread(file)
+        df.addBoth(self._doneHashing, file, walker)
+        df.addErrback(log.err)
+    
+    def _doneHashing(self, result, file, walker):
+        reactor.callLater(0, self._scanDirectories, walker)
     
+        if isinstance(result, HashObject):
+            log.msg('hash check of %s completed with hash: %s' % (file.path, result.hexdigest()))
+            if self.scanning[0] == self.cache_dir:
+                mirror_dir = self.cache_dir.child(file.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
+                urlpath, newdir = self.db.storeFile(file, result.digest(), mirror_dir)
+                url = 'http:/' + file.path[len(self.cache_dir.path):]
+            else:
+                urlpath, newdir = self.db.storeFile(file, result.digest(), self.scanning[0])
+                url = None
+            if newdir:
+                self.manager.setDirectories(self.db.getAllDirectories())
+            self.manager.new_cached_file(file, result, urlpath, url)
+        else:
+            log.msg('hash check of %s failed' % file.path)
+            log.err(result)
+
     def save_file(self, response, hash, url):
         """Save a downloaded file to the cache and stream it."""
         if response.code != 200:
@@ -207,14 +279,15 @@ class CacheManager:
             mirror_dir = self.cache_dir.child(destFile.path[len(self.cache_dir.path)+1:].split('/', 1)[0])
             urlpath, newdir = self.db.storeFile(destFile, hash.digest(), mirror_dir)
             log.msg('now avaliable at %s: %s' % (urlpath, url))
-            if newdir and self.manager:
-                log.msg('A new web directory was created, so enable it')
-                self.manager.setDirectories(self.db.getAllDirectories())
 
             if self.manager:
-                self.manager.new_cached_file(url, destFile, hash, urlpath)
+                if newdir:
+                    log.msg('A new web directory was created, so enable it')
+                    self.manager.setDirectories(self.db.getAllDirectories())
+    
+                self.manager.new_cached_file(destFile, hash, urlpath, url)
                 if ext:
-                    self.manager.new_cached_file(url[:-len(ext)], decFile, None, urlpath)
+                    self.manager.new_cached_file(decFile, None, urlpath, url[:-len(ext)])
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()