f585e16ad07fc1b717496f543cdd4e4b955a9f94
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import *
13
14 from khash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.application import service, internet
22 from twisted.web import server
23 import sys
24
25 from random import randrange
26
27 import sqlite  ## find this at http://pysqlite.sourceforge.net/
28
29 class KhashmirDBExcept(Exception):
30     pass
31
32 # this is the base class, has base functionality and find node, no key-value mappings
33 class KhashmirBase(protocol.Factory):
34     _Node = KNodeBase
35     def __init__(self, host, port, db='khashmir.db'):
36         self.setup(host, port, db)
37         
38     def setup(self, host, port, db='khashmir.db'):
39         self._findDB(db)
40         self.port = port
41         self.node = self._loadSelfNode(host, port)
42         self.table = KTable(self.node)
43         #self.app = service.Application("krpc")
44         self.udp = krpc.hostbroker(self)
45         self.udp.protocol = krpc.KRPC
46         self.listenport = reactor.listenUDP(port, self.udp)
47         self.last = time.time()
48         self._loadRoutingTable()
49         KeyExpirer(store=self.store)
50         self.refreshTable(force=1)
51         reactor.callLater(60, self.checkpoint, (1,))
52
53     def Node(self):
54         n = self._Node()
55         n.table = self.table
56         return n
57     
58     def __del__(self):
59         self.listenport.stopListening()
60         
61     def _loadSelfNode(self, host, port):
62         c = self.store.cursor()
63         c.execute('select id from self where num = 0;')
64         if c.rowcount > 0:
65             id = c.fetchone()[0]
66         else:
67             id = newID()
68         return self._Node().init(id, host, port)
69         
70     def _saveSelfNode(self):
71         c = self.store.cursor()
72         c.execute('delete from self where num = 0;')
73         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
74         self.store.commit()
75         
76     def checkpoint(self, auto=0):
77         self._saveSelfNode()
78         self._dumpRoutingTable()
79         self.refreshTable()
80         if auto:
81             reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
82         
83     def _findDB(self, db):
84         import os
85         try:
86             os.stat(db)
87         except OSError:
88             self._createNewDB(db)
89         else:
90             self._loadDB(db)
91         
92     def _loadDB(self, db):
93         try:
94             self.store = sqlite.connect(db=db)
95             #self.store.autocommit = 0
96         except:
97             import traceback
98             raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
99         
100     def _createNewDB(self, db):
101         self.store = sqlite.connect(db=db)
102         s = """
103             create table kv (key binary, value binary, time timestamp, primary key (key, value));
104             create index kv_key on kv(key);
105             create index kv_timestamp on kv(time);
106             
107             create table nodes (id binary primary key, host text, port number);
108             
109             create table self (num number primary key, id binary);
110             """
111         c = self.store.cursor()
112         c.execute(s)
113         self.store.commit()
114
115     def _dumpRoutingTable(self):
116         """
117             save routing table nodes to the database
118         """
119         c = self.store.cursor()
120         c.execute("delete from nodes where id not NULL;")
121         for bucket in self.table.buckets:
122             for node in bucket.l:
123                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
124         self.store.commit()
125         
126     def _loadRoutingTable(self):
127         """
128             load routing table nodes from database
129             it's usually a good idea to call refreshTable(force=1) after loading the table
130         """
131         c = self.store.cursor()
132         c.execute("select * from nodes;")
133         for rec in c.fetchall():
134             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
135             n.conn = self.udp.connectionForAddr((n.host, n.port))
136             self.table.insertNode(n, contacted=0)
137             
138
139     #######
140     #######  LOCAL INTERFACE    - use these methods!
141     def addContact(self, host, port, callback=None):
142         """
143             ping this node and add the contact info to the table on pong!
144         """
145         n =self.Node().init(const.NULL_ID, host, port) 
146         n.conn = self.udp.connectionForAddr((n.host, n.port))
147         self.sendPing(n, callback=callback)
148
149     ## this call is async!
150     def findNode(self, id, callback, errback=None):
151         """ returns the contact info for node, or the k closest nodes, from the global table """
152         # get K nodes out of local table/cache, or the node we want
153         nodes = self.table.findNodes(id)
154         d = Deferred()
155         if errback:
156             d.addCallbacks(callback, errback)
157         else:
158             d.addCallback(callback)
159         if len(nodes) == 1 and nodes[0].id == id :
160             d.callback(nodes)
161         else:
162             # create our search state
163             state = FindNode(self, id, d.callback)
164             reactor.callLater(0, state.goWithNodes, nodes)
165     
166     def insertNode(self, n, contacted=1):
167         """
168         insert a node in our local table, pinging oldest contact in bucket, if necessary
169         
170         If all you have is a host/port, then use addContact, which calls this method after
171         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
172         a node into the table without it's peer-ID.  That means of course the node passed into this
173         method needs to be a properly formed Node object with a valid ID.
174         """
175         old = self.table.insertNode(n, contacted=contacted)
176         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
177             # the bucket is full, check to see if old node is still around and if so, replace it
178             
179             ## these are the callbacks used when we ping the oldest node in a bucket
180             def _staleNodeHandler(oldnode=old, newnode = n):
181                 """ called if the pinged node never responds """
182                 self.table.replaceStaleNode(old, newnode)
183             
184             def _notStaleNodeHandler(dict, old=old):
185                 """ called when we get a pong from the old node """
186                 dict = dict['rsp']
187                 if dict['id'] == old.id:
188                     self.table.justSeenNode(old.id)
189             
190             df = old.ping(self.node.id)
191             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
192
193     def sendPing(self, node, callback=None):
194         """
195             ping a node
196         """
197         df = node.ping(self.node.id)
198         ## these are the callbacks we use when we issue a PING
199         def _pongHandler(dict, node=node, table=self.table, callback=callback):
200             _krpc_sender = dict['_krpc_sender']
201             dict = dict['rsp']
202             sender = {'id' : dict['id']}
203             sender['host'] = _krpc_sender[0]
204             sender['port'] = _krpc_sender[1]
205             n = self.Node().initWithDict(sender)
206             n.conn = self.udp.connectionForAddr((n.host, n.port))
207             table.insertNode(n)
208             if callback:
209                 callback()
210         def _defaultPong(err, node=node, table=self.table, callback=callback):
211             table.nodeFailed(node)
212             if callback:
213                 callback()
214         
215         df.addCallbacks(_pongHandler,_defaultPong)
216
217     def findCloseNodes(self, callback=lambda a: None):
218         """
219             This does a findNode on the ID one away from our own.  
220             This will allow us to populate our table with nodes on our network closest to our own.
221             This is called as soon as we start up with an empty table
222         """
223         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
224         self.findNode(id, callback)
225
226     def refreshTable(self, force=0):
227         """
228             force=1 will refresh table regardless of last bucket access time
229         """
230         def callback(nodes):
231             pass
232     
233         for bucket in self.table.buckets:
234             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
235                 id = newIDInRange(bucket.min, bucket.max)
236                 self.findNode(id, callback)
237
238     def stats(self):
239         """
240         Returns (num_contacts, num_nodes)
241         num_contacts: number contacts in our routing table
242         num_nodes: number of nodes estimated in the entire dht
243         """
244         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
245         num_nodes = const.K * (2**(len(self.table.buckets) - 1))
246         return (num_contacts, num_nodes)
247
248     def krpc_ping(self, id, _krpc_sender):
249         sender = {'id' : id}
250         sender['host'] = _krpc_sender[0]
251         sender['port'] = _krpc_sender[1]        
252         n = self.Node().initWithDict(sender)
253         n.conn = self.udp.connectionForAddr((n.host, n.port))
254         self.insertNode(n, contacted=0)
255         return {"id" : self.node.id}
256         
257     def krpc_find_node(self, target, id, _krpc_sender):
258         nodes = self.table.findNodes(target)
259         nodes = map(lambda node: node.senderDict(), nodes)
260         sender = {'id' : id}
261         sender['host'] = _krpc_sender[0]
262         sender['port'] = _krpc_sender[1]        
263         n = self.Node().initWithDict(sender)
264         n.conn = self.udp.connectionForAddr((n.host, n.port))
265         self.insertNode(n, contacted=0)
266         return {"nodes" : nodes, "id" : self.node.id}
267
268
269 ## This class provides read-only access to the DHT, valueForKey
270 ## you probably want to use this mixin and provide your own write methods
271 class KhashmirRead(KhashmirBase):
272     _Node = KNodeRead
273     def retrieveValues(self, key):
274         c = self.store.cursor()
275         c.execute("select value from kv where key = %s;", sqlite.encode(key))
276         t = c.fetchone()
277         l = []
278         while t:
279             l.append(t['value'])
280             t = c.fetchone()
281         return l
282     ## also async
283     def valueForKey(self, key, callback, searchlocal = 1):
284         """ returns the values found for key in global table
285             callback will be called with a list of values for each peer that returns unique values
286             final callback will be an empty list - probably should change to 'more coming' arg
287         """
288         nodes = self.table.findNodes(key)
289         
290         # get locals
291         if searchlocal:
292             l = self.retrieveValues(key)
293             if len(l) > 0:
294                 reactor.callLater(0, callback, (l))
295         else:
296             l = []
297         
298         # create our search state
299         state = GetValue(self, key, callback)
300         reactor.callLater(0, state.goWithNodes, nodes, l)
301
302     def krpc_find_value(self, key, id, _krpc_sender):
303         sender = {'id' : id}
304         sender['host'] = _krpc_sender[0]
305         sender['port'] = _krpc_sender[1]        
306         n = self.Node().initWithDict(sender)
307         n.conn = self.udp.connectionForAddr((n.host, n.port))
308         self.insertNode(n, contacted=0)
309     
310         l = self.retrieveValues(key)
311         if len(l) > 0:
312             return {'values' : l, "id": self.node.id}
313         else:
314             nodes = self.table.findNodes(key)
315             nodes = map(lambda node: node.senderDict(), nodes)
316             return {'nodes' : nodes, "id": self.node.id}
317
318 ###  provides a generic write method, you probably don't want to deploy something that allows
319 ###  arbitrary value storage
320 class KhashmirWrite(KhashmirRead):
321     _Node = KNodeWrite
322     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
323     def storeValueForKey(self, key, value, callback=None):
324         """ stores the value for key in the global table, returns immediately, no status 
325             in this implementation, peers respond but don't indicate status to storing values
326             a key can have many values
327         """
328         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
329             if not response:
330                 # default callback
331                 def _storedValueHandler(sender):
332                     pass
333                 response=_storedValueHandler
334             action = StoreValue(self.table, key, value, response)
335             reactor.callLater(action.goWithNodes, nodes)
336             
337         # this call is asynch
338         self.findNode(key, _storeValueForKey)
339                     
340     def krpc_store_value(self, key, value, id, _krpc_sender):
341         t = "%0.6f" % time.time()
342         c = self.store.cursor()
343         try:
344             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
345         except sqlite.IntegrityError, reason:
346             # update last insert time
347             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
348         self.store.commit()
349         sender = {'id' : id}
350         sender['host'] = _krpc_sender[0]
351         sender['port'] = _krpc_sender[1]        
352         n = self.Node().initWithDict(sender)
353         n.conn = self.udp.connectionForAddr((n.host, n.port))
354         self.insertNode(n, contacted=0)
355         return {"id" : self.node.id}
356
357 # the whole shebang, for testing
358 class Khashmir(KhashmirWrite):
359     _Node = KNodeWrite