Multiple peer downloading is mostly working now.
authorCameron Dale <camrdale@gmail.com>
Fri, 11 Apr 2008 06:40:04 +0000 (23:40 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 11 Apr 2008 06:40:04 +0000 (23:40 -0700)
Many bug fixes, too numerous to mention.
Lots of new (probably temporary) logging.
New test stuff for the multi-peer downloading.
Still fails on large files when retrieving piece hashes from peers.

apt_p2p/HTTPDownloader.py
apt_p2p/HTTPServer.py
apt_p2p/PeerManager.py
test.py

index e3eb66e..15ee564 100644 (file)
@@ -45,6 +45,9 @@ class Peer(ClientFactory):
         self._downloadSpeeds = []
         self._lastResponse = None
         self._responseTimes = []
+    
+    def __repr__(self):
+        return "(%s, %d, %0.5f)" % (self.host, self.port, self.rank)
         
     #{ Manage the request queue
     def connect(self):
index c3c64b8..f3d6de7 100644 (file)
@@ -227,7 +227,7 @@ class TopLevel(resource.Resource):
                     log.msg('Sending torrent string %s to %s' % (b2a_hex(hash), request.remoteAddr))
                     return static.Data(bencode({'t': files[0]['pieces']}), 'application/x-bencoded'), ()
             else:
-                log.msg('Hash could not be found in database: %s' % hash)
+                log.msg('Hash could not be found in database: %r' % hash)
 
         # Only local requests (apt) get past this point
         if request.remoteAddr.host != "127.0.0.1":
index d041504..9bf4b4d 100644 (file)
@@ -30,15 +30,15 @@ class GrowingFileStream(stream.FileStream):
 
     CHUNK_SIZE = 4*1024
 
-    def __init__(self, f):
+    def __init__(self, f, length = None):
         stream.FileStream.__init__(self, f)
-        self.length = None
+        self.length = length
         self.deferred = None
         self.available = 0L
         self.position = 0L
         self.finished = False
 
-    def updateAvaliable(self, newlyAvailable):
+    def updateAvailable(self, newlyAvailable):
         """Update the number of bytes that are available.
         
         Call it with 0 to trigger reading of a fully read file.
@@ -86,9 +86,13 @@ class GrowingFileStream(stream.FileStream):
                     deferred.callback(b)
                 else:
                     # We're done
+                    deferred = self.deferred
+                    self.deferred = None
                     deferred.callback(None)
             else:
                 # We're done
+                deferred = self.deferred
+                self.deferred = None
                 deferred.callback(None)
         
     def read(self, sendfile=False):
@@ -123,7 +127,7 @@ class GrowingFileStream(stream.FileStream):
             self.position += bytesRead
             return b
 
-class StreamToFile(defer.Deferred):
+class StreamToFile:
     """Save a stream to a partial file and hash it.
     
     @type stream: L{twisted.web2.stream.IByteStream}
@@ -168,6 +172,7 @@ class StreamToFile(defer.Deferred):
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
+        log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
         self.doneDefer = stream.readStream(self.stream, self._gotData)
         self.doneDefer.addCallbacks(self._done, self._error)
         return self.doneDefer
@@ -192,10 +197,12 @@ class StreamToFile(defer.Deferred):
         
     def _done(self, result):
         """Return the result."""
+        log.msg('Streaming is complete')
         return self.hash.digest()
     
     def _error(self, err):
         """Log the error."""
+        log.msg('Streaming error')
         log.err(err)
         return err
     
@@ -260,21 +267,22 @@ class FileDownload:
         file.restat(False)
         if file.exists():
             file.remove()
-        self.file = file.open('w')
+        self.file = file.open('w+')
 
     def run(self):
         """Start the downloading process."""
+        log.msg('Checking for pieces for %s' % self.path)
         self.defer = defer.Deferred()
         self.peers = {}
         no_pieces = 0
-        pieces_string = {}
-        pieces_hash = {}
-        pieces_dl_hash = {}
+        pieces_string = {0: 0}
+        pieces_hash = {0: 0}
+        pieces_dl_hash = {0: 0}
 
         for compact_peer in self.compact_peers:
             # Build a list of all the peers for this download
             site = uncompact(compact_peer['c'])
-            peer = manager.getPeer(site)
+            peer = self.manager.getPeer(site)
             self.peers.setdefault(site, {})['peer'] = peer
 
             # Extract any piece information from the peers list
@@ -303,6 +311,7 @@ class FileDownload:
 
         if max_found == no_pieces:
             # The file is not split into pieces
+            log.msg('No pieces were found for the file')
             self.pieces = []
             self.startDownload()
         elif max_found == max(pieces_string.values()):
@@ -319,6 +328,7 @@ class FileDownload:
             for pieces, num in pieces_hash.items():
                 # Find the most popular piece hash to lookup
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
                     self.getDHTPieces(pieces)
                     break
         elif max_found == max(pieces_dl_hash.values()):
@@ -326,6 +336,7 @@ class FileDownload:
             for pieces, num in pieces_hash.items():
                 # Find the most popular piece hash to download
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
                     self.getPeerPieces(pieces)
                     break
         return self.defer
@@ -368,27 +379,34 @@ class FileDownload:
         @param key: the key to request from the peers
         """
         if failedSite is None:
+            log.msg('Starting the lookup of piece hashes in peers')
             self.outstanding = 0
             # Remove any peers with the wrong piece hash
             #for site in self.peers.keys():
             #    if self.peers[site].get('l', '') != key:
             #        del self.peers[site]
         else:
+            log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
             self.peers[failedSite]['failed'] = True
             self.outstanding -= 1
 
         if self.pieces is None:
             # Send a request to one or more peers
+            log.msg('Checking for a peer to request piece hashes from')
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
+                    log.msg('Sending a piece hash request to %r' % (site, ))
                     path = '/~/' + quote_plus(key)
                     lookupDefer = self.peers[site]['peer'].get(path)
-                    lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
-                                             callbackArgs=(key, site), errbackArgs=(key, site))
+                    reactor.callLater(0, lookupDefer.addCallbacks,
+                                      *(self._getPeerPieces, self._gotPeerError),
+                                      **{'callbackArgs': (key, site),
+                                         'errbackArgs': (key, site)})
                     self.outstanding += 1
                     if self.outstanding >= 3:
                         break
         
+        log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
         if self.pieces is None and self.outstanding == 0:
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
@@ -397,32 +415,40 @@ class FileDownload:
         
     def _getPeerPieces(self, response, key, site):
         """Process the retrieved response from the peer."""
+        log.msg('Got a piece hash response %d from %r' % (response.code, site))
         if response.code != 200:
             # Request failed, try a different peer
+            log.msg('Did not like response %d from %r' % (response.code, site))
             self.getPeerPieces(key, site)
         else:
             # Read the response stream to a string
             self.peers[site]['pieces'] = ''
             def _gotPeerPiece(data, self = self, site = site):
+                log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
                 self.peers[site]['pieces'] += data
+            log.msg('Streaming piece hashes from peer')
             df = stream.readStream(response.stream, _gotPeerPiece)
             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
                             callbackArgs=(key, site), errbackArgs=(key, site))
 
     def _gotPeerError(self, err, key, site):
         """Peer failed, try again."""
+        log.msg('Peer piece hash request failed for %r' % (site, ))
         log.err(err)
         self.getPeerPieces(key, site)
 
     def _gotPeerPieces(self, result, key, site):
         """Check the retrieved pieces from the peer."""
+        log.msg('Finished streaming piece hashes from peer %r' % (site, ))
         if self.pieces is not None:
             # Already done
+            log.msg('Already done')
             return
         
         try:
             result = bdecode(self.peers[site]['pieces'])
         except:
+            log.msg('Error bdecoding piece hashes')
             log.err()
             self.getPeerPieces(key, site)
             return
@@ -431,7 +457,7 @@ class FileDownload:
         if result_hash == key:
             pieces = result['t']
             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-            log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+            log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
             self.startDownload()
         else:
             log.msg('Peer returned a piece string that did not match')
@@ -455,6 +481,7 @@ class FileDownload:
         if self.started:
             return
         
+        log.msg('Starting to download %s' % self.path)
         self.started = True
         assert self.pieces is not None, "You must initialize the piece hashes first"
         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
@@ -466,7 +493,7 @@ class FileDownload:
             return
         
         # Start sending the return file
-        self.stream = GrowingFileStream(self.file)
+        self.stream = GrowingFileStream(self.file, self.hash.expSize)
         resp = Response(200, {}, self.stream)
         self.defer.callback(resp)
 
@@ -482,13 +509,16 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
+        log.msg('Checking for more piece requests to send')
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+            log.msg('Checking piece %d' % piece)
             if self.completePieces[piece] == False:
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
                 self.completePieces[piece] = peer
+                log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
                 if self.pieces:
@@ -499,23 +529,28 @@ class FileDownload:
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
                                      'errbackArgs': (piece, peer)})
-                piece += 1
+            piece += 1
                 
-        # Check if we're don
+        log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
+        # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+            log.msg('We seem to be done with all pieces')
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
+        log.msg('Got response for piece %d from peer %r' % (piece, peer))
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
-            peer.hashError()
+            log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+            peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
             self.completePieces[piece] = False
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
         else:
             # Read the response stream to the file
+            log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
             else:
@@ -529,6 +564,7 @@ class FileDownload:
 
     def _getError(self, err, piece, peer):
         """Peer failed, try again."""
+        log.msg('Got error for piece %d from peer %r' % (piece, peer))
         self.outstanding -= 1
         self.peerlist.append(peer)
         self.completePieces[piece] = False
@@ -537,13 +573,16 @@ class FileDownload:
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
+        log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
         if ((self.pieces and response != self.pieces[piece]) or
-            (len(self.pieces) == 0 and response == self.hash.expected())):
+            (len(self.pieces) == 0 and response != self.hash.expected())):
             # Hash doesn't match
-            peer.hashError()
+            log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+            peer.hashError('Piece received from peer does not match expected')
             self.completePieces[piece] = False
         elif self.pieces:
             # Successfully completed one of several pieces
+            log.msg('Finished with piece %d from peer %r' % (piece, peer))
             self.completePieces[piece] = True
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
@@ -551,13 +590,16 @@ class FileDownload:
                 self.stream.updateAvailable(PIECE_SIZE)
         else:
             # Whole download (only one piece) is complete
+            log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
             self.completePieces[piece] = True
+            self.nextFinish = 1
             self.stream.updateAvailable(2**30)
 
         self.getPieces()
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
diff --git a/test.py b/test.py
index 3fb7057..f861b65 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -14,12 +14,12 @@ the apt-p2p program.
     and the apt-get commands to run (C{list}).
     
     The bootstrap nodes keys are integers, which must be in the range 1-9.
-    The values are the dictionary of string formatting values for creating
-    the apt-p2p configuration file (see L{apt_p2p_conf_template} below).
+    The values are the dictionary of keyword options to pass to the function
+    that starts the bootstrap node (see L{start_bootstrap} below).
     
     The downloaders keys are also integers in the range 1-99. The values are
-    the dictionary of string formatting values for creating the apt-p2p
-    configuration file (see L{apt_p2p_conf_template} below).
+    the dictionary of keyword options to pass to the function
+    that starts the downloader node (see L{start_downloader} below).
     
     The apt-get commands' list elements are tuples with 2 elements: the
     downloader to run the command on, and the list of command-line
@@ -86,16 +86,16 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
                 5: {},
                 6: {}},
                [(1, ['update']),
+                (2, ['update']),
+                (3, ['update']),
                 (1, ['install', 'aboot-base']),
                 (1, ['install', 'ada-reference-manual']),
                 (1, ['install', 'fop-doc']),
                 (1, ['install', 'doc-iana']),
-                (2, ['update']),
                 (2, ['install', 'aboot-base']),
                 (2, ['install', 'ada-reference-manual']),
                 (2, ['install', 'fop-doc']),
                 (2, ['install', 'doc-iana']),
-                (3, ['update']),
                 (3, ['install', 'aboot-base']),
                 (3, ['install', 'ada-reference-manual']),
                 (3, ['install', 'fop-doc']),
@@ -242,6 +242,29 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
                    ]),
               ]),
 
+         '9': ('Start a single bootstrap and 6 downloaders and test downloading' +
+               ' a very large file.',
+               {1: {}},
+               {1: {'clean': False},
+                2: {'clean': False},
+                3: {},
+                4: {},
+                5: {},
+                6: {}},
+               [(1, ['update']),
+                (1, ['install', 'kde-icons-oxygen']),
+                (2, ['update']),
+                (2, ['install', 'kde-icons-oxygen']),
+                (3, ['update']),
+                (3, ['install', 'kde-icons-oxygen']),
+                (4, ['update']),
+                (4, ['install', 'kde-icons-oxygen']),
+                (5, ['update']),
+                (5, ['install', 'kde-icons-oxygen']),
+                (6, ['update']),
+                (6, ['install', 'kde-icons-oxygen']),
+                ]),
+
          }
 
 assert 'all' not in tests
@@ -305,7 +328,6 @@ Debug
   NoLocking "false";
   Acquire::Ftp "false";    // Show ftp command traffic
   Acquire::Http "false";   // Show http command traffic
-  Acquire::Debtorrent "false";   // Show http command traffic
   Acquire::gpgv "false";   // Show the gpgv traffic
   aptcdrom "false";        // Show found package files
   IdentCdrom "false";
@@ -623,7 +645,7 @@ def start_downloader(bootstrap_addresses, num_down, options = {},
 
         # Create apt's config files
         f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w')
-        f.write('deb http://localhost:1%02d77/%s/ stable %s\n' % (num_down, mirror, suites))
+        f.write('deb http://localhost:1%02d77/%s/ unstable %s\n' % (num_down, mirror, suites))
         f.close()
 
         f = open(join([downloader_dir, 'etc', 'apt', 'apt.conf']), 'w')
@@ -731,12 +753,12 @@ def run_test(bootstraps, downloaders, apt_get_queue):
             bootstrap_addresses += '\n      ' + bootstrap_address(boot_keys[i])
             
         for k, v in bootstraps.items():
-            running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, v)
+            running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, **v)
         
         sleep(5)
         
         for k, v in downloaders.items():
-            running_downloaders[k] = start_downloader(bootstrap_addresses, k, v)
+            running_downloaders[k] = start_downloader(bootstrap_addresses, k, **v)
     
         sleep(5)