]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
updated for base64 encoding of hashes and values
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from base64 import decodestring as decode
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # decode values, they will be base64 encoded
96         def cbwrap(values, cb=callback):
97             values = map(lambda x: decode(x), values)
98             callback(values)
99         # create our search state
100         state = GetValue(self, key, cbwrap)
101         reactor.callFromThread(state.goWithNodes, nodes)
102
103
104
105     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
106     def storeValueForKey(self, key, value, callback=None):
107         """ stores the value for key in the global table, returns immediately, no status 
108             in this implementation, peers respond but don't indicate status to storing values
109             values are stored in peers on a first-come first-served basis
110             this will probably change so more than one value can be stored under a key
111         """
112         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
113             if not callback:
114                 # default callback - this will get called for each successful store value
115                 def _storedValueHandler(sender):
116                     pass
117                 response=_storedValueHandler
118             for node in nodes:
119                 if node.id != self.node.id:
120                     df = node.storeValue(key, value, self.node.senderDict())
121                     df.addCallbacks(response, default)
122         # this call is asynch
123         self.findNode(key, _storeValueForKey)
124         
125         
126     def insertNode(self, n):
127         """
128         insert a node in our local table, pinging oldest contact in bucket, if necessary
129         
130         If all you have is a host/port, then use addContact, which calls this method after
131         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
132         a node into the table without it's peer-ID.  That means of course the node passed into this
133         method needs to be a properly formed Node object with a valid ID.
134         """
135         old = self.table.insertNode(n)
136         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
137             # the bucket is full, check to see if old node is still around and if so, replace it
138             
139             ## these are the callbacks used when we ping the oldest node in a bucket
140             def _staleNodeHandler(oldnode=old, newnode = n):
141                 """ called if the pinged node never responds """
142                 self.table.replaceStaleNode(old, newnode)
143         
144             def _notStaleNodeHandler(sender, old=old):
145                 """ called when we get a ping from the remote node """
146                 if sender['id'] == old.id:
147                     self.table.insertNode(old)
148
149             df = old.ping(self.node.senderDict())
150             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
151
152
153     def sendPing(self, node):
154         """
155             ping a node
156         """
157         df = node.ping(self.node.senderDict())
158         ## these are the callbacks we use when we issue a PING
159         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
160             if id != 20 * ' ' and id != sender['id']:
161                 # whoah, got response from different peer than we were expecting
162                 pass
163             else:
164                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
165                 sender['host'] = host
166                 sender['port'] = port
167                 n = Node().initWithDict(sender)
168                 table.insertNode(n)
169             return
170         def _defaultPong(err):
171             # this should probably increment a failed message counter and dump the node if it gets over a threshold
172             return      
173
174         df.addCallbacks(_pongHandler,_defaultPong)
175
176
177     def findCloseNodes(self):
178         """
179             This does a findNode on the ID one away from our own.  
180             This will allow us to populate our table with nodes on our network closest to our own.
181             This is called as soon as we start up with an empty table
182         """
183         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
184         def callback(nodes):
185             pass
186         self.findNode(id, callback)
187
188     def refreshTable(self):
189         """
190             
191         """
192         def callback(nodes):
193             pass
194
195         for bucket in self.table.buckets:
196             if time.time() - bucket.lastAccessed >= 60 * 60:
197                 id = randRange(bucket.min, bucket.max)
198                 self.findNode(id, callback)
199         
200  
201     #####
202     ##### INCOMING MESSAGE HANDLERS
203     
204     def xmlrpc_ping(self, sender):
205         """
206             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
207             returns sender dict
208         """
209         ip = self.crequest.getClientIP()
210         sender['host'] = ip
211         n = Node().initWithDict(sender)
212         self.insertNode(n)
213         return self.node.senderDict()
214                 
215     def xmlrpc_find_node(self, target, sender):
216         nodes = self.table.findNodes(target)
217         nodes = map(lambda node: node.senderDict(), nodes)
218         ip = self.crequest.getClientIP()
219         sender['host'] = ip
220         n = Node().initWithDict(sender)
221         self.insertNode(n)
222         return nodes, self.node.senderDict()
223     
224     def xmlrpc_store_value(self, key, value, sender):
225         key = decode(key)
226         h1 = sha(key+value).digest()
227         t = `time.time()`
228         if not self.store.has_key(h1):
229             v = dumps((key, value, t))
230             self.store.put(h1, v)
231             self.itime.put(t, h1)
232             self.kw.put(key, h1)
233         else:
234             # update last insert time
235             tup = loads(self.store[h1])
236             self.store[h1] = dumps((tup[0], tup[1], t))
237             self.itime.put(t, h1)
238
239         ip = self.crequest.getClientIP()
240         sender['host'] = ip
241         n = Node().initWithDict(sender)
242         self.insertNode(n)
243         return self.node.senderDict()
244         
245     def xmlrpc_find_value(self, key, sender):
246         ip = self.crequest.getClientIP()
247         key = decode(key)
248         sender['host'] = ip
249         n = Node().initWithDict(sender)
250         self.insertNode(n)
251
252         if self.kw.has_key(key):
253             c = self.kw.cursor()
254             tup = c.set(key)
255             l = []
256             while(tup):
257                 h1 = tup[1]
258                 v = loads(self.store[h1])[1]
259                 l.append(v)
260                 tup = c.next()
261             return {'values' : l}, self.node.senderDict()
262         else:
263             nodes = self.table.findNodes(key)
264             nodes = map(lambda node: node.senderDict(), nodes)
265             return {'nodes' : nodes}, self.node.senderDict()
266
267
268
269
270
271 #------ testing
272
273 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
274     from whrandom import randrange
275     import thread
276     port = 2001
277     l = []
278         
279     if not quiet:
280         print "Building %s peer table." % peers
281         
282     for i in xrange(peers):
283         a = Khashmir(host, port + i)
284         l.append(a)
285     
286
287     thread.start_new_thread(l[0].app.run, ())
288     time.sleep(1)
289     for peer in l[1:]:
290         peer.app.run()
291         #time.sleep(.25)
292
293     print "adding contacts...."
294
295     for peer in l[1:]:
296         n = l[randrange(0, len(l))].node
297         peer.addContact(host, n.port)
298         n = l[randrange(0, len(l))].node
299         peer.addContact(host, n.port)
300         n = l[randrange(0, len(l))].node
301         peer.addContact(host, n.port)
302         if pause:
303             time.sleep(.30)
304             
305     time.sleep(1)
306     print "finding close nodes...."
307
308     for peer in l:
309         peer.findCloseNodes()
310         if pause:
311             time.sleep(.5)
312     if pause:
313             time.sleep(2)
314 #    for peer in l:
315 #       peer.refreshTable()
316     return l
317         
318 def test_find_nodes(l, quiet=0):
319     import threading, sys
320     from whrandom import randrange
321     flag = threading.Event()
322     
323     n = len(l)
324     
325     a = l[randrange(0,n)]
326     b = l[randrange(0,n)]
327     
328     def callback(nodes, flag=flag, id = b.node.id):
329         if (len(nodes) >0) and (nodes[0].id == id):
330             print "test_find_nodes      PASSED"
331         else:
332             print "test_find_nodes      FAILED"
333         flag.set()
334     a.findNode(b.node.id, callback)
335     flag.wait()
336     
337 def test_find_value(l, quiet=0):
338     from whrandom import randrange
339     from sha import sha
340     from hash import newID
341     import time, threading, sys
342     
343     fa = threading.Event()
344     fb = threading.Event()
345     fc = threading.Event()
346     
347     n = len(l)
348     a = l[randrange(0,n)]
349     b = l[randrange(0,n)]
350     c = l[randrange(0,n)]
351     d = l[randrange(0,n)]
352
353     key = newID()
354     value = newID()
355     if not quiet:
356         print "inserting value..."
357         sys.stdout.flush()
358     a.storeValueForKey(key, value)
359     time.sleep(3)
360     print "finding..."
361     sys.stdout.flush()
362     
363     class cb:
364         def __init__(self, flag, value=value):
365             self.flag = flag
366             self.val = value
367             self.found = 0
368         def callback(self, values):
369             try:
370                 if(len(values) == 0):
371                     if not self.found:
372                         print "find                FAILED"
373                     else:
374                         print "find                FOUND"
375                     sys.stdout.flush()
376
377                 else:
378                     if self.val in values:
379                         self.found = 1
380             finally:
381                 self.flag.set()
382
383     b.valueForKey(key, cb(fa).callback)
384     fa.wait()
385     c.valueForKey(key, cb(fb).callback)
386     fb.wait()
387     d.valueForKey(key, cb(fc).callback)    
388     fc.wait()
389     
390 def test_one(port):
391     import thread
392     k = Khashmir('localhost', port)
393     thread.start_new_thread(k.app.run, ())
394     return k
395     
396 if __name__ == "__main__":
397     l = test_build_net()
398     time.sleep(3)
399     print "finding nodes..."
400     for i in range(10):
401         test_find_nodes(l)
402     print "inserting and fetching values..."
403     for i in range(10):
404         test_find_value(l)