]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
PeerManager keeps a list of site names, peers are kept in a dictionary.
authorCameron Dale <camrdale@gmail.com>
Fri, 9 May 2008 00:28:07 +0000 (17:28 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 9 May 2008 00:28:07 +0000 (17:28 -0700)
apt_p2p/PeerManager.py

index cfc0f78103e5a2ce8fed2f0084d71cf2c8f253c3..0cbee1f6d45720111787f0214a26bc225341c42e 100644 (file)
@@ -288,12 +288,12 @@ class FileDownload:
         """Sort the peers by their rank (highest ranked at the end)."""
         def sort(a, b):
             """Sort peers by their rank."""
-            if a.rank > b.rank:
+            if self.peers[a]['peer'].rank > self.peers[b]['peer'].rank:
                 return 1
-            elif a.rank < b.rank:
+            elif self.peers[a]['peer'].rank < self.peers[b]['peer'].rank:
                 return -1
             return 0
-        self.peerlist.sort(sort)
+        self.sitelist.sort(sort)
 
     def startDownload(self):
         """Start the download from the peers."""
@@ -304,21 +304,23 @@ class FileDownload:
         log.msg('Starting to download %s' % self.path)
         self.started = True
         assert self.pieces, "You must initialize the piece hashes first"
-        self.peerlist = [self.peers[site]['peer'] for site in self.peers]
         
         # Use the mirror if there are few peers
-        if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+        if len(self.peers) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
             parsed = urlparse(self.mirror)
             if parsed[0] == "http":
                 site = splitHostPort(parsed[0], parsed[1])
                 self.mirror_path = urlunparse(('', '') + parsed[2:])
                 peer = self.manager.getPeer(site, mirror = True)
-                self.peerlist.append(peer)
+                self.peers[site] = {}
+                self.peers[site]['peer'] = peer
+        
+        self.sitelist = self.peers.keys()
         
         # Special case if there's only one good peer left
-#        if len(self.peerlist) == 1:
-#            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
-#            self.defer.callback(self.peerlist[0].get(self.path))
+#        if len(self.sitelist) == 1:
+#            log.msg('Downloading from peer %r' % (self.peers[self.sitelist[0]]['peer'], ))
+#            self.defer.callback(self.peers[self.sitelist[0]]['peer'].get(self.path))
 #            return
         
         # Begin to download the pieces
@@ -337,25 +339,25 @@ class FileDownload:
             
         self.sort()
         piece = self.nextFinish
-        while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+        while self.outstanding < 4 and self.sitelist and piece < len(self.completePieces):
             if self.completePieces[piece] == False:
                 # Send a request to the highest ranked peer
-                peer = self.peerlist.pop()
-                self.completePieces[piece] = peer
-                log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
+                site = self.sitelist.pop()
+                self.completePieces[piece] = site
+                log.msg('Sending a request for piece %d to peer %r' % (piece, self.peers[site]['peer']))
                 
                 self.outstanding += 1
                 path = self.path
-                if peer.mirror:
+                if self.peers[site]['peer'].mirror:
                     path = self.mirror_path
                 if len(self.completePieces) > 1:
-                    df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    df = self.peers[site]['peer'].getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
-                    df = peer.get(path)
+                    df = self.peers[site]['peer'].get(path)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
-                                  **{'callbackArgs': (piece, peer),
-                                     'errbackArgs': (piece, peer)})
+                                  **{'callbackArgs': (piece, site),
+                                     'errbackArgs': (piece, site)})
             piece += 1
                 
         # Check if we're done
@@ -363,13 +365,13 @@ class FileDownload:
             log.msg('Download is complete for %s' % self.path)
             self.stream.allAvailable(remove = True)
     
-    def _getPiece(self, response, piece, peer):
+    def _getPiece(self, response, piece, site):
         """Process the retrieved headers from the peer."""
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
-            log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
-            peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
+            log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, self.peers[site]['peer']))
+            self.peers[site]['peer'].hashError('Peer responded with the wrong type of download: %r' % response.code)
             self.completePieces[piece] = False
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
@@ -388,7 +390,7 @@ class FileDownload:
                 df.callback(resp)
 
             # Read the response stream to the file
-            log.msg('Streaming piece %d from peer %r' % (piece, peer))
+            log.msg('Streaming piece %d from peer %r' % (piece, self.peers[site]['peer']))
             if response.code == 206:
                 df = StreamToFile(self.hash.newPieceHasher(), response.stream,
                                   self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
@@ -397,32 +399,32 @@ class FileDownload:
                                   self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
-                              **{'callbackArgs': (piece, peer),
-                                 'errbackArgs': (piece, peer)})
+                              **{'callbackArgs': (piece, site),
+                                 'errbackArgs': (piece, site)})
 
         self.outstanding -= 1
-        self.peerlist.append(peer)
+        self.sitelist.append(site)
         self.getPieces()
 
-    def _getError(self, err, piece, peer):
+    def _getError(self, err, piece, site):
         """Peer failed, try again."""
-        log.msg('Got error for piece %d from peer %r' % (piece, peer))
+        log.msg('Got error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
         self.outstanding -= 1
-        self.peerlist.append(peer)
+        self.sitelist.append(site)
         self.completePieces[piece] = False
         self.getPieces()
         log.err(err)
 
-    def _gotPiece(self, hash, piece, peer):
+    def _gotPiece(self, hash, piece, site):
         """Process the retrieved piece from the peer."""
         if self.pieces[piece] and hash.digest() != self.pieces[piece]:
             # Hash doesn't match
-            log.msg('Hash error for piece %d from peer %r' % (piece, peer))
-            peer.hashError('Piece received from peer does not match expected')
+            log.msg('Hash error for piece %d from peer %r' % (piece, self.peers[site]['peer']))
+            self.peers[site]['peer'].hashError('Piece received from peer does not match expected')
             self.completePieces[piece] = False
         else:
             # Successfully completed one of several pieces
-            log.msg('Finished with piece %d from peer %r' % (piece, peer))
+            log.msg('Finished with piece %d from peer %r' % (piece, self.peers[site]['peer']))
             self.completePieces[piece] = True
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
@@ -431,9 +433,9 @@ class FileDownload:
 
         self.getPieces()
 
-    def _gotError(self, err, piece, peer):
+    def _gotError(self, err, piece, site):
         """Piece download failed, try again."""
-        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, err))
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, self.peers[site]['peer'], err))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()