]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
48319132b6a848b38fe5407164119b21e37c78dd
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31     def __init__(self, host, port, db='khashmir.db'):
32         self.node = Node().init(newID(), host, port)
33         self.table = KTable(self.node)
34         self.app = Application("xmlrpc")
35         self.app.listenTCP(port, server.Site(self))
36         self.findDB(db)
37         self.last = time.time()
38         KeyExpirer(store=self.store)
39
40     def findDB(self, db):
41         import os
42         try:
43             os.stat(db)
44         except OSError:
45             self.createNewDB(db)
46         else:
47             self.loadDB(db)
48             
49     def loadDB(self, db):
50         try:
51             self.store = sqlite.connect(db=db)
52         except:
53             import traceback
54             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
55             
56     def createNewDB(self, db):
57         self.store = sqlite.connect(db=db)
58         s = """
59             create table kv (key text, value text, time timestamp, primary key (key, value));
60             create index kv_key on kv(key);
61             create index kv_timestamp on kv(time);
62             
63             create table nodes (id text primary key, host text, port number);
64             """
65         c = self.store.cursor()
66         c.execute(s)
67         self.store.commit()
68                 
69     def render(self, request):
70         """
71             Override the built in render so we can have access to the request object!
72             note, crequest is probably only valid on the initial call (not after deferred!)
73         """
74         self.crequest = request
75         return xmlrpc.XMLRPC.render(self, request)
76
77         
78     #######
79     #######  LOCAL INTERFACE    - use these methods!
80     def addContact(self, host, port):
81         """
82          ping this node and add the contact info to the table on pong!
83         """
84         n =Node().init(const.NULL_ID, host, port)  # note, we 
85         self.sendPing(n)
86
87
88     ## this call is async!
89     def findNode(self, id, callback, errback=None):
90         """ returns the contact info for node, or the k closest nodes, from the global table """
91         # get K nodes out of local table/cache, or the node we want
92         nodes = self.table.findNodes(id)
93         d = Deferred()
94         if errback:
95             d.addCallbacks(callback, errback)
96         else:
97             d.addCallback(callback)
98         if len(nodes) == 1 and nodes[0].id == id :
99             d.callback(nodes)
100         else:
101             # create our search state
102             state = FindNode(self, id, d.callback)
103             reactor.callFromThread(state.goWithNodes, nodes)
104     
105     
106     ## also async
107     def valueForKey(self, key, callback):
108         """ returns the values found for key in global table
109             callback will be called with a list of values for each peer that returns unique values
110             final callback will be an empty list - probably should change to 'more coming' arg
111         """
112         nodes = self.table.findNodes(key)
113
114         # get locals
115         l = self.retrieveValues(key)
116         if len(l) > 0:
117             reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
118
119         # create our search state
120         state = GetValue(self, key, callback)
121         reactor.callFromThread(state.goWithNodes, nodes, l)
122         
123
124
125     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126     def storeValueForKey(self, key, value, callback=None):
127         """ stores the value for key in the global table, returns immediately, no status 
128             in this implementation, peers respond but don't indicate status to storing values
129             a key can have many values
130         """
131         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
132             if not callback:
133                 # default callback
134                 def _storedValueHandler(sender):
135                     pass
136                 response=_storedValueHandler
137         
138             for node in nodes[:const.STORE_REDUNDANCY]:
139                 def cb(t, table = table, node=node, resp=response):
140                     self.table.insertNode(node)
141                     response(t)
142                 if node.id != self.node.id:
143                     def default(err, node=node, table=table):
144                         table.nodeFailed(node)
145                     df = node.storeValue(key, value, self.node.senderDict())
146                     df.addCallbacks(cb, lambda x: None)
147         # this call is asynch
148         self.findNode(key, _storeValueForKey)
149         
150         
151     def insertNode(self, n, contacted=1):
152         """
153         insert a node in our local table, pinging oldest contact in bucket, if necessary
154         
155         If all you have is a host/port, then use addContact, which calls this method after
156         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
157         a node into the table without it's peer-ID.  That means of course the node passed into this
158         method needs to be a properly formed Node object with a valid ID.
159         """
160         old = self.table.insertNode(n, contacted=contacted)
161         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162             # the bucket is full, check to see if old node is still around and if so, replace it
163             
164             ## these are the callbacks used when we ping the oldest node in a bucket
165             def _staleNodeHandler(oldnode=old, newnode = n):
166                 """ called if the pinged node never responds """
167                 self.table.replaceStaleNode(old, newnode)
168         
169             def _notStaleNodeHandler(sender, old=old):
170                 """ called when we get a pong from the old node """
171                 sender = Node().initWithDict(sender)
172                 if sender.id == old.id:
173                     self.table.justSeenNode(old)
174
175             df = old.ping(self.node.senderDict())
176             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
177
178
179     def sendPing(self, node):
180         """
181             ping a node
182         """
183         df = node.ping(self.node.senderDict())
184         ## these are the callbacks we use when we issue a PING
185         def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
186             l, sender = args
187             if id != const.NULL_ID and id != sender['id'].decode('base64'):
188                 # whoah, got response from different peer than we were expecting
189                 pass
190             else:
191                 sender['host'] = host
192                 sender['port'] = port
193                 n = Node().initWithDict(sender)
194                 table.insertNode(n)
195             return
196         def _defaultPong(err, node=node, table=self.table):
197                 table.nodeFailed(node)
198
199         df.addCallbacks(_pongHandler,_defaultPong)
200
201
202     def findCloseNodes(self, callback=lambda a: None):
203         """
204             This does a findNode on the ID one away from our own.  
205             This will allow us to populate our table with nodes on our network closest to our own.
206             This is called as soon as we start up with an empty table
207         """
208         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
209         self.findNode(id, callback)
210
211     def refreshTable(self):
212         """
213             
214         """
215         def callback(nodes):
216             pass
217
218         for bucket in self.table.buckets:
219             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
220                 id = newIDInRange(bucket.min, bucket.max)
221                 self.findNode(id, callback)
222         
223  
224     def retrieveValues(self, key):
225         s = "select value from kv where key = '%s';" % key.encode('base64')
226         c = self.store.cursor()
227         c.execute(s)
228         t = c.fetchone()
229         l = []
230         while t:
231             l.append(t['value'])
232             t = c.fetchone()
233         return l
234         
235     #####
236     ##### INCOMING MESSAGE HANDLERS
237     
238     def xmlrpc_ping(self, sender):
239         """
240             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
241             returns sender dict
242         """
243         ip = self.crequest.getClientIP()
244         sender['host'] = ip
245         n = Node().initWithDict(sender)
246         self.insertNode(n, contacted=0)
247         return (), self.node.senderDict()
248                 
249     def xmlrpc_find_node(self, target, sender):
250         nodes = self.table.findNodes(target.decode('base64'))
251         nodes = map(lambda node: node.senderDict(), nodes)
252         ip = self.crequest.getClientIP()
253         sender['host'] = ip
254         n = Node().initWithDict(sender)
255         self.insertNode(n, contacted=0)
256         return nodes, self.node.senderDict()
257             
258     def xmlrpc_store_value(self, key, value, sender):
259         t = "%0.6f" % time.time()
260         s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
261         c = self.store.cursor()
262         try:
263             c.execute(s)
264         except pysqlite_exceptions.IntegrityError, reason:
265             # update last insert time
266             s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
267             c.execute(s)
268         self.store.commit()
269         ip = self.crequest.getClientIP()
270         sender['host'] = ip
271         n = Node().initWithDict(sender)
272         self.insertNode(n, contacted=0)
273         return (), self.node.senderDict()
274         
275     def xmlrpc_find_value(self, key, sender):
276         ip = self.crequest.getClientIP()
277         key = key.decode('base64')
278         sender['host'] = ip
279         n = Node().initWithDict(sender)
280         self.insertNode(n, contacted=0)
281
282         l = self.retrieveValues(key)
283         if len(l) > 0:
284             return {'values' : l}, self.node.senderDict()
285         else:
286             nodes = self.table.findNodes(key)
287             nodes = map(lambda node: node.senderDict(), nodes)
288             return {'nodes' : nodes}, self.node.senderDict()
289
290
291
292
293
294 #------ testing
295
296 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
297     from whrandom import randrange
298     import threading
299     import thread
300     port = 2001
301     l = []
302         
303     if not quiet:
304         print "Building %s peer table." % peers
305         
306     for i in xrange(peers):
307         a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
308         l.append(a)
309     
310
311     thread.start_new_thread(l[0].app.run, ())
312     time.sleep(1)
313     for peer in l[1:]:
314         peer.app.run()
315     time.sleep(10)
316
317     print "adding contacts...."
318
319     for peer in l[1:]:
320         n = l[randrange(0, len(l))].node
321         peer.addContact(host, n.port)
322         n = l[randrange(0, len(l))].node
323         peer.addContact(host, n.port)
324         n = l[randrange(0, len(l))].node
325         peer.addContact(host, n.port)
326         if pause:
327             time.sleep(.33)
328         
329     time.sleep(10)
330     print "finding close nodes...."
331
332     for peer in l:
333         flag = threading.Event()
334         def cb(nodes, f=flag):
335             f.set()
336         peer.findCloseNodes(cb)
337         flag.wait()
338
339 #    for peer in l:
340 #       peer.refreshTable()
341     return l
342         
343 def test_find_nodes(l, quiet=0):
344     import threading, sys
345     from whrandom import randrange
346     flag = threading.Event()
347     
348     n = len(l)
349     
350     a = l[randrange(0,n)]
351     b = l[randrange(0,n)]
352     
353     def callback(nodes, flag=flag, id = b.node.id):
354         if (len(nodes) >0) and (nodes[0].id == id):
355             print "test_find_nodes      PASSED"
356         else:
357             print "test_find_nodes      FAILED"
358         flag.set()
359     a.findNode(b.node.id, callback)
360     flag.wait()
361     
362 def test_find_value(l, quiet=0):
363     from whrandom import randrange
364     from sha import sha
365     from hash import newID
366     import time, threading, sys
367     
368     fa = threading.Event()
369     fb = threading.Event()
370     fc = threading.Event()
371     
372     n = len(l)
373     a = l[randrange(0,n)]
374     b = l[randrange(0,n)]
375     c = l[randrange(0,n)]
376     d = l[randrange(0,n)]
377
378     key = newID()
379     value = newID()
380     if not quiet:
381         print "inserting value..."
382         sys.stdout.flush()
383     a.storeValueForKey(key, value)
384     time.sleep(3)
385     print "finding..."
386     sys.stdout.flush()
387     
388     class cb:
389         def __init__(self, flag, value=value):
390             self.flag = flag
391             self.val = value
392             self.found = 0
393         def callback(self, values):
394             try:
395                 if(len(values) == 0):
396                     if not self.found:
397                         print "find                NOT FOUND"
398                     else:
399                         print "find                FOUND"
400                     sys.stdout.flush()
401
402                 else:
403                     if self.val in values:
404                         self.found = 1
405             finally:
406                 self.flag.set()
407
408     b.valueForKey(key, cb(fa).callback)
409     fa.wait()
410     c.valueForKey(key, cb(fb).callback)
411     fb.wait()
412     d.valueForKey(key, cb(fc).callback)    
413     fc.wait()
414     
415 def test_one(host, port, db='/tmp/test'):
416     import thread
417     k = Khashmir(host, port, db)
418     thread.start_new_thread(k.app.run, ())
419     return k
420     
421 if __name__ == "__main__":
422     import sys
423     n = 8
424     if len(sys.argv) > 1:
425         n = int(sys.argv[1])
426     l = test_build_net(peers=n)
427     time.sleep(3)
428     print "finding nodes..."
429     for i in range(10):
430         test_find_nodes(l)
431     print "inserting and fetching values..."
432     for i in range(10):
433         test_find_value(l)