from util import uncompact
from Hash import PIECE_SIZE
from apt_p2p_Khashmir.bencode import bdecode
+from apt_p2p_conf import config
+
+class PeerError(Exception):
+ """An error occurred downloading from peers."""
+
class GrowingFileStream(stream.FileStream):
"""Modified to stream data from a file as it becomes available.
def _gotData(self, data):
"""Process the received data."""
if self.outFile.closed:
- raise Exception, "outFile was unexpectedly closed"
+ raise PeerError, "outFile was unexpectedly closed"
if data is None:
- raise Exception, "Data is None?"
+ raise PeerError, "Data is None?"
# Make sure we don't go too far
if self.length is not None and self.position + len(data) > self.length:
self.compact_peers = compact_peers
self.path = '/~/' + quote_plus(hash.expected())
+ self.mirror_path = None
self.pieces = None
self.started = False
if max_found == no_pieces:
# The file is not split into pieces
log.msg('No pieces were found for the file')
- self.pieces = []
+ self.pieces = [self.hash.expected()]
self.startDownload()
elif max_found == max(pieces_string.values()):
# Small number of pieces in a string
# Start the DHT lookup
lookupDefer = self.manager.dht.getValue(key)
- lookupDefer.addCallback(self._getDHTPieces, key)
+ lookupDefer.addBoth(self._getDHTPieces, key)
def _getDHTPieces(self, results, key):
"""Check the retrieved values."""
- for result in results:
- # Make sure the hash matches the key
- result_hash = sha.new(result.get('t', '')).digest()
- if result_hash == key:
- pieces = result['t']
- self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
- self.startDownload()
- return
+ if isinstance(results, list):
+ for result in results:
+ # Make sure the hash matches the key
+ result_hash = sha.new(result.get('t', '')).digest()
+ if result_hash == key:
+ pieces = result['t']
+ self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+ self.startDownload()
+ return
+
+ log.msg('Could not retrieve the piece hashes from the DHT')
+ else:
+ log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
# Continue without the piece hashes
- log.msg('Could not retrieve the piece hashes from the DHT')
- self.pieces = []
+ self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
def getPeerPieces(self, key, failedSite = None):
if self.pieces is None:
# Send a request to one or more peers
- log.msg('Checking for a peer to request piece hashes from')
for site in self.peers:
if self.peers[site].get('failed', False) != True:
log.msg('Sending a piece hash request to %r' % (site, ))
**{'callbackArgs': (key, site),
'errbackArgs': (key, site)})
self.outstanding += 1
- if self.outstanding >= 3:
+ if self.outstanding >= 4:
break
- log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
if self.pieces is None and self.outstanding <= 0:
# Continue without the piece hashes
log.msg('Could not retrieve the piece hashes from the peers')
- self.pieces = []
+ self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
def _getPeerPieces(self, response, key, site):
log.msg('Got a piece hash response %d from %r' % (response.code, site))
if response.code != 200:
# Request failed, try a different peer
- log.msg('Did not like response %d from %r' % (response.code, site))
self.getPeerPieces(key, site)
else:
# Read the response stream to a string
log.msg('Starting to download %s' % self.path)
self.started = True
- assert self.pieces is not None, "You must initialize the piece hashes first"
+ assert self.pieces, "You must initialize the piece hashes first"
self.peerlist = [self.peers[site]['peer'] for site in self.peers]
+ # Use the mirror if there are few peers
+ if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+ parsed = urlparse(self.mirror)
+ if parsed[0] == "http":
+ site = splitHostPort(parsed[0], parsed[1])
+ self.mirror_path = urlunparse(('', '') + parsed[2:])
+ peer = self.manager.getPeer(site, mirror = True)
+ self.peerlist.append(peer)
+
# Special case if there's only one good peer left
- if len(self.peerlist) == 1:
- log.msg('Downloading from peer %r' % (self.peerlist[0], ))
- self.defer.callback(self.peerlist[0].get(self.path))
- return
+# if len(self.peerlist) == 1:
+# log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+# self.defer.callback(self.peerlist[0].get(self.path))
+# return
# Start sending the return file
self.stream = GrowingFileStream(self.file, self.hash.expSize)
# Begin to download the pieces
self.outstanding = 0
self.nextFinish = 0
- if self.pieces:
- self.completePieces = [False for piece in self.pieces]
- else:
- self.completePieces = [False]
+ self.completePieces = [False for piece in self.pieces]
self.getPieces()
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
- log.msg('Checking for more piece requests to send')
self.sort()
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
- log.msg('Checking piece %d' % piece)
if self.completePieces[piece] == False:
# Send a request to the highest ranked peer
peer = self.peerlist.pop()
log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
self.outstanding += 1
- if self.pieces:
- df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ if peer.mirror:
+ df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
else:
- df = peer.get(self.path)
+ df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
reactor.callLater(0, df.addCallbacks,
*(self._getPiece, self._getError),
**{'callbackArgs': (piece, peer),
'errbackArgs': (piece, peer)})
piece += 1
- log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
# Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
log.msg('We seem to be done with all pieces')
# Read the response stream to the file
log.msg('Streaming piece %d from peer %r' % (piece, peer))
if response.code == 206:
- df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+ PIECE_SIZE).run()
else:
df = StreamToFile(response.stream, self.file).run()
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ reactor.callLater(0, df.addCallbacks,
+ *(self._gotPiece, self._gotError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
self.outstanding -= 1
self.peerlist.append(peer)
def _gotPiece(self, response, piece, peer):
"""Process the retrieved piece from the peer."""
log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
- if ((self.pieces and response != self.pieces[piece]) or
- (len(self.pieces) == 0 and response != self.hash.expected())):
+ if self.pieces[piece] and response != self.pieces[piece]:
# Hash doesn't match
log.msg('Hash error for piece %d from peer %r' % (piece, peer))
peer.hashError('Piece received from peer does not match expected')
self.completePieces[piece] = False
- elif self.pieces:
+ else:
# Successfully completed one of several pieces
log.msg('Finished with piece %d from peer %r' % (piece, peer))
self.completePieces[piece] = True
self.completePieces[self.nextFinish] == True):
self.nextFinish += 1
self.stream.updateAvailable(PIECE_SIZE)
- else:
- # Whole download (only one piece) is complete
- log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
- self.completePieces[piece] = True
- self.nextFinish = 1
- self.stream.updateAvailable(2**30)
self.getPieces()
@ivar cache_dir: the directory to use for storing all files
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""
- def __init__(self, cache_dir, dht):
+ def __init__(self, cache_dir, dht, stats):
"""Initialize the instance."""
self.cache_dir = cache_dir
self.cache_dir.restat(False)
if not self.cache_dir.exists():
self.cache_dir.makedirs()
self.dht = dht
+ self.stats = stats
self.clients = {}
def get(self, hash, mirror, peers = [], method="GET", modtime=None):
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
- elif len(peers) == 1:
- site = uncompact(peers[0]['c'])
- log.msg('Downloading from peer %r' % (site, ))
- path = '/~/' + quote_plus(hash.expected())
- peer = self.getPeer(site)
- return peer.get(path)
+# elif len(peers) == 1:
+# site = uncompact(peers[0]['c'])
+# log.msg('Downloading from peer %r' % (site, ))
+# path = '/~/' + quote_plus(hash.expected())
+# peer = self.getPeer(site)
+# return peer.get(path)
else:
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):