]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/PeerManager.py
Add a final retry of all errback mirror downloads.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
index 0f64e8f3161f2e99e3cee9f16ec9f37fd2803f92..9e3f6daf678d0f3a1f15b0a289be5605630e4f56 100644 (file)
@@ -8,16 +8,21 @@ from binascii import b2a_hex, a2b_hex
 import sha
 
 from twisted.internet import reactor, defer
 import sha
 
 from twisted.internet import reactor, defer
-from twisted.python import log
+from twisted.python import log, filepath
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
 from util import uncompact
 from twisted.trial import unittest
 from twisted.web2 import stream
 from twisted.web2.http import Response, splitHostPort
 
 from HTTPDownloader import Peer
 from util import uncompact
-from hash import PIECE_SIZE
+from Hash import PIECE_SIZE
 from apt_p2p_Khashmir.bencode import bdecode
 from apt_p2p_Khashmir.bencode import bdecode
+from apt_p2p_conf import config
 
 
+
+class PeerError(Exception):
+    """An error occurred downloading from peers."""
+    
 class GrowingFileStream(stream.FileStream):
     """Modified to stream data from a file as it becomes available.
     
 class GrowingFileStream(stream.FileStream):
     """Modified to stream data from a file as it becomes available.
     
@@ -30,15 +35,15 @@ class GrowingFileStream(stream.FileStream):
 
     CHUNK_SIZE = 4*1024
 
 
     CHUNK_SIZE = 4*1024
 
-    def __init__(self, f):
+    def __init__(self, f, length = None):
         stream.FileStream.__init__(self, f)
         stream.FileStream.__init__(self, f)
-        self.length = None
+        self.length = length
         self.deferred = None
         self.available = 0L
         self.position = 0L
         self.finished = False
 
         self.deferred = None
         self.available = 0L
         self.position = 0L
         self.finished = False
 
-    def updateAvaliable(self, newlyAvailable):
+    def updateAvailable(self, newlyAvailable):
         """Update the number of bytes that are available.
         
         Call it with 0 to trigger reading of a fully read file.
         """Update the number of bytes that are available.
         
         Call it with 0 to trigger reading of a fully read file.
@@ -86,9 +91,15 @@ class GrowingFileStream(stream.FileStream):
                     deferred.callback(b)
                 else:
                     # We're done
                     deferred.callback(b)
                 else:
                     # We're done
+                    self._close()
+                    deferred = self.deferred
+                    self.deferred = None
                     deferred.callback(None)
             else:
                 # We're done
                     deferred.callback(None)
             else:
                 # We're done
+                self._close()
+                deferred = self.deferred
+                self.deferred = None
                 deferred.callback(None)
         
     def read(self, sendfile=False):
                 deferred.callback(None)
         
     def read(self, sendfile=False):
@@ -103,6 +114,7 @@ class GrowingFileStream(stream.FileStream):
         # If we don't have any available, we're done or deferred
         if readSize <= 0:
             if self.finished:
         # If we don't have any available, we're done or deferred
         if readSize <= 0:
             if self.finished:
+                self._close()
                 return None
             else:
                 self.deferred = defer.Deferred()
                 return None
             else:
                 self.deferred = defer.Deferred()
@@ -115,6 +127,7 @@ class GrowingFileStream(stream.FileStream):
         if not bytesRead:
             # End of file was reached, we're done or deferred
             if self.finished:
         if not bytesRead:
             # End of file was reached, we're done or deferred
             if self.finished:
+                self._close()
                 return None
             else:
                 self.deferred = defer.Deferred()
                 return None
             else:
                 self.deferred = defer.Deferred()
@@ -123,15 +136,21 @@ class GrowingFileStream(stream.FileStream):
             self.position += bytesRead
             return b
 
             self.position += bytesRead
             return b
 
-class StreamToFile(defer.Deferred):
+    def _close(self):
+        """Close the temporary file and remove it."""
+        self.f.close()
+        filepath.FilePath(self.f.name).remove()
+        self.f = None
+        
+class StreamToFile:
     """Save a stream to a partial file and hash it.
     
     @type stream: L{twisted.web2.stream.IByteStream}
     @ivar stream: the input stream being read
     @type outFile: L{twisted.python.filepath.FilePath}
     @ivar outFile: the file being written
     """Save a stream to a partial file and hash it.
     
     @type stream: L{twisted.web2.stream.IByteStream}
     @ivar stream: the input stream being read
     @type outFile: L{twisted.python.filepath.FilePath}
     @ivar outFile: the file being written
-    @type hash: C{sha1}
-    @ivar hash: the hash object for the data
+    @type hasher: hashing object, e.g. C{sha1}
+    @ivar hasher: the hash object for the data
     @type position: C{int}
     @ivar position: the current file position to write the next data to
     @type length: C{int}
     @type position: C{int}
     @ivar position: the current file position to write the next data to
     @type length: C{int}
@@ -140,9 +159,11 @@ class StreamToFile(defer.Deferred):
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None):
+    def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
         """Initializes the file.
         
+        @type hasher: hashing object, e.g. C{sha1}
+        @param hasher: the hash object for the data
         @type inputStream: L{twisted.web2.stream.IByteStream}
         @param inputStream: the input stream to read from
         @type outFile: L{twisted.python.filepath.FilePath}
         @type inputStream: L{twisted.web2.stream.IByteStream}
         @param inputStream: the input stream to read from
         @type outFile: L{twisted.python.filepath.FilePath}
@@ -156,7 +177,7 @@ class StreamToFile(defer.Deferred):
         """
         self.stream = inputStream
         self.outFile = outFile
         """
         self.stream = inputStream
         self.outFile = outFile
-        self.hash = sha.new()
+        self.hasher = hasher
         self.position = start
         self.length = None
         if length is not None:
         self.position = start
         self.length = None
         if length is not None:
@@ -175,10 +196,10 @@ class StreamToFile(defer.Deferred):
     def _gotData(self, data):
         """Process the received data."""
         if self.outFile.closed:
     def _gotData(self, data):
         """Process the received data."""
         if self.outFile.closed:
-            raise Exception, "outFile was unexpectedly closed"
+            raise PeerError, "outFile was unexpectedly closed"
         
         if data is None:
         
         if data is None:
-            raise Exception, "Data is None?"
+            raise PeerError, "Data is None?"
         
         # Make sure we don't go too far
         if self.length is not None and self.position + len(data) > self.length:
         
         # Make sure we don't go too far
         if self.length is not None and self.position + len(data) > self.length:
@@ -187,15 +208,16 @@ class StreamToFile(defer.Deferred):
         # Write and hash the streamed data
         self.outFile.seek(self.position)
         self.outFile.write(data)
         # Write and hash the streamed data
         self.outFile.seek(self.position)
         self.outFile.write(data)
-        self.hash.update(data)
+        self.hasher.update(data)
         self.position += len(data)
         
     def _done(self, result):
         """Return the result."""
         self.position += len(data)
         
     def _done(self, result):
         """Return the result."""
-        return self.hash.digest()
+        return self.hasher.digest()
     
     def _error(self, err):
         """Log the error."""
     
     def _error(self, err):
         """Log the error."""
+        log.msg('Streaming error')
         log.err(err)
         return err
     
         log.err(err)
         return err
     
@@ -254,27 +276,30 @@ class FileDownload:
         self.compact_peers = compact_peers
         
         self.path = '/~/' + quote_plus(hash.expected())
         self.compact_peers = compact_peers
         
         self.path = '/~/' + quote_plus(hash.expected())
+        self.defer = None
+        self.mirror_path = None
         self.pieces = None
         self.started = False
         
         file.restat(False)
         if file.exists():
             file.remove()
         self.pieces = None
         self.started = False
         
         file.restat(False)
         if file.exists():
             file.remove()
-        self.file = file.open('w')
+        self.file = file.open('w+')
 
     def run(self):
         """Start the downloading process."""
 
     def run(self):
         """Start the downloading process."""
+        log.msg('Checking for pieces for %s' % self.path)
         self.defer = defer.Deferred()
         self.peers = {}
         no_pieces = 0
         self.defer = defer.Deferred()
         self.peers = {}
         no_pieces = 0
-        pieces_string = {}
-        pieces_hash = {}
-        pieces_dl_hash = {}
+        pieces_string = {0: 0}
+        pieces_hash = {0: 0}
+        pieces_dl_hash = {0: 0}
 
         for compact_peer in self.compact_peers:
             # Build a list of all the peers for this download
             site = uncompact(compact_peer['c'])
 
         for compact_peer in self.compact_peers:
             # Build a list of all the peers for this download
             site = uncompact(compact_peer['c'])
-            peer = manager.getPeer(site)
+            peer = self.manager.getPeer(site)
             self.peers.setdefault(site, {})['peer'] = peer
 
             # Extract any piece information from the peers list
             self.peers.setdefault(site, {})['peer'] = peer
 
             # Extract any piece information from the peers list
@@ -303,7 +328,8 @@ class FileDownload:
 
         if max_found == no_pieces:
             # The file is not split into pieces
 
         if max_found == no_pieces:
             # The file is not split into pieces
-            self.pieces = []
+            log.msg('No pieces were found for the file')
+            self.pieces = [self.hash.expected()]
             self.startDownload()
         elif max_found == max(pieces_string.values()):
             # Small number of pieces in a string
             self.startDownload()
         elif max_found == max(pieces_string.values()):
             # Small number of pieces in a string
@@ -319,13 +345,15 @@ class FileDownload:
             for pieces, num in pieces_hash.items():
                 # Find the most popular piece hash to lookup
                 if num == max_found:
             for pieces, num in pieces_hash.items():
                 # Find the most popular piece hash to lookup
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
                     self.getDHTPieces(pieces)
                     break
         elif max_found == max(pieces_dl_hash.values()):
             # Large number of pieces stored in peers
                     self.getDHTPieces(pieces)
                     break
         elif max_found == max(pieces_dl_hash.values()):
             # Large number of pieces stored in peers
-            for pieces, num in pieces_hash.items():
+            for pieces, num in pieces_dl_hash.items():
                 # Find the most popular piece hash to download
                 if num == max_found:
                 # Find the most popular piece hash to download
                 if num == max_found:
+                    log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
                     self.getPeerPieces(pieces)
                     break
         return self.defer
                     self.getPeerPieces(pieces)
                     break
         return self.defer
@@ -342,24 +370,28 @@ class FileDownload:
         #        del self.peers[site]
 
         # Start the DHT lookup
         #        del self.peers[site]
 
         # Start the DHT lookup
-        lookupDefer = self.manager.dht.getValue(key)
-        lookupDefer.addCallback(self._getDHTPieces, key)
+        lookupDefer = self.manager.dht.get(key)
+        lookupDefer.addBoth(self._getDHTPieces, key)
         
     def _getDHTPieces(self, results, key):
         """Check the retrieved values."""
         
     def _getDHTPieces(self, results, key):
         """Check the retrieved values."""
-        for result in results:
-            # Make sure the hash matches the key
-            result_hash = sha.new(result.get('t', '')).digest()
-            if result_hash == key:
-                pieces = result['t']
-                self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-                log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
-                self.startDownload()
-                return
+        if isinstance(results, list):
+            for result in results:
+                # Make sure the hash matches the key
+                result_hash = sha.new(result.get('t', '')).digest()
+                if result_hash == key:
+                    pieces = result['t']
+                    self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+                    log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+                    self.startDownload()
+                    return
+                
+            log.msg('Could not retrieve the piece hashes from the DHT')
+        else:
+            log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
             
         # Continue without the piece hashes
             
         # Continue without the piece hashes
-        log.msg('Could not retrieve the piece hashes from the DHT')
-        self.pieces = []
+        self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
         self.startDownload()
 
     def getPeerPieces(self, key, failedSite = None):
         self.startDownload()
 
     def getPeerPieces(self, key, failedSite = None):
@@ -368,12 +400,14 @@ class FileDownload:
         @param key: the key to request from the peers
         """
         if failedSite is None:
         @param key: the key to request from the peers
         """
         if failedSite is None:
+            log.msg('Starting the lookup of piece hashes in peers')
             self.outstanding = 0
             # Remove any peers with the wrong piece hash
             #for site in self.peers.keys():
             #    if self.peers[site].get('l', '') != key:
             #        del self.peers[site]
         else:
             self.outstanding = 0
             # Remove any peers with the wrong piece hash
             #for site in self.peers.keys():
             #    if self.peers[site].get('l', '') != key:
             #        del self.peers[site]
         else:
+            log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
             self.peers[failedSite]['failed'] = True
             self.outstanding -= 1
 
             self.peers[failedSite]['failed'] = True
             self.outstanding -= 1
 
@@ -381,22 +415,26 @@ class FileDownload:
             # Send a request to one or more peers
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
             # Send a request to one or more peers
             for site in self.peers:
                 if self.peers[site].get('failed', False) != True:
+                    log.msg('Sending a piece hash request to %r' % (site, ))
                     path = '/~/' + quote_plus(key)
                     lookupDefer = self.peers[site]['peer'].get(path)
                     path = '/~/' + quote_plus(key)
                     lookupDefer = self.peers[site]['peer'].get(path)
-                    lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
-                                             callbackArgs=(key, site), errbackArgs=(key, site))
+                    reactor.callLater(0, lookupDefer.addCallbacks,
+                                      *(self._getPeerPieces, self._gotPeerError),
+                                      **{'callbackArgs': (key, site),
+                                         'errbackArgs': (key, site)})
                     self.outstanding += 1
                     self.outstanding += 1
-                    if self.outstanding >= 3:
+                    if self.outstanding >= 4:
                         break
         
                         break
         
-        if self.pieces is None and self.outstanding == 0:
+        if self.pieces is None and self.outstanding <= 0:
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
             # Continue without the piece hashes
             log.msg('Could not retrieve the piece hashes from the peers')
-            self.pieces = []
+            self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
             self.startDownload()
         
     def _getPeerPieces(self, response, key, site):
         """Process the retrieved response from the peer."""
             self.startDownload()
         
     def _getPeerPieces(self, response, key, site):
         """Process the retrieved response from the peer."""
+        log.msg('Got a piece hash response %d from %r' % (response.code, site))
         if response.code != 200:
             # Request failed, try a different peer
             self.getPeerPieces(key, site)
         if response.code != 200:
             # Request failed, try a different peer
             self.getPeerPieces(key, site)
@@ -404,25 +442,31 @@ class FileDownload:
             # Read the response stream to a string
             self.peers[site]['pieces'] = ''
             def _gotPeerPiece(data, self = self, site = site):
             # Read the response stream to a string
             self.peers[site]['pieces'] = ''
             def _gotPeerPiece(data, self = self, site = site):
+                log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
                 self.peers[site]['pieces'] += data
                 self.peers[site]['pieces'] += data
+            log.msg('Streaming piece hashes from peer')
             df = stream.readStream(response.stream, _gotPeerPiece)
             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
                             callbackArgs=(key, site), errbackArgs=(key, site))
 
     def _gotPeerError(self, err, key, site):
         """Peer failed, try again."""
             df = stream.readStream(response.stream, _gotPeerPiece)
             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
                             callbackArgs=(key, site), errbackArgs=(key, site))
 
     def _gotPeerError(self, err, key, site):
         """Peer failed, try again."""
+        log.msg('Peer piece hash request failed for %r' % (site, ))
         log.err(err)
         self.getPeerPieces(key, site)
 
     def _gotPeerPieces(self, result, key, site):
         """Check the retrieved pieces from the peer."""
         log.err(err)
         self.getPeerPieces(key, site)
 
     def _gotPeerPieces(self, result, key, site):
         """Check the retrieved pieces from the peer."""
+        log.msg('Finished streaming piece hashes from peer %r' % (site, ))
         if self.pieces is not None:
             # Already done
         if self.pieces is not None:
             # Already done
+            log.msg('Already done')
             return
         
         try:
             result = bdecode(self.peers[site]['pieces'])
         except:
             return
         
         try:
             result = bdecode(self.peers[site]['pieces'])
         except:
+            log.msg('Error bdecoding piece hashes')
             log.err()
             self.getPeerPieces(key, site)
             return
             log.err()
             self.getPeerPieces(key, site)
             return
@@ -431,7 +475,7 @@ class FileDownload:
         if result_hash == key:
             pieces = result['t']
             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
         if result_hash == key:
             pieces = result['t']
             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
-            log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+            log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
             self.startDownload()
         else:
             log.msg('Peer returned a piece string that did not match')
             self.startDownload()
         else:
             log.msg('Peer returned a piece string that did not match')
@@ -455,28 +499,30 @@ class FileDownload:
         if self.started:
             return
         
         if self.started:
             return
         
+        log.msg('Starting to download %s' % self.path)
         self.started = True
         self.started = True
-        assert self.pieces is not None, "You must initialize the piece hashes first"
+        assert self.pieces, "You must initialize the piece hashes first"
         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
         
         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
         
+        # Use the mirror if there are few peers
+        if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+            parsed = urlparse(self.mirror)
+            if parsed[0] == "http":
+                site = splitHostPort(parsed[0], parsed[1])
+                self.mirror_path = urlunparse(('', '') + parsed[2:])
+                peer = self.manager.getPeer(site, mirror = True)
+                self.peerlist.append(peer)
+        
         # Special case if there's only one good peer left
         # Special case if there's only one good peer left
-        if len(self.peerlist) == 1:
-            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
-            self.defer.callback(self.peerlist[0].get(self.path))
-            return
+#        if len(self.peerlist) == 1:
+#            log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+#            self.defer.callback(self.peerlist[0].get(self.path))
+#            return
         
         
-        # Start sending the return file
-        self.stream = GrowingFileStream(self.file)
-        resp = Response(200, {}, self.stream)
-        self.defer.callback(resp)
-
         # Begin to download the pieces
         self.outstanding = 0
         self.nextFinish = 0
         # Begin to download the pieces
         self.outstanding = 0
         self.nextFinish = 0
-        if self.pieces:
-            self.completePieces = [False for piece in self.pieces]
-        else:
-            self.completePieces = [False]
+        self.completePieces = [False for piece in self.pieces]
         self.getPieces()
         
     #{ Downloading the pieces
         self.getPieces()
         
     #{ Downloading the pieces
@@ -489,20 +535,25 @@ class FileDownload:
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
                 self.completePieces[piece] = peer
                 # Send a request to the highest ranked peer
                 peer = self.peerlist.pop()
                 self.completePieces[piece] = peer
+                log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
                 
                 self.outstanding += 1
-                if self.pieces:
-                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                path = self.path
+                if peer.mirror:
+                    path = self.mirror_path
+                if len(self.completePieces) > 1:
+                    df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
                 else:
-                    df = peer.get(self.path)
+                    df = peer.get(path)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
                                      'errbackArgs': (piece, peer)})
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
                                      'errbackArgs': (piece, peer)})
-                piece += 1
+            piece += 1
                 
                 
-        # Check if we're don
+        # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+            log.msg('Download is complete for %s' % self.path)
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
@@ -510,18 +561,37 @@ class FileDownload:
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
-            peer.hashError()
+            log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+            peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
             self.completePieces[piece] = False
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
         else:
             self.completePieces[piece] = False
             if response.stream and response.stream.length:
                 stream.readAndDiscard(response.stream)
         else:
+            if self.defer:
+                # Start sending the return file
+                df = self.defer
+                self.defer = None
+                self.stream = GrowingFileStream(self.file, self.hash.expSize)
+
+                # Get the headers from the peer's response
+                headers = {}
+                if response.headers.hasHeader('last-modified'):
+                    headers['last-modified'] = response.headers.getHeader('last-modified')
+                resp = Response(200, headers, self.stream)
+                df.callback(resp)
+
             # Read the response stream to the file
             # Read the response stream to the file
+            log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
+                df = StreamToFile(self.hash.newPieceHasher(), response.stream,
+                                  self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
             else:
             else:
-                df = StreamToFile(response.stream, self.file).run()
-            df.addCallbacks(self._gotPiece, self._gotError,
-                            callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+                df = StreamToFile(self.hash.newHasher(), response.stream,
+                                  self.file).run()
+            reactor.callLater(0, df.addCallbacks,
+                              *(self._gotPiece, self._gotError),
+                              **{'callbackArgs': (piece, peer),
+                                 'errbackArgs': (piece, peer)})
 
         self.outstanding -= 1
         self.peerlist.append(peer)
 
         self.outstanding -= 1
         self.peerlist.append(peer)
@@ -529,6 +599,7 @@ class FileDownload:
 
     def _getError(self, err, piece, peer):
         """Peer failed, try again."""
 
     def _getError(self, err, piece, peer):
         """Peer failed, try again."""
+        log.msg('Got error for piece %d from peer %r' % (piece, peer))
         self.outstanding -= 1
         self.peerlist.append(peer)
         self.completePieces[piece] = False
         self.outstanding -= 1
         self.peerlist.append(peer)
         self.completePieces[piece] = False
@@ -537,27 +608,25 @@ class FileDownload:
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
-        if ((self.pieces and response != self.pieces[piece]) or
-            (len(self.pieces) == 0 and response == self.hash.expected())):
+        if self.pieces[piece] and response != self.pieces[piece]:
             # Hash doesn't match
             # Hash doesn't match
-            peer.hashError()
+            log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+            peer.hashError('Piece received from peer does not match expected')
             self.completePieces[piece] = False
             self.completePieces[piece] = False
-        elif self.pieces:
+        else:
             # Successfully completed one of several pieces
             # Successfully completed one of several pieces
+            log.msg('Finished with piece %d from peer %r' % (piece, peer))
             self.completePieces[piece] = True
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
                 self.nextFinish += 1
                 self.stream.updateAvailable(PIECE_SIZE)
             self.completePieces[piece] = True
             while (self.nextFinish < len(self.completePieces) and
                    self.completePieces[self.nextFinish] == True):
                 self.nextFinish += 1
                 self.stream.updateAvailable(PIECE_SIZE)
-        else:
-            # Whole download (only one piece) is complete
-            self.completePieces[piece] = True
-            self.stream.updateAvailable(2**30)
 
         self.getPieces()
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
 
         self.getPieces()
 
     def _gotError(self, err, piece, peer):
         """Piece download failed, try again."""
+        log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
         log.err(err)
         self.completePieces[piece] = False
         self.getPieces()
@@ -567,19 +636,22 @@ class PeerManager:
     
     @type cache_dir: L{twisted.python.filepath.FilePath}
     @ivar cache_dir: the directory to use for storing all files
     
     @type cache_dir: L{twisted.python.filepath.FilePath}
     @ivar cache_dir: the directory to use for storing all files
-    @type dht: L{interfaces.IDHT}
+    @type dht: L{DHTManager.DHT}
     @ivar dht: the DHT instance
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
     @type clients: C{dictionary}
     @ivar clients: the available peers that have been previously contacted
     """
 
-    def __init__(self, cache_dir, dht):
+    def __init__(self, cache_dir, dht, stats):
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
         """Initialize the instance."""
         self.cache_dir = cache_dir
         self.cache_dir.restat(False)
         if not self.cache_dir.exists():
             self.cache_dir.makedirs()
         self.dht = dht
+        self.stats = stats
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
         self.clients = {}
         
     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
@@ -605,26 +677,30 @@ class PeerManager:
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
             site = splitHostPort(parsed[0], parsed[1])
             path = urlunparse(('', '') + parsed[2:])
-            peer = self.getPeer(site)
+            peer = self.getPeer(site, mirror = True)
             return peer.get(path, method, modtime)
             return peer.get(path, method, modtime)
-        elif len(peers) == 1:
-            site = uncompact(peers[0]['c'])
-            log.msg('Downloading from peer %r' % (site, ))
-            path = '/~/' + quote_plus(hash.expected())
-            peer = self.getPeer(site)
-            return peer.get(path)
+#        elif len(peers) == 1:
+#            site = uncompact(peers[0]['c'])
+#            log.msg('Downloading from peer %r' % (site, ))
+#            path = '/~/' + quote_plus(hash.expected())
+#            peer = self.getPeer(site)
+#            return peer.get(path)
         else:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
         else:
             tmpfile = self.cache_dir.child(hash.hexexpected())
             return FileDownload(self, hash, mirror, peers, tmpfile).run()
         
-    def getPeer(self, site):
+    def getPeer(self, site, mirror = False):
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
         """Create a new peer if necessary and return it.
         
         @type site: (C{string}, C{int})
         @param site: the IP address and port of the peer
+        @param mirror: whether the peer is actually a mirror
+            (optional, defaults to False)
         """
         if site not in self.clients:
         """
         if site not in self.clients:
-            self.clients[site] = Peer(site[0], site[1])
+            self.clients[site] = Peer(site[0], site[1], self.stats)
+            if mirror:
+                self.clients[site].mirror = True
         return self.clients[site]
     
     def close(self):
         return self.clients[site]
     
     def close(self):
@@ -639,60 +715,6 @@ class TestPeerManager(unittest.TestCase):
     manager = None
     pending_calls = []
     
     manager = None
     pending_calls = []
     
-    def gotResp(self, resp, num, expect):
-        self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
-        if expect is not None:
-            self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
-        def print_(n):
-            pass
-        def printdone(n):
-            pass
-        stream.readStream(resp.stream, print_).addCallback(printdone)
-    
-    def test_download(self):
-        """Tests a normal download."""
-        self.manager = PeerManager()
-        self.timeout = 10
-        
-        host = 'www.ietf.org'
-        d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
-        d.addCallback(self.gotResp, 1, 1070)
-        return d
-        
-    def test_head(self):
-        """Tests a 'HEAD' request."""
-        self.manager = PeerManager()
-        self.timeout = 10
-        
-        host = 'www.ietf.org'
-        d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
-        d.addCallback(self.gotResp, 1, 0)
-        return d
-        
-    def test_multiple_downloads(self):
-        """Tests multiple downloads with queueing and connection closing."""
-        self.manager = PeerManager()
-        self.timeout = 120
-        lastDefer = defer.Deferred()
-        
-        def newRequest(host, path, num, expect, last=False):
-            d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
-            d.addCallback(self.gotResp, num, expect)
-            if last:
-                d.addBoth(lastDefer.callback)
-                
-        newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
-        newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
-        newRequest('www.google.ca', "/", 3, None)
-        self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
-        self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
-        self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
-        self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
-        self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
-        self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
-        self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
-        return lastDefer
-        
     def tearDown(self):
         for p in self.pending_calls:
             if p.active():
     def tearDown(self):
         for p in self.pending_calls:
             if p.active():