Initial revision
[quix0rs-apt-p2p.git] / dispatcher.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from bsddb3 import db ## find this at http://pybsddb.sf.net/
4 from bsddb3._db import DBNotFoundError
5 import time
6 import hash
7 from node import Node
8 from bencode import bencode, bdecode
9 #from threading import RLock
10
11 # max number of incoming or outgoing messages to process at a time
12 NUM_EVENTS = 5
13
14 class Transaction:
15     __slots__ = ['responseTemplate', 'id', 'dispatcher', 'target', 'payload', 'response', 'default', 'timeout']
16     def __init__(self, dispatcher, node, response_handler, default_handler, id = None, payload = None, timeout=60):
17         if id == None:
18             id = hash.newID()
19         self.id = id
20         self.dispatcher = dispatcher
21         self.target = node
22         self.payload = payload
23         self.response = response_handler
24         self.default = default_handler
25         self.timeout = time.time() + timeout
26         
27     def setPayload(self, payload):
28         self.payload = payload
29         
30     def setResponseTemplate(self, t):
31         self.responseTemplate = t
32
33     def responseHandler(self, msg):
34         if self.responseTemplate and callable(self.responseTemplate):
35             try:
36                 self.responseTemplate(msg)
37             except ValueError, reason:
38                 print "response %s" % (reason)
39                 print `msg['id'], self.target.id`
40                 return
41         self.response(self, msg)
42         
43     def defaultHandler(self):
44         self.default(self)
45         
46     def dispatch(self):
47         if callable(self.response) and callable(self.default):
48             self.dispatcher.initiate(self)
49         else:
50             self.dispatchNoResponse()
51     def dispatchNoResponse(self):
52         self.dispatcher.initiateNoResponse(self)
53         
54         
55         
56 class Dispatcher:
57     def __init__(self, listener, base_template, id):
58         self.id = id
59         self.listener = listener
60         self.transactions = {}
61         self.handlers = {}
62         self.timeout = db.DB()
63         self.timeout.set_flags(db.DB_DUP)
64         self.timeout.open(None, None, db.DB_BTREE)
65         self.BASE = base_template
66         self.stopped = 0
67         #self.tlock = RLock()
68         
69     def registerHandler(self, key, handler, template):
70         assert(callable(handler))
71         assert(callable(template))
72         self.handlers[key] = (handler, template)
73         
74     def initiate(self, transaction):
75         #self.tlock.acquire()
76         #ignore messages to ourself
77         if transaction.target.id == self.id:
78             return
79         self.transactions[transaction.id] = transaction
80         self.timeout.put(`transaction.timeout`, transaction.id)
81         ## queue the message!
82         self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port)
83         #self.tlock.release()
84         
85     def initiateNoResponse(self, transaction):
86         #ignore messages to ourself
87         if transaction.target.id == self.id:
88             return
89         #self.tlock.acquire()
90         self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port)
91         #self.tlock.release()
92         
93     def postEvent(self, callback, delay, extras=None):
94         #self.tlock.acquire()
95         t = Transaction(self, None, None, callback, timeout=delay)
96         t.extras = extras
97         self.transactions[t.id] = t
98         self.timeout.put(`t.timeout`, t.id)
99         #self.tlock.release()
100         
101     def flushExpiredEvents(self):
102         events = 0
103         tstamp = `time.time()`
104         #self.tlock.acquire()
105         c = self.timeout.cursor()
106         e = c.first()
107         while e and e[0] < tstamp:
108             events = events + 1
109             try:
110                 t = self.transactions[e[1]]
111                 del(self.transactions[e[1]])
112             except KeyError:
113                 # transaction must have completed or was otherwise cancelled
114                 pass
115             ## default callback!
116             else:
117                 t.defaultHandler()
118             tmp = c.next()
119             # handle duplicates in a silly way
120             if tmp and e != tmp:
121                 self.timeout.delete(e[0])
122             e = tmp
123         #self.tlock.release()
124         return events
125         
126     def flushOutgoing(self):
127         events = 0
128         n = self.listener.qLen()
129         if n > NUM_EVENTS:
130             n = NUM_EVENTS
131         for i in range(n):
132             self.listener.dispatchMsg()
133             events = events + 1
134         return events
135     
136     def handleIncoming(self):
137         events = 0
138         #self.tlock.acquire()
139         for i in range(NUM_EVENTS):
140             try:
141                 msg, addr = self.listener.receiveMsg()
142             except ValueError:
143                 break
144
145             ## decode message, handle message!
146             try:
147                 msg = bdecode(msg)
148             except ValueError:
149                 # wrongly encoded message?
150                 print "Bogus message received: %s" % msg
151                 continue
152             try:
153                 # check base template for correctness
154                 self.BASE(msg)
155             except ValueError, reason:
156                 # bad message!
157                 print "Incoming message: %s" % reason
158                 continue
159             try:
160                 # check to see if we already know about this transaction
161                 t = self.transactions[msg['tid']]
162                 if msg['id'] != t.target.id and t.target.id != " "*20:
163                     # we're expecting a response from someone else
164                     if msg['id'] == self.id:
165                         print "received our own response!  " + `self.id`
166                     else:
167                         print "response from wrong peer!  "+ `msg['id'],t.target.id`
168                 else:
169                     del(self.transactions[msg['tid']])
170                     self.timeout.delete(`t.timeout`)
171                     t.addr = addr
172                     # call transaction response handler
173                     t.responseHandler(msg)
174             except KeyError:
175                 # we don't know about it, must be unsolicited
176                 n = Node(msg['id'], addr[0], addr[1])
177                 t = Transaction(self, n, None, None, msg['tid'])
178                 if self.handlers.has_key(msg['type']):
179                     ## handle this transaction
180                     try:
181                         # check handler template
182                         self.handlers[msg['type']][1](msg)
183                     except ValueError, reason:
184                         print "BAD MESSAGE: %s" % reason
185                     else:
186                         self.handlers[msg['type']][0](t, msg)
187                 else:
188                     ## no transaction, no handler, drop it on the floor!
189                     pass
190             events = events + 1
191         #self.tlock.release()
192         return events
193         
194     def stop(self):
195         self.stopped = 1
196         
197     def run(self):
198         self.stopped = 0
199         while(not self.stopped):
200             events = self.runOnce()
201             ## sleep
202             if events == 0:
203                 time.sleep(0.1)
204         
205     def runOnce(self):
206         events = 0
207         ## handle some incoming messages
208         events = events + self.handleIncoming()
209         ## process some outstanding events
210         events = events + self.flushExpiredEvents()
211         ## send outgoing messages
212         events = events + self.flushOutgoing()
213         return events
214
215
216