]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Massive work on the peer downloading, not working or complete yet.
authorCameron Dale <camrdale@gmail.com>
Fri, 4 Apr 2008 23:48:48 +0000 (16:48 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 4 Apr 2008 23:48:48 +0000 (16:48 -0700)
TODO
apt_p2p/HTTPServer.py
apt_p2p/PeerManager.py
apt_p2p/apt_p2p.py

diff --git a/TODO b/TODO
index 67eda4e90424ef2150f9b796e83993576b3e6ce6..49a33d8fc8227cc202a32a6e072f68ea6be12283 100644 (file)
--- a/TODO
+++ b/TODO
@@ -15,6 +15,20 @@ distributions. They need to be dealt with properly by
 adding them to the tracking done by the AptPackages module.
 
 
+Retransmit DHT requests before timeout occurs.
+
+Currently, only a single transmission to a peer is ever attempted. If
+that request is lost, a timeout will occur after 20 seconds, the peer
+will be declared unreachable and the action will move on to the next
+peer. Instead, try to resend the request periodically using exponential
+backoff to make sure that lost packets don't delay the action so much.
+For example, send the request, wait 2 seconds and send again, wait 4
+seconds and send again, wait 8 seconds (14 seconds have now passed) and
+then declare the host unreachable. The same TID should be used in each
+retransmission, so receiving multiple responses should not be a problem
+as the extra ones will be ignored. 
+
+
 PeerManager needs to download large files from multiple peers.
 
 The PeerManager currently chooses a peer at random from the list of 
index b5a858d455b94f6e49749dcfc87cfac291c05497..c3c64b86b5434dcf21ff1a3d769e28733fffcf12 100644 (file)
@@ -56,7 +56,7 @@ class FileDownloader(static.File):
 class FileUploaderStream(stream.FileStream):
     """Modified to make it suitable for streaming to peers.
     
-    Streams the file is small chunks to make it easier to throttle the
+    Streams the file in small chunks to make it easier to throttle the
     streaming to peers.
     
     @ivar CHUNK_SIZE: the size of chunks of data to send at a time
index a670bee8eb6ecf7f140bed757c8f25f397607fa1..df003993cf80676c50df08f3ea57e36a424917f3 100644 (file)
 from random import choice
 from urlparse import urlparse, urlunparse
 from urllib import quote_plus
+from binascii import b2a_hex, a2b_hex
+import sha
 
 from twisted.internet import reactor, defer
 from twisted.python import log
 from twisted.trial import unittest
-from twisted.web2 import stream as stream_mod
+from twisted.web2 import stream
 from twisted.web2.http import splitHostPort
 
 from HTTPDownloader import Peer
 from util import uncompact
+from hash import PIECE_SIZE
+from apt_p2p_Khashmir.bencode import bdecode
 
-class FileDownload(defer.Deferred):
+class GrowingFileStream(stream.FileStream):
+    """Modified to stream data from a file as it becomes available.
+    
+    @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
+    @ivar deferred: waiting for the result of the last read attempt
+    @ivar available: the number of bytes that are currently available to read
+    @ivar position: the current position in the file where the next read will begin
+    @ivar finished: True when no more data will be coming available
+    """
+
+    CHUNK_SIZE = 4*1024
+
+    def __init__(self, f):
+        stream.FileStream.__init__(self, f)
+        self.length = None
+        self.deferred = None
+        self.available = 0L
+        self.position = 0L
+        self.finished = False
+
+    def updateAvaliable(self, newlyAvailable):
+        """Update the number of bytes that are available.
+        
+        Call it with 0 to trigger reading of a fully read file.
+        
+        @param newlyAvailable: the number of bytes that just became available
+        """
+        assert not self.finished
+        self.available += newlyAvailable
+        
+        # If a read is pending, let it go
+        if self.deferred and self.position < self.available:
+            # Try to read some data from the file
+            length = self.available - self.position
+            readSize = min(length, self.CHUNK_SIZE)
+            self.f.seek(self.position)
+            b = self.f.read(readSize)
+            bytesRead = len(b)
+            
+            # Check if end of file was reached
+            if bytesRead:
+                self.position += bytesRead
+                deferred = self.deferred
+                self.deferred = None
+                deferred.callback(b)
+
+    def allAvailable(self):
+        """Indicate that no more data is coming available."""
+        self.finished = True
+
+        # If a read is pending, let it go
+        if self.deferred:
+            if self.position < self.available:
+                # Try to read some data from the file
+                length = self.available - self.position
+                readSize = min(length, self.CHUNK_SIZE)
+                self.f.seek(self.position)
+                b = self.f.read(readSize)
+                bytesRead = len(b)
+    
+                # Check if end of file was reached
+                if bytesRead:
+                    self.position += bytesRead
+                    deferred = self.deferred
+                    self.deferred = None
+                    deferred.callback(b)
+                else:
+                    # We're done
+                    deferred.callback(None)
+            else:
+                # We're done
+                deferred.callback(None)
+        
+    def read(self, sendfile=False):
+        assert not self.deferred, "A previous read is still deferred."
+
+        if self.f is None:
+            return None
+
+        length = self.available - self.position
+        readSize = min(length, self.CHUNK_SIZE)
+
+        # If we don't have any available, we're done or deferred
+        if readSize <= 0:
+            if self.finished:
+                return None
+            else:
+                self.deferred = defer.Deferred()
+                return self.deferred
+
+        # Try to read some data from the file
+        self.f.seek(self.position)
+        b = self.f.read(readSize)
+        bytesRead = len(b)
+        if not bytesRead:
+            # End of file was reached, we're done or deferred
+            if self.finished:
+                return None
+            else:
+                self.deferred = defer.Deferred()
+                return self.deferred
+        else:
+            self.position += bytesRead
+            return b
+
+class StreamToFile(defer.Deferred):
+    """Saves a stream to a file.
+    
+    @type stream: L{twisted.web2.stream.IByteStream}
+    @ivar stream: the input stream being read
+    @type outFile: L{twisted.python.filepath.FilePath}
+    @ivar outFile: the file being written
+    @type hash: L{Hash.HashObject}
+    @ivar hash: the hash object for the file
+    @type length: C{int}
+    @ivar length: the length of the original (compressed) file
+    @type doneDefer: L{twisted.internet.defer.Deferred}
+    @ivar doneDefer: the deferred that will fire when done streaming
+    """
+    
+    def __init__(self, inputStream, outFile, hash, start, length):
+        """Initializes the file.
+        
+        @type inputStream: L{twisted.web2.stream.IByteStream}
+        @param inputStream: the input stream to read from
+        @type outFile: L{twisted.python.filepath.FilePath}
+        @param outFile: the file to write to
+        @type hash: L{Hash.HashObject}
+        @param hash: the hash object to use for the file
+        """
+        self.stream = inputStream
+        self.outFile = outFile.open('w')
+        self.hash = hash
+        self.hash.new()
+        self.length = self.stream.length
+        
+    def run(self):
+        """Start the streaming."""
+        self.doneDefer = stream.readStream(self.stream, _gotData)
+        self.doneDefer.addCallbacks(self._done, self._error)
+        return self.doneDefer
+
+    def _done(self):
+        """Close all the output files, return the result."""
+        if not self.outFile.closed:
+            self.outFile.close()
+            self.hash.digest()
+            self.doneDefer.callback(self.hash)
+    
+    def _gotData(self, data):
+        self.peers[site]['pieces'] += data
+
+    def read(self):
+        """Read some data from the stream."""
+        if self.outFile.closed:
+            return None
+        
+        # Read data from the stream, deal with the possible deferred
+        data = self.stream.read()
+        if isinstance(data, defer.Deferred):
+            data.addCallbacks(self._write, self._done)
+            return data
+        
+        self._write(data)
+        return data
+    
+    def _write(self, data):
+        """Write the stream data to the file and return it for others to use.
+        
+        Also optionally decompresses it.
+        """
+        if data is None:
+            self._done()
+            return data
+        
+        # Write and hash the streamed data
+        self.outFile.write(data)
+        self.hash.update(data)
+        
+        return data
+    
+    def close(self):
+        """Clean everything up and return None to future reads."""
+        self.length = 0
+        self._done()
+        self.stream.close()
+
+
+class FileDownload:
     """Manage a download from a list of peers or a mirror.
     
     
     """
     
-    def __init__(self, manager, hash, mirror, compact_peers):
-        """Initialize the instance.
+    def __init__(self, manager, hash, mirror, compact_peers, file):
+        """Initialize the instance and check for piece hashes.
         
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         @param mirror: the URI of the file on the mirror
         @type compact_peers: C{list} of C{string}
         @param compact_peers: a list of the peer info where the file can be found
+        @type file: L{twisted.python.filepath.FilePath}
+        @param file: the temporary file to use to store the downloaded file
         """
-        defer.Deferred.__init__(self)
         self.manager = manager
         self.hash = hash
         self.mirror = mirror
+        self.compact_peers = compact_peers
+        
+        self.path = '/~/' + quote_plus(hash.expected())
+        self.pieces = None
+        self.started = False
+        
+        file.restat(False)
+        if file.exists():
+            file.remove()
+        self.file = file.open('w')
 
+    def run(self):
+        """Start the downloading process."""
+        self.defer = defer.Deferred()
         self.peers = {}
         no_pieces = 0
         pieces_string = {}
         pieces_hash = {}
         pieces_dl_hash = {}
 
-        for compact_peer in compact_peers:
+        for compact_peer in self.compact_peers:
             # Build a list of all the peers for this download
             site = uncompact(compact_peer['c'])
             peer = manager.getPeer(site)
-            self.peers[site] = peer
+            self.peers.setdefault(site, {})['peer'] = peer
 
             # Extract any piece information from the peers list
             if 't' in compact_peer:
+                self.peers[site]['t'] = compact_peer['t']['t']
                 pieces_string.setdefault(compact_peer['t']['t'], 0)
                 pieces_string[compact_peer['t']['t']] += 1
             elif 'h' in compact_peer:
+                self.peers[site]['h'] = compact_peer['h']
                 pieces_hash.setdefault(compact_peer['h'], 0)
                 pieces_hash[compact_peer['h']] += 1
             elif 'l' in compact_peer:
+                self.peers[site]['l'] = compact_peer['l']
                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
                 pieces_dl_hash[compact_peer['l']] += 1
             else:
                 no_pieces += 1
         
+        # Select the most popular piece info
         max_found = max(no_pieces, max(pieces_string.values()),
                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
 
+        if max_found < len(self.peers):
+            log.msg('Misleading piece information found, using most popular %d of %d peers' % 
+                    (max_found, len(self.peers)))
+
         if max_found == no_pieces:
-            self.sort()
-            pieces = []
-            if max_found < len(self.peers):
-                pass
+            # The file is not split into pieces
+            self.pieces = []
+            self.startDownload()
         elif max_found == max(pieces_string.values()):
-            pass
+            # Small number of pieces in a string
+            for pieces, num in pieces_string.items():
+                # Find the most popular piece string
+                if num == max_found:
+                    self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    self.startDownload()
+                    break
+        elif max_found == max(pieces_hash.values()):
+            # Medium number of pieces stored in the DHT
+            for pieces, num in pieces_hash.items():
+                # Find the most popular piece hash to lookup
+                if num == max_found:
+                    self.getDHTPieces(pieces)
+                    break
+        elif max_found == max(pieces_dl_hash.values()):
+            # Large number of pieces stored in peers
+            for pieces, num in pieces_hash.items():
+                # Find the most popular piece hash to download
+                if num == max_found:
+                    self.getPeerPieces(pieces)
+                    break
+        return self.defer
+
+    #{ Downloading the piece hashes
+    def getDHTPieces(self, key):
+        """Retrieve the piece information from the DHT.
+        
+        @param key: the key to lookup in the DHT
+        """
+        # Remove any peers with the wrong piece hash
+        #for site in self.peers.keys():
+        #    if self.peers[site].get('h', '') != key:
+        #        del self.peers[site]
+
+        # Start the DHT lookup
+        lookupDefer = self.manager.dht.getValue(key)
+        lookupDefer.addCallback(self._getDHTPieces, key)
+        
+    def _getDHTPieces(self, results, key):
+        """Check the retrieved values."""
+        for result in results:
+            # Make sure the hash matches the key
+            result_hash = sha.new(result.get('t', '')).digest()
+            if result_hash == key:
+                pieces = result['t']
+                self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+                self.startDownload()
+                return
+            
+        # Continue without the piece hashes
+        log.msg('Could not retrieve the piece hashes from the DHT')
+        self.pieces = []
+        self.startDownload()
+
+    def getPeerPieces(self, key, failedSite = None):
+        """Retrieve the piece information from the peers.
+        
+        @param key: the key to request from the peers
+        """
+        if failedSite is None:
+            self.outstanding = 0
+            # Remove any peers with the wrong piece hash
+            #for site in self.peers.keys():
+            #    if self.peers[site].get('l', '') != key:
+            #        del self.peers[site]
+        else:
+            self.peers[failedSite]['failed'] = True
+            self.outstanding -= 1
+
+        if self.pieces is None:
+            # Send a request to one or more peers
+            for site in self.peers:
+                if self.peers[site].get('failed', False) != True:
+                    path = '/~/' + quote_plus(key)
+                    lookupDefer = self.peers[site]['peer'].get(path)
+                    lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
+                                             callbackArgs=(key, site), errbackArgs=(key, site))
+                    self.outstanding += 1
+                    if self.outstanding >= 3:
+                        break
         
+        if self.pieces is None and self.outstanding == 0:
+            # Continue without the piece hashes
+            log.msg('Could not retrieve the piece hashes from the peers')
+            self.pieces = []
+            self.startDownload()
+        
+    def _getPeerPieces(self, response, key, site):
+        """Process the retrieved response from the peer."""
+        if response.code != 200:
+            # Request failed, try a different peer
+            self.getPeerPieces(key, site)
+        else:
+            # Read the response stream to a string
+            self.peers[site]['pieces'] = ''
+            def _gotPeerPiece(data, self = self, site = site):
+                self.peers[site]['pieces'] += data
+            df = stream.readStream(response.stream, _gotPeerPiece)
+            df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
+                            callbackArgs=(key, site), errbackArgs=(key, site))
+
+    def _gotPeerError(self, err, key, site):
+        """Peer failed, try again."""
+        log.err(err)
+        self.getPeerPieces(key, site)
+
+    def _gotPeerPieces(self, result, key, site):
+        """Check the retrieved pieces from the peer."""
+        if self.pieces is not None:
+            # Already done
+            return
+        
+        try:
+            result = bdecode(self.peers[site]['pieces'])
+        except:
+            log.err()
+            self.getPeerPieces(key, site)
+            return
+            
+        result_hash = sha.new(result.get('t', '')).digest()
+        if result_hash == key:
+            pieces = result['t']
+            self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+            log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+            self.startDownload()
+        else:
+            log.msg('Peer returned a piece string that did not match')
+            self.getPeerPieces(key, site)
+
+    #{ Downloading the file
     def sort(self):
+        """Sort the peers by their rank (highest ranked at the end)."""
         def sort(a, b):
             """Sort peers by their rank."""
             if a.rank > b.rank:
@@ -78,8 +421,54 @@ class FileDownload(defer.Deferred):
             elif a.rank < b.rank:
                 return -1
             return 0
-        self.peers.sort(sort)
+        self.peerlist.sort(sort)
 
+    def startDownload(self):
+        """Start the download from the peers."""
+        # Don't start twice
+        if self.started:
+            return
+        
+        self.started = True
+        assert self.pieces is not None, "You must initialize the piece hashes first"
+        self.peerlist = [self.peers[site]['peer'] for site in self.peers]
+        
+        # Special case if there's only one good peer left
+        if len(self.peerlist) == 1:
+            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+            self.defer.callback(self.peerlist[0].get(self.path))
+            return
+        
+        self.sort()
+        self.outstanding = 0
+        self.next_piece = 0
+        
+        while self.outstanding < 3 and self.peerlist and self.next_piece < len(self.pieces):
+            peer = self.peerlist.pop()
+            piece = self.next_piece
+            self.next_piece += 1
+            
+            self.outstanding += 1
+            df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+            df.addCallbacks(self._gotPiece, self._gotError,
+                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+    
+    def _gotPiece(self, response, piece, peer):
+        """Process the retrieved piece from the peer."""
+        if response.code != 206:
+            # Request failed, try a different peer
+            self.getPeerPieces(key, site)
+        else:
+            # Read the response stream to the file
+            df = StreamToFile(response.stream, self.file, self.hash, piece*PIECE_SIZE, PIECE_SIZE).run()
+            df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
+                            callbackArgs=(key, site), errbackArgs=(key, site))
+
+    def _gotError(self, err, piece, peer):
+        """Peer failed, try again."""
+        log.err(err)
+
+        
 class PeerManager:
     """Manage a set of peers and the requests to them.
     
@@ -87,8 +476,13 @@ class PeerManager:
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self):
+    def __init__(self, cache_dir, dht):
         """Initialize the instance."""
+        self.cache_dir = cache_dir
+        self.cache_dir.restat(False)
+        if not self.cache_dir.exists():
+            self.cache_dir.makedirs()
+        self.dht = dht
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -123,8 +517,8 @@ class PeerManager:
             peer = self.getPeer(site)
             return peer.get(path)
         else:
-            FileDownload(self, hash, mirror, peers)
-            
+            tmpfile = self.cache_dir.child(hash.hexexpected())
+            return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
     def getPeer(self, site):
         """Create a new peer if necessary and return it.
@@ -156,7 +550,7 @@ class TestPeerManager(unittest.TestCase):
             pass
         def printdone(n):
             pass
-        stream_mod.readStream(resp.stream, print_).addCallback(printdone)
+        stream.readStream(resp.stream, print_).addCallback(printdone)
     
     def test_download(self):
         """Tests a normal download."""
index 887f136a52c1453c5b3e35751cfae6c7823a1ff4..0063e9fffd2ebb08373be003c346c4d90c950d63 100644 (file)
@@ -80,7 +80,7 @@ class AptP2P:
         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
                                     config.getint('DEFAULT', 'UPLOAD_LIMIT'))
         self.getHTTPFactory = self.http_server.getHTTPFactory
-        self.peers = PeerManager()
+        self.peers = PeerManager(self.cache_dir, self.dht)
         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)