]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Remove some unnecessary log messages and use better Exceptions.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index e19c7b40dc0e5b691f2eea00d01928dab3590e37..d4c7aace9ad04dce45fceb1ac944c61803a7cc96 100644 (file)
@@ -17,7 +17,12 @@ from HTTPDownloader import Peer
 from util import uncompact
 from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
+from apt_p2p_conf import config
 
+
+class PeerError(Exception):
+    """An error occurred downloading from peers."""
+    
 class GrowingFileStream(stream.FileStream):
     """Modified to stream data from a file as it becomes available.
     
@@ -180,10 +185,10 @@ class StreamToFile:
     def _gotData(self, data):
         """Process the received data."""
         if self.outFile.closed:
-            raise Exception, "outFile was unexpectedly closed"
+            raise PeerError, "outFile was unexpectedly closed"
         
         if data is None:
-            raise Exception, "Data is None?"
+            raise PeerError, "Data is None?"
         
         # Make sure we don't go too far
         if self.length is not None and self.position + len(data) > self.length:
@@ -261,6 +266,7 @@ class FileDownload:
         self.compact_peers = compact_peers
         
         self.path = '/~/' + quote_plus(hash.expected())
+        self.mirror_path = None
         self.pieces = None
         self.started = False
         
@@ -354,22 +360,26 @@ class FileDownload:
 
         # Start the DHT lookup
         lookupDefer = self.manager.dht.getValue(key)
-        lookupDefer.addCallback(self._getDHTPieces, key)
+        lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
         """Check the retrieved values."""
-        for result in results:
-            # Make sure the hash matches the key
-            result_hash = sha.new(result.get('t', '')).digest()
-            if result_hash == key:
-                pieces = result['t']
-                self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-                log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
-                self.startDownload()
-                return
+        if isinstance(results, list):
+            for result in results:
+                # Make sure the hash matches the key
+                result_hash = sha.new(result.get('t', '')).digest()
+                if result_hash == key:
+                    pieces = result['t']
+                    self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+                    self.startDownload()
+                    return
+                
+            log.msg('Could not retrieve the piece hashes from the DHT')
+        else:
+            log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
             
         # Continue without the piece hashes
-        log.msg('Could not retrieve the piece hashes from the DHT')
         self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
         self.startDownload()
 
@@ -392,7 +402,6 @@ class FileDownload:
 
         if self.pieces is None:
             # Send a request to one or more peers
-            log.msg('Checking for a peer to request piece hashes from')
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
                     log.msg('Sending a piece hash request to %r' % (site, ))
@@ -406,7 +415,6 @@ class FileDownload:
                     if self.outstanding >= 4:
                         break
         
-        log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
         if self.pieces is None and self.outstanding <= 0:
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
@@ -418,7 +426,6 @@ class FileDownload:
         log.msg('Got a piece hash response %d from %r' % (response.code, site))
         if response.code != 200:
             # Request failed, try a different peer
-            log.msg('Did not like response %d from %r' % (response.code, site))
             self.getPeerPieces(key, site)
         else:
             # Read the response stream to a string
@@ -486,6 +493,15 @@ class FileDownload:
         assert self.pieces, "You must initialize the piece hashes first"
         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
         
+        # Use the mirror if there are few peers
+        if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+            parsed = urlparse(self.mirror)
+            if parsed[0] == "http":
+                site = splitHostPort(parsed[0], parsed[1])
+                self.mirror_path = urlunparse(('', '') + parsed[2:])
+                peer = self.manager.getPeer(site, mirror = True)
+                self.peerlist.append(peer)
+        
         # Special case if there's only one good peer left
 #        if len(self.peerlist) == 1:
 #            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
@@ -506,11 +522,9 @@ class FileDownload:
     #{ Downloading the pieces
     def getPieces(self):
         """Download the next pieces from the peers."""
-        log.msg('Checking for more piece requests to send')
         self.sort()
         piece = self.nextFinish
         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
-            log.msg('Checking piece %d' % piece)
             if self.completePieces[piece] == False:
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
@@ -518,14 +532,16 @@ class FileDownload:
                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
-                df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                if peer.mirror:
+                    df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                else:
+                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
                                      'errbackArgs': (piece, peer)})
             piece += 1
                 
-        log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
             log.msg('We seem to be done with all pieces')
@@ -546,11 +562,14 @@ class FileDownload:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+                                  PIECE_SIZE).run()
             else:
                 df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -598,17 +617,20 @@ class PeerManager:
     @ivar cache_dir: the directory to use for storing all files
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -634,7 +656,7 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
 #        elif len(peers) == 1:
 #            site = uncompact(peers[0]['c'])
@@ -646,14 +668,18 @@ class PeerManager:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):