]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
call back from GetValues each time we get some new values
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 # don't ping unless it's been at least this many seconds since we've heard from a peer
25 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
26
27
28
29 # this is the main class!
30 class Khashmir(xmlrpc.XMLRPC):
31     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
32     def __init__(self, host, port):
33         self.node = Node(newID(), host, port)
34         self.table = KTable(self.node)
35         self.app = Application("xmlrpc")
36         self.app.listenTCP(port, server.Site(self))
37         
38         ## these databases may be more suited to on-disk rather than in-memory
39         # h((key, value)) -> (key, value, time) mappings
40         self.store = db.DB()
41         self.store.open(None, None, db.DB_BTREE)
42         
43         # <insert time> -> h((key, value))
44         self.itime = db.DB()
45         self.itime.set_flags(db.DB_DUP)
46         self.itime.open(None, None, db.DB_BTREE)
47
48         # key -> h((key, value))
49         self.kw = db.DB()
50         self.kw.set_flags(db.DB_DUP)
51         self.kw.open(None, None, db.DB_BTREE)
52
53
54     def render(self, request):
55         """
56             Override the built in render so we can have access to the request object!
57             note, crequest is probably only valid on the initial call (not after deferred!)
58         """
59         self.crequest = request
60         return xmlrpc.XMLRPC.render(self, request)
61
62         
63     #######
64     #######  LOCAL INTERFACE    - use these methods!
65     def addContact(self, host, port):
66         """
67          ping this node and add the contact info to the table on pong!
68         """
69         n =Node(" "*20, host, port)  # note, we 
70         self.sendPing(n)
71
72
73     ## this call is async!
74     def findNode(self, id, callback, errback=None):
75         """ returns the contact info for node, or the k closest nodes, from the global table """
76         # get K nodes out of local table/cache, or the node we want
77         nodes = self.table.findNodes(id)
78         d = Deferred()
79         d.addCallbacks(callback, errback)
80         if len(nodes) == 1 and nodes[0].id == id :
81             d.callback(nodes)
82         else:
83             # create our search state
84             state = FindNode(self, id, d.callback)
85             reactor.callFromThread(state.goWithNodes, nodes)
86     
87     
88     ## also async
89     def valueForKey(self, key, callback):
90         """ returns the values found for key in global table """
91         nodes = self.table.findNodes(key)
92         # create our search state
93         state = GetValue(self, key, callback)
94         reactor.callFromThread(state.goWithNodes, nodes)
95
96
97     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
98     def storeValueForKey(self, key, value):
99         """ stores the value for key in the global table, returns immediately, no status 
100             in this implementation, peers respond but don't indicate status to storing values
101             values are stored in peers on a first-come first-served basis
102             this will probably change so more than one value can be stored under a key
103         """
104         def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
105             for node in nodes:
106                 if node.id != self.node.id:
107                     df = node.storeValue(key, value, self.node.senderDict())
108                     df.addCallbacks(response, default)
109         # this call is asynch
110         self.findNode(key, _storeValueForKey)
111         
112         
113     def insertNode(self, n):
114         """
115         insert a node in our local table, pinging oldest contact in bucket, if necessary
116         
117         If all you have is a host/port, then use addContact, which calls this method after
118         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
119         a node into the table without it's peer-ID.  That means of course the node passed into this
120         method needs to be a properly formed Node object with a valid ID.
121         """
122         old = self.table.insertNode(n)
123         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
124             # the bucket is full, check to see if old node is still around and if so, replace it
125             
126             ## these are the callbacks used when we ping the oldest node in a bucket
127             def _staleNodeHandler(oldnode=old, newnode = n):
128                 """ called if the pinged node never responds """
129                 self.table.replaceStaleNode(old, newnode)
130         
131             def _notStaleNodeHandler(sender, old=old):
132                 """ called when we get a ping from the remote node """
133                 if sender['id'] == old.id:
134                     self.table.insertNode(old)
135
136             df = old.ping(self.node.senderDict())
137             df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
138
139
140     def sendPing(self, node):
141         """
142             ping a node
143         """
144         df = node.ping(self.node.senderDict())
145         ## these are the callbacks we use when we issue a PING
146         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
147             if id != 20 * ' ' and id != sender['id']:
148                 # whoah, got response from different peer than we were expecting
149                 pass
150             else:
151                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
152                 n = Node(sender['id'], host, port)
153                 table.insertNode(n)
154             return
155         def _defaultPong(err):
156             # this should probably increment a failed message counter and dump the node if it gets over a threshold
157             return      
158
159         df.addCallbacks(_pongHandler,_defaultPong)
160
161
162     def findCloseNodes(self):
163         """
164             This does a findNode on the ID one away from our own.  
165             This will allow us to populate our table with nodes on our network closest to our own.
166             This is called as soon as we start up with an empty table
167         """
168         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
169         def callback(nodes):
170             pass
171         self.findNode(id, callback)
172
173     def refreshTable(self):
174         """
175             
176         """
177         def callback(nodes):
178             pass
179
180         for bucket in self.table.buckets:
181             if time.time() - bucket.lastAccessed >= 60 * 60:
182                 id = randRange(bucket.min, bucket.max)
183                 self.findNode(id, callback)
184         
185  
186     #####
187     ##### INCOMING MESSAGE HANDLERS
188     
189     def xmlrpc_ping(self, sender):
190         """
191             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
192             returns sender dict
193         """
194         ip = self.crequest.getClientIP()
195         n = Node(sender['id'], ip, sender['port'])
196         self.insertNode(n)
197         return self.node.senderDict()
198                 
199     def xmlrpc_find_node(self, target, sender):
200         nodes = self.table.findNodes(target)
201         nodes = map(lambda node: node.senderDict(), nodes)
202         ip = self.crequest.getClientIP()
203         n = Node(sender['id'], ip, sender['port'])
204         self.insertNode(n)
205         return nodes, self.node.senderDict()
206     
207     def xmlrpc_store_value(self, key, value, sender):
208         h1 = sha(key+value).digest()
209         t = `time.time()`
210         if not self.store.has_key(h1):
211             v = dumps((key, value, t))
212             self.store.put(h1, v)
213             self.itime.put(t, h1)
214             self.kw.put(key, h1)
215         else:
216             # update last insert time
217             tup = loads(self.store[h1])
218             self.store[h1] = dumps((tup[0], tup[1], t))
219             self.itime.put(t, h1)
220
221         ip = self.crequest.getClientIP()
222         n = Node(sender['id'], ip, sender['port'])
223         self.insertNode(n)
224         return self.node.senderDict()
225         
226     def xmlrpc_find_value(self, key, sender):
227         ip = self.crequest.getClientIP()
228         n = Node(sender['id'], ip, sender['port'])
229         self.insertNode(n)
230         if self.kw.has_key(key):
231             c = self.kw.cursor()
232             tup = c.set_range(key)
233             l = []
234             while(tup):
235                 h1 = tup[1]
236                 v = loads(self.store[h1])[1]
237                 l.append(v)
238                 tup = c.next()
239             return {'values' : l}, self.node.senderDict()
240         else:
241             nodes = self.table.findNodes(key)
242             nodes = map(lambda node: node.senderDict(), nodes)
243             return {'nodes' : nodes}, self.node.senderDict()
244
245     ###
246     ### message response callbacks
247     # called when we get a response to store value
248     def _storedValueHandler(self, sender):
249         pass
250
251
252
253
254
255
256 #------ testing
257
258 def test_build_net(quiet=0, peers=64, pause=1):
259     from whrandom import randrange
260     import thread
261     port = 2001
262     l = []
263         
264     if not quiet:
265         print "Building %s peer table." % peers
266         
267     for i in xrange(peers):
268         a = Khashmir('localhost', port + i)
269         l.append(a)
270     
271
272     thread.start_new_thread(l[0].app.run, ())
273     time.sleep(1)
274     for peer in l[1:]:
275         peer.app.run()
276         #time.sleep(.25)
277
278     print "adding contacts...."
279
280     for peer in l[1:]:
281         n = l[randrange(0, len(l))].node
282         peer.addContact(n.host, n.port)
283         n = l[randrange(0, len(l))].node
284         peer.addContact(n.host, n.port)
285         n = l[randrange(0, len(l))].node
286         peer.addContact(n.host, n.port)
287         if pause:
288             time.sleep(.30)
289             
290     time.sleep(1)
291     print "finding close nodes...."
292
293     for peer in l:
294         peer.findCloseNodes()
295         if pause:
296             time.sleep(.5)
297     if pause:
298             time.sleep(10)
299 #    for peer in l:
300 #       peer.refreshTable()
301     return l
302         
303 def test_find_nodes(l, quiet=0):
304     import threading, sys
305     from whrandom import randrange
306     flag = threading.Event()
307     
308     n = len(l)
309     
310     a = l[randrange(0,n)]
311     b = l[randrange(0,n)]
312     
313     def callback(nodes, flag=flag):
314         if (len(nodes) >0) and (nodes[0].id == b.node.id):
315             print "test_find_nodes      PASSED"
316         else:
317             print "test_find_nodes      FAILED"
318         flag.set()
319     a.findNode(b.node.id, callback)
320     flag.wait()
321     
322 def test_find_value(l, quiet=0):
323     from whrandom import randrange
324     from sha import sha
325     import time, threading, sys
326     
327     fa = threading.Event()
328     fb = threading.Event()
329     fc = threading.Event()
330     
331     n = len(l)
332     a = l[randrange(0,n)]
333     b = l[randrange(0,n)]
334     c = l[randrange(0,n)]
335     d = l[randrange(0,n)]
336
337     key = sha(`randrange(0,100000)`).digest()
338     value = sha(`randrange(0,100000)`).digest()
339     if not quiet:
340         print "inserting value..."
341         sys.stdout.flush()
342     a.storeValueForKey(key, value)
343     time.sleep(3)
344     print "finding..."
345     sys.stdout.flush()
346     
347     class cb:
348         def __init__(self, flag, value=value):
349             self.flag = flag
350             self.val = value
351             self.found = 0
352         def callback(self, values):
353             try:
354                 if(len(values) == 0):
355                     if not self.found:
356                         print "find                FAILED"
357                     else:
358                         print "find                FOUND"
359                     sys.stdout.flush()
360
361                 else:
362                     if self.val in values:
363                         self.found = 1
364             finally:
365                 self.flag.set()
366
367     b.valueForKey(key, cb(fa).callback)
368     fa.wait()
369     c.valueForKey(key, cb(fb).callback)
370     fb.wait()
371     d.valueForKey(key, cb(fc).callback)    
372     fc.wait()
373     
374 def test_one(port):
375     import thread
376     k = Khashmir('localhost', port)
377     thread.start_new_thread(k.app.run, ())
378     return k
379     
380 if __name__ == "__main__":
381     l = test_build_net()
382     time.sleep(3)
383     print "finding nodes..."
384     for i in range(10):
385         test_find_nodes(l)
386     print "inserting and fetching values..."
387     for i in range(10):
388         test_find_value(l)