]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
fix typo
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5 from pickle import loads, dumps
6 from sha import sha
7
8 from ktable import KTable, K
9 from knode import KNode as Node
10
11 from hash import newID
12
13 from actions import FindNode, GetValue, KeyExpirer
14 from twisted.web import xmlrpc
15 from twisted.internet.defer import Deferred
16 from twisted.python import threadable
17 from twisted.internet.app import Application
18 from twisted.web import server
19 threadable.init()
20
21 from bsddb3 import db ## find this at http://pybsddb.sf.net/
22 from bsddb3._db import DBNotFoundError
23
24 # don't ping unless it's been at least this many seconds since we've heard from a peer
25 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
26
27
28
29 # this is the main class!
30 class Khashmir(xmlrpc.XMLRPC):
31     __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
32     def __init__(self, host, port):
33         self.node = Node(newID(), host, port)
34         self.table = KTable(self.node)
35         self.app = Application("xmlrpc")
36         self.app.listenTCP(port, server.Site(self))
37         
38         ## these databases may be more suited to on-disk rather than in-memory
39         # h((key, value)) -> (key, value, time) mappings
40         self.store = db.DB()
41         self.store.open(None, None, db.DB_BTREE)
42         
43         # <insert time> -> h((key, value))
44         self.itime = db.DB()
45         self.itime.set_flags(db.DB_DUP)
46         self.itime.open(None, None, db.DB_BTREE)
47
48         # key -> h((key, value))
49         self.kw = db.DB()
50         self.kw.set_flags(db.DB_DUP)
51         self.kw.open(None, None, db.DB_BTREE)
52
53         KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
54         
55     def render(self, request):
56         """
57             Override the built in render so we can have access to the request object!
58             note, crequest is probably only valid on the initial call (not after deferred!)
59         """
60         self.crequest = request
61         return xmlrpc.XMLRPC.render(self, request)
62
63         
64     #######
65     #######  LOCAL INTERFACE    - use these methods!
66     def addContact(self, host, port):
67         """
68          ping this node and add the contact info to the table on pong!
69         """
70         n =Node(" "*20, host, port)  # note, we 
71         self.sendPing(n)
72
73
74     ## this call is async!
75     def findNode(self, id, callback, errback=None):
76         """ returns the contact info for node, or the k closest nodes, from the global table """
77         # get K nodes out of local table/cache, or the node we want
78         nodes = self.table.findNodes(id)
79         d = Deferred()
80         d.addCallbacks(callback, errback)
81         if len(nodes) == 1 and nodes[0].id == id :
82             d.callback(nodes)
83         else:
84             # create our search state
85             state = FindNode(self, id, d.callback)
86             reactor.callFromThread(state.goWithNodes, nodes)
87     
88     
89     ## also async
90     def valueForKey(self, key, callback):
91         """ returns the values found for key in global table """
92         nodes = self.table.findNodes(key)
93         # create our search state
94         state = GetValue(self, key, callback)
95         reactor.callFromThread(state.goWithNodes, nodes)
96
97
98
99     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
100     def storeValueForKey(self, key, value, callback=None):
101         """ stores the value for key in the global table, returns immediately, no status 
102             in this implementation, peers respond but don't indicate status to storing values
103             values are stored in peers on a first-come first-served basis
104             this will probably change so more than one value can be stored under a key
105         """
106         def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
107             if not callback:
108                 # default callback - this will get called for each successful store value
109                 def _storedValueHandler(sender):
110                     pass
111                 response=_storedValueHandler
112             for node in nodes:
113                 if node.id != self.node.id:
114                     df = node.storeValue(key, value, self.node.senderDict())
115                     df.addCallbacks(response, default)
116         # this call is asynch
117         self.findNode(key, _storeValueForKey)
118         
119         
120     def insertNode(self, n):
121         """
122         insert a node in our local table, pinging oldest contact in bucket, if necessary
123         
124         If all you have is a host/port, then use addContact, which calls this method after
125         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
126         a node into the table without it's peer-ID.  That means of course the node passed into this
127         method needs to be a properly formed Node object with a valid ID.
128         """
129         old = self.table.insertNode(n)
130         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
131             # the bucket is full, check to see if old node is still around and if so, replace it
132             
133             ## these are the callbacks used when we ping the oldest node in a bucket
134             def _staleNodeHandler(oldnode=old, newnode = n):
135                 """ called if the pinged node never responds """
136                 self.table.replaceStaleNode(old, newnode)
137         
138             def _notStaleNodeHandler(sender, old=old):
139                 """ called when we get a ping from the remote node """
140                 if sender['id'] == old.id:
141                     self.table.insertNode(old)
142
143             df = old.ping(self.node.senderDict())
144             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
145
146
147     def sendPing(self, node):
148         """
149             ping a node
150         """
151         df = node.ping(self.node.senderDict())
152         ## these are the callbacks we use when we issue a PING
153         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
154             if id != 20 * ' ' and id != sender['id']:
155                 # whoah, got response from different peer than we were expecting
156                 pass
157             else:
158                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
159                 n = Node(sender['id'], host, port)
160                 table.insertNode(n)
161             return
162         def _defaultPong(err):
163             # this should probably increment a failed message counter and dump the node if it gets over a threshold
164             return      
165
166         df.addCallbacks(_pongHandler,_defaultPong)
167
168
169     def findCloseNodes(self):
170         """
171             This does a findNode on the ID one away from our own.  
172             This will allow us to populate our table with nodes on our network closest to our own.
173             This is called as soon as we start up with an empty table
174         """
175         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
176         def callback(nodes):
177             pass
178         self.findNode(id, callback)
179
180     def refreshTable(self):
181         """
182             
183         """
184         def callback(nodes):
185             pass
186
187         for bucket in self.table.buckets:
188             if time.time() - bucket.lastAccessed >= 60 * 60:
189                 id = randRange(bucket.min, bucket.max)
190                 self.findNode(id, callback)
191         
192  
193     #####
194     ##### INCOMING MESSAGE HANDLERS
195     
196     def xmlrpc_ping(self, sender):
197         """
198             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
199             returns sender dict
200         """
201         ip = self.crequest.getClientIP()
202         n = Node(sender['id'], ip, sender['port'])
203         self.insertNode(n)
204         return self.node.senderDict()
205                 
206     def xmlrpc_find_node(self, target, sender):
207         nodes = self.table.findNodes(target)
208         nodes = map(lambda node: node.senderDict(), nodes)
209         ip = self.crequest.getClientIP()
210         n = Node(sender['id'], ip, sender['port'])
211         self.insertNode(n)
212         return nodes, self.node.senderDict()
213     
214     def xmlrpc_store_value(self, key, value, sender):
215         h1 = sha(key+value).digest()
216         t = `time.time()`
217         if not self.store.has_key(h1):
218             v = dumps((key, value, t))
219             self.store.put(h1, v)
220             self.itime.put(t, h1)
221             self.kw.put(key, h1)
222         else:
223             # update last insert time
224             tup = loads(self.store[h1])
225             self.store[h1] = dumps((tup[0], tup[1], t))
226             self.itime.put(t, h1)
227
228         ip = self.crequest.getClientIP()
229         n = Node(sender['id'], ip, sender['port'])
230         self.insertNode(n)
231         return self.node.senderDict()
232         
233     def xmlrpc_find_value(self, key, sender):
234         ip = self.crequest.getClientIP()
235         n = Node(sender['id'], ip, sender['port'])
236         self.insertNode(n)
237         if self.kw.has_key(key):
238             c = self.kw.cursor()
239             tup = c.set(key)
240             l = []
241             while(tup):
242                 h1 = tup[1]
243                 v = loads(self.store[h1])[1]
244                 l.append(v)
245                 tup = c.next()
246             return {'values' : l}, self.node.senderDict()
247         else:
248             nodes = self.table.findNodes(key)
249             nodes = map(lambda node: node.senderDict(), nodes)
250             return {'nodes' : nodes}, self.node.senderDict()
251
252
253
254
255
256 #------ testing
257
258 def test_build_net(quiet=0, peers=8, pause=1):
259     from whrandom import randrange
260     import thread
261     port = 2001
262     l = []
263         
264     if not quiet:
265         print "Building %s peer table." % peers
266         
267     for i in xrange(peers):
268         a = Khashmir('localhost', port + i)
269         l.append(a)
270     
271
272     thread.start_new_thread(l[0].app.run, ())
273     time.sleep(1)
274     for peer in l[1:]:
275         peer.app.run()
276         #time.sleep(.25)
277
278     print "adding contacts...."
279
280     for peer in l[1:]:
281         n = l[randrange(0, len(l))].node
282         peer.addContact(n.host, n.port)
283         n = l[randrange(0, len(l))].node
284         peer.addContact(n.host, n.port)
285         n = l[randrange(0, len(l))].node
286         peer.addContact(n.host, n.port)
287         if pause:
288             time.sleep(.30)
289             
290     time.sleep(1)
291     print "finding close nodes...."
292
293     for peer in l:
294         peer.findCloseNodes()
295         if pause:
296             time.sleep(.5)
297     if pause:
298             time.sleep(2)
299 #    for peer in l:
300 #       peer.refreshTable()
301     return l
302         
303 def test_find_nodes(l, quiet=0):
304     import threading, sys
305     from whrandom import randrange
306     flag = threading.Event()
307     
308     n = len(l)
309     
310     a = l[randrange(0,n)]
311     b = l[randrange(0,n)]
312     
313     def callback(nodes, flag=flag):
314         if (len(nodes) >0) and (nodes[0].id == b.node.id):
315             print "test_find_nodes      PASSED"
316         else:
317             print "test_find_nodes      FAILED"
318         flag.set()
319     a.findNode(b.node.id, callback)
320     flag.wait()
321     
322 def test_find_value(l, quiet=0):
323     from whrandom import randrange
324     from sha import sha
325     from hash import newID
326     import time, threading, sys
327     
328     fa = threading.Event()
329     fb = threading.Event()
330     fc = threading.Event()
331     
332     n = len(l)
333     a = l[randrange(0,n)]
334     b = l[randrange(0,n)]
335     c = l[randrange(0,n)]
336     d = l[randrange(0,n)]
337
338     key = newID()
339     value = newID()
340     if not quiet:
341         print "inserting value..."
342         sys.stdout.flush()
343     a.storeValueForKey(key, value)
344     time.sleep(3)
345     print "finding..."
346     sys.stdout.flush()
347     
348     class cb:
349         def __init__(self, flag, value=value):
350             self.flag = flag
351             self.val = value
352             self.found = 0
353         def callback(self, values):
354             try:
355                 if(len(values) == 0):
356                     if not self.found:
357                         print "find                FAILED"
358                     else:
359                         print "find                FOUND"
360                     sys.stdout.flush()
361
362                 else:
363                     if self.val in values:
364                         self.found = 1
365             finally:
366                 self.flag.set()
367
368     b.valueForKey(key, cb(fa).callback)
369     fa.wait()
370     c.valueForKey(key, cb(fb).callback)
371     fb.wait()
372     d.valueForKey(key, cb(fc).callback)    
373     fc.wait()
374     
375 def test_one(port):
376     import thread
377     k = Khashmir('localhost', port)
378     thread.start_new_thread(k.app.run, ())
379     return k
380     
381 if __name__ == "__main__":
382     l = test_build_net()
383     time.sleep(3)
384     print "finding nodes..."
385     for i in range(10):
386         test_find_nodes(l)
387     print "inserting and fetching values..."
388     for i in range(10):
389         test_find_value(l)