]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Remove some unnecessary log messages and use better Exceptions.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 0f64e8f3161f2e99e3cee9f16ec9f37fd2803f92..d4c7aace9ad04dce45fceb1ac944c61803a7cc96 100644 (file)
@@ -15,9 +15,14 @@ from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
 from util import uncompact
-from hash import PIECE_SIZE
+from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
+from apt_p2p_conf import config
 
+
+class PeerError(Exception):
+    """An error occurred downloading from peers."""
+    
 class GrowingFileStream(stream.FileStream):
     """Modified to stream data from a file as it becomes available.
     
@@ -30,15 +35,15 @@ class GrowingFileStream(stream.FileStream):
 
     CHUNK_SIZE = 4*1024
 
-    def __init__(self, f):
+    def __init__(self, f, length = None):
         stream.FileStream.__init__(self, f)
-        self.length = None
+        self.length = length
         self.deferred = None
         self.available = 0L
         self.position = 0L
         self.finished = False
 
-    def updateAvaliable(self, newlyAvailable):
+    def updateAvailable(self, newlyAvailable):
         """Update the number of bytes that are available.
         
         Call it with 0 to trigger reading of a fully read file.
@@ -86,9 +91,13 @@ class GrowingFileStream(stream.FileStream):
                     deferred.callback(b)
                 else:
                     # We're done
+                    deferred = self.deferred
+                    self.deferred = None
                     deferred.callback(None)
             else:
                 # We're done
+                deferred = self.deferred
+                self.deferred = None
                 deferred.callback(None)
         
     def read(self, sendfile=False):
@@ -123,7 +132,7 @@ class GrowingFileStream(stream.FileStream):
             self.position += bytesRead
             return b
 
-class StreamToFile(defer.Deferred):
+class StreamToFile:
     """Save a stream to a partial file and hash it.
     
     @type stream: L{twisted.web2.stream.IByteStream}
@@ -168,6 +177,7 @@ class StreamToFile(defer.Deferred):
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
+        log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
         self.doneDefer = stream.readStream(self.stream, self._gotData)
         self.doneDefer.addCallbacks(self._done, self._error)
         return self.doneDefer
@@ -175,10 +185,10 @@ class StreamToFile(defer.Deferred):
     def _gotData(self, data):
         """Process the received data."""
         if self.outFile.closed:
-            raise Exception, "outFile was unexpectedly closed"
+            raise PeerError, "outFile was unexpectedly closed"
         
         if data is None:
-            raise Exception, "Data is None?"
+            raise PeerError, "Data is None?"
         
         # Make sure we don't go too far
         if self.length is not None and self.position + len(data) > self.length:
@@ -192,10 +202,12 @@ class StreamToFile(defer.Deferred):
         
     def _done(self, result):
         """Return the result."""
+        log.msg('Streaming is complete')
         return self.hash.digest()
     
     def _error(self, err):
         """Log the error."""
+        log.msg('Streaming error')
         log.err(err)
         return err
     
@@ -254,27 +266,29 @@ class FileDownload:
         self.compact_peers = compact_peers
         
         self.path = '/~/' + quote_plus(hash.expected())
+        self.mirror_path = None
         self.pieces = None
         self.started = False
         
         file.restat(False)
         if file.exists():
             file.remove()
-        self.file = file.open('w')
+        self.file = file.open('w+')
 
     def run(self):
         """Start the downloading process."""
+        log.msg('Checking for pieces for %s' % self.path)
         self.defer = defer.Deferred()
         self.peers = {}
         no_pieces = 0
-        pieces_string = {}
-        pieces_hash = {}
-        pieces_dl_hash = {}
+        pieces_string = {0: 0}
+        pieces_hash = {0: 0}
+        pieces_dl_hash = {0: 0}
 
         for compact_peer in self.compact_peers:
             # Build a list of all the peers for this download
             site = uncompact(compact_peer['c'])
-            peer = manager.getPeer(site)
+            peer = self.manager.getPeer(site)
             self.peers.setdefault(site, {})['peer'] = peer
 
             # Extract any piece information from the peers list
@@ -303,7 +317,8 @@ class FileDownload:
 
         if max_found == no_pieces:
             # The file is not split into pieces
-            self.pieces = []
+            log.msg('No pieces were found for the file')
+            self.pieces = [self.hash.expected()]
             self.startDownload()
         elif max_found == max(pieces_string.values()):
             # Small number of pieces in a string
@@ -319,13 +334,15 @@ class FileDownload:
             for pieces, num in pieces_hash.items():
                 # Find the most popular piece hash to lookup
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
                     self.getDHTPieces(pieces)
                     break
         elif max_found == max(pieces_dl_hash.values()):
             # Large number of pieces stored in peers
-            for pieces, num in pieces_hash.items():
+            for pieces, num in pieces_dl_hash.items():
                 # Find the most popular piece hash to download
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
                     self.getPeerPieces(pieces)
                     break
         return self.defer
@@ -343,23 +360,27 @@ class FileDownload:
 
         # Start the DHT lookup
         lookupDefer = self.manager.dht.getValue(key)
-        lookupDefer.addCallback(self._getDHTPieces, key)
+        lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
         """Check the retrieved values."""
-        for result in results:
-            # Make sure the hash matches the key
-            result_hash = sha.new(result.get('t', '')).digest()
-            if result_hash == key:
-                pieces = result['t']
-                self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-                log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
-                self.startDownload()
-                return
+        if isinstance(results, list):
+            for result in results:
+                # Make sure the hash matches the key
+                result_hash = sha.new(result.get('t', '')).digest()
+                if result_hash == key:
+                    pieces = result['t']
+                    self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+                    self.startDownload()
+                    return
+                
+            log.msg('Could not retrieve the piece hashes from the DHT')
+        else:
+            log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
             
         # Continue without the piece hashes
-        log.msg('Could not retrieve the piece hashes from the DHT')
-        self.pieces = []
+        self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
         self.startDownload()
 
     def getPeerPieces(self, key, failedSite = None):
@@ -368,12 +389,14 @@ class FileDownload:
         @param key: the key to request from the peers
         """
         if failedSite is None:
+            log.msg('Starting the lookup of piece hashes in peers')
             self.outstanding = 0
             # Remove any peers with the wrong piece hash
             #for site in self.peers.keys():
             #    if self.peers[site].get('l', '') != key:
             #        del self.peers[site]
         else:
+            log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
             self.peers[failedSite]['failed'] = True
             self.outstanding -= 1
 
@@ -381,22 +404,26 @@ class FileDownload:
             # Send a request to one or more peers
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
+                    log.msg('Sending a piece hash request to %r' % (site, ))
                     path = '/~/' + quote_plus(key)
                     lookupDefer = self.peers[site]['peer'].get(path)
-                    lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
-                                             callbackArgs=(key, site), errbackArgs=(key, site))
+                    reactor.callLater(0, lookupDefer.addCallbacks,
+                                      *(self._getPeerPieces, self._gotPeerError),
+                                      **{'callbackArgs': (key, site),
+                                         'errbackArgs': (key, site)})
                     self.outstanding += 1
-                    if self.outstanding >= 3:
+                    if self.outstanding >= 4:
                         break
         
-        if self.pieces is None and self.outstanding == 0:
+        if self.pieces is None and self.outstanding <= 0:
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
-            self.pieces = []
+            self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
             self.startDownload()
         
     def _getPeerPieces(self, response, key, site):
         """Process the retrieved response from the peer."""
+        log.msg('Got a piece hash response %d from %r' % (response.code, site))
         if response.code != 200:
             # Request failed, try a different peer
             self.getPeerPieces(key, site)
@@ -404,25 +431,31 @@ class FileDownload:
             # Read the response stream to a string
             self.peers[site]['pieces'] = ''
             def _gotPeerPiece(data, self = self, site = site):
+                log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
                 self.peers[site]['pieces'] += data
+            log.msg('Streaming piece hashes from peer')
             df = stream.readStream(response.stream, _gotPeerPiece)
             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
                             callbackArgs=(key, site), errbackArgs=(key, site))
 
     def _gotPeerError(self, err, key, site):
         """Peer failed, try again."""
+        log.msg('Peer piece hash request failed for %r' % (site, ))
         log.err(err)
         self.getPeerPieces(key, site)
 
     def _gotPeerPieces(self, result, key, site):
         """Check the retrieved pieces from the peer."""
+        log.msg('Finished streaming piece hashes from peer %r' % (site, ))
         if self.pieces is not None:
             # Already done
+            log.msg('Already done')
             return
         
         try:
             result = bdecode(self.peers[site]['pieces'])
         except:
+            log.msg('Error bdecoding piece hashes')
             log.err()
             self.getPeerPieces(key, site)
             return
@@ -431,7 +464,7 @@ class FileDownload:
         if result_hash == key:
             pieces = result['t']
             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-            log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+            log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
             self.startDownload()
         else:
             log.msg('Peer returned a piece string that did not match')
@@ -455,28 +488,35 @@ class FileDownload:
         if self.started:
             return
         
+        log.msg('Starting to download %s' % self.path)
         self.started = True
-        assert self.pieces is not None, "You must initialize the piece hashes first"
+        assert self.pieces, "You must initialize the piece hashes first"
         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
         
+        # Use the mirror if there are few peers
+        if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+            parsed = urlparse(self.mirror)
+            if parsed[0] == "http":
+                site = splitHostPort(parsed[0], parsed[1])
+                self.mirror_path = urlunparse(('', '') + parsed[2:])
+                peer = self.manager.getPeer(site, mirror = True)
+                self.peerlist.append(peer)
+        
         # Special case if there's only one good peer left
-        if len(self.peerlist) == 1:
-            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
-            self.defer.callback(self.peerlist[0].get(self.path))
-            return
+#        if len(self.peerlist) == 1:
+#            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+#            self.defer.callback(self.peerlist[0].get(self.path))
+#            return
         
         # Start sending the return file
-        self.stream = GrowingFileStream(self.file)
+        self.stream = GrowingFileStream(self.file, self.hash.expSize)
         resp = Response(200, {}, self.stream)
         self.defer.callback(resp)
 
         # Begin to download the pieces
         self.outstanding = 0
         self.nextFinish = 0
-        if self.pieces:
-            self.completePieces = [False for piece in self.pieces]
-        else:
-            self.completePieces = [False]
+        self.completePieces = [False for piece in self.pieces]
         self.getPieces()
         
     #{ Downloading the pieces
@@ -489,39 +529,47 @@ class FileDownload:
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
                 self.completePieces[piece] = peer
+                log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
-                if self.pieces:
-                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                if peer.mirror:
+                    df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
-                    df = peer.get(self.path)
+                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
                                      'errbackArgs': (piece, peer)})
-                piece += 1
+            piece += 1
                 
-        # Check if we're don
+        # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+            log.msg('We seem to be done with all pieces')
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
+        log.msg('Got response for piece %d from peer %r' % (piece, peer))
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
-            peer.hashError()
+            log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+            peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
             self.completePieces[piece] = False
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
         else:
             # Read the response stream to the file
+            log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+                                  PIECE_SIZE).run()
             else:
                 df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -529,6 +577,7 @@ class FileDownload:
 
     def _getError(self, err, piece, peer):
         """Peer failed, try again."""
+        log.msg('Got error for piece %d from peer %r' % (piece, peer))
         self.outstanding -= 1
         self.peerlist.append(peer)
         self.completePieces[piece] = False
@@ -537,27 +586,26 @@ class FileDownload:
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
-        if ((self.pieces and response != self.pieces[piece]) or
-            (len(self.pieces) == 0 and response == self.hash.expected())):
+        log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
+        if self.pieces[piece] and response != self.pieces[piece]:
             # Hash doesn't match
-            peer.hashError()
+            log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+            peer.hashError('Piece received from peer does not match expected')
             self.completePieces[piece] = False
-        elif self.pieces:
+        else:
             # Successfully completed one of several pieces
+            log.msg('Finished with piece %d from peer %r' % (piece, peer))
             self.completePieces[piece] = True
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
                 self.nextFinish += 1
                 self.stream.updateAvailable(PIECE_SIZE)
-        else:
-            # Whole download (only one piece) is complete
-            self.completePieces[piece] = True
-            self.stream.updateAvailable(2**30)
 
         self.getPieces()
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
@@ -569,17 +617,20 @@ class PeerManager:
     @ivar cache_dir: the directory to use for storing all files
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -605,26 +656,30 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
-        elif len(peers) == 1:
-            site = uncompact(peers[0]['c'])
-            log.msg('Downloading from peer %r' % (site, ))
-            path = '/~/' + quote_plus(hash.expected())
-            peer = self.getPeer(site)
-            return peer.get(path)
+#        elif len(peers) == 1:
+#            site = uncompact(peers[0]['c'])
+#            log.msg('Downloading from peer %r' % (site, ))
+#            path = '/~/' + quote_plus(hash.expected())
+#            peer = self.getPeer(site)
+#            return peer.get(path)
         else:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):
@@ -639,60 +694,6 @@ class TestPeerManager(unittest.TestCase):
     manager = None
     pending_calls = []
     
-    def gotResp(self, resp, num, expect):
-        self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
-        if expect is not None:
-            self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
-        def print_(n):
-            pass
-        def printdone(n):
-            pass
-        stream.readStream(resp.stream, print_).addCallback(printdone)
-    
-    def test_download(self):
-        """Tests a normal download."""
-        self.manager = PeerManager()
-        self.timeout = 10
-        
-        host = 'www.ietf.org'
-        d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
-        d.addCallback(self.gotResp, 1, 1070)
-        return d
-        
-    def test_head(self):
-        """Tests a 'HEAD' request."""
-        self.manager = PeerManager()
-        self.timeout = 10
-        
-        host = 'www.ietf.org'
-        d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
-        d.addCallback(self.gotResp, 1, 0)
-        return d
-        
-    def test_multiple_downloads(self):
-        """Tests multiple downloads with queueing and connection closing."""
-        self.manager = PeerManager()
-        self.timeout = 120
-        lastDefer = defer.Deferred()
-        
-        def newRequest(host, path, num, expect, last=False):
-            d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
-            d.addCallback(self.gotResp, num, expect)
-            if last:
-                d.addBoth(lastDefer.callback)
-                
-        newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
-        newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
-        newRequest('www.google.ca', "/", 3, None)
-        self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
-        self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
-        self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
-        self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
-        self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
-        self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
-        self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
-        return lastDefer
-        
     def tearDown(self):
         for p in self.pending_calls:
             if p.active():