]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
8321cf7ce08d3d83a22c92c2e343871e2176ec6b
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # decode values, they will be base64 encoded
96         # create our search state
97         state = GetValue(self, key, callback)
98         reactor.callFromThread(state.goWithNodes, nodes)
99
100
101
102     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
103     def storeValueForKey(self, key, value, callback=None):
104         """ stores the value for key in the global table, returns immediately, no status 
105             in this implementation, peers respond but don't indicate status to storing values
106             values are stored in peers on a first-come first-served basis
107             this will probably change so more than one value can be stored under a key
108         """
109         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
110             if not callback:
111                 # default callback - this will get called for each successful store value
112                 def _storedValueHandler(sender):
113                     pass
114                 response=_storedValueHandler
115             for node in nodes:
116                 if node.id != self.node.id:
117                     df = node.storeValue(key, value, self.node.senderDict())
118                     df.addCallbacks(response, default)
119         # this call is asynch
120         self.findNode(key, _storeValueForKey)
121         
122         
123     def insertNode(self, n):
124         """
125         insert a node in our local table, pinging oldest contact in bucket, if necessary
126         
127         If all you have is a host/port, then use addContact, which calls this method after
128         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
129         a node into the table without it's peer-ID.  That means of course the node passed into this
130         method needs to be a properly formed Node object with a valid ID.
131         """
132         old = self.table.insertNode(n)
133         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
134             # the bucket is full, check to see if old node is still around and if so, replace it
135             
136             ## these are the callbacks used when we ping the oldest node in a bucket
137             def _staleNodeHandler(oldnode=old, newnode = n):
138                 """ called if the pinged node never responds """
139                 self.table.replaceStaleNode(old, newnode)
140         
141             def _notStaleNodeHandler(sender, old=old):
142                 """ called when we get a ping from the remote node """
143                 sender = Node().initWithSenderDict(sender)
144                 if sender.id == old.id:
145                     self.table.insertNode(old)
146
147             df = old.ping(self.node.senderDict())
148             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
149
150
151     def sendPing(self, node):
152         """
153             ping a node
154         """
155         df = node.ping(self.node.senderDict())
156         ## these are the callbacks we use when we issue a PING
157         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
158             sender = Node().initWithSenderDict(sender)
159             if id != 20 * ' ' and id != sender.id:
160                 # whoah, got response from different peer than we were expecting
161                 pass
162             else:
163                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
164                 sender['host'] = host
165                 sender['port'] = port
166                 n = Node().initWithDict(sender)
167                 table.insertNode(n)
168             return
169         def _defaultPong(err):
170             # this should probably increment a failed message counter and dump the node if it gets over a threshold
171             return      
172
173         df.addCallbacks(_pongHandler,_defaultPong)
174
175
176     def findCloseNodes(self):
177         """
178             This does a findNode on the ID one away from our own.  
179             This will allow us to populate our table with nodes on our network closest to our own.
180             This is called as soon as we start up with an empty table
181         """
182         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
183         def callback(nodes):
184             pass
185         self.findNode(id, callback)
186
187     def refreshTable(self):
188         """
189             
190         """
191         def callback(nodes):
192             pass
193
194         for bucket in self.table.buckets:
195             if time.time() - bucket.lastAccessed >= 60 * 60:
196                 id = randRange(bucket.min, bucket.max)
197                 self.findNode(id, callback)
198         
199  
200     #####
201     ##### INCOMING MESSAGE HANDLERS
202     
203     def xmlrpc_ping(self, sender):
204         """
205             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
206             returns sender dict
207         """
208         ip = self.crequest.getClientIP()
209         sender['host'] = ip
210         n = Node().initWithDict(sender)
211         self.insertNode(n)
212         return self.node.senderDict()
213                 
214     def xmlrpc_find_node(self, target, sender):
215         nodes = self.table.findNodes(target)
216         nodes = map(lambda node: node.senderDict(), nodes)
217         ip = self.crequest.getClientIP()
218         sender['host'] = ip
219         n = Node().initWithDict(sender)
220         self.insertNode(n)
221         return nodes, self.node.senderDict()
222     
223     def xmlrpc_store_value(self, key, value, sender):
224         key = key.data
225         h1 = sha(key+value.data).digest()
226         t = `time.time()`
227         if not self.store.has_key(h1):
228             v = dumps((key, value.data, t))
229             self.store.put(h1, v)
230             self.itime.put(t, h1)
231             self.kw.put(key, h1)
232         else:
233             # update last insert time
234             tup = loads(self.store[h1])
235             self.store[h1] = dumps((tup[0], tup[1], t))
236             self.itime.put(t, h1)
237
238         ip = self.crequest.getClientIP()
239         sender['host'] = ip
240         n = Node().initWithDict(sender)
241         self.insertNode(n)
242         return self.node.senderDict()
243         
244     def xmlrpc_find_value(self, key, sender):
245         ip = self.crequest.getClientIP()
246         key = key.data
247         sender['host'] = ip
248         n = Node().initWithDict(sender)
249         self.insertNode(n)
250
251         if self.kw.has_key(key):
252             c = self.kw.cursor()
253             tup = c.set(key)
254             l = []
255             while(tup):
256                 h1 = tup[1]
257                 v = loads(self.store[h1])[1]
258                 l.append(v)
259                 tup = c.next()
260             l = map(lambda v: Binary(v), l)
261             return {'values' : l}, self.node.senderDict()
262         else:
263             nodes = self.table.findNodes(key)
264             nodes = map(lambda node: node.senderDict(), nodes)
265             return {'nodes' : nodes}, self.node.senderDict()
266
267
268
269
270
271 #------ testing
272
273 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
274     from whrandom import randrange
275     import thread
276     port = 2001
277     l = []
278         
279     if not quiet:
280         print "Building %s peer table." % peers
281         
282     for i in xrange(peers):
283         a = Khashmir(host, port + i)
284         l.append(a)
285     
286
287     thread.start_new_thread(l[0].app.run, ())
288     time.sleep(1)
289     for peer in l[1:]:
290         peer.app.run()
291         #time.sleep(.25)
292
293     print "adding contacts...."
294
295     for peer in l[1:]:
296         n = l[randrange(0, len(l))].node
297         peer.addContact(host, n.port)
298         n = l[randrange(0, len(l))].node
299         peer.addContact(host, n.port)
300         n = l[randrange(0, len(l))].node
301         peer.addContact(host, n.port)
302         if pause:
303             time.sleep(.30)
304             
305     time.sleep(1)
306     print "finding close nodes...."
307
308     for peer in l:
309         peer.findCloseNodes()
310         if pause:
311             time.sleep(.5)
312     if pause:
313             time.sleep(2)
314 #    for peer in l:
315 #       peer.refreshTable()
316     return l
317         
318 def test_find_nodes(l, quiet=0):
319     import threading, sys
320     from whrandom import randrange
321     flag = threading.Event()
322     
323     n = len(l)
324     
325     a = l[randrange(0,n)]
326     b = l[randrange(0,n)]
327     
328     def callback(nodes, flag=flag, id = b.node.id):
329         if (len(nodes) >0) and (nodes[0].id == id):
330             print "test_find_nodes      PASSED"
331         else:
332             print "test_find_nodes      FAILED"
333         flag.set()
334     a.findNode(b.node.id, callback)
335     flag.wait()
336     
337 def test_find_value(l, quiet=0):
338     from whrandom import randrange
339     from sha import sha
340     from hash import newID
341     import time, threading, sys
342     
343     fa = threading.Event()
344     fb = threading.Event()
345     fc = threading.Event()
346     
347     n = len(l)
348     a = l[randrange(0,n)]
349     b = l[randrange(0,n)]
350     c = l[randrange(0,n)]
351     d = l[randrange(0,n)]
352
353     key = newID()
354     value = newID()
355     if not quiet:
356         print "inserting value..."
357         sys.stdout.flush()
358     a.storeValueForKey(key, value)
359     time.sleep(3)
360     print "finding..."
361     sys.stdout.flush()
362     
363     class cb:
364         def __init__(self, flag, value=value):
365             self.flag = flag
366             self.val = value
367             self.found = 0
368         def callback(self, values):
369             try:
370                 if(len(values) == 0):
371                     if not self.found:
372                         print "find                FAILED"
373                     else:
374                         print "find                FOUND"
375                     sys.stdout.flush()
376
377                 else:
378                     if self.val in values:
379                         self.found = 1
380             finally:
381                 self.flag.set()
382
383     b.valueForKey(key, cb(fa).callback)
384     fa.wait()
385     c.valueForKey(key, cb(fb).callback)
386     fb.wait()
387     d.valueForKey(key, cb(fc).callback)    
388     fc.wait()
389     
390 def test_one(port):
391     import thread
392     k = Khashmir('localhost', port)
393     thread.start_new_thread(k.app.run, ())
394     return k
395     
396 if __name__ == "__main__":
397     l = test_build_net()
398     time.sleep(3)
399     print "finding nodes..."
400     for i in range(10):
401         test_find_nodes(l)
402     print "inserting and fetching values..."
403     for i in range(10):
404         test_find_value(l)