]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Better identification of peer uploads by the HTTP server.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index e9d1ecb590702de992bec32c14f3803fa0d6b6b9..737669751da78ee9a3242996aed50e41dc5aa18b 100644 (file)
@@ -356,22 +356,26 @@ class FileDownload:
 
         # Start the DHT lookup
         lookupDefer = self.manager.dht.getValue(key)
-        lookupDefer.addCallback(self._getDHTPieces, key)
+        lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
         """Check the retrieved values."""
-        for result in results:
-            # Make sure the hash matches the key
-            result_hash = sha.new(result.get('t', '')).digest()
-            if result_hash == key:
-                pieces = result['t']
-                self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-                log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
-                self.startDownload()
-                return
+        if isinstance(results, list):
+            for result in results:
+                # Make sure the hash matches the key
+                result_hash = sha.new(result.get('t', '')).digest()
+                if result_hash == key:
+                    pieces = result['t']
+                    self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+                    self.startDownload()
+                    return
+                
+            log.msg('Could not retrieve the piece hashes from the DHT')
+        else:
+            log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
             
         # Continue without the piece hashes
-        log.msg('Could not retrieve the piece hashes from the DHT')
         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
         self.startDownload()
 
@@ -494,8 +498,7 @@ class FileDownload:
             if parsed[0] == "http":
                 site = splitHostPort(parsed[0], parsed[1])
                 self.mirror_path = urlunparse(('', '') + parsed[2:])
-                peer = self.manager.getPeer(site)
-                peer.mirror = True
+                peer = self.manager.getPeer(site, mirror = True)
                 self.peerlist.append(peer)
         
         # Special case if there's only one good peer left
@@ -561,11 +564,14 @@ class FileDownload:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+                                  PIECE_SIZE).run()
             else:
                 df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -613,17 +619,20 @@ class PeerManager:
     @ivar cache_dir: the directory to use for storing all files
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -649,8 +658,7 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
-            peer.mirror = True
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
 #        elif len(peers) == 1:
 #            site = uncompact(peers[0]['c'])
@@ -662,14 +670,18 @@ class PeerManager:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):