]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
canonicalize time for accurate sorting
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
2
3 from const import reactor
4 import const
5
6 import time
7
8 from sha import sha
9
10 from ktable import KTable, K
11 from knode import KNode as Node
12
13 from hash import newID, newIDInRange
14
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
21 threadable.init()
22
23 import sqlite  ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
25
26 KhashmirDBExcept = "KhashmirDBExcept"
27
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31     def __init__(self, host, port, db='khashmir.db'):
32         self.node = Node().init(newID(), host, port)
33         self.table = KTable(self.node)
34         self.app = Application("xmlrpc")
35         self.app.listenTCP(port, server.Site(self))
36         self.findDB(db)
37         self.last = time.time()
38         KeyExpirer(store=self.store)
39
40     def findDB(self, db):
41         import os
42         try:
43             os.stat(db)
44         except OSError:
45             self.createNewDB(db)
46         else:
47             self.loadDB(db)
48             
49     def loadDB(self, db):
50         try:
51             self.store = sqlite.connect(db=db)
52         except:
53             import traceback
54             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
55             
56     def createNewDB(self, db):
57         self.store = sqlite.connect(db=db)
58         s = """
59             create table kv (key text, value text, time timestamp, primary key (key, value));
60             create index kv_key on kv(key);
61             create index kv_timestamp on kv(time);
62             
63             create table nodes (id text primary key, host text, port number);
64             """
65         c = self.store.cursor()
66         c.execute(s)
67         self.store.commit()
68                 
69     def render(self, request):
70         """
71             Override the built in render so we can have access to the request object!
72             note, crequest is probably only valid on the initial call (not after deferred!)
73         """
74         self.crequest = request
75         return xmlrpc.XMLRPC.render(self, request)
76
77         
78     #######
79     #######  LOCAL INTERFACE    - use these methods!
80     def addContact(self, host, port):
81         """
82          ping this node and add the contact info to the table on pong!
83         """
84         n =Node().init(const.NULL_ID, host, port)  # note, we 
85         self.sendPing(n)
86
87
88     ## this call is async!
89     def findNode(self, id, callback, errback=None):
90         """ returns the contact info for node, or the k closest nodes, from the global table """
91         # get K nodes out of local table/cache, or the node we want
92         nodes = self.table.findNodes(id)
93         d = Deferred()
94         if errback:
95             d.addCallbacks(callback, errback)
96         else:
97             d.addCallback(callback)
98         if len(nodes) == 1 and nodes[0].id == id :
99             d.callback(nodes)
100         else:
101             # create our search state
102             state = FindNode(self, id, d.callback)
103             reactor.callFromThread(state.goWithNodes, nodes)
104     
105     
106     ## also async
107     def valueForKey(self, key, callback):
108         """ returns the values found for key in global table
109             callback will be called with a list of values for each peer that returns unique values
110             final callback will be an empty list - probably should change to 'more coming' arg
111         """
112         nodes = self.table.findNodes(key)
113
114         # get locals
115         l = self.retrieveValues(key)
116         if len(l) > 0:
117             reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
118
119         # create our search state
120         state = GetValue(self, key, callback)
121         reactor.callFromThread(state.goWithNodes, nodes, l)
122         
123
124
125     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126     def storeValueForKey(self, key, value, callback=None):
127         """ stores the value for key in the global table, returns immediately, no status 
128             in this implementation, peers respond but don't indicate status to storing values
129             a key can have many values
130         """
131         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
132             if not callback:
133                 # default callback
134                 def _storedValueHandler(sender):
135                     pass
136                 response=_storedValueHandler
137         
138             for node in nodes[:const.STORE_REDUNDANCY]:
139                 def cb(t, table = table, node=node, resp=response):
140                     self.table.insertNode(node)
141                     response(t)
142                 if node.id != self.node.id:
143                     def default(err, node=node, table=table):
144                         table.nodeFailed(node)
145                     df = node.storeValue(key, value, self.node.senderDict())
146                     df.addCallbacks(cb, lambda x: None)
147         # this call is asynch
148         self.findNode(key, _storeValueForKey)
149         
150         
151     def insertNode(self, n, contacted=1):
152         """
153         insert a node in our local table, pinging oldest contact in bucket, if necessary
154         
155         If all you have is a host/port, then use addContact, which calls this method after
156         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
157         a node into the table without it's peer-ID.  That means of course the node passed into this
158         method needs to be a properly formed Node object with a valid ID.
159         """
160         old = self.table.insertNode(n, contacted=contacted)
161         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162             # the bucket is full, check to see if old node is still around and if so, replace it
163             
164             ## these are the callbacks used when we ping the oldest node in a bucket
165             def _staleNodeHandler(oldnode=old, newnode = n):
166                 """ called if the pinged node never responds """
167                 self.table.replaceStaleNode(old, newnode)
168         
169             def _notStaleNodeHandler(sender, old=old):
170                 """ called when we get a pong from the old node """
171                 sender = Node().initWithDict(sender)
172                 if sender.id == old.id:
173                     self.table.justSeenNode(old)
174
175             df = old.ping(self.node.senderDict())
176             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
177
178
179     def sendPing(self, node):
180         """
181             ping a node
182         """
183         df = node.ping(self.node.senderDict())
184         ## these are the callbacks we use when we issue a PING
185         def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
186             l, sender = args
187             if id != const.NULL_ID and id != sender['id'].decode('base64'):
188                 # whoah, got response from different peer than we were expecting
189                 pass
190             else:
191                 sender['host'] = host
192                 sender['port'] = port
193                 n = Node().initWithDict(sender)
194                 table.insertNode(n)
195             return
196         def _defaultPong(err, node=node, table=self.table):
197                 table.nodeFailed(node)
198
199         df.addCallbacks(_pongHandler,_defaultPong)
200
201
202     def findCloseNodes(self):
203         """
204             This does a findNode on the ID one away from our own.  
205             This will allow us to populate our table with nodes on our network closest to our own.
206             This is called as soon as we start up with an empty table
207         """
208         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
209         def callback(nodes):
210             pass
211         self.findNode(id, callback)
212
213     def refreshTable(self):
214         """
215             
216         """
217         def callback(nodes):
218             pass
219
220         for bucket in self.table.buckets:
221             if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
222                 id = newIDInRange(bucket.min, bucket.max)
223                 self.findNode(id, callback)
224         
225  
226     def retrieveValues(self, key):
227         s = "select value from kv where key = '%s';" % key.encode('base64')
228         c = self.store.cursor()
229         c.execute(s)
230         t = c.fetchone()
231         l = []
232         while t:
233             l.append(t['value'])
234             t = c.fetchone()
235         return l
236         
237     #####
238     ##### INCOMING MESSAGE HANDLERS
239     
240     def xmlrpc_ping(self, sender):
241         """
242             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
243             returns sender dict
244         """
245         ip = self.crequest.getClientIP()
246         sender['host'] = ip
247         n = Node().initWithDict(sender)
248         self.insertNode(n, contacted=0)
249         return (), self.node.senderDict()
250                 
251     def xmlrpc_find_node(self, target, sender):
252         nodes = self.table.findNodes(target.decode('base64'))
253         nodes = map(lambda node: node.senderDict(), nodes)
254         ip = self.crequest.getClientIP()
255         sender['host'] = ip
256         n = Node().initWithDict(sender)
257         self.insertNode(n, contacted=0)
258         return nodes, self.node.senderDict()
259             
260     def xmlrpc_store_value(self, key, value, sender):
261         t = "%0.6f" % time.time()
262         s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
263         c = self.store.cursor()
264         try:
265             c.execute(s)
266         except pysqlite_exceptions.IntegrityError, reason:
267             if reason == 'constraint failed':
268                 # update last insert time
269                 s = "update kv set time = '%s' where key = '%s' and value = %s; commit;" % (t, key, value)
270                 c.execute(s)
271             else:
272                 raise pysqlite_exceptions.IntegrityError, reason
273         self.store.commit()
274         ip = self.crequest.getClientIP()
275         sender['host'] = ip
276         n = Node().initWithDict(sender)
277         self.insertNode(n, contacted=0)
278         return (), self.node.senderDict()
279         
280     def xmlrpc_find_value(self, key, sender):
281         ip = self.crequest.getClientIP()
282         key = key.decode('base64')
283         sender['host'] = ip
284         n = Node().initWithDict(sender)
285         self.insertNode(n, contacted=0)
286
287         l = self.retrieveValues(key)
288         if len(l) > 0:
289             return {'values' : l}, self.node.senderDict()
290         else:
291             nodes = self.table.findNodes(key)
292             nodes = map(lambda node: node.senderDict(), nodes)
293             return {'nodes' : nodes}, self.node.senderDict()
294
295
296
297
298
299 #------ testing
300
301 def test_build_net(quiet=0, peers=24, host='localhost',  pause=1):
302     from whrandom import randrange
303     import thread
304     port = 2001
305     l = []
306         
307     if not quiet:
308         print "Building %s peer table." % peers
309         
310     for i in xrange(peers):
311         a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
312         l.append(a)
313     
314
315     thread.start_new_thread(l[0].app.run, ())
316     time.sleep(1)
317     for peer in l[1:]:
318         peer.app.run()
319         #time.sleep(.25)
320
321     print "adding contacts...."
322
323     for peer in l[1:]:
324         n = l[randrange(0, len(l))].node
325         peer.addContact(host, n.port)
326         n = l[randrange(0, len(l))].node
327         peer.addContact(host, n.port)
328         n = l[randrange(0, len(l))].node
329         peer.addContact(host, n.port)
330         if pause:
331             time.sleep(.5)
332             
333     time.sleep(1)
334     print "finding close nodes...."
335
336     for peer in l:
337         peer.findCloseNodes()
338         if pause:
339             time.sleep(.5)
340     if pause:
341             time.sleep(1)
342 #    for peer in l:
343 #       peer.refreshTable()
344     return l
345         
346 def test_find_nodes(l, quiet=0):
347     import threading, sys
348     from whrandom import randrange
349     flag = threading.Event()
350     
351     n = len(l)
352     
353     a = l[randrange(0,n)]
354     b = l[randrange(0,n)]
355     
356     def callback(nodes, flag=flag, id = b.node.id):
357         if (len(nodes) >0) and (nodes[0].id == id):
358             print "test_find_nodes      PASSED"
359         else:
360             print "test_find_nodes      FAILED"
361         flag.set()
362     a.findNode(b.node.id, callback)
363     flag.wait()
364     
365 def test_find_value(l, quiet=0):
366     from whrandom import randrange
367     from sha import sha
368     from hash import newID
369     import time, threading, sys
370     
371     fa = threading.Event()
372     fb = threading.Event()
373     fc = threading.Event()
374     
375     n = len(l)
376     a = l[randrange(0,n)]
377     b = l[randrange(0,n)]
378     c = l[randrange(0,n)]
379     d = l[randrange(0,n)]
380
381     key = newID()
382     value = newID()
383     if not quiet:
384         print "inserting value..."
385         sys.stdout.flush()
386     a.storeValueForKey(key, value)
387     time.sleep(3)
388     print "finding..."
389     sys.stdout.flush()
390     
391     class cb:
392         def __init__(self, flag, value=value):
393             self.flag = flag
394             self.val = value
395             self.found = 0
396         def callback(self, values):
397             try:
398                 if(len(values) == 0):
399                     if not self.found:
400                         print "find                NOT FOUND"
401                     else:
402                         print "find                FOUND"
403                     sys.stdout.flush()
404
405                 else:
406                     if self.val in values:
407                         self.found = 1
408             finally:
409                 self.flag.set()
410
411     b.valueForKey(key, cb(fa).callback)
412     fa.wait()
413     c.valueForKey(key, cb(fb).callback)
414     fb.wait()
415     d.valueForKey(key, cb(fc).callback)    
416     fc.wait()
417     
418 def test_one(host, port, db='/tmp/test'):
419     import thread
420     k = Khashmir(host, port, db)
421     thread.start_new_thread(k.app.run, ())
422     return k
423     
424 if __name__ == "__main__":
425     import sys
426     n = 8
427     if len(sys.argv) > 1:
428         n = int(sys.argv[1])
429     l = test_build_net(peers=n)
430     time.sleep(3)
431     print "finding nodes..."
432     for i in range(10):
433         test_find_nodes(l)
434     print "inserting and fetching values..."
435     for i in range(10):
436         test_find_value(l)