CHUNK_SIZE = 4*1024
- def __init__(self, f):
+ def __init__(self, f, length = None):
stream.FileStream.__init__(self, f)
- self.length = None
+ self.length = length
self.deferred = None
self.available = 0L
self.position = 0L
self.finished = False
- def updateAvaliable(self, newlyAvailable):
+ def updateAvailable(self, newlyAvailable):
"""Update the number of bytes that are available.
Call it with 0 to trigger reading of a fully read file.
deferred.callback(b)
else:
# We're done
+ deferred = self.deferred
+ self.deferred = None
deferred.callback(None)
else:
# We're done
+ deferred = self.deferred
+ self.deferred = None
deferred.callback(None)
def read(self, sendfile=False):
self.position += bytesRead
return b
-class StreamToFile(defer.Deferred):
+class StreamToFile:
"""Save a stream to a partial file and hash it.
@type stream: L{twisted.web2.stream.IByteStream}
@rtype: L{twisted.internet.defer.Deferred}
"""
+ log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
self.doneDefer = stream.readStream(self.stream, self._gotData)
self.doneDefer.addCallbacks(self._done, self._error)
return self.doneDefer
def _done(self, result):
"""Return the result."""
+ log.msg('Streaming is complete')
return self.hash.digest()
def _error(self, err):
"""Log the error."""
+ log.msg('Streaming error')
log.err(err)
return err
file.restat(False)
if file.exists():
file.remove()
- self.file = file.open('w')
+ self.file = file.open('w+')
def run(self):
"""Start the downloading process."""
+ log.msg('Checking for pieces for %s' % self.path)
self.defer = defer.Deferred()
self.peers = {}
no_pieces = 0
- pieces_string = {}
- pieces_hash = {}
- pieces_dl_hash = {}
+ pieces_string = {0: 0}
+ pieces_hash = {0: 0}
+ pieces_dl_hash = {0: 0}
for compact_peer in self.compact_peers:
# Build a list of all the peers for this download
site = uncompact(compact_peer['c'])
- peer = manager.getPeer(site)
+ peer = self.manager.getPeer(site)
self.peers.setdefault(site, {})['peer'] = peer
# Extract any piece information from the peers list
if max_found == no_pieces:
# The file is not split into pieces
+ log.msg('No pieces were found for the file')
self.pieces = []
self.startDownload()
elif max_found == max(pieces_string.values()):
for pieces, num in pieces_hash.items():
# Find the most popular piece hash to lookup
if num == max_found:
+ log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
self.getDHTPieces(pieces)
break
elif max_found == max(pieces_dl_hash.values()):
for pieces, num in pieces_hash.items():
# Find the most popular piece hash to download
if num == max_found:
+ log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
self.getPeerPieces(pieces)
break
return self.defer
@param key: the key to request from the peers
"""
if failedSite is None:
+ log.msg('Starting the lookup of piece hashes in peers')
self.outstanding = 0
# Remove any peers with the wrong piece hash
#for site in self.peers.keys():
# if self.peers[site].get('l', '') != key:
# del self.peers[site]
else:
+ log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
self.peers[failedSite]['failed'] = True
self.outstanding -= 1
if self.pieces is None:
# Send a request to one or more peers
+ log.msg('Checking for a peer to request piece hashes from')
for site in self.peers:
if self.peers[site].get('failed', False) != True:
+ log.msg('Sending a piece hash request to %r' % (site, ))
path = '/~/' + quote_plus(key)
lookupDefer = self.peers[site]['peer'].get(path)
- lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
- callbackArgs=(key, site), errbackArgs=(key, site))
+ reactor.callLater(0, lookupDefer.addCallbacks,
+ *(self._getPeerPieces, self._gotPeerError),
+ **{'callbackArgs': (key, site),
+ 'errbackArgs': (key, site)})
self.outstanding += 1
if self.outstanding >= 3:
break
+ log.msg('Done sending piece hash requests for now, %d outstanding' % self.outstanding)
if self.pieces is None and self.outstanding == 0:
# Continue without the piece hashes
log.msg('Could not retrieve the piece hashes from the peers')
def _getPeerPieces(self, response, key, site):
"""Process the retrieved response from the peer."""
+ log.msg('Got a piece hash response %d from %r' % (response.code, site))
if response.code != 200:
# Request failed, try a different peer
+ log.msg('Did not like response %d from %r' % (response.code, site))
self.getPeerPieces(key, site)
else:
# Read the response stream to a string
self.peers[site]['pieces'] = ''
def _gotPeerPiece(data, self = self, site = site):
+ log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
self.peers[site]['pieces'] += data
+ log.msg('Streaming piece hashes from peer')
df = stream.readStream(response.stream, _gotPeerPiece)
df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
callbackArgs=(key, site), errbackArgs=(key, site))
def _gotPeerError(self, err, key, site):
"""Peer failed, try again."""
+ log.msg('Peer piece hash request failed for %r' % (site, ))
log.err(err)
self.getPeerPieces(key, site)
def _gotPeerPieces(self, result, key, site):
"""Check the retrieved pieces from the peer."""
+ log.msg('Finished streaming piece hashes from peer %r' % (site, ))
if self.pieces is not None:
# Already done
+ log.msg('Already done')
return
try:
result = bdecode(self.peers[site]['pieces'])
except:
+ log.msg('Error bdecoding piece hashes')
log.err()
self.getPeerPieces(key, site)
return
if result_hash == key:
pieces = result['t']
self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+ log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
self.startDownload()
else:
log.msg('Peer returned a piece string that did not match')
if self.started:
return
+ log.msg('Starting to download %s' % self.path)
self.started = True
assert self.pieces is not None, "You must initialize the piece hashes first"
self.peerlist = [self.peers[site]['peer'] for site in self.peers]
return
# Start sending the return file
- self.stream = GrowingFileStream(self.file)
+ self.stream = GrowingFileStream(self.file, self.hash.expSize)
resp = Response(200, {}, self.stream)
self.defer.callback(resp)
#{ Downloading the pieces
def getPieces(self):
"""Download the next pieces from the peers."""
+ log.msg('Checking for more piece requests to send')
self.sort()
piece = self.nextFinish
while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+ log.msg('Checking piece %d' % piece)
if self.completePieces[piece] == False:
# Send a request to the highest ranked peer
peer = self.peerlist.pop()
self.completePieces[piece] = peer
+ log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
self.outstanding += 1
if self.pieces:
*(self._getPiece, self._getError),
**{'callbackArgs': (piece, peer),
'errbackArgs': (piece, peer)})
- piece += 1
+ piece += 1
- # Check if we're don
+ log.msg('Finished checking pieces, %d outstanding, next piece %d of %d' % (self.outstanding, self.nextFinish, len(self.completePieces)))
+ # Check if we're done
if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+ log.msg('We seem to be done with all pieces')
self.stream.allAvailable()
def _getPiece(self, response, piece, peer):
"""Process the retrieved headers from the peer."""
+ log.msg('Got response for piece %d from peer %r' % (piece, peer))
if ((len(self.completePieces) > 1 and response.code != 206) or
(response.code not in (200, 206))):
# Request failed, try a different peer
- peer.hashError()
+ log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+ peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
self.completePieces[piece] = False
if response.stream and response.stream.length:
stream.readAndDiscard(response.stream)
else:
# Read the response stream to the file
+ log.msg('Streaming piece %d from peer %r' % (piece, peer))
if response.code == 206:
df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
else:
def _getError(self, err, piece, peer):
"""Peer failed, try again."""
+ log.msg('Got error for piece %d from peer %r' % (piece, peer))
self.outstanding -= 1
self.peerlist.append(peer)
self.completePieces[piece] = False
def _gotPiece(self, response, piece, peer):
"""Process the retrieved piece from the peer."""
+ log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
if ((self.pieces and response != self.pieces[piece]) or
- (len(self.pieces) == 0 and response == self.hash.expected())):
+ (len(self.pieces) == 0 and response != self.hash.expected())):
# Hash doesn't match
- peer.hashError()
+ log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+ peer.hashError('Piece received from peer does not match expected')
self.completePieces[piece] = False
elif self.pieces:
# Successfully completed one of several pieces
+ log.msg('Finished with piece %d from peer %r' % (piece, peer))
self.completePieces[piece] = True
while (self.nextFinish < len(self.completePieces) and
self.completePieces[self.nextFinish] == True):
self.stream.updateAvailable(PIECE_SIZE)
else:
# Whole download (only one piece) is complete
+ log.msg('Piece %d from peer %r is the last piece' % (piece, peer))
self.completePieces[piece] = True
+ self.nextFinish = 1
self.stream.updateAvailable(2**30)
self.getPieces()
def _gotError(self, err, piece, peer):
"""Piece download failed, try again."""
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
log.err(err)
self.completePieces[piece] = False
self.getPieces()
and the apt-get commands to run (C{list}).
The bootstrap nodes keys are integers, which must be in the range 1-9.
- The values are the dictionary of string formatting values for creating
- the apt-p2p configuration file (see L{apt_p2p_conf_template} below).
+ The values are the dictionary of keyword options to pass to the function
+ that starts the bootstrap node (see L{start_bootstrap} below).
The downloaders keys are also integers in the range 1-99. The values are
- the dictionary of string formatting values for creating the apt-p2p
- configuration file (see L{apt_p2p_conf_template} below).
+ the dictionary of keyword options to pass to the function
+ that starts the downloader node (see L{start_downloader} below).
The apt-get commands' list elements are tuples with 2 elements: the
downloader to run the command on, and the list of command-line
5: {},
6: {}},
[(1, ['update']),
+ (2, ['update']),
+ (3, ['update']),
(1, ['install', 'aboot-base']),
(1, ['install', 'ada-reference-manual']),
(1, ['install', 'fop-doc']),
(1, ['install', 'doc-iana']),
- (2, ['update']),
(2, ['install', 'aboot-base']),
(2, ['install', 'ada-reference-manual']),
(2, ['install', 'fop-doc']),
(2, ['install', 'doc-iana']),
- (3, ['update']),
(3, ['install', 'aboot-base']),
(3, ['install', 'ada-reference-manual']),
(3, ['install', 'fop-doc']),
]),
]),
+ '9': ('Start a single bootstrap and 6 downloaders and test downloading' +
+ ' a very large file.',
+ {1: {}},
+ {1: {'clean': False},
+ 2: {'clean': False},
+ 3: {},
+ 4: {},
+ 5: {},
+ 6: {}},
+ [(1, ['update']),
+ (1, ['install', 'kde-icons-oxygen']),
+ (2, ['update']),
+ (2, ['install', 'kde-icons-oxygen']),
+ (3, ['update']),
+ (3, ['install', 'kde-icons-oxygen']),
+ (4, ['update']),
+ (4, ['install', 'kde-icons-oxygen']),
+ (5, ['update']),
+ (5, ['install', 'kde-icons-oxygen']),
+ (6, ['update']),
+ (6, ['install', 'kde-icons-oxygen']),
+ ]),
+
}
assert 'all' not in tests
NoLocking "false";
Acquire::Ftp "false"; // Show ftp command traffic
Acquire::Http "false"; // Show http command traffic
- Acquire::Debtorrent "false"; // Show http command traffic
Acquire::gpgv "false"; // Show the gpgv traffic
aptcdrom "false"; // Show found package files
IdentCdrom "false";
# Create apt's config files
f = open(join([downloader_dir, 'etc', 'apt', 'sources.list']), 'w')
- f.write('deb http://localhost:1%02d77/%s/ stable %s\n' % (num_down, mirror, suites))
+ f.write('deb http://localhost:1%02d77/%s/ unstable %s\n' % (num_down, mirror, suites))
f.close()
f = open(join([downloader_dir, 'etc', 'apt', 'apt.conf']), 'w')
bootstrap_addresses += '\n ' + bootstrap_address(boot_keys[i])
for k, v in bootstraps.items():
- running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, v)
+ running_bootstraps[k] = start_bootstrap(bootstrap_addresses, k, **v)
sleep(5)
for k, v in downloaders.items():
- running_downloaders[k] = start_downloader(bootstrap_addresses, k, v)
+ running_downloaders[k] = start_downloader(bootstrap_addresses, k, **v)
sleep(5)