From aafdc40ed1f4b5355bdf23365dc9d48f74cc8761 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Wed, 23 Apr 2008 20:52:11 -0700 Subject: [PATCH] Fixed an error in downloading source packages from peers. Source packages with no pieces were hash checked with sha1, which would fail. Also added a test for source packages. Also removed some unneeded logging in the peer downloader. --- apt_p2p/Hash.py | 12 ++++++------ apt_p2p/PeerManager.py | 34 ++++++++++++++++++---------------- test.py | 37 ++++++++++++++++++++++++++++++------- 3 files changed, 54 insertions(+), 29 deletions(-) diff --git a/apt_p2p/Hash.py b/apt_p2p/Hash.py index fe26a01..021c84c 100644 --- a/apt_p2p/Hash.py +++ b/apt_p2p/Hash.py @@ -82,11 +82,11 @@ class HashObject: if self.result is None or force: self.result = None self.done = False - self.fileHasher = self._new() + self.fileHasher = self.newHasher() if self.ORDER[self.hashTypeNum]['name'] == 'sha1': self.pieceHasher = None else: - self.pieceHasher = self._newSHA1() + self.pieceHasher = self.newPieceHasher() self.pieceSize = 0 self.fileHash = None self.pieceHash = [] @@ -94,7 +94,7 @@ class HashObject: self.fileHex = None self.fileNormHash = None - def _new(self): + def newHasher(self): """Create a new hashing object according to the hash type.""" if sys.version_info < (2, 5): mod = __import__(self.ORDER[self.hashTypeNum]['old_module'], globals(), locals(), []) @@ -104,7 +104,7 @@ class HashObject: func = getattr(hashlib, self.ORDER[self.hashTypeNum]['hashlib_func']) return func() - def _newSHA1(self): + def newPieceHasher(self): """Create a new SHA1 hashing object.""" if sys.version_info < (2, 5): import sha @@ -130,7 +130,7 @@ class HashObject: # Save the first piece digest and initialize a new piece hasher self.pieceHash.append(self.fileHasher.digest()) - self.pieceHasher = self._newSHA1() + self.pieceHasher = self.newPieceHasher() if self.pieceHasher: # Loop in case the data contains multiple pieces @@ -138,7 +138,7 @@ class HashObject: # Save the piece hash and start a new one self.pieceHasher.update(data[:(PIECE_SIZE - self.pieceSize)]) self.pieceHash.append(self.pieceHasher.digest()) - self.pieceHasher = self._newSHA1() + self.pieceHasher = self.newPieceHasher() # Don't forget to hash the data normally self.fileHasher.update(data[:(PIECE_SIZE - self.pieceSize)]) diff --git a/apt_p2p/PeerManager.py b/apt_p2p/PeerManager.py index 7f63c97..31d9261 100644 --- a/apt_p2p/PeerManager.py +++ b/apt_p2p/PeerManager.py @@ -149,8 +149,8 @@ class StreamToFile: @ivar stream: the input stream being read @type outFile: L{twisted.python.filepath.FilePath} @ivar outFile: the file being written - @type hash: C{sha1} - @ivar hash: the hash object for the data + @type hasher: hashing object, e.g. C{sha1} + @ivar hasher: the hash object for the data @type position: C{int} @ivar position: the current file position to write the next data to @type length: C{int} @@ -159,9 +159,11 @@ class StreamToFile: @ivar doneDefer: the deferred that will fire when done writing """ - def __init__(self, inputStream, outFile, start = 0, length = None): + def __init__(self, hasher, inputStream, outFile, start = 0, length = None): """Initializes the file. + @type hasher: hashing object, e.g. C{sha1} + @param hasher: the hash object for the data @type inputStream: L{twisted.web2.stream.IByteStream} @param inputStream: the input stream to read from @type outFile: L{twisted.python.filepath.FilePath} @@ -175,7 +177,7 @@ class StreamToFile: """ self.stream = inputStream self.outFile = outFile - self.hash = sha.new() + self.hasher = hasher self.position = start self.length = None if length is not None: @@ -187,7 +189,6 @@ class StreamToFile: @rtype: L{twisted.internet.defer.Deferred} """ - log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position)) self.doneDefer = stream.readStream(self.stream, self._gotData) self.doneDefer.addCallbacks(self._done, self._error) return self.doneDefer @@ -207,13 +208,12 @@ class StreamToFile: # Write and hash the streamed data self.outFile.seek(self.position) self.outFile.write(data) - self.hash.update(data) + self.hasher.update(data) self.position += len(data) def _done(self, result): """Return the result.""" - log.msg('Streaming is complete') - return self.hash.digest() + return self.hasher.digest() def _error(self, err): """Log the error.""" @@ -538,10 +538,13 @@ class FileDownload: log.msg('Sending a request for piece %d to peer %r' % (piece, peer)) self.outstanding += 1 + path = self.path if peer.mirror: - df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + path = self.mirror_path + if len(self.completePieces) > 1: + df = peer.getRange(path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) else: - df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1) + df = peer.get(path) reactor.callLater(0, df.addCallbacks, *(self._getPiece, self._getError), **{'callbackArgs': (piece, peer), @@ -550,12 +553,11 @@ class FileDownload: # Check if we're done if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces): - log.msg('We seem to be done with all pieces') + log.msg('Download is complete for %s' % self.path) self.stream.allAvailable() def _getPiece(self, response, piece, peer): """Process the retrieved headers from the peer.""" - log.msg('Got response for piece %d from peer %r' % (piece, peer)) if ((len(self.completePieces) > 1 and response.code != 206) or (response.code not in (200, 206))): # Request failed, try a different peer @@ -581,10 +583,11 @@ class FileDownload: # Read the response stream to the file log.msg('Streaming piece %d from peer %r' % (piece, peer)) if response.code == 206: - df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, - PIECE_SIZE).run() + df = StreamToFile(self.hash.newPieceHasher(), response.stream, + self.file, piece*PIECE_SIZE, PIECE_SIZE).run() else: - df = StreamToFile(response.stream, self.file).run() + df = StreamToFile(self.hash.newHasher(), response.stream, + self.file).run() reactor.callLater(0, df.addCallbacks, *(self._gotPiece, self._gotError), **{'callbackArgs': (piece, peer), @@ -605,7 +608,6 @@ class FileDownload: def _gotPiece(self, response, piece, peer): """Process the retrieved piece from the peer.""" - log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response)) if self.pieces[piece] and response != self.pieces[piece]: # Hash doesn't match log.msg('Hash error for piece %d from peer %r' % (piece, peer)) diff --git a/test.py b/test.py index d83ccf1..314a21b 100755 --- a/test.py +++ b/test.py @@ -47,7 +47,6 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl (1, ['install', 'ada-reference-manual']), (1, ['install', 'aspectj-doc']), (1, ['install', 'fop-doc']), - (1, ['install', 'jswat-doc']), (1, ['install', 'asis-doc']), (1, ['install', 'bison-doc']), (1, ['install', 'crash-whitepaper']), @@ -64,13 +63,11 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl (1, ['install', 'aap-doc']), (1, ['install', 'ada-reference-manual']), (1, ['install', 'fop-doc']), - (1, ['install', 'jswat-doc']), (1, ['install', 'bison-doc']), (1, ['install', 'crash-whitepaper']), (2, ['install', 'aap-doc']), (2, ['install', 'ada-reference-manual']), (2, ['install', 'fop-doc']), - (2, ['install', 'jswat-doc']), (2, ['install', 'bison-doc']), (2, ['install', 'crash-whitepaper']), ]), @@ -322,6 +319,26 @@ tests = {'1': ('Start a single bootstrap and downloader, test updating and downl ]), ]), + 'b': ('Start 2 downloaders and test source downloads.', + {1: {}}, + {1: {'types': ['deb-src']}, + 2: {'types': ['deb-src']}}, + [(1, ['update']), + (2, ['update']), + (1, ['source', 'aboot-base']), + (2, ['source', 'aboot-base']), + (1, ['source', 'aap-doc']), + (1, ['source', 'ada-reference-manual']), + (1, ['source', 'fop-doc']), + (1, ['source', 'bison-doc']), + (1, ['source', 'crash-whitepaper']), + (2, ['source', 'aap-doc']), + (2, ['source', 'ada-reference-manual']), + (2, ['source', 'fop-doc']), + (2, ['source', 'bison-doc']), + (2, ['source', 'crash-whitepaper']), + ]), + } assert 'all' not in tests @@ -384,7 +401,7 @@ Debug pkgInitialize "false"; // This one will dump the configuration space NoLocking "false"; Acquire::Ftp "false"; // Show ftp command traffic - Acquire::Http "false"; // Show http command traffic + Acquire::Http "true"; // Show http command traffic Acquire::gpgv "false"; // Show the gpgv traffic aptcdrom "false"; // Show found package files IdentCdrom "false"; @@ -558,6 +575,8 @@ def start(cmd, args, work_dir = None): """ new_cmd = [cmd] + args + if work_dir: + os.chdir(work_dir) pid = os.spawnvp(os.P_NOWAIT, new_cmd[0], new_cmd) return pid @@ -627,7 +646,7 @@ def apt_get(num_down, cmd): apt_conf = join([down_dir(num_down), 'etc', 'apt', 'apt.conf']) dpkg_status = join([down_dir(num_down), 'var', 'lib', 'dpkg', 'status']) args = ['-d', '-c', apt_conf, '-o', 'Dir::state::status='+dpkg_status] + cmd - pid = start('apt-get', args) + pid = start('apt-get', args, downloader_dir) return pid def bootstrap_address(num_boot): @@ -667,7 +686,7 @@ def boot_dir(num_boot): return os.path.join(CWD,'bootstrap' + str(num_boot)) def start_downloader(bootstrap_addresses, num_down, options = {}, - mirror = 'ftp.us.debian.org/debian', + types = ['deb'], mirror = 'ftp.us.debian.org/debian', suites = 'main contrib non-free', clean = True): """Initialize a new downloader process. @@ -682,6 +701,9 @@ def start_downloader(bootstrap_addresses, num_down, options = {}, @param options: the dictionary of string formatting values for creating the apt-p2p configuration file (see L{apt_p2p_conf_template} above). (optional, defaults to only using the default arguments) + @type types: C{list} of C{string} + @param types: the type of sources.list line to add + (optional, defaults to only 'deb') @type mirror: C{string} @param mirror: the Debian mirror to use (optional, defaults to 'ftp.us.debian.org/debian') @@ -722,7 +744,8 @@ def start_downloader(bootstrap_addresses, num_down, options = {}, if not exists(join([downloader_dir, 'etc', 'apt', 'sources.list'])): # Create apt's config files f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w') - f.write('deb http://localhost:1%02d77/%s/ unstable %s\n' % (num_down, mirror, suites)) + for type in types: + f.write('%s http://localhost:1%02d77/%s/ unstable %s\n' % (type, num_down, mirror, suites)) f.close() if not exists(join([downloader_dir, 'etc', 'apt', 'apt.conf'])): -- 2.39.2