]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
509725bdfcd8ffe3563e3e2efd50359201d9f5a2
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95         # create our search state
96         state = GetValue(self, key, callback)
97         reactor.callFromThread(state.goWithNodes, nodes)
98         
99         # get locals
100         l = self.retrieveValues(key)
101         if len(l) > 0:
102             reactor.callFromThread(callback, l)
103
104
105     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
106     def storeValueForKey(self, key, value, callback=None):
107         """ stores the value for key in the global table, returns immediately, no status 
108             in this implementation, peers respond but don't indicate status to storing values
109             values are stored in peers on a first-come first-served basis
110             this will probably change so more than one value can be stored under a key
111         """
112         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
113             if not callback:
114                 # default callback - this will get called for each successful store value
115                 def _storedValueHandler(sender):
116                     pass
117                 response=_storedValueHandler
118             for node in nodes:
119                 if node.id != self.node.id:
120                     df = node.storeValue(key, value, self.node.senderDict())
121                     df.addCallbacks(response, default)
122         # this call is asynch
123         self.findNode(key, _storeValueForKey)
124         
125         
126     def insertNode(self, n):
127         """
128         insert a node in our local table, pinging oldest contact in bucket, if necessary
129         
130         If all you have is a host/port, then use addContact, which calls this method after
131         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
132         a node into the table without it's peer-ID.  That means of course the node passed into this
133         method needs to be a properly formed Node object with a valid ID.
134         """
135         old = self.table.insertNode(n)
136         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
137             # the bucket is full, check to see if old node is still around and if so, replace it
138             
139             ## these are the callbacks used when we ping the oldest node in a bucket
140             def _staleNodeHandler(oldnode=old, newnode = n):
141                 """ called if the pinged node never responds """
142                 self.table.replaceStaleNode(old, newnode)
143         
144             def _notStaleNodeHandler(sender, old=old):
145                 """ called when we get a ping from the remote node """
146                 sender = Node().initWithDict(sender)
147                 if sender.id == old.id:
148                     self.table.insertNode(old)
149
150             df = old.ping(self.node.senderDict())
151             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
152
153
154     def sendPing(self, node):
155         """
156             ping a node
157         """
158         df = node.ping(self.node.senderDict())
159         ## these are the callbacks we use when we issue a PING
160         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
161             if id != 20 * ' ' and id != sender['id'].data:
162                 # whoah, got response from different peer than we were expecting
163                 pass
164             else:
165                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
166                 sender['host'] = host
167                 sender['port'] = port
168                 n = Node().initWithDict(sender)
169                 table.insertNode(n)
170             return
171         def _defaultPong(err):
172             # this should probably increment a failed message counter and dump the node if it gets over a threshold
173             return      
174
175         df.addCallbacks(_pongHandler,_defaultPong)
176
177
178     def findCloseNodes(self):
179         """
180             This does a findNode on the ID one away from our own.  
181             This will allow us to populate our table with nodes on our network closest to our own.
182             This is called as soon as we start up with an empty table
183         """
184         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
185         def callback(nodes):
186             pass
187         self.findNode(id, callback)
188
189     def refreshTable(self):
190         """
191             
192         """
193         def callback(nodes):
194             pass
195
196         for bucket in self.table.buckets:
197             if time.time() - bucket.lastAccessed >= 60 * 60:
198                 id = randRange(bucket.min, bucket.max)
199                 self.findNode(id, callback)
200         
201  
202     def retrieveValues(self, key):
203         if self.kw.has_key(key):
204             c = self.kw.cursor()
205             tup = c.set(key)
206             l = []
207             while(tup and tup[0] == key):
208                 h1 = tup[1]
209                 v = loads(self.store[h1])[1]
210                 l.append(v)
211                 tup = c.next()
212             l = map(lambda v: Binary(v), l)
213             return l
214         return []
215         
216     #####
217     ##### INCOMING MESSAGE HANDLERS
218     
219     def xmlrpc_ping(self, sender):
220         """
221             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
222             returns sender dict
223         """
224         ip = self.crequest.getClientIP()
225         sender['host'] = ip
226         n = Node().initWithDict(sender)
227         self.insertNode(n)
228         return self.node.senderDict()
229                 
230     def xmlrpc_find_node(self, target, sender):
231         nodes = self.table.findNodes(target.data)
232         nodes = map(lambda node: node.senderDict(), nodes)
233         ip = self.crequest.getClientIP()
234         sender['host'] = ip
235         n = Node().initWithDict(sender)
236         self.insertNode(n)
237         return nodes, self.node.senderDict()
238     
239     def xmlrpc_store_value(self, key, value, sender):
240         key = key.data
241         h1 = sha(key+value.data).digest()
242         t = `time.time()`
243         if not self.store.has_key(h1):
244             v = dumps((key, value.data, t))
245             self.store.put(h1, v)
246             self.itime.put(t, h1)
247             self.kw.put(key, h1)
248         else:
249             # update last insert time
250             tup = loads(self.store[h1])
251             self.store[h1] = dumps((tup[0], tup[1], t))
252             self.itime.put(t, h1)
253
254         ip = self.crequest.getClientIP()
255         sender['host'] = ip
256         n = Node().initWithDict(sender)
257         self.insertNode(n)
258         return self.node.senderDict()
259         
260     def xmlrpc_find_value(self, key, sender):
261         ip = self.crequest.getClientIP()
262         key = key.data
263         sender['host'] = ip
264         n = Node().initWithDict(sender)
265         self.insertNode(n)
266
267         l = self.retrieveValues(key)
268         if len(l):
269             return {'values' : l}, self.node.senderDict()
270         else:
271             nodes = self.table.findNodes(key)
272             nodes = map(lambda node: node.senderDict(), nodes)
273             return {'nodes' : nodes}, self.node.senderDict()
274
275
276
277
278
279 #------ testing
280
281 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
282     from whrandom import randrange
283     import thread
284     port = 2001
285     l = []
286         
287     if not quiet:
288         print "Building %s peer table." % peers
289         
290     for i in xrange(peers):
291         a = Khashmir(host, port + i)
292         l.append(a)
293     
294
295     thread.start_new_thread(l[0].app.run, ())
296     time.sleep(1)
297     for peer in l[1:]:
298         peer.app.run()
299         #time.sleep(.25)
300
301     print "adding contacts...."
302
303     for peer in l[1:]:
304         n = l[randrange(0, len(l))].node
305         peer.addContact(host, n.port)
306         n = l[randrange(0, len(l))].node
307         peer.addContact(host, n.port)
308         n = l[randrange(0, len(l))].node
309         peer.addContact(host, n.port)
310         if pause:
311             time.sleep(.30)
312             
313     time.sleep(1)
314     print "finding close nodes...."
315
316     for peer in l:
317         peer.findCloseNodes()
318         if pause:
319             time.sleep(.5)
320     if pause:
321             time.sleep(2)
322 #    for peer in l:
323 #       peer.refreshTable()
324     return l
325         
326 def test_find_nodes(l, quiet=0):
327     import threading, sys
328     from whrandom import randrange
329     flag = threading.Event()
330     
331     n = len(l)
332     
333     a = l[randrange(0,n)]
334     b = l[randrange(0,n)]
335     
336     def callback(nodes, flag=flag, id = b.node.id):
337         if (len(nodes) >0) and (nodes[0].id == id):
338             print "test_find_nodes      PASSED"
339         else:
340             print "test_find_nodes      FAILED"
341         flag.set()
342     a.findNode(b.node.id, callback)
343     flag.wait()
344     
345 def test_find_value(l, quiet=0):
346     from whrandom import randrange
347     from sha import sha
348     from hash import newID
349     import time, threading, sys
350     
351     fa = threading.Event()
352     fb = threading.Event()
353     fc = threading.Event()
354     
355     n = len(l)
356     a = l[randrange(0,n)]
357     b = l[randrange(0,n)]
358     c = l[randrange(0,n)]
359     d = l[randrange(0,n)]
360
361     key = newID()
362     value = newID()
363     if not quiet:
364         print "inserting value..."
365         sys.stdout.flush()
366     a.storeValueForKey(key, value)
367     time.sleep(3)
368     print "finding..."
369     sys.stdout.flush()
370     
371     class cb:
372         def __init__(self, flag, value=value):
373             self.flag = flag
374             self.val = value
375             self.found = 0
376         def callback(self, values):
377             try:
378                 if(len(values) == 0):
379                     if not self.found:
380                         print "find                FAILED"
381                     else:
382                         print "find                FOUND"
383                     sys.stdout.flush()
384
385                 else:
386                     if self.val in values:
387                         self.found = 1
388             finally:
389                 self.flag.set()
390
391     b.valueForKey(key, cb(fa).callback)
392     fa.wait()
393     c.valueForKey(key, cb(fb).callback)
394     fb.wait()
395     d.valueForKey(key, cb(fc).callback)    
396     fc.wait()
397     
398 def test_one(port):
399     import thread
400     k = Khashmir('localhost', port)
401     thread.start_new_thread(k.app.run, ())
402     return k
403     
404 if __name__ == "__main__":
405     l = test_build_net()
406     time.sleep(3)
407     print "finding nodes..."
408     for i in range(10):
409         test_find_nodes(l)
410     print "inserting and fetching values..."
411     for i in range(10):
412         test_find_value(l)