from binascii import b2a_hex
from urlparse import urlunparse
+from urllib import unquote
import os, re, sha
from twisted.internet import defer, reactor
from twisted.python import log, failure
from twisted.python.filepath import FilePath
+from interfaces import IDHT, IDHTStats
from apt_p2p_conf import config
from PeerManager import PeerManager
from HTTPServer import TopLevel
Contains all of the sub-components that do all the low-level work, and
coordinates communication between them.
+ @type dhtClass: L{interfaces.IDHT}
+ @ivar dhtClass: the DHT class to use
@type cache_dir: L{twisted.python.filepath.FilePath}
@ivar cache_dir: the directory to use for storing all files
@type db: L{db.DB}
@ivar db: the database to use for tracking files and hashes
@type dht: L{interfaces.IDHT}
- @ivar dht: the DHT instance to use
+ @ivar dht: the DHT instance
@type http_server: L{HTTPServer.TopLevel}
@ivar http_server: the web server that will handle all requests from apt
and from other peers
download information (IP address and port)
"""
- def __init__(self, dht):
+ def __init__(self, dhtClass):
"""Initialize all the sub-components.
- @type dht: L{interfaces.IDHT}
- @param dht: the DHT instance to use
+ @type dhtClass: L{interfaces.IDHT}
+ @param dhtClass: the DHT class to use
"""
log.msg('Initializing the main apt_p2p application')
- self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
+ self.dhtClass = dhtClass
+ self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
if not self.cache_dir.child(download_dir).exists():
self.cache_dir.child(download_dir).makedirs()
self.db = DB(self.cache_dir.child('apt-p2p.db'))
- self.dht = dht
+ self.dht = dhtClass()
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
self.getHTTPFactory = self.http_server.getHTTPFactory
- self.peers = PeerManager()
- self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
- other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
- self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
+ self.peers = PeerManager(self.cache_dir, self.dht)
+ self.mirrors = MirrorManager(self.cache_dir)
+ self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_contact = None
#{ DHT maintenance
storeDefer.addBoth(self._refreshFiles, hashes)
else:
reactor.callLater(60, self.refreshFiles)
+
+ def getStats(self):
+ """Retrieve and format the statistics for the program.
+
+ @rtype: C{string}
+ @return: the formatted HTML page containing the statistics
+ """
+ out = '<html><body>\n\n'
+ if IDHTStats.implementedBy(self.dhtClass):
+ out += self.dht.getStats()
+ out += '\n</body></html>\n'
+ return out
#{ Main workflow
def check_freshness(self, req, url, modtime, resp):
d = defer.Deferred()
log.msg('Trying to find hash for %s' % url)
- findDefer = self.mirrors.findHash(url)
+ findDefer = self.mirrors.findHash(unquote(url))
findDefer.addCallbacks(self.findHash_done, self.findHash_error,
callbackArgs=(req, url, d), errbackArgs=(req, url, d))
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
- lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
+ lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
def lookupHash_done(self, values, hash, url, d):
"""Start the download of the file.
@param values: the returned values from the DHT containing peer
download information
"""
- if not values:
- log.msg('Peers for %s were not found' % url)
+ if not isinstance(values, list) or not values:
+ if not isinstance(values, list):
+ log.msg('DHT lookup for %s failed with error %r' % (url, values))
+ else:
+ log.msg('Peers for %s were not found' % url)
getDefer = self.peers.get(hash, url)
getDefer.addCallback(self.cache.save_file, hash, url)
getDefer.addErrback(self.cache.save_error, url)
value['l'] = sha.new(''.join(pieces)).digest()
storeDefer = self.dht.storeValue(key, value)
- storeDefer.addCallback(self.store_done, hash)
+ storeDefer.addCallbacks(self.store_done, self.store_error,
+ callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
return storeDefer
def store_done(self, result, hash):
value = {'t': ''.join(pieces)}
storeDefer = self.dht.storeValue(key, value)
- storeDefer.addCallback(self.store_torrent_done, key)
+ storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
+ callbackArgs = (key, ), errbackArgs = (key, ))
return storeDefer
return result
"""Adding the file to the DHT is complete, and so is the workflow."""
log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
return result
+
+ def store_error(self, err, key):
+ """Adding to the DHT failed."""
+ log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
+ return err
\ No newline at end of file