Made the main DB track url top-level directories.
authorCameron Dale <camrdale@gmail.com>
Sat, 12 Jan 2008 08:34:48 +0000 (00:34 -0800)
committerCameron Dale <camrdale@gmail.com>
Sat, 12 Jan 2008 08:34:48 +0000 (00:34 -0800)
Need to change HTTPServer to use the DB, and the main code as
well.

Don't forget to call reconcileDirectories periodically and
then if changed update the HTTPServer's directories.

apt_dht/db.py

index c451874c62eb8db8c0fb4a47c1c9aa2f11e7a8b6..7f8c4493a2071643f4243fbbb1023bf51aeadafb 100644 (file)
@@ -18,6 +18,7 @@ class khash(str):
 sqlite.register_adapter(khash, b2a_base64)
 sqlite.register_converter("KHASH", a2b_base64)
 sqlite.register_converter("khash", a2b_base64)
+sqlite.enable_callback_tracebacks(True)
 
 class DB:
     """Database access for storing persistent data."""
@@ -43,10 +44,11 @@ class DB:
     def _createNewDB(self, db):
         self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
-        c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urlpath TEXT, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
-#        c.execute("CREATE INDEX files_hash ON files(hash)")
+        c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+        c.execute("CREATE INDEX files_urldir ON files(urldir)")
         c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
-        c.execute("CREATE TABLE dirs (path TEXT PRIMARY KEY, urlpath TEXT)")
+        c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
+        c.execute("CREATE INDEX dirs_path ON dirs(path)")
         c.close()
         self.conn.commit()
 
@@ -66,15 +68,26 @@ class DB:
                 c.close()
         return res
         
-    def storeFile(self, path, hash, urlpath):
+    def storeFile(self, path, hash, directory):
         """Store or update a file in the database."""
         path = os.path.abspath(path)
+        directory = os.path.abspath(directory)
+        assert path.startswith(directory)
         stat = os.stat(path)
         c = self.conn.cursor()
-        c.execute("INSERT OR REPLACE INTO files VALUES (?, ?, ?, ?, ?, ?)", 
-                  (path, khash(hash), urlpath, stat.st_size, stat.st_mtime, datetime.now()))
+        c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (path, ))
+        row = c.fetchone()
+        if row and directory == row['directory']:
+            c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", 
+                      (khash(hash), stat.st_size, stat.st_mtime, datetime.now()))
+            newdir = False
+        else:
+            urldir, newdir = self.findDirectory(directory)
+            c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)",
+                      (path, khash(hash), urldir, len(directory), stat.st_size, stat.st_mtime, datetime.now()))
         self.conn.commit()
         c.close()
+        return newdir
         
     def getFile(self, path):
         """Get a file from the database.
@@ -86,13 +99,13 @@ class DB:
         """
         path = os.path.abspath(path)
         c = self.conn.cursor()
-        c.execute("SELECT hash, urlpath, size, mtime FROM files WHERE path = ?", (path, ))
+        c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (path, ))
         row = c.fetchone()
         res = self._removeChanged(path, row)
         if res:
             res = {}
             res['hash'] = row['hash']
-            res['urlpath'] = row['urlpath']
+            res['urlpath'] = '/~' + str(row['urldir']) + path[row['dirlength']:]
         c.close()
         return res
         
@@ -134,13 +147,13 @@ class DB:
         """
         t = datetime.now() - timedelta(seconds=expireAfter)
         c = self.conn.cursor()
-        c.execute("SELECT path, hash, urlpath, size, mtime FROM files WHERE refreshed < ?", (t, ))
+        c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, ))
         row = c.fetchone()
         expired = {}
         while row:
             res = self._removeChanged(row['path'], row)
             if res:
-                expired.setdefault(row['hash'], []).append(row['urlpath'])
+                expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:])
             row = c.fetchone()
         c.close()
         return expired
@@ -172,6 +185,46 @@ class DB:
             c.execute("DELETE FROM files " + sql, newdirs)
         self.conn.commit()
         return removed
+    
+    def findDirectory(self, directory):
+        """Store or update a directory in the database.
+        
+        @return: the index of the url directory, and whether it is new or not
+        """
+        directory = os.path.abspath(directory)
+        c = self.conn.cursor()
+        c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory, ))
+        row = c.fetchone()
+        c.close()
+        if row['urldir']:
+            return row['urldir'], False
+
+        # Not found, need to add a new one
+        c = self.conn.cursor()
+        c.execute("INSERT INTO dirs (path) VALUES (?)", (directory, ))
+        self.conn.commit()
+        urldir = c.lastrowid
+        c.close()
+        return urldir, True
+        
+    def getAllDirectories(self):
+        """Get all the current directories avaliable."""
+        c = self.conn.cursor()
+        c.execute("SELECT urldir, path FROM dirs")
+        row = c.fetchone()
+        dirs = {}
+        while row:
+            dirs['~' + str(row['urldir'])] = row['path']
+            row = c.fetchone()
+        c.close()
+        return dirs
+    
+    def reconcileDirectories(self):
+        """Remove any unneeded directories by checking which are used by files."""
+        c = self.conn.cursor()
+        c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)')
+        self.conn.commit()
+        return bool(c.rowcount)
         
     def close(self):
         self.conn.close()
@@ -183,7 +236,8 @@ class TestDB(unittest.TestCase):
     db = '/tmp/khashmir.db'
     path = '/tmp/khashmir.test'
     hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
-    urlpath = '/~1/what/ever/khashmir.test'
+    directory = '/tmp/'
+    urlpath = '/~1/khashmir.test'
     dirs = ['/tmp/apt-dht/top1', '/tmp/apt-dht/top2/sub1', '/tmp/apt-dht/top2/sub2/']
 
     def setUp(self):
@@ -192,7 +246,7 @@ class TestDB(unittest.TestCase):
         f.close()
         os.utime(self.path, None)
         self.store = DB(self.db)
-        self.store.storeFile(self.path, self.hash, self.urlpath)
+        self.store.storeFile(self.path, self.hash, self.directory)
 
     def test_getFile(self):
         res = self.store.getFile(self.path)
@@ -200,6 +254,13 @@ class TestDB(unittest.TestCase):
         self.failUnlessEqual(res['hash'], self.hash)
         self.failUnlessEqual(res['urlpath'], self.urlpath)
         
+    def test_getAllDirectories(self):
+        res = self.store.getAllDirectories()
+        self.failUnless(res)
+        self.failUnlessEqual(len(res.keys()), 1)
+        self.failUnlessEqual(res.keys()[0], '~1')
+        self.failUnlessEqual(res['~1'], os.path.abspath(self.directory))
+        
     def test_isUnchanged(self):
         res = self.store.isUnchanged(self.path)
         self.failUnless(res)
@@ -225,7 +286,7 @@ class TestDB(unittest.TestCase):
         res = self.store.expiredFiles(1)
         self.failUnlessEqual(len(res.keys()), 0)
         
-    def test_removeUntracked(self):
+    def build_dirs(self):
         for dir in self.dirs:
             path = os.path.join(dir, self.path[1:])
             os.makedirs(os.path.dirname(path))
@@ -233,8 +294,10 @@ class TestDB(unittest.TestCase):
             f.write(path)
             f.close()
             os.utime(path, None)
-            self.store.storeFile(path, self.hash, self.urlpath)
-        
+            self.store.storeFile(path, self.hash, dir)
+    
+    def test_removeUntracked(self):
+        self.build_dirs()
         res = self.store.removeUntrackedFiles(self.dirs)
         self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res)
         self.failUnlessEqual(res[0], self.path, 'Got removed paths: %r' % res)
@@ -248,6 +311,34 @@ class TestDB(unittest.TestCase):
         self.failUnlessIn(os.path.join(self.dirs[1], self.path[1:]), res, 'Got removed paths: %r' % res)
         self.failUnlessIn(os.path.join(self.dirs[2], self.path[1:]), res, 'Got removed paths: %r' % res)
         
+    def test_reconcileDirectories(self):
+        self.build_dirs()
+        res = self.store.getAllDirectories()
+        self.failUnless(res)
+        self.failUnlessEqual(len(res.keys()), 4)
+        res = self.store.reconcileDirectories()
+        self.failUnlessEqual(res, False)
+        res = self.store.getAllDirectories()
+        self.failUnless(res)
+        self.failUnlessEqual(len(res.keys()), 4)
+        res = self.store.removeUntrackedFiles(self.dirs)
+        res = self.store.reconcileDirectories()
+        self.failUnlessEqual(res, True)
+        res = self.store.getAllDirectories()
+        self.failUnless(res)
+        self.failUnlessEqual(len(res.keys()), 3)
+        res = self.store.removeUntrackedFiles(self.dirs[:1])
+        res = self.store.reconcileDirectories()
+        self.failUnlessEqual(res, True)
+        res = self.store.getAllDirectories()
+        self.failUnless(res)
+        self.failUnlessEqual(len(res.keys()), 1)
+        res = self.store.removeUntrackedFiles(['/what'])
+        res = self.store.reconcileDirectories()
+        self.failUnlessEqual(res, True)
+        res = self.store.getAllDirectories()
+        self.failUnlessEqual(len(res.keys()), 0)
+        
     def tearDown(self):
         for root, dirs, files in os.walk('/tmp/apt-dht', topdown=False):
             for name in files: