1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from bsddb3 import db ## find this at http://pybsddb.sf.net/
4 from bsddb3._db import DBNotFoundError
8 from bencode import bencode, bdecode
9 #from threading import RLock
11 # max number of incoming or outgoing messages to process at a time
15 __slots__ = ['responseTemplate', 'id', 'dispatcher', 'target', 'payload', 'response', 'default', 'timeout']
16 def __init__(self, dispatcher, node, response_handler, default_handler, id = None, payload = None, timeout=60):
20 self.dispatcher = dispatcher
22 self.payload = payload
23 self.response = response_handler
24 self.default = default_handler
25 self.timeout = time.time() + timeout
27 def setPayload(self, payload):
28 self.payload = payload
30 def setResponseTemplate(self, t):
31 self.responseTemplate = t
33 def responseHandler(self, msg):
34 if self.responseTemplate and callable(self.responseTemplate):
36 self.responseTemplate(msg)
37 except ValueError, reason:
38 print "response %s" % (reason)
39 print `msg['id'], self.target.id`
41 self.response(self, msg)
43 def defaultHandler(self):
47 if callable(self.response) and callable(self.default):
48 self.dispatcher.initiate(self)
50 self.dispatchNoResponse()
51 def dispatchNoResponse(self):
52 self.dispatcher.initiateNoResponse(self)
57 def __init__(self, listener, base_template, id):
59 self.listener = listener
60 self.transactions = {}
62 self.timeout = db.DB()
63 self.timeout.set_flags(db.DB_DUP)
64 self.timeout.open(None, None, db.DB_BTREE)
65 self.BASE = base_template
69 def registerHandler(self, key, handler, template):
70 assert(callable(handler))
71 assert(callable(template))
72 self.handlers[key] = (handler, template)
74 def initiate(self, transaction):
76 #ignore messages to ourself
77 if transaction.target.id == self.id:
79 self.transactions[transaction.id] = transaction
80 self.timeout.put(`transaction.timeout`, transaction.id)
82 self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port)
85 def initiateNoResponse(self, transaction):
86 #ignore messages to ourself
87 if transaction.target.id == self.id:
90 self.listener.qMsg(transaction.payload, transaction.target.host, transaction.target.port)
93 def postEvent(self, callback, delay, extras=None):
95 t = Transaction(self, None, None, callback, timeout=delay)
97 self.transactions[t.id] = t
98 self.timeout.put(`t.timeout`, t.id)
101 def flushExpiredEvents(self):
103 tstamp = `time.time()`
104 #self.tlock.acquire()
105 c = self.timeout.cursor()
107 while e and e[0] < tstamp:
110 t = self.transactions[e[1]]
111 del(self.transactions[e[1]])
113 # transaction must have completed or was otherwise cancelled
119 # handle duplicates in a silly way
121 self.timeout.delete(e[0])
123 #self.tlock.release()
126 def flushOutgoing(self):
128 n = self.listener.qLen()
132 self.listener.dispatchMsg()
136 def handleIncoming(self):
138 #self.tlock.acquire()
139 for i in range(NUM_EVENTS):
141 msg, addr = self.listener.receiveMsg()
145 ## decode message, handle message!
149 # wrongly encoded message?
150 print "Bogus message received: %s" % msg
153 # check base template for correctness
155 except ValueError, reason:
157 print "Incoming message: %s" % reason
160 # check to see if we already know about this transaction
161 t = self.transactions[msg['tid']]
162 if msg['id'] != t.target.id and t.target.id != " "*20:
163 # we're expecting a response from someone else
164 if msg['id'] == self.id:
165 print "received our own response! " + `self.id`
167 print "response from wrong peer! "+ `msg['id'],t.target.id`
169 del(self.transactions[msg['tid']])
170 self.timeout.delete(`t.timeout`)
172 # call transaction response handler
173 t.responseHandler(msg)
175 # we don't know about it, must be unsolicited
176 n = Node(msg['id'], addr[0], addr[1])
177 t = Transaction(self, n, None, None, msg['tid'])
178 if self.handlers.has_key(msg['type']):
179 ## handle this transaction
181 # check handler template
182 self.handlers[msg['type']][1](msg)
183 except ValueError, reason:
184 print "BAD MESSAGE: %s" % reason
186 self.handlers[msg['type']][0](t, msg)
188 ## no transaction, no handler, drop it on the floor!
191 #self.tlock.release()
199 while(not self.stopped):
200 events = self.runOnce()
207 ## handle some incoming messages
208 events = events + self.handleIncoming()
209 ## process some outstanding events
210 events = events + self.flushExpiredEvents()
211 ## send outgoing messages
212 events = events + self.flushOutgoing()