From 40ef1ce5ded865bc9c339d15e667e87fc5775a7c Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 10 Apr 2008 23:40:04 -0700 Subject: [PATCH] Multiple peer downloading is mostly working now. Many bug fixes, too numerous to mention. Lots of new (probably temporary) logging. New test stuff for the multi-peer downloading. Still fails on large files when retrieving piece hashes from peers. --- apt_p2p/HTTPDownloader.py | 3 ++ apt_p2p/HTTPServer.py | 2 +- apt_p2p/PeerManager.py | 78 ++++++++++++++++++++++++++++++--------- test.py | 42 ++++++++++++++++----- 4 files changed, 96 insertions(+), 29 deletions(-) diff --git a/apt_p2p/HTTPDownloader.py b/apt_p2p/HTTPDownloader.py index e3eb66e..15ee564 100644 --- a/apt_p2p/HTTPDownloader.py +++ b/apt_p2p/HTTPDownloader.py @@ -45,6 +45,9 @@ class Peer(ClientFactory): self._downloadSpeeds = [] self._lastResponse = None self._responseTimes = [] + + def __repr__(self): + return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank) #{ Manage the request queue def connect(self): diff --git a/apt_p2p/HTTPServer.py b/apt_p2p/HTTPServer.py index c3c64b8..f3d6de7 100644 --- a/apt_p2p/HTTPServer.py +++ b/apt_p2p/HTTPServer.py @@ -227,7 +227,7 @@ class TopLevel(resource.Resource): log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr)) return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), () else: - log.msg('Hash could not be found in database: %s' % hash) + log.msg('Hash could not be found in database: %r' % hash) # Only local requests (apt) get past this point if request.remoteAddr.host != "127.0.0.1": diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index d041504..9bf4b4d 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -30,15 +30,15 @@ class GrowingFileStream(stream.FileStream): CHUNK_SIZE = 4*1024 - def __init__(self, f): + def __init__(self, f, length = None): stream.FileStream.__init__(self, f) - self.length = None + self.length = length self.deferred = None self.available = 0L self.position = 0L self.finished = False - def updateAvaliable(self, newlyAvailable): + def updateAvailable(self, newlyAvailable): """Update the number of bytes that are available. Call it with 0 to trigger reading of a fully read file. @@ -86,9 +86,13 @@ class GrowingFileStream(stream.FileStream): deferred.callback(b) else: # We're done + deferred = self.deferred + self.deferred = None deferred.callback(None) else: # We're done + deferred = self.deferred + self.deferred = None deferred.callback(None) def read(self, sendfile=False): @@ -123,7 +127,7 @@ class GrowingFileStream(stream.FileStream): self.position += bytesRead return b -class StreamToFile(defer.Deferred): +class StreamToFile: """Save a stream to a partial file and hash it. @type stream: L{twisted.web2.stream.IByteStream} @@ -168,6 +172,7 @@ class StreamToFile(defer.Deferred): @rtype: L{twisted.internet.defer.Deferred} """ + log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position)) self.doneDefer = stream.readStream(self.stream, self._gotData) self.doneDefer.addCallbacks(self._done, self._error) return self.doneDefer @@ -192,10 +197,12 @@ class StreamToFile(defer.Deferred): def _done(self, result): """Return the result.""" + log.msg('Streaming is complete') return self.hash.digest() def _error(self, err): """Log the error.""" + log.msg('Streaming error') log.err(err) return err @@ -260,21 +267,22 @@ class FileDownload: file.restat(False) if file.exists(): file.remove() - self.file = file.open('w') + self.file = file.open('w+') def run(self): """Start the downloading process.""" + log.msg('Checking for pieces for %s' % self.path) self.defer = defer.Deferred() self.peers = {} no_pieces = 0 - pieces_string = {} - pieces_hash = {} - pieces_dl_hash = {} + pieces_string = {0: 0} + pieces_hash = {0: 0} + pieces_dl_hash = {0: 0} for compact_peer in self.compact_peers: # Build a list of all the peers for this download site = uncompact(compact_peer['c']) - peer = manager.getPeer(site) + peer = self.manager.getPeer(site) self.peers.setdefault(site, {})['peer'] = peer # Extract any piece information from the peers list @@ -303,6 +311,7 @@ class FileDownload: if max_found == no_pieces: # The file is not split into pieces + log.msg('No pieces were found for the file') self.pieces = [] self.startDownload() elif max_found == max(pieces_string.values()): @@ -319,6 +328,7 @@ class FileDownload: for pieces, num in pieces_hash.items(): # Find the most popular piece hash to lookup if num == max_found: + log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces) self.getDHTPieces(pieces) break elif max_found == max(pieces_dl_hash.values()): @@ -326,6 +336,7 @@ class FileDownload: for pieces, num in pieces_hash.items(): # Find the most popular piece hash to download if num == max_found: + log.msg('Found a hash for pieces to lookup in peers: %r' % pieces) self.getPeerPieces(pieces) break return self.defer @@ -368,27 +379,34 @@ class FileDownload: @param key: the key to request from the peers """ if failedSite is None: + log.msg('Starting the lookup of piece hashes in peers') self.outstanding = 0 # Remove any peers with the wrong piece hash #for site in self.peers.keys(): # if self.peers[site].get('l', '') != key: # del self.peers[site] else: + log.msg('Piece hash lookup failed for peer %r' % (failedSite, )) self.peers[failedSite]['failed'] = True self.outstanding -= 1 if self.pieces is None: # Send a request to one or more peers + log.msg('Checking for a peer to request piece hashes from') for site in self.peers: if self.peers[site].get('failed', False) != True: + log.msg('Sending a piece hash request to %r' % (site, )) path = '/~/' + quote_plus(key) lookupDefer = self.peers[site]['peer'].get(path) - lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError, - callbackArgs=(key, site), errbackArgs=(key, site)) + reactor.callLater(0, lookupDefer.addCallbacks, + *(self._getPeerPieces, self._gotPeerError), + **{'callbackArgs': (key, site), + 'errbackArgs': (key, site)}) self.outstanding += 1 if self.outstanding >= 3: break + log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding) if self.pieces is None and self.outstanding == 0: # Continue without the piece hashes log.msg('Could not retrieve the piece hashes from the peers') @@ -397,32 +415,40 @@ class FileDownload: def _getPeerPieces(self, response, key, site): """Process the retrieved response from the peer.""" + log.msg('Got a piece hash response %d from %r' % (response.code, site)) if response.code != 200: # Request failed, try a different peer + log.msg('Did not like response %d from %r' % (response.code, site)) self.getPeerPieces(key, site) else: # Read the response stream to a string self.peers[site]['pieces'] = '' def _gotPeerPiece(data, self = self, site = site): + log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data))) self.peers[site]['pieces'] += data + log.msg('Streaming piece hashes from peer') df = stream.readStream(response.stream, _gotPeerPiece) df.addCallbacks(self._gotPeerPieces, self._gotPeerError, callbackArgs=(key, site), errbackArgs=(key, site)) def _gotPeerError(self, err, key, site): """Peer failed, try again.""" + log.msg('Peer piece hash request failed for %r' % (site, )) log.err(err) self.getPeerPieces(key, site) def _gotPeerPieces(self, result, key, site): """Check the retrieved pieces from the peer.""" + log.msg('Finished streaming piece hashes from peer %r' % (site, )) if self.pieces is not None: # Already done + log.msg('Already done') return try: result = bdecode(self.peers[site]['pieces']) except: + log.msg('Error bdecoding piece hashes') log.err() self.getPeerPieces(key, site) return @@ -431,7 +457,7 @@ class FileDownload: if result_hash == key: pieces = result['t'] self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)] - log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces)) + log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site)) self.startDownload() else: log.msg('Peer returned a piece string that did not match') @@ -455,6 +481,7 @@ class FileDownload: if self.started: return + log.msg('Starting to download %s' % self.path) self.started = True assert self.pieces is not None, "You must initialize the piece hashes first" self.peerlist = [self.peers[site]['peer'] for site in self.peers] @@ -466,7 +493,7 @@ class FileDownload: return # Start sending the return file - self.stream = GrowingFileStream(self.file) + self.stream = GrowingFileStream(self.file, self.hash.expSize) resp = Response(200, {}, self.stream) self.defer.callback(resp) @@ -482,13 +509,16 @@ class FileDownload: #{ Downloading the pieces def getPieces(self): """Download the next pieces from the peers.""" + log.msg('Checking for more piece requests to send') self.sort() piece = self.nextFinish while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces): + log.msg('Checking piece %d' % piece) if self.completePieces[piece] == False: # Send a request to the highest ranked peer peer = self.peerlist.pop() self.completePieces[piece] = peer + log.msg('Sending a request for piece %d to peer %r' % (piece, peer)) self.outstanding += 1 if self.pieces: @@ -499,23 +529,28 @@ class FileDownload: *(self._getPiece, self._getError), **{'callbackArgs': (piece, peer), 'errbackArgs': (piece, peer)}) - piece += 1 + piece += 1 - # Check if we're don + log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces))) + # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): + log.msg('We seem to be done with all pieces') self.stream.allAvailable() def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" + log.msg('Got response for piece %d from peer %r' % (piece, peer)) if ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer - peer.hashError() + log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer)) + peer.hashError('Peer responded with the wrong type of download: %r' % response.code) self.completePieces[piece] = False if response.stream and response.stream.length: stream.readAndDiscard(response.stream) else: # Read the response stream to the file + log.msg('Streaming piece %d from peer %r' % (piece, peer)) if response.code == 206: df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run() else: @@ -529,6 +564,7 @@ class FileDownload: def _getError(self, err, piece, peer): """Peer failed, try again.""" + log.msg('Got error for piece %d from peer %r' % (piece, peer)) self.outstanding -= 1 self.peerlist.append(peer) self.completePieces[piece] = False @@ -537,13 +573,16 @@ class FileDownload: def _gotPiece(self, response, piece, peer): """Process the retrieved piece from the peer.""" + log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response)) if ((self.pieces and response != self.pieces[piece]) or - (len(self.pieces) == 0 and response == self.hash.expected())): + (len(self.pieces) == 0 and response != self.hash.expected())): # Hash doesn't match - peer.hashError() + log.msg('Hash error for piece %d from peer %r' % (piece, peer)) + peer.hashError('Piece received from peer does not match expected') self.completePieces[piece] = False elif self.pieces: # Successfully completed one of several pieces + log.msg('Finished with piece %d from peer %r' % (piece, peer)) self.completePieces[piece] = True while (self.nextFinish < len(self.completePieces) and self.completePieces[self.nextFinish] == True): @@ -551,13 +590,16 @@ class FileDownload: self.stream.updateAvailable(PIECE_SIZE) else: # Whole download (only one piece) is complete + log.msg('Piece %d from peer %r is the last piece' % (piece, peer)) self.completePieces[piece] = True + self.nextFinish = 1 self.stream.updateAvailable(2**30) self.getPieces() def _gotError(self, err, piece, peer): """Piece download failed, try again.""" + log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response)) log.err(err) self.completePieces[piece] = False self.getPieces() diff --git a/test.py b/test.py index 3fb7057..f861b65 100755 --- a/test.py +++ b/test.py @@ -14,12 +14,12 @@ the apt-p2p program. and the apt-get commands to run (C{list}). The bootstrap nodes keys are integers, which must be in the range 1-9. - The values are the dictionary of string formatting values for creating - the apt-p2p configuration file (see L{apt_p2p_conf_template} below). + The values are the dictionary of keyword options to pass to the function + that starts the bootstrap node (see L{start_bootstrap} below). The downloaders keys are also integers in the range 1-99. The values are - the dictionary of string formatting values for creating the apt-p2p - configuration file (see L{apt_p2p_conf_template} below). + the dictionary of keyword options to pass to the function + that starts the downloader node (see L{start_downloader} below). The apt-get commands' list elements are tuples with 2 elements: the downloader to run the command on, and the list of command-line @@ -86,16 +86,16 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl 5: {}, 6: {}}, [(1, ['update']), + (2, ['update']), + (3, ['update']), (1, ['install', 'aboot-base']), (1, ['install', 'ada-reference-manual']), (1, ['install', 'fop-doc']), (1, ['install', 'doc-iana']), - (2, ['update']), (2, ['install', 'aboot-base']), (2, ['install', 'ada-reference-manual']), (2, ['install', 'fop-doc']), (2, ['install', 'doc-iana']), - (3, ['update']), (3, ['install', 'aboot-base']), (3, ['install', 'ada-reference-manual']), (3, ['install', 'fop-doc']), @@ -242,6 +242,29 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl ]), ]), + '9': ('Start a single bootstrap and 6 downloaders and test downloading' + + ' a very large file.', + {1: {}}, + {1: {'clean': False}, + 2: {'clean': False}, + 3: {}, + 4: {}, + 5: {}, + 6: {}}, + [(1, ['update']), + (1, ['install', 'kde-icons-oxygen']), + (2, ['update']), + (2, ['install', 'kde-icons-oxygen']), + (3, ['update']), + (3, ['install', 'kde-icons-oxygen']), + (4, ['update']), + (4, ['install', 'kde-icons-oxygen']), + (5, ['update']), + (5, ['install', 'kde-icons-oxygen']), + (6, ['update']), + (6, ['install', 'kde-icons-oxygen']), + ]), + } assert 'all' not in tests @@ -305,7 +328,6 @@ Debug NoLocking "false"; Acquire::Ftp "false"; // Show ftp command traffic Acquire::Http "false"; // Show http command traffic - Acquire::Debtorrent "false"; // Show http command traffic Acquire::gpgv "false"; // Show the gpgv traffic aptcdrom "false"; // Show found package files IdentCdrom "false"; @@ -623,7 +645,7 @@ def start_downloader(bootstrap_addresses, num_down, options = {}, # Create apt's config files f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w') - f.write('deb http://localhost:1%02d77/%s/ stable %s\n' % (num_down, mirror, suites)) + f.write('deb http://localhost:1%02d77/%s/ unstable %s\n' % (num_down, mirror, suites)) f.close() f = open(join([downloader_dir, 'etc', 'apt', 'apt.conf']), 'w') @@ -731,12 +753,12 @@ def run_test(bootstraps, downloaders, apt_get_queue): bootstrap_addresses += '\n ' + bootstrap_address(boot_keys[i]) for k, v in bootstraps.items(): - running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, v) + running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, **v) sleep(5) for k, v in downloaders.items(): - running_downloaders[k] = start_downloader(bootstrap_addresses, k, v) + running_downloaders[k] = start_downloader(bootstrap_addresses, k, **v) sleep(5) -- 2.39.5