]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
don't return duplicates if we have a key in our store
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 from xmlrpclib import Binary
25
26 # don't ping unless it's been at least this many seconds since we've heard from a peer
27 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
28
29
30
31 # this is the main class!
32 class Khashmir(xmlrpc.XMLRPC):
33     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
34     def __init__(self, host, port):
35         self.node = Node().init(newID(), host, port)
36         self.table = KTable(self.node)
37         self.app = Application("xmlrpc")
38         self.app.listenTCP(port, server.Site(self))
39         
40         ## these databases may be more suited to on-disk rather than in-memory
41         # h((key, value)) -> (key, value, time) mappings
42         self.store = db.DB()
43         self.store.open(None, None, db.DB_BTREE)
44         
45         # <insert time> -> h((key, value))
46         self.itime = db.DB()
47         self.itime.set_flags(db.DB_DUP)
48         self.itime.open(None, None, db.DB_BTREE)
49
50         # key -> h((key, value))
51         self.kw = db.DB()
52         self.kw.set_flags(db.DB_DUP)
53         self.kw.open(None, None, db.DB_BTREE)
54
55         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56         
57     def render(self, request):
58         """
59             Override the built in render so we can have access to the request object!
60             note, crequest is probably only valid on the initial call (not after deferred!)
61         """
62         self.crequest = request
63         return xmlrpc.XMLRPC.render(self, request)
64
65         
66     #######
67     #######  LOCAL INTERFACE    - use these methods!
68     def addContact(self, host, port):
69         """
70          ping this node and add the contact info to the table on pong!
71         """
72         n =Node().init(" "*20, host, port)  # note, we 
73         self.sendPing(n)
74
75
76     ## this call is async!
77     def findNode(self, id, callback, errback=None):
78         """ returns the contact info for node, or the k closest nodes, from the global table """
79         # get K nodes out of local table/cache, or the node we want
80         nodes = self.table.findNodes(id)
81         d = Deferred()
82         d.addCallbacks(callback, errback)
83         if len(nodes) == 1 and nodes[0].id == id :
84             d.callback(nodes)
85         else:
86             # create our search state
87             state = FindNode(self, id, d.callback)
88             reactor.callFromThread(state.goWithNodes, nodes)
89     
90     
91     ## also async
92     def valueForKey(self, key, callback):
93         """ returns the values found for key in global table """
94         nodes = self.table.findNodes(key)
95
96         # get locals
97         l = self.retrieveValues(key)
98         if len(l) > 0:
99             reactor.callFromThread(callback, l)
100
101         # create our search state
102         state = GetValue(self, key, callback)
103         reactor.callFromThread(state.goWithNodes, nodes, {'found' : l})
104         
105
106
107     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
108     def storeValueForKey(self, key, value, callback=None):
109         """ stores the value for key in the global table, returns immediately, no status 
110             in this implementation, peers respond but don't indicate status to storing values
111             values are stored in peers on a first-come first-served basis
112             this will probably change so more than one value can be stored under a key
113         """
114         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
115             if not callback:
116                 # default callback - this will get called for each successful store value
117                 def _storedValueHandler(sender):
118                     pass
119                 response=_storedValueHandler
120             for node in nodes:
121                 if node.id != self.node.id:
122                     df = node.storeValue(key, value, self.node.senderDict())
123                     df.addCallbacks(response, default)
124         # this call is asynch
125         self.findNode(key, _storeValueForKey)
126         
127         
128     def insertNode(self, n):
129         """
130         insert a node in our local table, pinging oldest contact in bucket, if necessary
131         
132         If all you have is a host/port, then use addContact, which calls this method after
133         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
134         a node into the table without it's peer-ID.  That means of course the node passed into this
135         method needs to be a properly formed Node object with a valid ID.
136         """
137         old = self.table.insertNode(n)
138         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
139             # the bucket is full, check to see if old node is still around and if so, replace it
140             
141             ## these are the callbacks used when we ping the oldest node in a bucket
142             def _staleNodeHandler(oldnode=old, newnode = n):
143                 """ called if the pinged node never responds """
144                 self.table.replaceStaleNode(old, newnode)
145         
146             def _notStaleNodeHandler(sender, old=old):
147                 """ called when we get a ping from the remote node """
148                 sender = Node().initWithDict(sender)
149                 if sender.id == old.id:
150                     self.table.insertNode(old)
151
152             df = old.ping(self.node.senderDict())
153             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
154
155
156     def sendPing(self, node):
157         """
158             ping a node
159         """
160         df = node.ping(self.node.senderDict())
161         ## these are the callbacks we use when we issue a PING
162         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
163             if id != 20 * ' ' and id != sender['id'].data:
164                 # whoah, got response from different peer than we were expecting
165                 pass
166             else:
167                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
168                 sender['host'] = host
169                 sender['port'] = port
170                 n = Node().initWithDict(sender)
171                 table.insertNode(n)
172             return
173         def _defaultPong(err):
174             # this should probably increment a failed message counter and dump the node if it gets over a threshold
175             return      
176
177         df.addCallbacks(_pongHandler,_defaultPong)
178
179
180     def findCloseNodes(self):
181         """
182             This does a findNode on the ID one away from our own.  
183             This will allow us to populate our table with nodes on our network closest to our own.
184             This is called as soon as we start up with an empty table
185         """
186         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
187         def callback(nodes):
188             pass
189         self.findNode(id, callback)
190
191     def refreshTable(self):
192         """
193             
194         """
195         def callback(nodes):
196             pass
197
198         for bucket in self.table.buckets:
199             if time.time() - bucket.lastAccessed >= 60 * 60:
200                 id = randRange(bucket.min, bucket.max)
201                 self.findNode(id, callback)
202         
203  
204     def retrieveValues(self, key):
205         if self.kw.has_key(key):
206             c = self.kw.cursor()
207             tup = c.set(key)
208             l = []
209             while(tup and tup[0] == key):
210                 h1 = tup[1]
211                 v = loads(self.store[h1])[1]
212                 l.append(v)
213                 tup = c.next()
214             return l
215         return []
216         
217     #####
218     ##### INCOMING MESSAGE HANDLERS
219     
220     def xmlrpc_ping(self, sender):
221         """
222             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
223             returns sender dict
224         """
225         ip = self.crequest.getClientIP()
226         sender['host'] = ip
227         n = Node().initWithDict(sender)
228         self.insertNode(n)
229         return self.node.senderDict()
230                 
231     def xmlrpc_find_node(self, target, sender):
232         nodes = self.table.findNodes(target.data)
233         nodes = map(lambda node: node.senderDict(), nodes)
234         ip = self.crequest.getClientIP()
235         sender['host'] = ip
236         n = Node().initWithDict(sender)
237         self.insertNode(n)
238         return nodes, self.node.senderDict()
239     
240     def xmlrpc_store_value(self, key, value, sender):
241         key = key.data
242         h1 = sha(key+value.data).digest()
243         t = `time.time()`
244         if not self.store.has_key(h1):
245             v = dumps((key, value.data, t))
246             self.store.put(h1, v)
247             self.itime.put(t, h1)
248             self.kw.put(key, h1)
249         else:
250             # update last insert time
251             tup = loads(self.store[h1])
252             self.store[h1] = dumps((tup[0], tup[1], t))
253             self.itime.put(t, h1)
254
255         ip = self.crequest.getClientIP()
256         sender['host'] = ip
257         n = Node().initWithDict(sender)
258         self.insertNode(n)
259         return self.node.senderDict()
260         
261     def xmlrpc_find_value(self, key, sender):
262         ip = self.crequest.getClientIP()
263         key = key.data
264         sender['host'] = ip
265         n = Node().initWithDict(sender)
266         self.insertNode(n)
267
268         l = self.retrieveValues(key)
269         if len(l) > 0:
270             l = map(lambda v: Binary(v), l)
271             return {'values' : l}, self.node.senderDict()
272         else:
273             nodes = self.table.findNodes(key)
274             nodes = map(lambda node: node.senderDict(), nodes)
275             return {'nodes' : nodes}, self.node.senderDict()
276
277
278
279
280
281 #------ testing
282
283 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
284     from whrandom import randrange
285     import thread
286     port = 2001
287     l = []
288         
289     if not quiet:
290         print "Building %s peer table." % peers
291         
292     for i in xrange(peers):
293         a = Khashmir(host, port + i)
294         l.append(a)
295     
296
297     thread.start_new_thread(l[0].app.run, ())
298     time.sleep(1)
299     for peer in l[1:]:
300         peer.app.run()
301         #time.sleep(.25)
302
303     print "adding contacts...."
304
305     for peer in l[1:]:
306         n = l[randrange(0, len(l))].node
307         peer.addContact(host, n.port)
308         n = l[randrange(0, len(l))].node
309         peer.addContact(host, n.port)
310         n = l[randrange(0, len(l))].node
311         peer.addContact(host, n.port)
312         if pause:
313             time.sleep(.30)
314             
315     time.sleep(1)
316     print "finding close nodes...."
317
318     for peer in l:
319         peer.findCloseNodes()
320         if pause:
321             time.sleep(.5)
322     if pause:
323             time.sleep(2)
324 #    for peer in l:
325 #       peer.refreshTable()
326     return l
327         
328 def test_find_nodes(l, quiet=0):
329     import threading, sys
330     from whrandom import randrange
331     flag = threading.Event()
332     
333     n = len(l)
334     
335     a = l[randrange(0,n)]
336     b = l[randrange(0,n)]
337     
338     def callback(nodes, flag=flag, id = b.node.id):
339         if (len(nodes) >0) and (nodes[0].id == id):
340             print "test_find_nodes      PASSED"
341         else:
342             print "test_find_nodes      FAILED"
343         flag.set()
344     a.findNode(b.node.id, callback)
345     flag.wait()
346     
347 def test_find_value(l, quiet=0):
348     from whrandom import randrange
349     from sha import sha
350     from hash import newID
351     import time, threading, sys
352     
353     fa = threading.Event()
354     fb = threading.Event()
355     fc = threading.Event()
356     
357     n = len(l)
358     a = l[randrange(0,n)]
359     b = l[randrange(0,n)]
360     c = l[randrange(0,n)]
361     d = l[randrange(0,n)]
362
363     key = newID()
364     value = newID()
365     if not quiet:
366         print "inserting value..."
367         sys.stdout.flush()
368     a.storeValueForKey(key, value)
369     time.sleep(3)
370     print "finding..."
371     sys.stdout.flush()
372     
373     class cb:
374         def __init__(self, flag, value=value):
375             self.flag = flag
376             self.val = value
377             self.found = 0
378         def callback(self, values):
379             try:
380                 if(len(values) == 0):
381                     if not self.found:
382                         print "find                FAILED"
383                     else:
384                         print "find                FOUND"
385                     sys.stdout.flush()
386
387                 else:
388                     if self.val in values:
389                         self.found = 1
390             finally:
391                 self.flag.set()
392
393     b.valueForKey(key, cb(fa).callback)
394     fa.wait()
395     c.valueForKey(key, cb(fb).callback)
396     fb.wait()
397     d.valueForKey(key, cb(fc).callback)    
398     fc.wait()
399     
400 def test_one(port):
401     import thread
402     k = Khashmir('localhost', port)
403     thread.start_new_thread(k.app.run, ())
404     return k
405     
406 if __name__ == "__main__":
407     l = test_build_net()
408     time.sleep(3)
409     print "finding nodes..."
410     for i in range(10):
411         test_find_nodes(l)
412     print "inserting and fetching values..."
413     for i in range(10):
414         test_find_value(l)