]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
now search local store for keys in addititon to net
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # create our search state
96         state = GetValue(self, key, callback)
97         reactor.callFromThread(state.goWithNodes, nodes)
98         
99         # get locals
100         l = self.retrieveValues(key)
101         if len(l) > 0:
102             callback(l)
103
104
105
106     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
107     def storeValueForKey(self, key, value, callback=None):
108         """ stores the value for key in the global table, returns immediately, no status 
109             in this implementation, peers respond but don't indicate status to storing values
110             values are stored in peers on a first-come first-served basis
111             this will probably change so more than one value can be stored under a key
112         """
113         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
114             if not callback:
115                 # default callback - this will get called for each successful store value
116                 def _storedValueHandler(sender):
117                     pass
118                 response=_storedValueHandler
119             for node in nodes:
120                 if node.id != self.node.id:
121                     df = node.storeValue(key, value, self.node.senderDict())
122                     df.addCallbacks(response, default)
123         # this call is asynch
124         self.findNode(key, _storeValueForKey)
125         
126         
127     def insertNode(self, n):
128         """
129         insert a node in our local table, pinging oldest contact in bucket, if necessary
130         
131         If all you have is a host/port, then use addContact, which calls this method after
132         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
133         a node into the table without it's peer-ID.  That means of course the node passed into this
134         method needs to be a properly formed Node object with a valid ID.
135         """
136         old = self.table.insertNode(n)
137         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
138             # the bucket is full, check to see if old node is still around and if so, replace it
139             
140             ## these are the callbacks used when we ping the oldest node in a bucket
141             def _staleNodeHandler(oldnode=old, newnode = n):
142                 """ called if the pinged node never responds """
143                 self.table.replaceStaleNode(old, newnode)
144         
145             def _notStaleNodeHandler(sender, old=old):
146                 """ called when we get a ping from the remote node """
147                 sender = Node().initWithDict(sender)
148                 if sender.id == old.id:
149                     self.table.insertNode(old)
150
151             df = old.ping(self.node.senderDict())
152             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
153
154
155     def sendPing(self, node):
156         """
157             ping a node
158         """
159         df = node.ping(self.node.senderDict())
160         ## these are the callbacks we use when we issue a PING
161         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
162             if id != 20 * ' ' and id != sender['id'].data:
163                 # whoah, got response from different peer than we were expecting
164                 pass
165             else:
166                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
167                 sender['host'] = host
168                 sender['port'] = port
169                 n = Node().initWithDict(sender)
170                 table.insertNode(n)
171             return
172         def _defaultPong(err):
173             # this should probably increment a failed message counter and dump the node if it gets over a threshold
174             return      
175
176         df.addCallbacks(_pongHandler,_defaultPong)
177
178
179     def findCloseNodes(self):
180         """
181             This does a findNode on the ID one away from our own.  
182             This will allow us to populate our table with nodes on our network closest to our own.
183             This is called as soon as we start up with an empty table
184         """
185         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
186         def callback(nodes):
187             pass
188         self.findNode(id, callback)
189
190     def refreshTable(self):
191         """
192             
193         """
194         def callback(nodes):
195             pass
196
197         for bucket in self.table.buckets:
198             if time.time() - bucket.lastAccessed >= 60 * 60:
199                 id = randRange(bucket.min, bucket.max)
200                 self.findNode(id, callback)
201         
202  
203     def retrieveValues(self, key):
204         if self.kw.has_key(key):
205             c = self.kw.cursor()
206             tup = c.set(key)
207             l = []
208             while(tup and tup[0] == key):
209                 h1 = tup[1]
210                 v = loads(self.store[h1])[1]
211                 l.append(v)
212                 tup = c.next()
213             l = map(lambda v: Binary(v), l)
214             return l
215         return []
216         
217     #####
218     ##### INCOMING MESSAGE HANDLERS
219     
220     def xmlrpc_ping(self, sender):
221         """
222             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
223             returns sender dict
224         """
225         ip = self.crequest.getClientIP()
226         sender['host'] = ip
227         n = Node().initWithDict(sender)
228         self.insertNode(n)
229         return self.node.senderDict()
230                 
231     def xmlrpc_find_node(self, target, sender):
232         nodes = self.table.findNodes(target.data)
233         nodes = map(lambda node: node.senderDict(), nodes)
234         ip = self.crequest.getClientIP()
235         sender['host'] = ip
236         n = Node().initWithDict(sender)
237         self.insertNode(n)
238         return nodes, self.node.senderDict()
239     
240     def xmlrpc_store_value(self, key, value, sender):
241         key = key.data
242         h1 = sha(key+value.data).digest()
243         t = `time.time()`
244         if not self.store.has_key(h1):
245             v = dumps((key, value.data, t))
246             self.store.put(h1, v)
247             self.itime.put(t, h1)
248             self.kw.put(key, h1)
249         else:
250             # update last insert time
251             tup = loads(self.store[h1])
252             self.store[h1] = dumps((tup[0], tup[1], t))
253             self.itime.put(t, h1)
254
255         ip = self.crequest.getClientIP()
256         sender['host'] = ip
257         n = Node().initWithDict(sender)
258         self.insertNode(n)
259         return self.node.senderDict()
260         
261     def xmlrpc_find_value(self, key, sender):
262         ip = self.crequest.getClientIP()
263         key = key.data
264         sender['host'] = ip
265         n = Node().initWithDict(sender)
266         self.insertNode(n)
267
268         l = self.retrieveValues(key)
269         if len(l):
270             return {'values' : l}, self.node.senderDict()
271         else:
272             nodes = self.table.findNodes(key)
273             nodes = map(lambda node: node.senderDict(), nodes)
274             return {'nodes' : nodes}, self.node.senderDict()
275
276
277
278
279
280 #------ testing
281
282 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
283     from whrandom import randrange
284     import thread
285     port = 2001
286     l = []
287         
288     if not quiet:
289         print "Building %s peer table." % peers
290         
291     for i in xrange(peers):
292         a = Khashmir(host, port + i)
293         l.append(a)
294     
295
296     thread.start_new_thread(l[0].app.run, ())
297     time.sleep(1)
298     for peer in l[1:]:
299         peer.app.run()
300         #time.sleep(.25)
301
302     print "adding contacts...."
303
304     for peer in l[1:]:
305         n = l[randrange(0, len(l))].node
306         peer.addContact(host, n.port)
307         n = l[randrange(0, len(l))].node
308         peer.addContact(host, n.port)
309         n = l[randrange(0, len(l))].node
310         peer.addContact(host, n.port)
311         if pause:
312             time.sleep(.30)
313             
314     time.sleep(1)
315     print "finding close nodes...."
316
317     for peer in l:
318         peer.findCloseNodes()
319         if pause:
320             time.sleep(.5)
321     if pause:
322             time.sleep(2)
323 #    for peer in l:
324 #       peer.refreshTable()
325     return l
326         
327 def test_find_nodes(l, quiet=0):
328     import threading, sys
329     from whrandom import randrange
330     flag = threading.Event()
331     
332     n = len(l)
333     
334     a = l[randrange(0,n)]
335     b = l[randrange(0,n)]
336     
337     def callback(nodes, flag=flag, id = b.node.id):
338         if (len(nodes) >0) and (nodes[0].id == id):
339             print "test_find_nodes      PASSED"
340         else:
341             print "test_find_nodes      FAILED"
342         flag.set()
343     a.findNode(b.node.id, callback)
344     flag.wait()
345     
346 def test_find_value(l, quiet=0):
347     from whrandom import randrange
348     from sha import sha
349     from hash import newID
350     import time, threading, sys
351     
352     fa = threading.Event()
353     fb = threading.Event()
354     fc = threading.Event()
355     
356     n = len(l)
357     a = l[randrange(0,n)]
358     b = l[randrange(0,n)]
359     c = l[randrange(0,n)]
360     d = l[randrange(0,n)]
361
362     key = newID()
363     value = newID()
364     if not quiet:
365         print "inserting value..."
366         sys.stdout.flush()
367     a.storeValueForKey(key, value)
368     time.sleep(3)
369     print "finding..."
370     sys.stdout.flush()
371     
372     class cb:
373         def __init__(self, flag, value=value):
374             self.flag = flag
375             self.val = value
376             self.found = 0
377         def callback(self, values):
378             try:
379                 if(len(values) == 0):
380                     if not self.found:
381                         print "find                FAILED"
382                     else:
383                         print "find                FOUND"
384                     sys.stdout.flush()
385
386                 else:
387                     if self.val in values:
388                         self.found = 1
389             finally:
390                 self.flag.set()
391
392     b.valueForKey(key, cb(fa).callback)
393     fa.wait()
394     c.valueForKey(key, cb(fb).callback)
395     fb.wait()
396     d.valueForKey(key, cb(fc).callback)    
397     fc.wait()
398     
399 def test_one(port):
400     import thread
401     k = Khashmir('localhost', port)
402     thread.start_new_thread(k.app.run, ())
403     return k
404     
405 if __name__ == "__main__":
406     l = test_build_net()
407     time.sleep(3)
408     print "finding nodes..."
409     for i in range(10):
410         test_find_nodes(l)
411     print "inserting and fetching values..."
412     for i in range(10):
413         test_find_value(l)