it would send the "get_value" method to the nodes that have the most
values. The "get_value" query could also have a new parameter "number",
which is the maximum number of values to return.
-
-
-Missing Kademlia implementation details are needed.
-
-The current implementation is missing some important features, mostly
-focussed on storing values:
- - values need to be republished (every hour?)
# to reload when a new request arrives.
UNLOAD_PACKAGES_CACHE = 5m
+# Refresh the DHT keys after this much time has passed.
+# This should be a time slightly less than the DHT's KEY_EXPIRE value.
+KEY_REFRESH = 57m
+
# Which DHT implementation to use.
# It must be possile to do "from <DHT>.DHT import DHT" to get a class that
# implements the IDHT interface. There should also be a similarly named
url = None
if self.scanning[0] == self.cache_dir:
url = 'http:/' + file.path[len(self.cache_dir.path):]
- self.db.storeFile(file, result.digest())
- df = self.manager.new_cached_file(file, result, url, True)
+ new_hash = self.db.storeFile(file, result.digest())
+ df = self.manager.new_cached_file(file, result, new_hash, url, True)
if df is None:
reactor.callLater(0, self._scanDirectories, None, walker)
else:
else:
log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
- self.db.storeFile(destFile, hash.digest())
+ new_hash = self.db.storeFile(destFile, hash.digest())
log.msg('now avaliable: %s' % (url))
if self.manager:
- self.manager.new_cached_file(destFile, hash, url)
+ self.manager.new_cached_file(destFile, hash, new_hash, url)
if ext:
- self.manager.new_cached_file(decFile, None, url[:-len(ext)])
+ self.manager.new_cached_file(decFile, None, False, url[:-len(ext)])
else:
log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
destFile.remove()
},
]
- def __init__(self):
+ def __init__(self, digest = None, size = None):
self.hashTypeNum = 0 # Use the first if nothing else matters
self.expHash = None
self.expHex = None
self.expSize = None
self.expNormHash = None
self.fileHasher = None
- self.fileHash = None
+ self.fileHash = digest
+ self.size = size
self.fileHex = None
self.fileNormHash = None
self.done = True
from urlparse import urlunparse
import os, re
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from twisted.web2 import server, http, http_headers, static
from twisted.python import log, failure
from twisted.python.filepath import FilePath
self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
- self.my_addr = None
+ self.my_contact = None
def joinComplete(self, result):
- self.my_addr = findMyIPAddr(result,
- config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
- config.getboolean('DEFAULT', 'LOCAL_OK'))
- if not self.my_addr:
+ my_addr = findMyIPAddr(result,
+ config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
+ config.getboolean('DEFAULT', 'LOCAL_OK'))
+ if not my_addr:
raise RuntimeError, "IP address for this machine could not be found"
+ self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
self.cache.scanDirectories()
+ reactor.callLater(60, self.refreshFiles)
def joinError(self, failure):
log.msg("joining DHT failed miserably")
log.err(failure)
raise RuntimeError, "IP address for this machine could not be found"
+ def refreshFiles(self):
+ """Refresh any files in the DHT that are about to expire."""
+ expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
+ hashes = self.db.expiredFiles(expireAfter)
+ if len(hashes.keys()) > 0:
+ log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
+ for raw_hash in hashes:
+ self.db.refreshHash(raw_hash)
+ hash = HashObject(raw_hash)
+ key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+ value = {'c': self.my_contact}
+ storeDefer = self.dht.storeValue(key, value)
+ storeDefer.addCallback(self.store_done, hash)
+ reactor.callLater(60, self.refreshFiles)
+
def check_freshness(self, req, path, modtime, resp):
log.msg('Checking if %s is still fresh' % path)
d = self.peers.get('', path, method = "HEAD", modtime = modtime)
return getDefer
return response
- def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
+ def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
"""Add a newly cached file to the DHT.
If the file was downloaded, set url to the path it was downloaded for.
if url:
self.mirrors.updatedFile(url, file_path)
- if self.my_addr and hash and (hash.expected() is not None or forceDHT):
- contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT'))
- value = {'c': contact}
+ if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+ value = {'c': self.my_contact}
storeDefer = self.dht.storeValue(key, value)
storeDefer.addCallback(self.store_done, hash)
return storeDefer
# to reload when a new request arrives.
'UNLOAD_PACKAGES_CACHE': '5m',
+ # Refresh the DHT keys after this much time has passed.
+ # This should be a time slightly less than the DHT's KEY_EXPIRE value.
+ 'KEY_REFRESH': '57m',
+
# Which DHT implementation to use.
# It must be possile to do "from <DHT>.DHT import DHT" to get a class that
# implements the IDHT interface.
return res
def storeFile(self, file, hash):
- """Store or update a file in the database."""
+ """Store or update a file in the database.
+
+ @return: True if the hash was not in the database before
+ (so it needs to be added to the DHT)
+ """
+ new_hash = True
+ refreshTime = datetime.now()
+ c = self.conn.cursor()
+ c.execute("SELECT MAX(refreshed) AS max_refresh FROM files WHERE hash = ?", (khash(hash), ))
+ row = c.fetchone()
+ if row and row['max_refresh']:
+ new_hash = False
+ refreshTime = row['max_refresh']
+ c.close()
+
file.restat()
c = self.conn.cursor()
c.execute("SELECT path FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
if row:
c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?",
- (khash(hash), file.getsize(), file.getmtime(), datetime.now()))
+ (khash(hash), file.getsize(), file.getmtime(), refreshTime))
else:
c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)",
- (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now()))
+ (file.path, khash(hash), file.getsize(), file.getmtime(), refreshTime))
self.conn.commit()
c.close()
+ return new_hash
+
def getFile(self, file):
"""Get a file from the database.
@return: list of dictionaries of info for the found files
"""
c = self.conn.cursor()
- c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
+ c.execute("SELECT path, size, mtime, refreshed FROM files WHERE hash = ?", (khash(hash), ))
row = c.fetchone()
files = []
while row:
res = {}
res['path'] = file
res['size'] = row['size']
+ res['refreshed'] = row['refreshed']
files.append(res)
row = c.fetchone()
c.close()
row = c.fetchone()
return self._removeChanged(file, row)
- def refreshFile(self, file):
- """Refresh the publishing time of a file.
-
- If it has changed or is missing, it is removed from the table.
-
- @return: True if unchanged, False if changed, None if not in database
- """
+ def refreshHash(self, hash):
+ """Refresh the publishing time all files with a hash."""
+ refreshTime = datetime.now()
c = self.conn.cursor()
- c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
- row = c.fetchone()
- res = None
- if row:
- res = self._removeChanged(file, row)
- if res:
- c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
- return res
+ c.execute("UPDATE files SET refreshed = ? WHERE hash = ?", (refreshTime, khash(hash)))
+ c.close()
def expiredFiles(self, expireAfter):
"""Find files that need refreshing after expireAfter seconds.
- Also removes any entries from the table that no longer exist.
+ For each hash that needs refreshing, finds all the files with that hash.
+ If the file has changed or is missing, it is removed from the table.
@return: dictionary with keys the hashes, values a list of FilePaths
"""
t = datetime.now() - timedelta(seconds=expireAfter)
+
+ # First find the hashes that need refreshing
c = self.conn.cursor()
- c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, ))
+ c.execute("SELECT DISTINCT hash FROM files WHERE refreshed < ?", (t, ))
row = c.fetchone()
expired = {}
while row:
- res = self._removeChanged(FilePath(row['path']), row)
- if res:
- expired.setdefault(row['hash'], []).append(FilePath(row['path']))
+ expired.setdefault(row['hash'], [])
row = c.fetchone()
c.close()
+
+ # Now find the files for each hash
+ for hash in expired.keys():
+ c = self.conn.cursor()
+ c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
+ row = c.fetchone()
+ while row:
+ res = self._removeChanged(FilePath(row['path']), row)
+ if res:
+ expired[hash].append(FilePath(row['path']))
+ row = c.fetchone()
+ if len(expired[hash]) == 0:
+ del expired[hash]
+ c.close()
+
return expired
def removeUntrackedFiles(self, dirs):
self.failUnless(res)
self.failUnlessEqual(res['hash'], self.hash)
+ def test_lookupHash(self):
+ res = self.store.lookupHash(self.hash)
+ self.failUnless(res)
+ self.failUnlessEqual(len(res), 1)
+ self.failUnlessEqual(res[0]['path'].path, self.file.path)
+
def test_isUnchanged(self):
res = self.store.isUnchanged(self.file)
self.failUnless(res)
self.failUnlessEqual(len(res.keys()), 1)
self.failUnlessEqual(res.keys()[0], self.hash)
self.failUnlessEqual(len(res[self.hash]), 1)
- res = self.store.refreshFile(self.file)
- self.failUnless(res)
+ self.store.refreshHash(self.hash)
res = self.store.expiredFiles(1)
self.failUnlessEqual(len(res.keys()), 0)
file.touch()
self.store.storeFile(file, self.hash)
+ def test_multipleHashes(self):
+ self.build_dirs()
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 0)
+ res = self.store.lookupHash(self.hash)
+ self.failUnless(res)
+ self.failUnlessEqual(len(res), 4)
+ self.failUnlessEqual(res[0]['refreshed'], res[1]['refreshed'])
+ self.failUnlessEqual(res[0]['refreshed'], res[2]['refreshed'])
+ self.failUnlessEqual(res[0]['refreshed'], res[3]['refreshed'])
+ sleep(2)
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 1)
+ self.failUnlessEqual(res.keys()[0], self.hash)
+ self.failUnlessEqual(len(res[self.hash]), 4)
+ self.store.refreshHash(self.hash)
+ res = self.store.expiredFiles(1)
+ self.failUnlessEqual(len(res.keys()), 0)
+
def test_removeUntracked(self):
self.build_dirs()
res = self.store.removeUntrackedFiles(self.dirs)
to reload when a new request arrives. (Default is 5 minutes.)</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>KEY_REFRESH = <replaceable>time</replaceable></option></term>
+ <listitem>
+ <para>The <replaceable>time</replaceable> after which to refresh DHT keys.
+ This should be a time slightly less than the DHT's KEY_EXPIRE value.
+ (Default is 57 minutes.)</para>
+ </listitem>
+ </varlistentry>
<varlistentry>
<term><option>DHT = <replaceable>string</replaceable></option></term>
<listitem>
# to reload when a new request arrives.
UNLOAD_PACKAGES_CACHE = 5m
+# Refresh the DHT keys after this much time has passed.
+# This should be a time slightly less than the DHT's KEY_EXPIRE value.
+KEY_REFRESH = 57m
+
# Which DHT implementation to use.
# It must be possile to do "from <DHT>.DHT import DHT" to get a class that
# implements the IDHT interface.