]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
test harness update
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import time
5
6 from ktable import KTable, K
7 from knode import KNode as Node
8
9 from hash import newID
10
11 from actions import FindNode, GetValue
12 from twisted.web import xmlrpc
13 from twisted.internet.defer import Deferred
14 from twisted.python import threadable
15 from twisted.internet.app import Application
16 from twisted.web import server
17 threadable.init()
18
19 from bsddb3 import db ## find this at http://pybsddb.sf.net/
20 from bsddb3._db import DBNotFoundError
21
22 # don't ping unless it's been at least this many seconds since we've heard from a peer
23 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
24
25
26
27 # this is the main class!
28 class Khashmir(xmlrpc.XMLRPC):
29     __slots__ = ['listener', 'node', 'table', 'store', 'app']
30     def __init__(self, host, port):
31         self.node = Node(newID(), host, port)
32         self.table = KTable(self.node)
33         self.app = Application("xmlrpc")
34         self.app.listenTCP(port, server.Site(self))
35         self.store = db.DB()
36         self.store.open(None, None, db.DB_BTREE)
37         
38
39     def render(self, request):
40         """
41             Override the built in render so we can have access to the request object!
42             note, crequest is probably only valid on the initial call (not after deferred!)
43         """
44         self.crequest = request
45         return xmlrpc.XMLRPC.render(self, request)
46
47         
48     #######
49     #######  LOCAL INTERFACE    - use these methods!
50     def addContact(self, host, port):
51         """
52          ping this node and add the contact info to the table on pong!
53         """
54         n =Node(" "*20, host, port)  # note, we 
55         self.sendPing(n)
56
57
58     ## this call is async!
59     def findNode(self, id, callback, errback=None):
60         """ returns the contact info for node, or the k closest nodes, from the global table """
61         # get K nodes out of local table/cache, or the node we want
62         nodes = self.table.findNodes(id)
63         d = Deferred()
64         d.addCallbacks(callback, errback)
65         if len(nodes) == 1 and nodes[0].id == id :
66             d.callback(nodes)
67         else:
68             # create our search state
69             state = FindNode(self, id, d.callback)
70             reactor.callFromThread(state.goWithNodes, nodes)
71     
72     
73     ## also async
74     def valueForKey(self, key, callback):
75         """ returns the values found for key in global table """
76         nodes = self.table.findNodes(key)
77         # create our search state
78         state = GetValue(self, key, callback)
79         reactor.callFromThread(state.goWithNodes, nodes)
80
81
82     ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
83     def storeValueForKey(self, key, value):
84         """ stores the value for key in the global table, returns immediately, no status 
85             in this implementation, peers respond but don't indicate status to storing values
86             values are stored in peers on a first-come first-served basis
87             this will probably change so more than one value can be stored under a key
88         """
89         def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
90             for node in nodes:
91                 if node.id != self.node.id:
92                     df = node.storeValue(key, value, self.node.senderDict())
93                     df.addCallbacks(response, default)
94         # this call is asynch
95         self.findNode(key, _storeValueForKey)
96         
97         
98     def insertNode(self, n):
99         """
100         insert a node in our local table, pinging oldest contact in bucket, if necessary
101         
102         If all you have is a host/port, then use addContact, which calls this method after
103         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
104         a node into the table without it's peer-ID.  That means of course the node passed into this
105         method needs to be a properly formed Node object with a valid ID.
106         """
107         old = self.table.insertNode(n)
108         if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
109             # the bucket is full, check to see if old node is still around and if so, replace it
110             
111             ## these are the callbacks used when we ping the oldest node in a bucket
112             def _staleNodeHandler(oldnode=old, newnode = n):
113                 """ called if the pinged node never responds """
114                 self.table.replaceStaleNode(old, newnode)
115         
116             def _notStaleNodeHandler(sender, old=old):
117                 """ called when we get a ping from the remote node """
118                 if sender['id'] == old.id:
119                     self.table.insertNode(old)
120
121             df = old.ping()
122             df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
123
124
125     def sendPing(self, node):
126         """
127             ping a node
128         """
129         df = node.ping(self.node.senderDict())
130         ## these are the callbacks we use when we issue a PING
131         def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
132             if id != 20 * ' ' and id != sender['id']:
133                 # whoah, got response from different peer than we were expecting
134                 pass
135             else:
136                 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
137                 n = Node(sender['id'], host, port)
138                 table.insertNode(n)
139             return
140         def _defaultPong(err):
141             # this should probably increment a failed message counter and dump the node if it gets over a threshold
142             return      
143
144         df.addCallbacks(_pongHandler,_defaultPong)
145
146
147     def findCloseNodes(self):
148         """
149             This does a findNode on the ID one away from our own.  
150             This will allow us to populate our table with nodes on our network closest to our own.
151             This is called as soon as we start up with an empty table
152         """
153         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
154         def callback(nodes):
155             pass
156         self.findNode(id, callback)
157
158     def refreshTable(self):
159         """
160             
161         """
162         def callback(nodes):
163             pass
164
165         for bucket in self.table.buckets:
166             if time.time() - bucket.lastAccessed >= 60 * 60:
167                 id = randRange(bucket.min, bucket.max)
168                 self.findNode(id, callback)
169         
170  
171     #####
172     ##### INCOMING MESSAGE HANDLERS
173     
174     def xmlrpc_ping(self, sender):
175         """
176             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
177             returns sender dict
178         """
179         ip = self.crequest.getClientIP()
180         n = Node(sender['id'], ip, sender['port'])
181         self.insertNode(n)
182         return self.node.senderDict()
183                 
184     def xmlrpc_find_node(self, target, sender):
185         nodes = self.table.findNodes(target)
186         nodes = map(lambda node: node.senderDict(), nodes)
187         ip = self.crequest.getClientIP()
188         n = Node(sender['id'], ip, sender['port'])
189         self.insertNode(n)
190         return nodes, self.node.senderDict()
191     
192     def xmlrpc_store_value(self, key, value, sender):
193         if not self.store.has_key(key):
194             self.store.put(key, value)
195         ip = self.crequest.getClientIP()
196         n = Node(sender['id'], ip, sender['port'])
197         self.insertNode(n)
198         return self.node.senderDict()
199         
200     def xmlrpc_find_value(self, key, sender):
201         ip = self.crequest.getClientIP()
202         n = Node(sender['id'], ip, sender['port'])
203         self.insertNode(n)
204         if self.store.has_key(key):
205             return {'values' : self.store[key]}, self.node.senderDict()
206         else:
207             nodes = self.table.findNodes(key)
208             nodes = map(lambda node: node.senderDict(), nodes)
209             return {'nodes' : nodes}, self.node.senderDict()
210
211     ###
212     ### message response callbacks
213     # called when we get a response to store value
214     def _storedValueHandler(self, sender):
215         pass
216
217
218
219
220
221
222 #------ testing
223
224 def test_build_net(quiet=0, peers=256, pause=1):
225     from whrandom import randrange
226     import thread
227     port = 2001
228     l = []
229         
230     if not quiet:
231         print "Building %s peer table." % peers
232         
233     for i in xrange(peers):
234         a = Khashmir('localhost', port + i)
235         l.append(a)
236     
237
238     thread.start_new_thread(l[0].app.run, ())
239     time.sleep(1)
240     for peer in l[1:]:
241         peer.app.run()
242         #time.sleep(.25)
243
244     print "adding contacts...."
245
246     for peer in l[1:]:
247         n = l[randrange(0, len(l))].node
248         peer.addContact(n.host, n.port)
249         n = l[randrange(0, len(l))].node
250         peer.addContact(n.host, n.port)
251         n = l[randrange(0, len(l))].node
252         peer.addContact(n.host, n.port)
253         if pause:
254             time.sleep(.30)
255             
256     time.sleep(5)
257     print "finding close nodes...."
258
259     for peer in l:
260         peer.findCloseNodes()
261         if pause:
262             time.sleep(1)
263 #    for peer in l:
264 #       peer.refreshTable()
265     return l
266         
267 def test_find_nodes(l, quiet=0):
268     import threading, sys
269     from whrandom import randrange
270     flag = threading.Event()
271     
272     n = len(l)
273     
274     a = l[randrange(0,n)]
275     b = l[randrange(0,n)]
276     
277     def callback(nodes, l=l, flag=flag):
278         if (len(nodes) >0) and (nodes[0].id == b.node.id):
279             print "test_find_nodes      PASSED"
280         else:
281             print "test_find_nodes      FAILED"
282         flag.set()
283     a.findNode(b.node.id, callback)
284     flag.wait()
285     
286 def test_find_value(l, quiet=0):
287     from whrandom import randrange
288     from sha import sha
289     import time, threading, sys
290     
291     fa = threading.Event()
292     fb = threading.Event()
293     fc = threading.Event()
294     
295     n = len(l)
296     a = l[randrange(0,n)]
297     b = l[randrange(0,n)]
298     c = l[randrange(0,n)]
299     d = l[randrange(0,n)]
300
301     key = sha(`randrange(0,100000)`).digest()
302     value = sha(`randrange(0,100000)`).digest()
303     if not quiet:
304         print "inserting value...",
305         sys.stdout.flush()
306     a.storeValueForKey(key, value)
307     time.sleep(3)
308     print "finding..."
309     
310     def mc(flag, value=value):
311         def callback(values, f=flag, val=value):
312             try:
313                 if(len(values) == 0):
314                     print "find                FAILED"
315                 else:
316                     if values != val:
317                         print "find                FAILED"
318                     else:
319                         print "find                FOUND"
320             finally:
321                 f.set()
322         return callback
323     b.valueForKey(key, mc(fa))
324     fa.wait()
325     c.valueForKey(key, mc(fb))
326     fb.wait()
327     d.valueForKey(key, mc(fc))    
328     fc.wait()
329     
330 if __name__ == "__main__":
331     l = test_build_net()
332     time.sleep(3)
333     print "finding nodes..."
334     test_find_nodes(l)
335     test_find_nodes(l)
336     test_find_nodes(l)
337     print "inserting and fetching values..."
338     test_find_value(l)
339     test_find_value(l)
340     test_find_value(l)
341     test_find_value(l)
342     test_find_value(l)
343     test_find_value(l)