More multiple peer downloading work, finished but still untested.
[quix0rs-apt-p2p.git] / apt_p2p / PeerManager.py
1
2 """Manage a set of peers and the requests to them."""
3
4 from random import choice
5 from urlparse import urlparse, urlunparse
6 from urllib import quote_plus
7 from binascii import b2a_hex, a2b_hex
8 import sha
9
10 from twisted.internet import reactor, defer
11 from twisted.python import log
12 from twisted.trial import unittest
13 from twisted.web2 import stream
14 from twisted.web2.http import Response, splitHostPort
15
16 from HTTPDownloader import Peer
17 from util import uncompact
18 from hash import PIECE_SIZE
19 from apt_p2p_Khashmir.bencode import bdecode
20
21 class GrowingFileStream(stream.FileStream):
22     """Modified to stream data from a file as it becomes available.
23     
24     @ivar CHUNK_SIZE: the maximum size of chunks of data to send at a time
25     @ivar deferred: waiting for the result of the last read attempt
26     @ivar available: the number of bytes that are currently available to read
27     @ivar position: the current position in the file where the next read will begin
28     @ivar finished: True when no more data will be coming available
29     """
30
31     CHUNK_SIZE = 4*1024
32
33     def __init__(self, f):
34         stream.FileStream.__init__(self, f)
35         self.length = None
36         self.deferred = None
37         self.available = 0L
38         self.position = 0L
39         self.finished = False
40
41     def updateAvaliable(self, newlyAvailable):
42         """Update the number of bytes that are available.
43         
44         Call it with 0 to trigger reading of a fully read file.
45         
46         @param newlyAvailable: the number of bytes that just became available
47         """
48         assert not self.finished
49         self.available += newlyAvailable
50         
51         # If a read is pending, let it go
52         if self.deferred and self.position < self.available:
53             # Try to read some data from the file
54             length = self.available - self.position
55             readSize = min(length, self.CHUNK_SIZE)
56             self.f.seek(self.position)
57             b = self.f.read(readSize)
58             bytesRead = len(b)
59             
60             # Check if end of file was reached
61             if bytesRead:
62                 self.position += bytesRead
63                 deferred = self.deferred
64                 self.deferred = None
65                 deferred.callback(b)
66
67     def allAvailable(self):
68         """Indicate that no more data will be coming available."""
69         self.finished = True
70
71         # If a read is pending, let it go
72         if self.deferred:
73             if self.position < self.available:
74                 # Try to read some data from the file
75                 length = self.available - self.position
76                 readSize = min(length, self.CHUNK_SIZE)
77                 self.f.seek(self.position)
78                 b = self.f.read(readSize)
79                 bytesRead = len(b)
80     
81                 # Check if end of file was reached
82                 if bytesRead:
83                     self.position += bytesRead
84                     deferred = self.deferred
85                     self.deferred = None
86                     deferred.callback(b)
87                 else:
88                     # We're done
89                     deferred.callback(None)
90             else:
91                 # We're done
92                 deferred.callback(None)
93         
94     def read(self, sendfile=False):
95         assert not self.deferred, "A previous read is still deferred."
96
97         if self.f is None:
98             return None
99
100         length = self.available - self.position
101         readSize = min(length, self.CHUNK_SIZE)
102
103         # If we don't have any available, we're done or deferred
104         if readSize <= 0:
105             if self.finished:
106                 return None
107             else:
108                 self.deferred = defer.Deferred()
109                 return self.deferred
110
111         # Try to read some data from the file
112         self.f.seek(self.position)
113         b = self.f.read(readSize)
114         bytesRead = len(b)
115         if not bytesRead:
116             # End of file was reached, we're done or deferred
117             if self.finished:
118                 return None
119             else:
120                 self.deferred = defer.Deferred()
121                 return self.deferred
122         else:
123             self.position += bytesRead
124             return b
125
126 class StreamToFile(defer.Deferred):
127     """Save a stream to a partial file and hash it.
128     
129     @type stream: L{twisted.web2.stream.IByteStream}
130     @ivar stream: the input stream being read
131     @type outFile: L{twisted.python.filepath.FilePath}
132     @ivar outFile: the file being written
133     @type hash: C{sha1}
134     @ivar hash: the hash object for the data
135     @type position: C{int}
136     @ivar position: the current file position to write the next data to
137     @type length: C{int}
138     @ivar length: the position in the file to not write beyond
139     @type doneDefer: L{twisted.internet.defer.Deferred}
140     @ivar doneDefer: the deferred that will fire when done writing
141     """
142     
143     def __init__(self, inputStream, outFile, start = 0, length = None):
144         """Initializes the file.
145         
146         @type inputStream: L{twisted.web2.stream.IByteStream}
147         @param inputStream: the input stream to read from
148         @type outFile: L{twisted.python.filepath.FilePath}
149         @param outFile: the file to write to
150         @type start: C{int}
151         @param start: the file position to start writing at
152             (optional, defaults to the start of the file)
153         @type length: C{int}
154         @param length: the maximum amount of data to write to the file
155             (optional, defaults to not limiting the writing to the file
156         """
157         self.stream = inputStream
158         self.outFile = outFile
159         self.hash = sha.new()
160         self.position = start
161         self.length = None
162         if length is not None:
163             self.length = start + length
164         self.doneDefer = None
165         
166     def run(self):
167         """Start the streaming.
168
169         @rtype: L{twisted.internet.defer.Deferred}
170         """
171         self.doneDefer = stream.readStream(self.stream, self._gotData)
172         self.doneDefer.addCallbacks(self._done, self._error)
173         return self.doneDefer
174
175     def _gotData(self, data):
176         """Process the received data."""
177         if self.outFile.closed:
178             raise Exception, "outFile was unexpectedly closed"
179         
180         if data is None:
181             raise Exception, "Data is None?"
182         
183         # Make sure we don't go too far
184         if self.length is not None and self.position + len(data) > self.length:
185             data = data[:(self.length - self.position)]
186         
187         # Write and hash the streamed data
188         self.outFile.seek(self.position)
189         self.outFile.write(data)
190         self.hash.update(data)
191         self.position += len(data)
192         
193     def _done(self, result):
194         """Return the result."""
195         return self.hash.digest()
196     
197     def _error(self, err):
198         """Log the error."""
199         log.err(err)
200         return err
201     
202 class FileDownload:
203     """Manage a download from a list of peers or a mirror.
204     
205     @type manager: L{PeerManager}
206     @ivar manager: the manager to send requests for peers to
207     @type hash: L{Hash.HashObject}
208     @ivar hash: the hash object containing the expected hash for the file
209     @ivar mirror: the URI of the file on the mirror
210     @type compact_peers: C{list} of C{dictionary}
211     @ivar compact_peers: a list of the peer info where the file can be found
212     @type file: C{file}
213     @ivar file: the open file to right the download to
214     @type path: C{string}
215     @ivar path: the path to request from peers to access the file
216     @type pieces: C{list} of C{string} 
217     @ivar pieces: the hashes of the pieces in the file
218     @type started: C{boolean}
219     @ivar started: whether the download has begun yet
220     @type defer: L{twisted.internet.defer.Deferred}
221     @ivar defer: the deferred that will callback with the result of the download
222     @type peers: C{dictionary}
223     @ivar peers: information about each of the peers available to download from
224     @type outstanding: C{int}
225     @ivar outstanding: the number of requests to peers currently outstanding
226     @type peerlist: C{list} of L{HTTPDownloader.Peer}
227     @ivar peerlist: the sorted list of peers for this download
228     @type stream: L{GrowingFileStream}
229     @ivar stream: the stream of resulting data from the download
230     @type nextFinish: C{int}
231     @ivar nextFinish: the next piece that is needed to finish for the stream
232     @type completePieces: C{list} of C{boolean} or L{HTTPDownloader.Peer}
233     @ivar completePieces: one per piece, will be False if no requests are
234         outstanding for the piece, True if the piece has been successfully
235         downloaded, or the Peer that a request for this piece has been sent  
236     """
237     
238     def __init__(self, manager, hash, mirror, compact_peers, file):
239         """Initialize the instance and check for piece hashes.
240         
241         @type manager: L{PeerManager}
242         @param manager: the manager to send requests for peers to
243         @type hash: L{Hash.HashObject}
244         @param hash: the hash object containing the expected hash for the file
245         @param mirror: the URI of the file on the mirror
246         @type compact_peers: C{list} of C{dictionary}
247         @param compact_peers: a list of the peer info where the file can be found
248         @type file: L{twisted.python.filepath.FilePath}
249         @param file: the temporary file to use to store the downloaded file
250         """
251         self.manager = manager
252         self.hash = hash
253         self.mirror = mirror
254         self.compact_peers = compact_peers
255         
256         self.path = '/~/' + quote_plus(hash.expected())
257         self.pieces = None
258         self.started = False
259         
260         file.restat(False)
261         if file.exists():
262             file.remove()
263         self.file = file.open('w')
264
265     def run(self):
266         """Start the downloading process."""
267         self.defer = defer.Deferred()
268         self.peers = {}
269         no_pieces = 0
270         pieces_string = {}
271         pieces_hash = {}
272         pieces_dl_hash = {}
273
274         for compact_peer in self.compact_peers:
275             # Build a list of all the peers for this download
276             site = uncompact(compact_peer['c'])
277             peer = manager.getPeer(site)
278             self.peers.setdefault(site, {})['peer'] = peer
279
280             # Extract any piece information from the peers list
281             if 't' in compact_peer:
282                 self.peers[site]['t'] = compact_peer['t']['t']
283                 pieces_string.setdefault(compact_peer['t']['t'], 0)
284                 pieces_string[compact_peer['t']['t']] += 1
285             elif 'h' in compact_peer:
286                 self.peers[site]['h'] = compact_peer['h']
287                 pieces_hash.setdefault(compact_peer['h'], 0)
288                 pieces_hash[compact_peer['h']] += 1
289             elif 'l' in compact_peer:
290                 self.peers[site]['l'] = compact_peer['l']
291                 pieces_dl_hash.setdefault(compact_peer['l'], 0)
292                 pieces_dl_hash[compact_peer['l']] += 1
293             else:
294                 no_pieces += 1
295         
296         # Select the most popular piece info
297         max_found = max(no_pieces, max(pieces_string.values()),
298                         max(pieces_hash.values()), max(pieces_dl_hash.values()))
299
300         if max_found < len(self.peers):
301             log.msg('Misleading piece information found, using most popular %d of %d peers' % 
302                     (max_found, len(self.peers)))
303
304         if max_found == no_pieces:
305             # The file is not split into pieces
306             self.pieces = []
307             self.startDownload()
308         elif max_found == max(pieces_string.values()):
309             # Small number of pieces in a string
310             for pieces, num in pieces_string.items():
311                 # Find the most popular piece string
312                 if num == max_found:
313                     self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
314                     log.msg('Peer info contained %d piece hashes' % len(self.pieces))
315                     self.startDownload()
316                     break
317         elif max_found == max(pieces_hash.values()):
318             # Medium number of pieces stored in the DHT
319             for pieces, num in pieces_hash.items():
320                 # Find the most popular piece hash to lookup
321                 if num == max_found:
322                     self.getDHTPieces(pieces)
323                     break
324         elif max_found == max(pieces_dl_hash.values()):
325             # Large number of pieces stored in peers
326             for pieces, num in pieces_hash.items():
327                 # Find the most popular piece hash to download
328                 if num == max_found:
329                     self.getPeerPieces(pieces)
330                     break
331         return self.defer
332
333     #{ Downloading the piece hashes
334     def getDHTPieces(self, key):
335         """Retrieve the piece information from the DHT.
336         
337         @param key: the key to lookup in the DHT
338         """
339         # Remove any peers with the wrong piece hash
340         #for site in self.peers.keys():
341         #    if self.peers[site].get('h', '') != key:
342         #        del self.peers[site]
343
344         # Start the DHT lookup
345         lookupDefer = self.manager.dht.getValue(key)
346         lookupDefer.addCallback(self._getDHTPieces, key)
347         
348     def _getDHTPieces(self, results, key):
349         """Check the retrieved values."""
350         for result in results:
351             # Make sure the hash matches the key
352             result_hash = sha.new(result.get('t', '')).digest()
353             if result_hash == key:
354                 pieces = result['t']
355                 self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
356                 log.msg('Retrieved %d piece hashes from the DHT' % len(self.pieces))
357                 self.startDownload()
358                 return
359             
360         # Continue without the piece hashes
361         log.msg('Could not retrieve the piece hashes from the DHT')
362         self.pieces = []
363         self.startDownload()
364
365     def getPeerPieces(self, key, failedSite = None):
366         """Retrieve the piece information from the peers.
367         
368         @param key: the key to request from the peers
369         """
370         if failedSite is None:
371             self.outstanding = 0
372             # Remove any peers with the wrong piece hash
373             #for site in self.peers.keys():
374             #    if self.peers[site].get('l', '') != key:
375             #        del self.peers[site]
376         else:
377             self.peers[failedSite]['failed'] = True
378             self.outstanding -= 1
379
380         if self.pieces is None:
381             # Send a request to one or more peers
382             for site in self.peers:
383                 if self.peers[site].get('failed', False) != True:
384                     path = '/~/' + quote_plus(key)
385                     lookupDefer = self.peers[site]['peer'].get(path)
386                     lookupDefer.addCallbacks(self._getPeerPieces, self._gotPeerError,
387                                              callbackArgs=(key, site), errbackArgs=(key, site))
388                     self.outstanding += 1
389                     if self.outstanding >= 3:
390                         break
391         
392         if self.pieces is None and self.outstanding == 0:
393             # Continue without the piece hashes
394             log.msg('Could not retrieve the piece hashes from the peers')
395             self.pieces = []
396             self.startDownload()
397         
398     def _getPeerPieces(self, response, key, site):
399         """Process the retrieved response from the peer."""
400         if response.code != 200:
401             # Request failed, try a different peer
402             self.getPeerPieces(key, site)
403         else:
404             # Read the response stream to a string
405             self.peers[site]['pieces'] = ''
406             def _gotPeerPiece(data, self = self, site = site):
407                 self.peers[site]['pieces'] += data
408             df = stream.readStream(response.stream, _gotPeerPiece)
409             df.addCallbacks(self._gotPeerPieces, self._gotPeerError,
410                             callbackArgs=(key, site), errbackArgs=(key, site))
411
412     def _gotPeerError(self, err, key, site):
413         """Peer failed, try again."""
414         log.err(err)
415         self.getPeerPieces(key, site)
416
417     def _gotPeerPieces(self, result, key, site):
418         """Check the retrieved pieces from the peer."""
419         if self.pieces is not None:
420             # Already done
421             return
422         
423         try:
424             result = bdecode(self.peers[site]['pieces'])
425         except:
426             log.err()
427             self.getPeerPieces(key, site)
428             return
429             
430         result_hash = sha.new(result.get('t', '')).digest()
431         if result_hash == key:
432             pieces = result['t']
433             self.pieces = [pieces[x:x+20] for x in xrange(0, len(pieces), 20)]
434             log.msg('Retrieved %d piece hashes from the peer' % len(self.pieces))
435             self.startDownload()
436         else:
437             log.msg('Peer returned a piece string that did not match')
438             self.getPeerPieces(key, site)
439
440     #{ Downloading the file
441     def sort(self):
442         """Sort the peers by their rank (highest ranked at the end)."""
443         def sort(a, b):
444             """Sort peers by their rank."""
445             if a.rank > b.rank:
446                 return 1
447             elif a.rank < b.rank:
448                 return -1
449             return 0
450         self.peerlist.sort(sort)
451
452     def startDownload(self):
453         """Start the download from the peers."""
454         # Don't start twice
455         if self.started:
456             return
457         
458         self.started = True
459         assert self.pieces is not None, "You must initialize the piece hashes first"
460         self.peerlist = [self.peers[site]['peer'] for site in self.peers]
461         
462         # Special case if there's only one good peer left
463         if len(self.peerlist) == 1:
464             log.msg('Downloading from peer %r' % (self.peerlist[0], ))
465             self.defer.callback(self.peerlist[0].get(self.path))
466             return
467         
468         # Start sending the return file
469         self.stream = GrowingFileStream(self.file)
470         resp = Response(200, {}, self.stream)
471         self.defer.callback(resp)
472
473         # Begin to download the pieces
474         self.outstanding = 0
475         self.nextFinish = 0
476         if self.pieces:
477             self.completePieces = [False for piece in self.pieces]
478         else:
479             self.completePieces = [False]
480         self.getPieces()
481         
482     #{ Downloading the pieces
483     def getPieces(self):
484         """Download the next pieces from the peers."""
485         self.sort()
486         piece = self.nextFinish
487         while self.outstanding < 4 and self.peerlist and piece < len(self.completePieces):
488             if self.completePieces[piece] == False:
489                 # Send a request to the highest ranked peer
490                 peer = self.peerlist.pop()
491                 self.completePieces[piece] = peer
492                 
493                 self.outstanding += 1
494                 if self.pieces:
495                     df = peer.getRange(self.path, piece*PIECE_SIZE, (piece+1)*PIECE_SIZE - 1)
496                 else:
497                     df = peer.get(self.path)
498                 reactor.callLater(0, df.addCallbacks,
499                                   *(self._getPiece, self._getError),
500                                   **{'callbackArgs': (piece, peer),
501                                      'errbackArgs': (piece, peer)})
502                 piece += 1
503                 
504         # Check if we're don
505         if self.outstanding <= 0 and self.nextFinish >= len(self.completePieces):
506             self.stream.allAvailable()
507     
508     def _getPiece(self, response, piece, peer):
509         """Process the retrieved headers from the peer."""
510         if ((len(self.completePieces) > 1 and response.code != 206) or
511             (response.code not in (200, 206))):
512             # Request failed, try a different peer
513             peer.hashError()
514             self.completePieces[piece] = False
515             if response.stream and response.stream.length:
516                 stream.readAndDiscard(response.stream)
517         else:
518             # Read the response stream to the file
519             if response.code == 206:
520                 df = StreamToFile(response.stream, self.file, piece*PIECE_SIZE, PIECE_SIZE).run()
521             else:
522                 df = StreamToFile(response.stream, self.file).run()
523             df.addCallbacks(self._gotPiece, self._gotError,
524                             callbackArgs=(piece, peer), errbackArgs=(piece, peer))
525
526         self.outstanding -= 1
527         self.peerlist.append(peer)
528         self.getPieces()
529
530     def _getError(self, err, piece, peer):
531         """Peer failed, try again."""
532         self.outstanding -= 1
533         self.peerlist.append(peer)
534         self.completePieces[piece] = False
535         self.getPieces()
536         log.err(err)
537
538     def _gotPiece(self, response, piece, peer):
539         """Process the retrieved piece from the peer."""
540         if ((self.pieces and response != self.pieces[piece]) or
541             (len(self.pieces) == 0 and response == self.hash.expected())):
542             # Hash doesn't match
543             peer.hashError()
544             self.completePieces[piece] = False
545         elif self.pieces:
546             # Successfully completed one of several pieces
547             self.completePieces[piece] = True
548             while (self.nextFinish < len(self.completePieces) and
549                    self.completePieces[self.nextFinish] == True):
550                 self.nextFinish += 1
551                 self.stream.updateAvailable(PIECE_SIZE)
552         else:
553             # Whole download (only one piece) is complete
554             self.completePieces[piece] = True
555             self.stream.updateAvailable(2**30)
556
557         self.getPieces()
558
559     def _gotError(self, err, piece, peer):
560         """Piece download failed, try again."""
561         log.err(err)
562         self.completePieces[piece] = False
563         self.getPieces()
564         
565 class PeerManager:
566     """Manage a set of peers and the requests to them.
567     
568     @type cache_dir: L{twisted.python.filepath.FilePath}
569     @ivar cache_dir: the directory to use for storing all files
570     @type dht: L{interfaces.IDHT}
571     @ivar dht: the DHT instance
572     @type clients: C{dictionary}
573     @ivar clients: the available peers that have been previously contacted
574     """
575
576     def __init__(self, cache_dir, dht):
577         """Initialize the instance."""
578         self.cache_dir = cache_dir
579         self.cache_dir.restat(False)
580         if not self.cache_dir.exists():
581             self.cache_dir.makedirs()
582         self.dht = dht
583         self.clients = {}
584         
585     def get(self, hash, mirror, peers = [], method="GET", modtime=None):
586         """Download from a list of peers or fallback to a mirror.
587         
588         @type hash: L{Hash.HashObject}
589         @param hash: the hash object containing the expected hash for the file
590         @param mirror: the URI of the file on the mirror
591         @type peers: C{list} of C{string}
592         @param peers: a list of the peer info where the file can be found
593             (optional, defaults to downloading from the mirror)
594         @type method: C{string}
595         @param method: the HTTP method to use, 'GET' or 'HEAD'
596             (optional, defaults to 'GET')
597         @type modtime: C{int}
598         @param modtime: the modification time to use for an 'If-Modified-Since'
599             header, as seconds since the epoch
600             (optional, defaults to not sending that header)
601         """
602         if not peers or method != "GET" or modtime is not None:
603             log.msg('Downloading (%s) from mirror %s' % (method, mirror))
604             parsed = urlparse(mirror)
605             assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
606             site = splitHostPort(parsed[0], parsed[1])
607             path = urlunparse(('', '') + parsed[2:])
608             peer = self.getPeer(site)
609             return peer.get(path, method, modtime)
610         elif len(peers) == 1:
611             site = uncompact(peers[0]['c'])
612             log.msg('Downloading from peer %r' % (site, ))
613             path = '/~/' + quote_plus(hash.expected())
614             peer = self.getPeer(site)
615             return peer.get(path)
616         else:
617             tmpfile = self.cache_dir.child(hash.hexexpected())
618             return FileDownload(self, hash, mirror, peers, tmpfile).run()
619         
620     def getPeer(self, site):
621         """Create a new peer if necessary and return it.
622         
623         @type site: (C{string}, C{int})
624         @param site: the IP address and port of the peer
625         """
626         if site not in self.clients:
627             self.clients[site] = Peer(site[0], site[1])
628         return self.clients[site]
629     
630     def close(self):
631         """Close all the connections to peers."""
632         for site in self.clients:
633             self.clients[site].close()
634         self.clients = {}
635
636 class TestPeerManager(unittest.TestCase):
637     """Unit tests for the PeerManager."""
638     
639     manager = None
640     pending_calls = []
641     
642     def gotResp(self, resp, num, expect):
643         self.failUnless(resp.code >= 200 and resp.code < 300, "Got a non-200 response: %r" % resp.code)
644         if expect is not None:
645             self.failUnless(resp.stream.length == expect, "Length was incorrect, got %r, expected %r" % (resp.stream.length, expect))
646         def print_(n):
647             pass
648         def printdone(n):
649             pass
650         stream.readStream(resp.stream, print_).addCallback(printdone)
651     
652     def test_download(self):
653         """Tests a normal download."""
654         self.manager = PeerManager()
655         self.timeout = 10
656         
657         host = 'www.ietf.org'
658         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt')
659         d.addCallback(self.gotResp, 1, 1070)
660         return d
661         
662     def test_head(self):
663         """Tests a 'HEAD' request."""
664         self.manager = PeerManager()
665         self.timeout = 10
666         
667         host = 'www.ietf.org'
668         d = self.manager.get('', 'http://' + host + '/rfc/rfc0013.txt', method = "HEAD")
669         d.addCallback(self.gotResp, 1, 0)
670         return d
671         
672     def test_multiple_downloads(self):
673         """Tests multiple downloads with queueing and connection closing."""
674         self.manager = PeerManager()
675         self.timeout = 120
676         lastDefer = defer.Deferred()
677         
678         def newRequest(host, path, num, expect, last=False):
679             d = self.manager.get('', 'http://' + host + ':' + str(80) + path)
680             d.addCallback(self.gotResp, num, expect)
681             if last:
682                 d.addBoth(lastDefer.callback)
683                 
684         newRequest('www.ietf.org', "/rfc/rfc0006.txt", 1, 1776)
685         newRequest('www.ietf.org', "/rfc/rfc2362.txt", 2, 159833)
686         newRequest('www.google.ca', "/", 3, None)
687         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
688         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.ietf.org', '/rfc/rfc0048.txt', 5, 41696))
689         self.pending_calls.append(reactor.callLater(30, newRequest, 'www.ietf.org', '/rfc/rfc0022.txt', 6, 4606))
690         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
691         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0014.txt', 8, 27))
692         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.ietf.org', '/rfc/rfc0001.txt', 9, 21088))
693         self.pending_calls.append(reactor.callLater(62, newRequest, 'www.google.ca', '/intl/en/options/', 0, None, True))
694         return lastDefer
695         
696     def tearDown(self):
697         for p in self.pending_calls:
698             if p.active():
699                 p.cancel()
700         self.pending_calls = []
701         if self.manager:
702             self.manager.close()
703             self.manager = None