Fixed an error in downloading source packages from peers.
authorCameron Dale <camrdale@gmail.com>
Thu, 24 Apr 2008 03:52:11 +0000 (20:52 -0700)
committerCameron Dale <camrdale@gmail.com>
Thu, 24 Apr 2008 03:52:11 +0000 (20:52 -0700)
Source packages with no pieces were hash checked with sha1, which would fail.
Also added a test for source packages.
Also removed some unneeded logging in the peer downloader.

apt_p2p/Hash.py
apt_p2p/PeerManager.py
test.py

index fe26a0129de9924ec39b54e0bf16fea9d3b088dc..021c84cf11dec62110eed41155e80bc7768fc48f 100644 (file)
@@ -82,11 +82,11 @@ class HashObject:
         if self.result is None or force:
             self.result = None
             self.done = False
-            self.fileHasher = self._new()
+            self.fileHasher = self.newHasher()
             if self.ORDER[self.hashTypeNum]['name'] == 'sha1':
                 self.pieceHasher = None
             else:
-                self.pieceHasher = self._newSHA1()
+                self.pieceHasher = self.newPieceHasher()
                 self.pieceSize = 0
             self.fileHash = None
             self.pieceHash = []
@@ -94,7 +94,7 @@ class HashObject:
             self.fileHex = None
             self.fileNormHash = None
 
-    def _new(self):
+    def newHasher(self):
         """Create a new hashing object according to the hash type."""
         if sys.version_info < (2, 5):
             mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), [])
@@ -104,7 +104,7 @@ class HashObject:
             func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func'])
             return func()
 
-    def _newSHA1(self):
+    def newPieceHasher(self):
         """Create a new SHA1 hashing object."""
         if sys.version_info < (2, 5):
             import sha
@@ -130,7 +130,7 @@ class HashObject:
 
                 # Save the first piece digest and initialize a new piece hasher
                 self.pieceHash.append(self.fileHasher.digest())
-                self.pieceHasher = self._newSHA1()
+                self.pieceHasher = self.newPieceHasher()
 
             if self.pieceHasher:
                 # Loop in case the data contains multiple pieces
@@ -138,7 +138,7 @@ class HashObject:
                     # Save the piece hash and start a new one
                     self.pieceHasher.update(data[:(PIECE_SIZE - self.pieceSize)])
                     self.pieceHash.append(self.pieceHasher.digest())
-                    self.pieceHasher = self._newSHA1()
+                    self.pieceHasher = self.newPieceHasher()
                     
                     # Don't forget to hash the data normally
                     self.fileHasher.update(data[:(PIECE_SIZE - self.pieceSize)])
index 7f63c974aadb7c48b12298433627611a20394a0b..31d926168954c953c4ef3a6a314462c34a8a4693 100644 (file)
@@ -149,8 +149,8 @@ class StreamToFile:
     @ivar stream: the input stream being read
     @type outFile: L{twisted.python.filepath.FilePath}
     @ivar outFile: the file being written
-    @type hash: C{sha1}
-    @ivar hash: the hash object for the data
+    @type hasher: hashing object, e.g. C{sha1}
+    @ivar hasher: the hash object for the data
     @type position: C{int}
     @ivar position: the current file position to write the next data to
     @type length: C{int}
@@ -159,9 +159,11 @@ class StreamToFile:
     @ivar doneDefer: the deferred that will fire when done writing
     """
     
-    def __init__(self, inputStream, outFile, start = 0, length = None):
+    def __init__(self, hasher, inputStream, outFile, start = 0, length = None):
         """Initializes the file.
         
+        @type hasher: hashing object, e.g. C{sha1}
+        @param hasher: the hash object for the data
         @type inputStream: L{twisted.web2.stream.IByteStream}
         @param inputStream: the input stream to read from
         @type outFile: L{twisted.python.filepath.FilePath}
@@ -175,7 +177,7 @@ class StreamToFile:
         """
         self.stream = inputStream
         self.outFile = outFile
-        self.hash = sha.new()
+        self.hasher = hasher
         self.position = start
         self.length = None
         if length is not None:
@@ -187,7 +189,6 @@ class StreamToFile:
 
         @rtype: L{twisted.internet.defer.Deferred}
         """
-        log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
         self.doneDefer = stream.readStream(self.stream, self._gotData)
         self.doneDefer.addCallbacks(self._done, self._error)
         return self.doneDefer
@@ -207,13 +208,12 @@ class StreamToFile:
         # Write and hash the streamed data
         self.outFile.seek(self.position)
         self.outFile.write(data)
-        self.hash.update(data)
+        self.hasher.update(data)
         self.position += len(data)
         
     def _done(self, result):
         """Return the result."""
-        log.msg('Streaming is complete')
-        return self.hash.digest()
+        return self.hasher.digest()
     
     def _error(self, err):
         """Log the error."""
@@ -538,10 +538,13 @@ class FileDownload:
                 log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
                 
                 self.outstanding += 1
+                path = self.path
                 if peer.mirror:
-                    df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    path = self.mirror_path
+                if len(self.completePieces) > 1:
+                    df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
                 else:
-                    df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+                    df = peer.get(path)
                 reactor.callLater(0, df.addCallbacks,
                                   *(self._getPiece, self._getError),
                                   **{'callbackArgs': (piece, peer),
@@ -550,12 +553,11 @@ class FileDownload:
                 
         # Check if we're done
         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
-            log.msg('We seem to be done with all pieces')
+            log.msg('Download is complete for %s' % self.path)
             self.stream.allAvailable()
     
     def _getPiece(self, response, piece, peer):
         """Process the retrieved headers from the peer."""
-        log.msg('Got response for piece %d from peer %r' % (piece, peer))
         if ((len(self.completePieces) > 1 and response.code != 206) or
             (response.code not in (200, 206))):
             # Request failed, try a different peer
@@ -581,10 +583,11 @@ class FileDownload:
             # Read the response stream to the file
             log.msg('Streaming piece %d from peer %r' % (piece, peer))
             if response.code == 206:
-                df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
-                                  PIECE_SIZE).run()
+                df = StreamToFile(self.hash.newPieceHasher(), response.stream,
+                                  self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
             else:
-                df = StreamToFile(response.stream, self.file).run()
+                df = StreamToFile(self.hash.newHasher(), response.stream,
+                                  self.file).run()
             reactor.callLater(0, df.addCallbacks,
                               *(self._gotPiece, self._gotError),
                               **{'callbackArgs': (piece, peer),
@@ -605,7 +608,6 @@ class FileDownload:
 
     def _gotPiece(self, response, piece, peer):
         """Process the retrieved piece from the peer."""
-        log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
         if self.pieces[piece] and response != self.pieces[piece]:
             # Hash doesn't match
             log.msg('Hash error for piece %d from peer %r' % (piece, peer))
diff --git a/test.py b/test.py
index d83ccf18b7bc8959b4cdece4cfba901ac29ab191..314a21b961ae408a4ea69980cd401a8bb6529eac 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -47,7 +47,6 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
               (1, ['install', 'ada-reference-manual']),
               (1, ['install', 'aspectj-doc']),
               (1, ['install', 'fop-doc']),
-              (1, ['install', 'jswat-doc']),
               (1, ['install', 'asis-doc']),
               (1, ['install', 'bison-doc']),
               (1, ['install', 'crash-whitepaper']),
@@ -64,13 +63,11 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
                 (1, ['install', 'aap-doc']),
                 (1, ['install', 'ada-reference-manual']),
                 (1, ['install', 'fop-doc']),
-                (1, ['install', 'jswat-doc']),
                 (1, ['install', 'bison-doc']),
                 (1, ['install', 'crash-whitepaper']),
                 (2, ['install', 'aap-doc']),
                 (2, ['install', 'ada-reference-manual']),
                 (2, ['install', 'fop-doc']),
-                (2, ['install', 'jswat-doc']),
                 (2, ['install', 'bison-doc']),
                 (2, ['install', 'crash-whitepaper']),
                 ]),
@@ -322,6 +319,26 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl
                    ]),
               ]),
 
+         'b': ('Start 2 downloaders and test source downloads.',
+               {1: {}},
+               {1: {'types': ['deb-src']},
+                2: {'types': ['deb-src']}},
+               [(1, ['update']),
+                (2, ['update']),
+                (1, ['source', 'aboot-base']),
+                (2, ['source', 'aboot-base']),
+                (1, ['source', 'aap-doc']),
+                (1, ['source', 'ada-reference-manual']),
+                (1, ['source', 'fop-doc']),
+                (1, ['source', 'bison-doc']),
+                (1, ['source', 'crash-whitepaper']),
+                (2, ['source', 'aap-doc']),
+                (2, ['source', 'ada-reference-manual']),
+                (2, ['source', 'fop-doc']),
+                (2, ['source', 'bison-doc']),
+                (2, ['source', 'crash-whitepaper']),
+                ]),
+                
          }
 
 assert 'all' not in tests
@@ -384,7 +401,7 @@ Debug
   pkgInitialize "false";   // This one will dump the configuration space
   NoLocking "false";
   Acquire::Ftp "false";    // Show ftp command traffic
-  Acquire::Http "false";   // Show http command traffic
+  Acquire::Http "true";   // Show http command traffic
   Acquire::gpgv "false";   // Show the gpgv traffic
   aptcdrom "false";        // Show found package files
   IdentCdrom "false";
@@ -558,6 +575,8 @@ def start(cmd, args, work_dir = None):
     """
     
     new_cmd = [cmd] + args
+    if work_dir:
+        os.chdir(work_dir)
     pid = os.spawnvp(os.P_NOWAIT, new_cmd[0], new_cmd)
     return pid
 
@@ -627,7 +646,7 @@ def apt_get(num_down, cmd):
     apt_conf = join([down_dir(num_down), 'etc', 'apt', 'apt.conf'])
     dpkg_status = join([down_dir(num_down), 'var', 'lib', 'dpkg', 'status'])
     args = ['-d', '-c', apt_conf, '-o', 'Dir::state::status='+dpkg_status] + cmd
-    pid = start('apt-get', args)
+    pid = start('apt-get', args, downloader_dir)
     return pid
 
 def bootstrap_address(num_boot):
@@ -667,7 +686,7 @@ def boot_dir(num_boot):
     return os.path.join(CWD,'bootstrap' + str(num_boot))
 
 def start_downloader(bootstrap_addresses, num_down, options = {},
-                     mirror = 'ftp.us.debian.org/debian', 
+                     types = ['deb'], mirror = 'ftp.us.debian.org/debian', 
                      suites = 'main contrib non-free', clean = True):
     """Initialize a new downloader process.
 
@@ -682,6 +701,9 @@ def start_downloader(bootstrap_addresses, num_down, options = {},
     @param options: the dictionary of string formatting values for creating
         the apt-p2p configuration file (see L{apt_p2p_conf_template} above).
         (optional, defaults to only using the default arguments)
+    @type types: C{list} of C{string}
+    @param types: the type of sources.list line to add
+        (optional, defaults to only 'deb')
     @type mirror: C{string}
     @param mirror: the Debian mirror to use
         (optional, defaults to 'ftp.us.debian.org/debian')
@@ -722,7 +744,8 @@ def start_downloader(bootstrap_addresses, num_down, options = {},
     if not exists(join([downloader_dir, 'etc', 'apt', 'sources.list'])):
         # Create apt's config files
         f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w')
-        f.write('deb http://localhost:1%02d77/%s/ unstable %s\n' % (num_down, mirror, suites))
+        for type in types:
+            f.write('%s http://localhost:1%02d77/%s/ unstable %s\n' % (type, num_down, mirror, suites))
         f.close()
 
     if not exists(join([downloader_dir, 'etc', 'apt', 'apt.conf'])):