import sha
from twisted.internet import reactor, defer
-from twisted.python import log
+from twisted.python import log, filepath
from twisted.trial import unittest
from twisted.web2 import stream
-from twisted.web2.http import splitHostPort
+from twisted.web2.http import Response, splitHostPort
from HTTPDownloader import Peer
from util import uncompact
-from hash import PIECE_SIZE
+from Hash import PIECE_SIZE
from apt_p2p_Khashmir.bencode import bdecode
+from apt_p2p_conf import config
+
+class PeerError(Exception):
+ """An error occurred downloading from peers."""
+
class GrowingFileStream(stream.FileStream):
"""Modified to stream data from a file as it becomes available.
CHUNK_SIZE = 4*1024
- def __init__(self, f):
+ def __init__(self, f, length = None):
stream.FileStream.__init__(self, f)
- self.length = None
+ self.length = length
self.deferred = None
self.available = 0L
self.position = 0L
self.finished = False
- def updateAvaliable(self, newlyAvailable):
+ def updateAvailable(self, newlyAvailable):
"""Update the number of bytes that are available.
Call it with 0 to trigger reading of a fully read file.
deferred.callback(b)
def allAvailable(self):
- """Indicate that no more data is coming available."""
+ """Indicate that no more data will be coming available."""
self.finished = True
# If a read is pending, let it go
deferred.callback(b)
else:
# We're done
+ self._close()
+ deferred = self.deferred
+ self.deferred = None
deferred.callback(None)
else:
# We're done
+ self._close()
+ deferred = self.deferred
+ self.deferred = None
deferred.callback(None)
def read(self, sendfile=False):
# If we don't have any available, we're done or deferred
if readSize <= 0:
if self.finished:
+ self._close()
return None
else:
self.deferred = defer.Deferred()
if not bytesRead:
# End of file was reached, we're done or deferred
if self.finished:
+ self._close()
return None
else:
self.deferred = defer.Deferred()
self.position += bytesRead
return b
-class StreamToFile(defer.Deferred):
- """Saves a stream to a file.
+ def _close(self):
+ """Close the temporary file and remove it."""
+ self.f.close()
+ filepath.FilePath(self.f.name).remove()
+ self.f = None
+
+class StreamToFile:
+ """Save a stream to a partial file and hash it.
@type stream: L{twisted.web2.stream.IByteStream}
@ivar stream: the input stream being read
@type outFile: L{twisted.python.filepath.FilePath}
@ivar outFile: the file being written
- @type hash: L{Hash.HashObject}
- @ivar hash: the hash object for the file
+ @type hash: C{sha1}
+ @ivar hash: the hash object for the data
+ @type position: C{int}
+ @ivar position: the current file position to write the next data to
@type length: C{int}
- @ivar length: the length of the original (compressed) file
+ @ivar length: the position in the file to not write beyond
@type doneDefer: L{twisted.internet.defer.Deferred}
- @ivar doneDefer: the deferred that will fire when done streaming
+ @ivar doneDefer: the deferred that will fire when done writing
"""
- def __init__(self, inputStream, outFile, hash, start, length):
+ def __init__(self, inputStream, outFile, start = 0, length = None):
"""Initializes the file.
@type inputStream: L{twisted.web2.stream.IByteStream}
@param inputStream: the input stream to read from
@type outFile: L{twisted.python.filepath.FilePath}
@param outFile: the file to write to
- @type hash: L{Hash.HashObject}
- @param hash: the hash object to use for the file
+ @type start: C{int}
+ @param start: the file position to start writing at
+ (optional, defaults to the start of the file)
+ @type length: C{int}
+ @param length: the maximum amount of data to write to the file
+ (optional, defaults to not limiting the writing to the file
"""
self.stream = inputStream
- self.outFile = outFile.open('w')
- self.hash = hash
- self.hash.new()
- self.length = self.stream.length
+ self.outFile = outFile
+ self.hash = sha.new()
+ self.position = start
+ self.length = None
+ if length is not None:
+ self.length = start + length
+ self.doneDefer = None
def run(self):
- """Start the streaming."""
- self.doneDefer = stream.readStream(self.stream, _gotData)
+ """Start the streaming.
+
+ @rtype: L{twisted.internet.defer.Deferred}
+ """
+ log.msg('Started streaming %r bytes to file at position %d' % (self.length, self.position))
+ self.doneDefer = stream.readStream(self.stream, self._gotData)
self.doneDefer.addCallbacks(self._done, self._error)
return self.doneDefer
- def _done(self):
- """Close all the output files, return the result."""
- if not self.outFile.closed:
- self.outFile.close()
- self.hash.digest()
- self.doneDefer.callback(self.hash)
-
def _gotData(self, data):
- self.peers[site]['pieces'] += data
-
- def read(self):
- """Read some data from the stream."""
+ """Process the received data."""
if self.outFile.closed:
- return None
+ raise PeerError, "outFile was unexpectedly closed"
- # Read data from the stream, deal with the possible deferred
- data = self.stream.read()
- if isinstance(data, defer.Deferred):
- data.addCallbacks(self._write, self._done)
- return data
-
- self._write(data)
- return data
-
- def _write(self, data):
- """Write the stream data to the file and return it for others to use.
-
- Also optionally decompresses it.
- """
if data is None:
- self._done()
- return data
+ raise PeerError, "Data is None?"
+
+ # Make sure we don't go too far
+ if self.length is not None and self.position + len(data) > self.length:
+ data = data[:(self.length - self.position)]
# Write and hash the streamed data
+ self.outFile.seek(self.position)
self.outFile.write(data)
self.hash.update(data)
+ self.position += len(data)
- return data
+ def _done(self, result):
+ """Return the result."""
+ log.msg('Streaming is complete')
+ return self.hash.digest()
+
+ def _error(self, err):
+ """Log the error."""
+ log.msg('Streaming error')
+ log.err(err)
+ return err
- def close(self):
- """Clean everything up and return None to future reads."""
- self.length = 0
- self._done()
- self.stream.close()
-
-
class FileDownload:
"""Manage a download from a list of peers or a mirror.
-
+ @type manager: L{PeerManager}
+ @ivar manager: the manager to send requests for peers to
+ @type hash: L{Hash.HashObject}
+ @ivar hash: the hash object containing the expected hash for the file
+ @ivar mirror: the URI of the file on the mirror
+ @type compact_peers: C{list} of C{dictionary}
+ @ivar compact_peers: a list of the peer info where the file can be found
+ @type file: C{file}
+ @ivar file: the open file to right the download to
+ @type path: C{string}
+ @ivar path: the path to request from peers to access the file
+ @type pieces: C{list} of C{string}
+ @ivar pieces: the hashes of the pieces in the file
+ @type started: C{boolean}
+ @ivar started: whether the download has begun yet
+ @type defer: L{twisted.internet.defer.Deferred}
+ @ivar defer: the deferred that will callback with the result of the download
+ @type peers: C{dictionary}
+ @ivar peers: information about each of the peers available to download from
+ @type outstanding: C{int}
+ @ivar outstanding: the number of requests to peers currently outstanding
+ @type peerlist: C{list} of L{HTTPDownloader.Peer}
+ @ivar peerlist: the sorted list of peers for this download
+ @type stream: L{GrowingFileStream}
+ @ivar stream: the stream of resulting data from the download
+ @type nextFinish: C{int}
+ @ivar nextFinish: the next piece that is needed to finish for the stream
+ @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
+ @ivar completePieces: one per piece, will be False if no requests are
+ outstanding for the piece, True if the piece has been successfully
+ downloaded, or the Peer that a request for this piece has been sent
"""
def __init__(self, manager, hash, mirror, compact_peers, file):
"""Initialize the instance and check for piece hashes.
+ @type manager: L{PeerManager}
+ @param manager: the manager to send requests for peers to
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
@param mirror: the URI of the file on the mirror
- @type compact_peers: C{list} of C{string}
+ @type compact_peers: C{list} of C{dictionary}
@param compact_peers: a list of the peer info where the file can be found
@type file: L{twisted.python.filepath.FilePath}
@param file: the temporary file to use to store the downloaded file
self.compact_peers = compact_peers
self.path = '/~/' + quote_plus(hash.expected())
+ self.mirror_path = None
self.pieces = None
self.started = False
file.restat(False)
if file.exists():
file.remove()
- self.file = file.open('w')
+ self.file = file.open('w+')
def run(self):
"""Start the downloading process."""
+ log.msg('Checking for pieces for %s' % self.path)
self.defer = defer.Deferred()
self.peers = {}
no_pieces = 0
- pieces_string = {}
- pieces_hash = {}
- pieces_dl_hash = {}
+ pieces_string = {0: 0}
+ pieces_hash = {0: 0}
+ pieces_dl_hash = {0: 0}
for compact_peer in self.compact_peers:
# Build a list of all the peers for this download
site = uncompact(compact_peer['c'])
- peer = manager.getPeer(site)
+ peer = self.manager.getPeer(site)
self.peers.setdefault(site, {})['peer'] = peer
# Extract any piece information from the peers list
if max_found == no_pieces:
# The file is not split into pieces
- self.pieces = []
+ log.msg('No pieces were found for the file')
+ self.pieces = [self.hash.expected()]
self.startDownload()
elif max_found == max(pieces_string.values()):
# Small number of pieces in a string
# Find the most popular piece string
if num == max_found:
self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Peer info contained %d piece hashes' % len(self.pieces))
self.startDownload()
break
elif max_found == max(pieces_hash.values()):
for pieces, num in pieces_hash.items():
# Find the most popular piece hash to lookup
if num == max_found:
+ log.msg('Found a hash for pieces to lookup in the DHT: %r' % pieces)
self.getDHTPieces(pieces)
break
elif max_found == max(pieces_dl_hash.values()):
# Large number of pieces stored in peers
- for pieces, num in pieces_hash.items():
+ for pieces, num in pieces_dl_hash.items():
# Find the most popular piece hash to download
if num == max_found:
+ log.msg('Found a hash for pieces to lookup in peers: %r' % pieces)
self.getPeerPieces(pieces)
break
return self.defer
# Start the DHT lookup
lookupDefer = self.manager.dht.getValue(key)
- lookupDefer.addCallback(self._getDHTPieces, key)
+ lookupDefer.addBoth(self._getDHTPieces, key)
def _getDHTPieces(self, results, key):
"""Check the retrieved values."""
- for result in results:
- # Make sure the hash matches the key
- result_hash = sha.new(result.get('t', '')).digest()
- if result_hash == key:
- pieces = result['t']
- self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
- self.startDownload()
- return
+ if isinstance(results, list):
+ for result in results:
+ # Make sure the hash matches the key
+ result_hash = sha.new(result.get('t', '')).digest()
+ if result_hash == key:
+ pieces = result['t']
+ self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
+ log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
+ self.startDownload()
+ return
+
+ log.msg('Could not retrieve the piece hashes from the DHT')
+ else:
+ log.msg('Looking up piece hashes in the DHT resulted in an error: %r' % (result, ))
# Continue without the piece hashes
- log.msg('Could not retrieve the piece hashes from the DHT')
- self.pieces = []
+ self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
def getPeerPieces(self, key, failedSite = None):
@param key: the key to request from the peers
"""
if failedSite is None:
+ log.msg('Starting the lookup of piece hashes in peers')
self.outstanding = 0
# Remove any peers with the wrong piece hash
#for site in self.peers.keys():
# if self.peers[site].get('l', '') != key:
# del self.peers[site]
else:
+ log.msg('Piece hash lookup failed for peer %r' % (failedSite, ))
self.peers[failedSite]['failed'] = True
self.outstanding -= 1
# Send a request to one or more peers
for site in self.peers:
if self.peers[site].get('failed', False) != True:
+ log.msg('Sending a piece hash request to %r' % (site, ))
path = '/~/' + quote_plus(key)
lookupDefer = self.peers[site]['peer'].get(path)
- lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
- callbackArgs=(key, site), errbackArgs=(key, site))
+ reactor.callLater(0, lookupDefer.addCallbacks,
+ *(self._getPeerPieces, self._gotPeerError),
+ **{'callbackArgs': (key, site),
+ 'errbackArgs': (key, site)})
self.outstanding += 1
- if self.outstanding >= 3:
+ if self.outstanding >= 4:
break
- if self.pieces is None and self.outstanding == 0:
+ if self.pieces is None and self.outstanding <= 0:
# Continue without the piece hashes
log.msg('Could not retrieve the piece hashes from the peers')
- self.pieces = []
+ self.pieces = [None for x in xrange(0, self.hash.expSize, PIECE_SIZE)]
self.startDownload()
def _getPeerPieces(self, response, key, site):
"""Process the retrieved response from the peer."""
+ log.msg('Got a piece hash response %d from %r' % (response.code, site))
if response.code != 200:
# Request failed, try a different peer
self.getPeerPieces(key, site)
# Read the response stream to a string
self.peers[site]['pieces'] = ''
def _gotPeerPiece(data, self = self, site = site):
+ log.msg('Peer %r got %d bytes of piece hashes' % (site, len(data)))
self.peers[site]['pieces'] += data
+ log.msg('Streaming piece hashes from peer')
df = stream.readStream(response.stream, _gotPeerPiece)
df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
callbackArgs=(key, site), errbackArgs=(key, site))
def _gotPeerError(self, err, key, site):
"""Peer failed, try again."""
+ log.msg('Peer piece hash request failed for %r' % (site, ))
log.err(err)
self.getPeerPieces(key, site)
def _gotPeerPieces(self, result, key, site):
"""Check the retrieved pieces from the peer."""
+ log.msg('Finished streaming piece hashes from peer %r' % (site, ))
if self.pieces is not None:
# Already done
+ log.msg('Already done')
return
try:
result = bdecode(self.peers[site]['pieces'])
except:
+ log.msg('Error bdecoding piece hashes')
log.err()
self.getPeerPieces(key, site)
return
if result_hash == key:
pieces = result['t']
self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
- log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
+ log.msg('Retrieved %d piece hashes from the peer %r' % (len(self.pieces), site))
self.startDownload()
else:
log.msg('Peer returned a piece string that did not match')
if self.started:
return
+ log.msg('Starting to download %s' % self.path)
self.started = True
- assert self.pieces is not None, "You must initialize the piece hashes first"
+ assert self.pieces, "You must initialize the piece hashes first"
self.peerlist = [self.peers[site]['peer'] for site in self.peers]
+ # Use the mirror if there are few peers
+ if len(self.peerlist) < config.getint('DEFAULT', 'MIN_DOWNLOAD_PEERS'):
+ parsed = urlparse(self.mirror)
+ if parsed[0] == "http":
+ site = splitHostPort(parsed[0], parsed[1])
+ self.mirror_path = urlunparse(('', '') + parsed[2:])
+ peer = self.manager.getPeer(site, mirror = True)
+ self.peerlist.append(peer)
+
# Special case if there's only one good peer left
- if len(self.peerlist) == 1:
- log.msg('Downloading from peer %r' % (self.peerlist[0], ))
- self.defer.callback(self.peerlist[0].get(self.path))
- return
+# if len(self.peerlist) == 1:
+# log.msg('Downloading from peer %r' % (self.peerlist[0], ))
+# self.defer.callback(self.peerlist[0].get(self.path))
+# return
- self.sort()
+ # Start sending the return file
+ self.stream = GrowingFileStream(self.file, self.hash.expSize)
+ resp = Response(200, {}, self.stream)
+ self.defer.callback(resp)
+
+ # Begin to download the pieces
self.outstanding = 0
- self.next_piece = 0
+ self.nextFinish = 0
+ self.completePieces = [False for piece in self.pieces]
+ self.getPieces()
- while self.outstanding < 3 and self.peerlist and self.next_piece < len(self.pieces):
- peer = self.peerlist.pop()
- piece = self.next_piece
- self.next_piece += 1
-
- self.outstanding += 1
- df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
- df.addCallbacks(self._gotPiece, self._gotError,
- callbackArgs=(piece, peer), errbackArgs=(piece, peer))
+ #{ Downloading the pieces
+ def getPieces(self):
+ """Download the next pieces from the peers."""
+ self.sort()
+ piece = self.nextFinish
+ while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
+ if self.completePieces[piece] == False:
+ # Send a request to the highest ranked peer
+ peer = self.peerlist.pop()
+ self.completePieces[piece] = peer
+ log.msg('Sending a request for piece %d to peer %r' % (piece, peer))
+
+ self.outstanding += 1
+ if peer.mirror:
+ df = peer.getRange(self.mirror_path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ else:
+ df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
+ reactor.callLater(0, df.addCallbacks,
+ *(self._getPiece, self._getError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
+ piece += 1
+
+ # Check if we're done
+ if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
+ log.msg('We seem to be done with all pieces')
+ self.stream.allAvailable()
- def _gotPiece(self, response, piece, peer):
- """Process the retrieved piece from the peer."""
- if response.code != 206:
+ def _getPiece(self, response, piece, peer):
+ """Process the retrieved headers from the peer."""
+ log.msg('Got response for piece %d from peer %r' % (piece, peer))
+ if ((len(self.completePieces) > 1 and response.code != 206) or
+ (response.code not in (200, 206))):
# Request failed, try a different peer
- self.getPeerPieces(key, site)
+ log.msg('Wrong response type %d for piece %d from peer %r' % (response.code, piece, peer))
+ peer.hashError('Peer responded with the wrong type of download: %r' % response.code)
+ self.completePieces[piece] = False
+ if response.stream and response.stream.length:
+ stream.readAndDiscard(response.stream)
else:
# Read the response stream to the file
- df = StreamToFile(response.stream, self.file, self.hash, piece*PIECE_SIZE, PIECE_SIZE).run()
- df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
- callbackArgs=(key, site), errbackArgs=(key, site))
+ log.msg('Streaming piece %d from peer %r' % (piece, peer))
+ if response.code == 206:
+ df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE,
+ PIECE_SIZE).run()
+ else:
+ df = StreamToFile(response.stream, self.file).run()
+ reactor.callLater(0, df.addCallbacks,
+ *(self._gotPiece, self._gotError),
+ **{'callbackArgs': (piece, peer),
+ 'errbackArgs': (piece, peer)})
- def _gotError(self, err, piece, peer):
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.getPieces()
+
+ def _getError(self, err, piece, peer):
"""Peer failed, try again."""
+ log.msg('Got error for piece %d from peer %r' % (piece, peer))
+ self.outstanding -= 1
+ self.peerlist.append(peer)
+ self.completePieces[piece] = False
+ self.getPieces()
log.err(err)
+ def _gotPiece(self, response, piece, peer):
+ """Process the retrieved piece from the peer."""
+ log.msg('Finished streaming piece %d from peer %r: %r' % (piece, peer, response))
+ if self.pieces[piece] and response != self.pieces[piece]:
+ # Hash doesn't match
+ log.msg('Hash error for piece %d from peer %r' % (piece, peer))
+ peer.hashError('Piece received from peer does not match expected')
+ self.completePieces[piece] = False
+ else:
+ # Successfully completed one of several pieces
+ log.msg('Finished with piece %d from peer %r' % (piece, peer))
+ self.completePieces[piece] = True
+ while (self.nextFinish < len(self.completePieces) and
+ self.completePieces[self.nextFinish] == True):
+ self.nextFinish += 1
+ self.stream.updateAvailable(PIECE_SIZE)
+
+ self.getPieces()
+
+ def _gotError(self, err, piece, peer):
+ """Piece download failed, try again."""
+ log.msg('Error streaming piece %d from peer %r: %r' % (piece, peer, response))
+ log.err(err)
+ self.completePieces[piece] = False
+ self.getPieces()
class PeerManager:
"""Manage a set of peers and the requests to them.
+ @type cache_dir: L{twisted.python.filepath.FilePath}
+ @ivar cache_dir: the directory to use for storing all files
+ @type dht: L{interfaces.IDHT}
+ @ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type clients: C{dictionary}
@ivar clients: the available peers that have been previously contacted
"""
- def __init__(self, cache_dir, dht):
+ def __init__(self, cache_dir, dht, stats):
"""Initialize the instance."""
self.cache_dir = cache_dir
self.cache_dir.restat(False)
if not self.cache_dir.exists():
self.cache_dir.makedirs()
self.dht = dht
+ self.stats = stats
self.clients = {}
def get(self, hash, mirror, peers = [], method="GET", modtime=None):
assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
site = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
- peer = self.getPeer(site)
+ peer = self.getPeer(site, mirror = True)
return peer.get(path, method, modtime)
- elif len(peers) == 1:
- site = uncompact(peers[0]['c'])
- log.msg('Downloading from peer %r' % (site, ))
- path = '/~/' + quote_plus(hash.expected())
- peer = self.getPeer(site)
- return peer.get(path)
+# elif len(peers) == 1:
+# site = uncompact(peers[0]['c'])
+# log.msg('Downloading from peer %r' % (site, ))
+# path = '/~/' + quote_plus(hash.expected())
+# peer = self.getPeer(site)
+# return peer.get(path)
else:
tmpfile = self.cache_dir.child(hash.hexexpected())
return FileDownload(self, hash, mirror, peers, tmpfile).run()
- def getPeer(self, site):
+ def getPeer(self, site, mirror = False):
"""Create a new peer if necessary and return it.
@type site: (C{string}, C{int})
@param site: the IP address and port of the peer
+ @param mirror: whether the peer is actually a mirror
+ (optional, defaults to False)
"""
if site not in self.clients:
- self.clients[site] = Peer(site[0], site[1])
+ self.clients[site] = Peer(site[0], site[1], self.stats)
+ if mirror:
+ self.clients[site].mirror = True
return self.clients[site]
def close(self):
manager = None
pending_calls = []
- def gotResp(self, resp, num, expect):
- self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
- if expect is not None:
- self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
- def print_(n):
- pass
- def printdone(n):
- pass
- stream.readStream(resp.stream, print_).addCallback(printdone)
-
- def test_download(self):
- """Tests a normal download."""
- self.manager = PeerManager()
- self.timeout = 10
-
- host = 'www.ietf.org'
- d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
- d.addCallback(self.gotResp, 1, 1070)
- return d
-
- def test_head(self):
- """Tests a 'HEAD' request."""
- self.manager = PeerManager()
- self.timeout = 10
-
- host = 'www.ietf.org'
- d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
- d.addCallback(self.gotResp, 1, 0)
- return d
-
- def test_multiple_downloads(self):
- """Tests multiple downloads with queueing and connection closing."""
- self.manager = PeerManager()
- self.timeout = 120
- lastDefer = defer.Deferred()
-
- def newRequest(host, path, num, expect, last=False):
- d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
- d.addCallback(self.gotResp, num, expect)
- if last:
- d.addBoth(lastDefer.callback)
-
- newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
- newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
- newRequest('www.google.ca', "/", 3, None)
- self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
- self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
- self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
- self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
- self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
- self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
- self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
- return lastDefer
-
def tearDown(self):
for p in self.pending_calls:
if p.active():