self.db.parent().makedirs()
self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
- c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+ c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
c.execute("CREATE INDEX files_hash ON files(hash)")
- c.execute("CREATE INDEX files_urldir ON files(urldir)")
c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
- c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
- c.execute("CREATE INDEX dirs_path ON dirs(path)")
c.close()
self.conn.commit()
c.close()
return res
- def storeFile(self, file, hash, directory):
+ def storeFile(self, file, hash):
"""Store or update a file in the database.
- @return: the urlpath to access the file, and whether a
- new url top-level directory was needed
+ @return: True if the hash was not in the database before
+ (so it needs to be added to the DHT)
"""
+ new_hash = True
+ refreshTime = datetime.now()
+ c = self.conn.cursor()
+ c.execute("SELECT MAX(refreshed) AS max_refresh FROM files WHERE hash = ?", (khash(hash), ))
+ row = c.fetchone()
+ if row and row['max_refresh']:
+ new_hash = False
+ refreshTime = row['max_refresh']
+ c.close()
+
file.restat()
c = self.conn.cursor()
- c.execute("SELECT dirs.urldir AS urldir, dirs.path AS directory FROM dirs JOIN files USING (urldir) WHERE files.path = ?", (file.path, ))
+ c.execute("SELECT path FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- if row and directory == row['directory']:
+ if row:
c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?",
- (khash(hash), file.getsize(), file.getmtime(), datetime.now()))
- newdir = False
- urldir = row['urldir']
+ (khash(hash), file.getsize(), file.getmtime(), refreshTime))
else:
- urldir, newdir = self.findDirectory(directory)
- c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?, ?, ?)",
- (file.path, khash(hash), urldir, len(directory.path), file.getsize(), file.getmtime(), datetime.now()))
+ c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)",
+ (file.path, khash(hash), file.getsize(), file.getmtime(), refreshTime))
self.conn.commit()
c.close()
- return '/~' + str(urldir) + file.path[len(directory.path):], newdir
+
+ return new_hash
def getFile(self, file):
"""Get a file from the database.
None if not in database or missing
"""
c = self.conn.cursor()
- c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
+ c.execute("SELECT hash, size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
res = None
if row:
res = {}
res['hash'] = row['hash']
res['size'] = row['size']
- res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
c.close()
return res
@return: list of dictionaries of info for the found files
"""
c = self.conn.cursor()
- c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), ))
+ c.execute("SELECT path, size, mtime, refreshed FROM files WHERE hash = ?", (khash(hash), ))
row = c.fetchone()
files = []
while row:
res = {}
res['path'] = file
res['size'] = row['size']
- res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
+ res['refreshed'] = row['refreshed']
files.append(res)
row = c.fetchone()
c.close()
row = c.fetchone()
return self._removeChanged(file, row)
- def refreshFile(self, file):
- """Refresh the publishing time of a file.
-
- If it has changed or is missing, it is removed from the table.
-
- @return: True if unchanged, False if changed, None if not in database
- """
+ def refreshHash(self, hash):
+ """Refresh the publishing time all files with a hash."""
+ refreshTime = datetime.now()
c = self.conn.cursor()
- c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
- row = c.fetchone()
- res = None
- if row:
- res = self._removeChanged(file, row)
- if res:
- c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
- return res
+ c.execute("UPDATE files SET refreshed = ? WHERE hash = ?", (refreshTime, khash(hash)))
+ c.close()
def expiredFiles(self, expireAfter):
"""Find files that need refreshing after expireAfter seconds.
- Also removes any entries from the table that no longer exist.
+ For each hash that needs refreshing, finds all the files with that hash.
+ If the file has changed or is missing, it is removed from the table.
- @return: dictionary with keys the hashes, values a list of url paths
+ @return: dictionary with keys the hashes, values a list of FilePaths
"""
t = datetime.now() - timedelta(seconds=expireAfter)
+
+ # First find the hashes that need refreshing
c = self.conn.cursor()
- c.execute("SELECT path, hash, urldir, dirlength, size, mtime FROM files WHERE refreshed < ?", (t, ))
+ c.execute("SELECT DISTINCT hash FROM files WHERE refreshed < ?", (t, ))
row = c.fetchone()
expired = {}
while row:
- res = self._removeChanged(FilePath(row['path']), row)
- if res:
- expired.setdefault(row['hash'], []).append('/~' + str(row['urldir']) + row['path'][row['dirlength']:])
+ expired.setdefault(row['hash'], [])
row = c.fetchone()
c.close()
+
+ # Now find the files for each hash
+ for hash in expired.keys():
+ c = self.conn.cursor()
+ c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
+ row = c.fetchone()
+ while row:
+ res = self._removeChanged(FilePath(row['path']), row)
+ if res:
+ expired[hash].append(FilePath(row['path']))
+ row = c.fetchone()
+ if len(expired[hash]) == 0:
+ del expired[hash]
+ c.close()
+
return expired
def removeUntrackedFiles(self, dirs):
self.conn.commit()
return removed
- def findDirectory(self, directory):
- """Store or update a directory in the database.
-
- @return: the index of the url directory, and whether it is new or not
- """
- c = self.conn.cursor()
- c.execute("SELECT min(urldir) AS urldir FROM dirs WHERE path = ?", (directory.path, ))
- row = c.fetchone()
- c.close()
- if row['urldir']:
- return row['urldir'], False
-
- # Not found, need to add a new one
- c = self.conn.cursor()
- c.execute("INSERT INTO dirs (path) VALUES (?)", (directory.path, ))
- self.conn.commit()
- urldir = c.lastrowid
- c.close()
- return urldir, True
-
- def getAllDirectories(self):
- """Get all the current directories avaliable."""
- c = self.conn.cursor()
- c.execute("SELECT urldir, path FROM dirs")
- row = c.fetchone()
- dirs = {}
- while row:
- dirs['~' + str(row['urldir'])] = FilePath(row['path'])
- row = c.fetchone()
- c.close()
- return dirs
-
- def reconcileDirectories(self):
- """Remove any unneeded directories by checking which are used by files."""
- c = self.conn.cursor()
- c.execute('DELETE FROM dirs WHERE urldir NOT IN (SELECT DISTINCT urldir FROM files)')
- self.conn.commit()
- return bool(c.rowcount)
-
def close(self):
self.conn.close()
timeout = 5
db = FilePath('/tmp/khashmir.db')
- file = FilePath('/tmp/apt-dht/khashmir.test')
hash = '\xca\xec\xb8\x0c\x00\xe7\x07\xf8~])\x8f\x9d\xe5_B\xff\x1a\xc4!'
directory = FilePath('/tmp/apt-dht/')
- urlpath = '/~1/khashmir.test'
+ file = FilePath('/tmp/apt-dht/khashmir.test')
testfile = 'tmp/khashmir.test'
dirs = [FilePath('/tmp/apt-dht/top1'),
FilePath('/tmp/apt-dht/top2/sub1'),
self.file.setContent('fgfhds')
self.file.touch()
self.store = DB(self.db)
- self.store.storeFile(self.file, self.hash, self.directory)
+ self.store.storeFile(self.file, self.hash)
def test_openExistsingDB(self):
self.store.close()
res = self.store.getFile(self.file)
self.failUnless(res)
self.failUnlessEqual(res['hash'], self.hash)
- self.failUnlessEqual(res['urlpath'], self.urlpath)
- def test_getAllDirectories(self):
- res = self.store.getAllDirectories()
+ def test_lookupHash(self):
+ res = self.store.lookupHash(self.hash)
self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 1)
- self.failUnlessEqual(res.keys()[0], '~1')
- self.failUnlessEqual(res['~1'], self.directory)
+ self.failUnlessEqual(len(res), 1)
+ self.failUnlessEqual(res[0]['path'].path, self.file.path)
def test_isUnchanged(self):
res = self.store.isUnchanged(self.file)
self.failUnlessEqual(len(res.keys()), 1)
self.failUnlessEqual(res.keys()[0], self.hash)
self.failUnlessEqual(len(res[self.hash]), 1)
- self.failUnlessEqual(res[self.hash][0], self.urlpath)
- res = self.store.refreshFile(self.file)
- self.failUnless(res)
+ self.store.refreshHash(self.hash)
res = self.store.expiredFiles(1)
self.failUnlessEqual(len(res.keys()), 0)
file.parent().makedirs()
file.setContent(file.path)
file.touch()
- self.store.storeFile(file, self.hash, dir)
+ self.store.storeFile(file, self.hash)
+
+ def test_multipleHashes(self):
+ self.build_dirs()
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 0)
+ res = self.store.lookupHash(self.hash)
+ self.failUnless(res)
+ self.failUnlessEqual(len(res), 4)
+ self.failUnlessEqual(res[0]['refreshed'], res[1]['refreshed'])
+ self.failUnlessEqual(res[0]['refreshed'], res[2]['refreshed'])
+ self.failUnlessEqual(res[0]['refreshed'], res[3]['refreshed'])
+ sleep(2)
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 1)
+ self.failUnlessEqual(res.keys()[0], self.hash)
+ self.failUnlessEqual(len(res[self.hash]), 4)
+ self.store.refreshHash(self.hash)
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 0)
def test_removeUntracked(self):
self.build_dirs()
self.failUnlessIn(self.dirs[1].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
self.failUnlessIn(self.dirs[2].preauthChild(self.testfile), res, 'Got removed paths: %r' % res)
- def test_reconcileDirectories(self):
- self.build_dirs()
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 4)
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, False)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 4)
- res = self.store.removeUntrackedFiles(self.dirs)
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 3)
- res = self.store.removeUntrackedFiles(self.dirs[:1])
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnless(res)
- self.failUnlessEqual(len(res.keys()), 1)
- res = self.store.removeUntrackedFiles([FilePath('/what')])
- res = self.store.reconcileDirectories()
- self.failUnlessEqual(res, True)
- res = self.store.getAllDirectories()
- self.failUnlessEqual(len(res.keys()), 0)
-
def tearDown(self):
self.directory.remove()
self.store.close()