X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht%2Fdb.py;h=7b225a19391fe7591c88a6e4a756ef894c3a7112;hp=9725aa88f10c36e0bd137de1cb5051850d7cde91;hb=ef16875cd9e8b3452deeff8b13768d4c10b0e9ad;hpb=07893ed1ffd3cdcc1a8b51d0854599d61b2b5fac diff --git a/apt_dht/db.py b/apt_dht/db.py index 9725aa8..7b225a1 100644 --- a/apt_dht/db.py +++ b/apt_dht/db.py @@ -5,6 +5,7 @@ from binascii import a2b_base64, b2a_base64 from time import sleep import os +from twisted.python.filepath import FilePath from twisted.trial import unittest assert sqlite.version_info >= (2, 1) @@ -25,76 +26,61 @@ class DB: def __init__(self, db): self.db = db - try: - os.stat(db) - except OSError: - self._createNewDB(db) + self.db.restat(False) + if self.db.exists(): + self._loadDB() else: - self._loadDB(db) + self._createNewDB() self.conn.text_factory = str self.conn.row_factory = sqlite.Row - def _loadDB(self, db): + def _loadDB(self): try: - self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) + self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) except: import traceback raise DBExcept, "Couldn't open DB", traceback.format_exc() - def _createNewDB(self, db): - self.conn = sqlite.connect(database=db, detect_types=sqlite.PARSE_DECLTYPES) + def _createNewDB(self): + if not self.db.parent().exists(): + self.db.parent().makedirs() + self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() - c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") - c.execute("CREATE INDEX files_urldir ON files(urldir)") + c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") + c.execute("CREATE INDEX files_hash ON files(hash)") c.execute("CREATE INDEX files_refreshed ON files(refreshed)") - c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)") - c.execute("CREATE INDEX dirs_path ON dirs(path)") c.close() self.conn.commit() - def _removeChanged(self, path, row): + def _removeChanged(self, file, row): res = None if row: - try: - stat = os.stat(path) - except: - stat = None - if stat: - res = (row['size'] == stat.st_size and row['mtime'] == stat.st_mtime) + file.restat(False) + if file.exists(): + res = (row['size'] == file.getsize() and row['mtime'] == file.getmtime()) if not res: c = self.conn.cursor() - c.execute("DELETE FROM files WHERE path = ?", (path, )) + c.execute("DELETE FROM files WHERE path = ?", (file.path, )) self.conn.commit() c.close() return res - def storeFile(self, path, hash, directory): - """Store or update a file in the database. - - @return: the urlpath to access the file, and whether a - new url top-level directory was needed - """ - path = os.path.abspath(path) - directory = os.path.abspath(directory) - assert path.startswith(directory) - stat = os.stat(path) + def storeFile(self, file, hash): + """Store or update a file in the database.""" + file.restat() c = self.conn.cursor() - c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (path, )) + c.execute("SELECT path FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - if row and directory == row['directory']: + if row: c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", - (khash(hash), stat.st_size, stat.st_mtime, datetime.now())) - newdir = False - urldir = row['urldir'] + (khash(hash), file.getsize(), file.getmtime(), datetime.now())) else: - urldir, newdir = self.findDirectory(directory) - c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)", - (path, khash(hash), urldir, len(directory), stat.st_size, stat.st_mtime, datetime.now())) + c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)", + (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now())) self.conn.commit() c.close() - return '/~' + str(urldir) + path[len(directory):], newdir - def getFile(self, path): + def getFile(self, file): """Get a file from the database. If it has changed or is missing, it is removed from the database. @@ -102,45 +88,70 @@ class DB: @return: dictionary of info for the file, False if changed, or None if not in database or missing """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT hash, size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(path, row) - if res: - res = {} - res['hash'] = row['hash'] - res['urlpath'] = '/~' + str(row['urldir']) + path[row['dirlength']:] + res = None + if row: + res = self._removeChanged(file, row) + if res: + res = {} + res['hash'] = row['hash'] + res['size'] = row['size'] c.close() return res - def isUnchanged(self, path): + def lookupHash(self, hash): + """Find a file by hash in the database. + + If any found files have changed or are missing, they are removed + from the database. + + @return: list of dictionaries of info for the found files + """ + c = self.conn.cursor() + c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), )) + row = c.fetchone() + files = [] + while row: + file = FilePath(row['path']) + res = self._removeChanged(file, row) + if res: + res = {} + res['path'] = file + res['size'] = row['size'] + files.append(res) + row = c.fetchone() + c.close() + return files + + def isUnchanged(self, file): """Check if a file in the file system has changed. If it has changed, it is removed from the table. @return: True if unchanged, False if changed, None if not in database """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - return self._removeChanged(path, row) + return self._removeChanged(file, row) - def refreshFile(self, path): + def refreshFile(self, file): """Refresh the publishing time of a file. If it has changed or is missing, it is removed from the table. @return: True if unchanged, False if changed, None if not in database """ - path = os.path.abspath(path) c = self.conn.cursor() - c.execute("SELECT size, mtime FROM files WHERE path = ?", (path, )) + c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(path, row) - if res: - c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), path)) + res = None + if row: + res = self._removeChanged(file, row) + if res: + c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path)) return res def expiredFiles(self, expireAfter): @@ -148,17 +159,17 @@ class DB: Also removes any entries from the table that no longer exist. - @return: dictionary with keys the hashes, values a list of url paths + @return: dictionary with keys the hashes, values a list of FilePaths """ t = datetime.now() - timedelta(seconds=expireAfter) c = self.conn.cursor() - c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, )) + c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, )) row = c.fetchone() expired = {} while row: - res = self._removeChanged(row['path'], row) + res = self._removeChanged(FilePath(row['path']), row) if res: - expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:]) + expired.setdefault(row['hash'], []).append(FilePath(row['path'])) row = c.fetchone() c.close() return expired @@ -174,7 +185,7 @@ class DB: newdirs = [] sql = "WHERE" for dir in dirs: - newdirs.append(os.path.abspath(dir) + os.sep + '*') + newdirs.append(dir.child('*').path) sql += " path NOT GLOB ? AND" sql = sql[:-4] @@ -183,7 +194,7 @@ class DB: row = c.fetchone() removed = [] while row: - removed.append(row['path']) + removed.append(FilePath(row['path'])) row = c.fetchone() if removed: @@ -191,46 +202,6 @@ class DB: self.conn.commit() return removed - def findDirectory(self, directory): - """Store or update a directory in the database. - - @return: the index of the url directory, and whether it is new or not - """ - directory = os.path.abspath(directory) - c = self.conn.cursor() - c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory, )) - row = c.fetchone() - c.close() - if row['urldir']: - return row['urldir'], False - - # Not found, need to add a new one - c = self.conn.cursor() - c.execute("INSERT INTO dirs (path) VALUES (?)", (directory, )) - self.conn.commit() - urldir = c.lastrowid - c.close() - return urldir, True - - def getAllDirectories(self): - """Get all the current directories avaliable.""" - c = self.conn.cursor() - c.execute("SELECT urldir, path FROM dirs") - row = c.fetchone() - dirs = {} - while row: - dirs['~' + str(row['urldir'])] = row['path'] - row = c.fetchone() - c.close() - return dirs - - def reconcileDirectories(self): - """Remove any unneeded directories by checking which are used by files.""" - c = self.conn.cursor() - c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)') - self.conn.commit() - return bool(c.rowcount) - def close(self): self.conn.close() @@ -238,43 +209,45 @@ class TestDB(unittest.TestCase): """Tests for the khashmir database.""" timeout = 5 - db = '/tmp/khashmir.db' - path = '/tmp/khashmir.test' + db = FilePath('/tmp/khashmir.db') hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!' - directory = '/tmp/' - urlpath = '/~1/khashmir.test' - dirs = ['/tmp/apt-dht/top1', '/tmp/apt-dht/top2/sub1', '/tmp/apt-dht/top2/sub2/'] + directory = FilePath('/tmp/apt-dht/') + file = FilePath('/tmp/apt-dht/khashmir.test') + testfile = 'tmp/khashmir.test' + dirs = [FilePath('/tmp/apt-dht/top1'), + FilePath('/tmp/apt-dht/top2/sub1'), + FilePath('/tmp/apt-dht/top2/sub2/')] def setUp(self): - f = open(self.path, 'w') - f.write('fgfhds') - f.close() - os.utime(self.path, None) + if not self.file.parent().exists(): + self.file.parent().makedirs() + self.file.setContent('fgfhds') + self.file.touch() + self.store = DB(self.db) + self.store.storeFile(self.file, self.hash) + + def test_openExistsingDB(self): + self.store.close() + self.store = None + sleep(1) self.store = DB(self.db) - self.store.storeFile(self.path, self.hash, self.directory) + res = self.store.isUnchanged(self.file) + self.failUnless(res) def test_getFile(self): - res = self.store.getFile(self.path) + res = self.store.getFile(self.file) self.failUnless(res) self.failUnlessEqual(res['hash'], self.hash) - self.failUnlessEqual(res['urlpath'], self.urlpath) - - def test_getAllDirectories(self): - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 1) - self.failUnlessEqual(res.keys()[0], '~1') - self.failUnlessEqual(res['~1'], os.path.abspath(self.directory)) def test_isUnchanged(self): - res = self.store.isUnchanged(self.path) + res = self.store.isUnchanged(self.file) self.failUnless(res) sleep(2) - os.utime(self.path, None) - res = self.store.isUnchanged(self.path) + self.file.touch() + res = self.store.isUnchanged(self.file) self.failUnless(res == False) - os.unlink(self.path) - res = self.store.isUnchanged(self.path) + self.file.remove() + res = self.store.isUnchanged(self.file) self.failUnless(res == None) def test_expiry(self): @@ -285,70 +258,36 @@ class TestDB(unittest.TestCase): self.failUnlessEqual(len(res.keys()), 1) self.failUnlessEqual(res.keys()[0], self.hash) self.failUnlessEqual(len(res[self.hash]), 1) - self.failUnlessEqual(res[self.hash][0], self.urlpath) - res = self.store.refreshFile(self.path) + res = self.store.refreshFile(self.file) self.failUnless(res) res = self.store.expiredFiles(1) self.failUnlessEqual(len(res.keys()), 0) def build_dirs(self): for dir in self.dirs: - path = os.path.join(dir, self.path[1:]) - os.makedirs(os.path.dirname(path)) - f = open(path, 'w') - f.write(path) - f.close() - os.utime(path, None) - self.store.storeFile(path, self.hash, dir) + file = dir.preauthChild(self.testfile) + if not file.parent().exists(): + file.parent().makedirs() + file.setContent(file.path) + file.touch() + self.store.storeFile(file, self.hash) def test_removeUntracked(self): self.build_dirs() res = self.store.removeUntrackedFiles(self.dirs) self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res) - self.failUnlessEqual(res[0], self.path, 'Got removed paths: %r' % res) + self.failUnlessEqual(res[0], self.file, 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs) self.failUnlessEqual(len(res), 0, 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs[1:]) self.failUnlessEqual(len(res), 1, 'Got removed paths: %r' % res) - self.failUnlessEqual(res[0], os.path.join(self.dirs[0], self.path[1:]), 'Got removed paths: %r' % res) + self.failUnlessEqual(res[0], self.dirs[0].preauthChild(self.testfile), 'Got removed paths: %r' % res) res = self.store.removeUntrackedFiles(self.dirs[:1]) self.failUnlessEqual(len(res), 2, 'Got removed paths: %r' % res) - self.failUnlessIn(os.path.join(self.dirs[1], self.path[1:]), res, 'Got removed paths: %r' % res) - self.failUnlessIn(os.path.join(self.dirs[2], self.path[1:]), res, 'Got removed paths: %r' % res) - - def test_reconcileDirectories(self): - self.build_dirs() - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 4) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, False) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 4) - res = self.store.removeUntrackedFiles(self.dirs) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 3) - res = self.store.removeUntrackedFiles(self.dirs[:1]) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnless(res) - self.failUnlessEqual(len(res.keys()), 1) - res = self.store.removeUntrackedFiles(['/what']) - res = self.store.reconcileDirectories() - self.failUnlessEqual(res, True) - res = self.store.getAllDirectories() - self.failUnlessEqual(len(res.keys()), 0) + self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) + self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res) def tearDown(self): - for root, dirs, files in os.walk('/tmp/apt-dht', topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) + self.directory.remove() self.store.close() - os.unlink(self.db) + self.db.remove()