]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
moved actions into their own file
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5
6 from ktable import KTable, K
7 from knode import KNode as Node
8
9 from hash import newID
10
11 from actions import FindNode, GetValue
12 from twisted.web import xmlrpc
13 from twisted.internet.defer import Deferred
14 from twisted.python import threadable
15 threadable.init()
16
17 from bsddb3 import db ## find this at http://pybsddb.sf.net/
18 from bsddb3._db import DBNotFoundError
19
20 # don't ping unless it's been at least this many seconds since we've heard from a peer
21 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
22
23
24
25 # this is the main class!
26 class Khashmir(xmlrpc.XMLRPC):
27     __slots__ = ['listener', 'node', 'table', 'store', 'app']
28     def __init__(self, host, port):
29         self.node = Node(newID(), host, port)
30         self.table = KTable(self.node)
31         from twisted.internet.app import Application
32         from twisted.web import server
33         self.app = Application("xmlrpc")
34         self.app.listenTCP(port, server.Site(self))
35         self.store = db.DB()
36         self.store.open(None, None, db.DB_BTREE)
37         
38
39     def render(self, request):
40         """
41             Override the built in render so we can have access to the request object!
42             note, crequest is probably only valid on the initial call (not after deferred!)
43         """
44         self.crequest = request
45         return xmlrpc.XMLRPC.render(self, request)
46
47         
48     #######
49     #######  LOCAL INTERFACE    - use these methods!
50     def addContact(self, host, port):
51         """
52          ping this node and add the contact info to the table on pong!
53         """
54         n =Node(" "*20, host, port)  # note, we 
55         self.sendPing(n)
56
57
58     ## this call is async!
59     def findNode(self, id, callback, errback=None):
60         """ returns the contact info for node, or the k closest nodes, from the global table """
61         # get K nodes out of local table/cache, or the node we want
62         nodes = self.table.findNodes(id)
63         d = Deferred()
64         d.addCallbacks(callback, errback)
65         if len(nodes) == 1 and nodes[0].id == id :
66             d.callback(nodes)
67         else:
68             # create our search state
69             state = FindNode(self, id, d.callback)
70             reactor.callFromThread(state.goWithNodes, nodes)
71     
72     
73     ## also async
74     def valueForKey(self, key, callback):
75         """ returns the values found for key in global table """
76         nodes = self.table.findNodes(key)
77         # create our search state
78         state = GetValue(self, key, callback)
79         reactor.callFromThread(state.goWithNodes, nodes)
80
81
82     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
83     def storeValueForKey(self, key, value):
84         """ stores the value for key in the global table, returns immediately, no status 
85             in this implementation, peers respond but don't indicate status to storing values
86             values are stored in peers on a first-come first-served basis
87             this will probably change so more than one value can be stored under a key
88         """
89         def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
90             for node in nodes:
91                 if node.id != self.node.id:
92                     df = node.storeValue(key, value, self.node.senderDict())
93                     df.addCallbacks(response, default)
94         # this call is asynch
95         self.findNode(key, _storeValueForKey)
96         
97         
98     def insertNode(self, n):
99         """
100         insert a node in our local table, pinging oldest contact in bucket, if necessary
101         
102         If all you have is a host/port, then use addContact, which calls this method after
103         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
104         a node into the table without it's peer-ID.  That means of course the node passed into this
105         method needs to be a properly formed Node object with a valid ID.
106         """
107         old = self.table.insertNode(n)
108         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
109             # the bucket is full, check to see if old node is still around and if so, replace it
110             
111             ## these are the callbacks used when we ping the oldest node in a bucket
112             def _staleNodeHandler(oldnode=old, newnode = n):
113                 """ called if the pinged node never responds """
114                 self.table.replaceStaleNode(old, newnode)
115         
116             def _notStaleNodeHandler(sender, old=old):
117                 """ called when we get a ping from the remote node """
118                 if sender['id'] == old.id:
119                     self.table.insertNode(old)
120
121             df = old.ping()
122             df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
123
124
125     def sendPing(self, node):
126         """
127             ping a node
128         """
129         df = node.ping(self.node.senderDict())
130         ## these are the callbacks we use when we issue a PING
131         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
132             if id != 20 * ' ' and id != sender['id']:
133                 # whoah, got response from different peer than we were expecting
134                 pass
135             else:
136                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
137                 n = Node(sender['id'], host, port)
138                 table.insertNode(n)
139             return
140         def _defaultPong(err):
141             # this should probably increment a failed message counter and dump the node if it gets over a threshold
142             return      
143
144         df.addCallbacks(_pongHandler,_defaultPong)
145
146
147     def findCloseNodes(self):
148         """
149             This does a findNode on the ID one away from our own.  
150             This will allow us to populate our table with nodes on our network closest to our own.
151             This is called as soon as we start up with an empty table
152         """
153         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
154         def callback(nodes):
155             pass
156         self.findNode(id, callback)
157
158     def refreshTable(self):
159         """
160             
161         """
162         def callback(nodes):
163             pass
164
165         for bucket in self.table.buckets:
166             if time.time() - bucket.lastAccessed >= 60 * 60:
167                 id = randRange(bucket.min, bucket.max)
168                 self.findNode(id, callback)
169         
170  
171     #####
172     ##### INCOMING MESSAGE HANDLERS
173     
174     def xmlrpc_ping(self, sender):
175         """
176             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
177             returns sender dict
178         """
179         ip = self.crequest.getClientIP()
180         n = Node(sender['id'], ip, sender['port'])
181         self.insertNode(n)
182         return self.node.senderDict()
183                 
184     def xmlrpc_find_node(self, target, sender):
185         nodes = self.table.findNodes(target)
186         nodes = map(lambda node: node.senderDict(), nodes)
187         ip = self.crequest.getClientIP()
188         n = Node(sender['id'], ip, sender['port'])
189         self.insertNode(n)
190         return nodes, self.node.senderDict()
191     
192     def xmlrpc_store_value(self, key, value, sender):
193         if not self.store.has_key(key):
194             self.store.put(key, value)
195         ip = self.crequest.getClientIP()
196         n = Node(sender['id'], ip, sender['port'])
197         self.insertNode(n)
198         return self.node.senderDict()
199         
200     def xmlrpc_find_value(self, key, sender):
201         ip = self.crequest.getClientIP()
202         n = Node(sender['id'], ip, sender['port'])
203         self.insertNode(n)
204         if self.store.has_key(key):
205             return {'values' : self.store[key]}, self.node.senderDict()
206         else:
207             nodes = self.table.findNodes(msg['key'])
208             nodes = map(lambda node: node.senderDict(), nodes)
209             return {'nodes' : nodes}, self.node.senderDict()
210
211     ###
212     ### message response callbacks
213     # called when we get a response to store value
214     def _storedValueHandler(self, sender):
215         pass
216
217
218 #------
219
220 def test_build_net(quiet=0):
221     from whrandom import randrange
222     import thread
223     port = 2001
224     l = []
225     peers = 16
226     
227     if not quiet:
228         print "Building %s peer table." % peers
229         
230     for i in xrange(peers):
231         a = Khashmir('localhost', port + i)
232         l.append(a)
233     
234     def run(l=l):
235         while(1):
236                 events = 0
237                 for peer in l:
238                         events = events + peer.dispatcher.runOnce()
239                 if events == 0:
240                         time.sleep(.25)
241
242     thread.start_new_thread(l[0].app.run, ())
243     for peer in l[1:]:
244         peer.app.run()
245         
246     for peer in l[1:]:
247         n = l[randrange(0, len(l))].node
248         peer.addContact(n.host, n.port)
249         n = l[randrange(0, len(l))].node
250         peer.addContact(n.host, n.port)
251         n = l[randrange(0, len(l))].node
252         peer.addContact(n.host, n.port)
253         
254     time.sleep(5)
255
256     for peer in l:
257         peer.findCloseNodes()
258     time.sleep(5)
259     for peer in l:
260         peer.refreshTable()
261     return l
262         
263 def test_find_nodes(l, quiet=0):
264     import threading, sys
265     from whrandom import randrange
266     flag = threading.Event()
267     
268     n = len(l)
269     
270     a = l[randrange(0,n)]
271     b = l[randrange(0,n)]
272     
273     def callback(nodes, l=l, flag=flag):
274         if (len(nodes) >0) and (nodes[0].id == b.node.id):
275             print "test_find_nodes      PASSED"
276         else:
277             print "test_find_nodes      FAILED"
278         flag.set()
279     a.findNode(b.node.id, callback)
280     flag.wait()
281     
282 def test_find_value(l, quiet=0):
283     from whrandom import randrange
284     from sha import sha
285     import time, threading, sys
286     
287     fa = threading.Event()
288     fb = threading.Event()
289     fc = threading.Event()
290     
291     n = len(l)
292     a = l[randrange(0,n)]
293     b = l[randrange(0,n)]
294     c = l[randrange(0,n)]
295     d = l[randrange(0,n)]
296
297     key = sha(`randrange(0,100000)`).digest()
298     value = sha(`randrange(0,100000)`).digest()
299     if not quiet:
300         print "inserting value...",
301         sys.stdout.flush()
302     a.storeValueForKey(key, value)
303     time.sleep(3)
304     print "finding..."
305     
306     def mc(flag, value=value):
307         def callback(values, f=flag, val=value):
308             try:
309                 if(len(values) == 0):
310                     print "find                FAILED"
311                 else:
312                     if values[0]['value'] != val:
313                         print "find                FAILED"
314                     else:
315                         print "find                FOUND"
316             finally:
317                 f.set()
318         return callback
319     b.valueForKey(key, mc(fa))
320     c.valueForKey(key, mc(fb))
321     d.valueForKey(key, mc(fc))
322     
323     fa.wait()
324     fb.wait()
325     fc.wait()
326     
327 if __name__ == "__main__":
328     l = test_build_net()
329     time.sleep(3)
330     print "finding nodes..."
331     test_find_nodes(l)
332     test_find_nodes(l)
333     test_find_nodes(l)
334     print "inserting and fetching values..."
335     test_find_value(l)
336     test_find_value(l)
337     test_find_value(l)
338     test_find_value(l)
339     test_find_value(l)
340     test_find_value(l)