Clean up all the imports.
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5 from random import randrange
6 import sqlite  ## find this at http://pysqlite.sourceforge.net/
7
8 from twisted.internet.defer import Deferred
9 from twisted.internet import protocol
10 from twisted.internet import reactor
11
12 import const
13 from ktable import KTable
14 from knode import KNodeBase, KNodeRead, KNodeWrite
15 from khash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 class KhashmirDBExcept(Exception):
20     pass
21
22 # this is the base class, has base functionality and find node, no key-value mappings
23 class KhashmirBase(protocol.Factory):
24     _Node = KNodeBase
25     def __init__(self, host, port, db='khashmir.db'):
26         self.setup(host, port, db)
27         
28     def setup(self, host, port, db='khashmir.db'):
29         self._findDB(db)
30         self.port = port
31         self.node = self._loadSelfNode(host, port)
32         self.table = KTable(self.node)
33         #self.app = service.Application("krpc")
34         self.udp = krpc.hostbroker(self)
35         self.udp.protocol = krpc.KRPC
36         self.listenport = reactor.listenUDP(port, self.udp)
37         self.last = time()
38         self._loadRoutingTable()
39         KeyExpirer(store=self.store)
40         self.refreshTable(force=1)
41         reactor.callLater(60, self.checkpoint, (1,))
42
43     def Node(self):
44         n = self._Node()
45         n.table = self.table
46         return n
47     
48     def __del__(self):
49         self.listenport.stopListening()
50         
51     def _loadSelfNode(self, host, port):
52         c = self.store.cursor()
53         c.execute('select id from self where num = 0;')
54         if c.rowcount > 0:
55             id = c.fetchone()[0]
56         else:
57             id = newID()
58         return self._Node().init(id, host, port)
59         
60     def _saveSelfNode(self):
61         c = self.store.cursor()
62         c.execute('delete from self where num = 0;')
63         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
64         self.store.commit()
65         
66     def checkpoint(self, auto=0):
67         self._saveSelfNode()
68         self._dumpRoutingTable()
69         self.refreshTable()
70         if auto:
71             reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,))
72         
73     def _findDB(self, db):
74         import os
75         try:
76             os.stat(db)
77         except OSError:
78             self._createNewDB(db)
79         else:
80             self._loadDB(db)
81         
82     def _loadDB(self, db):
83         try:
84             self.store = sqlite.connect(db=db)
85             #self.store.autocommit = 0
86         except:
87             import traceback
88             raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
89         
90     def _createNewDB(self, db):
91         self.store = sqlite.connect(db=db)
92         s = """
93             create table kv (key binary, value binary, time timestamp, primary key (key, value));
94             create index kv_key on kv(key);
95             create index kv_timestamp on kv(time);
96             
97             create table nodes (id binary primary key, host text, port number);
98             
99             create table self (num number primary key, id binary);
100             """
101         c = self.store.cursor()
102         c.execute(s)
103         self.store.commit()
104
105     def _dumpRoutingTable(self):
106         """
107             save routing table nodes to the database
108         """
109         c = self.store.cursor()
110         c.execute("delete from nodes where id not NULL;")
111         for bucket in self.table.buckets:
112             for node in bucket.l:
113                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
114         self.store.commit()
115         
116     def _loadRoutingTable(self):
117         """
118             load routing table nodes from database
119             it's usually a good idea to call refreshTable(force=1) after loading the table
120         """
121         c = self.store.cursor()
122         c.execute("select * from nodes;")
123         for rec in c.fetchall():
124             n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
125             n.conn = self.udp.connectionForAddr((n.host, n.port))
126             self.table.insertNode(n, contacted=0)
127             
128
129     #######
130     #######  LOCAL INTERFACE    - use these methods!
131     def addContact(self, host, port, callback=None):
132         """
133             ping this node and add the contact info to the table on pong!
134         """
135         n =self.Node().init(const.NULL_ID, host, port) 
136         n.conn = self.udp.connectionForAddr((n.host, n.port))
137         self.sendPing(n, callback=callback)
138
139     ## this call is async!
140     def findNode(self, id, callback, errback=None):
141         """ returns the contact info for node, or the k closest nodes, from the global table """
142         # get K nodes out of local table/cache, or the node we want
143         nodes = self.table.findNodes(id)
144         d = Deferred()
145         if errback:
146             d.addCallbacks(callback, errback)
147         else:
148             d.addCallback(callback)
149         if len(nodes) == 1 and nodes[0].id == id :
150             d.callback(nodes)
151         else:
152             # create our search state
153             state = FindNode(self, id, d.callback)
154             reactor.callLater(0, state.goWithNodes, nodes)
155     
156     def insertNode(self, n, contacted=1):
157         """
158         insert a node in our local table, pinging oldest contact in bucket, if necessary
159         
160         If all you have is a host/port, then use addContact, which calls this method after
161         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
162         a node into the table without it's peer-ID.  That means of course the node passed into this
163         method needs to be a properly formed Node object with a valid ID.
164         """
165         old = self.table.insertNode(n, contacted=contacted)
166         if old and (time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
167             # the bucket is full, check to see if old node is still around and if so, replace it
168             
169             ## these are the callbacks used when we ping the oldest node in a bucket
170             def _staleNodeHandler(oldnode=old, newnode = n):
171                 """ called if the pinged node never responds """
172                 self.table.replaceStaleNode(old, newnode)
173             
174             def _notStaleNodeHandler(dict, old=old):
175                 """ called when we get a pong from the old node """
176                 dict = dict['rsp']
177                 if dict['id'] == old.id:
178                     self.table.justSeenNode(old.id)
179             
180             df = old.ping(self.node.id)
181             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
182
183     def sendPing(self, node, callback=None):
184         """
185             ping a node
186         """
187         df = node.ping(self.node.id)
188         ## these are the callbacks we use when we issue a PING
189         def _pongHandler(dict, node=node, table=self.table, callback=callback):
190             _krpc_sender = dict['_krpc_sender']
191             dict = dict['rsp']
192             sender = {'id' : dict['id']}
193             sender['host'] = _krpc_sender[0]
194             sender['port'] = _krpc_sender[1]
195             n = self.Node().initWithDict(sender)
196             n.conn = self.udp.connectionForAddr((n.host, n.port))
197             table.insertNode(n)
198             if callback:
199                 callback()
200         def _defaultPong(err, node=node, table=self.table, callback=callback):
201             table.nodeFailed(node)
202             if callback:
203                 callback()
204         
205         df.addCallbacks(_pongHandler,_defaultPong)
206
207     def findCloseNodes(self, callback=lambda a: None):
208         """
209             This does a findNode on the ID one away from our own.  
210             This will allow us to populate our table with nodes on our network closest to our own.
211             This is called as soon as we start up with an empty table
212         """
213         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
214         self.findNode(id, callback)
215
216     def refreshTable(self, force=0):
217         """
218             force=1 will refresh table regardless of last bucket access time
219         """
220         def callback(nodes):
221             pass
222     
223         for bucket in self.table.buckets:
224             if force or (time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
225                 id = newIDInRange(bucket.min, bucket.max)
226                 self.findNode(id, callback)
227
228     def stats(self):
229         """
230         Returns (num_contacts, num_nodes)
231         num_contacts: number contacts in our routing table
232         num_nodes: number of nodes estimated in the entire dht
233         """
234         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
235         num_nodes = const.K * (2**(len(self.table.buckets) - 1))
236         return (num_contacts, num_nodes)
237
238     def krpc_ping(self, id, _krpc_sender):
239         sender = {'id' : id}
240         sender['host'] = _krpc_sender[0]
241         sender['port'] = _krpc_sender[1]        
242         n = self.Node().initWithDict(sender)
243         n.conn = self.udp.connectionForAddr((n.host, n.port))
244         self.insertNode(n, contacted=0)
245         return {"id" : self.node.id}
246         
247     def krpc_find_node(self, target, id, _krpc_sender):
248         nodes = self.table.findNodes(target)
249         nodes = map(lambda node: node.senderDict(), nodes)
250         sender = {'id' : id}
251         sender['host'] = _krpc_sender[0]
252         sender['port'] = _krpc_sender[1]        
253         n = self.Node().initWithDict(sender)
254         n.conn = self.udp.connectionForAddr((n.host, n.port))
255         self.insertNode(n, contacted=0)
256         return {"nodes" : nodes, "id" : self.node.id}
257
258
259 ## This class provides read-only access to the DHT, valueForKey
260 ## you probably want to use this mixin and provide your own write methods
261 class KhashmirRead(KhashmirBase):
262     _Node = KNodeRead
263     def retrieveValues(self, key):
264         c = self.store.cursor()
265         c.execute("select value from kv where key = %s;", sqlite.encode(key))
266         t = c.fetchone()
267         l = []
268         while t:
269             l.append(t['value'])
270             t = c.fetchone()
271         return l
272     ## also async
273     def valueForKey(self, key, callback, searchlocal = 1):
274         """ returns the values found for key in global table
275             callback will be called with a list of values for each peer that returns unique values
276             final callback will be an empty list - probably should change to 'more coming' arg
277         """
278         nodes = self.table.findNodes(key)
279         
280         # get locals
281         if searchlocal:
282             l = self.retrieveValues(key)
283             if len(l) > 0:
284                 reactor.callLater(0, callback, (l))
285         else:
286             l = []
287         
288         # create our search state
289         state = GetValue(self, key, callback)
290         reactor.callLater(0, state.goWithNodes, nodes, l)
291
292     def krpc_find_value(self, key, id, _krpc_sender):
293         sender = {'id' : id}
294         sender['host'] = _krpc_sender[0]
295         sender['port'] = _krpc_sender[1]        
296         n = self.Node().initWithDict(sender)
297         n.conn = self.udp.connectionForAddr((n.host, n.port))
298         self.insertNode(n, contacted=0)
299     
300         l = self.retrieveValues(key)
301         if len(l) > 0:
302             return {'values' : l, "id": self.node.id}
303         else:
304             nodes = self.table.findNodes(key)
305             nodes = map(lambda node: node.senderDict(), nodes)
306             return {'nodes' : nodes, "id": self.node.id}
307
308 ###  provides a generic write method, you probably don't want to deploy something that allows
309 ###  arbitrary value storage
310 class KhashmirWrite(KhashmirRead):
311     _Node = KNodeWrite
312     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
313     def storeValueForKey(self, key, value, callback=None):
314         """ stores the value for key in the global table, returns immediately, no status 
315             in this implementation, peers respond but don't indicate status to storing values
316             a key can have many values
317         """
318         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
319             if not response:
320                 # default callback
321                 def _storedValueHandler(sender):
322                     pass
323                 response=_storedValueHandler
324             action = StoreValue(self.table, key, value, response)
325             reactor.callLater(action.goWithNodes, nodes)
326             
327         # this call is asynch
328         self.findNode(key, _storeValueForKey)
329                     
330     def krpc_store_value(self, key, value, id, _krpc_sender):
331         t = "%0.6f" % time()
332         c = self.store.cursor()
333         try:
334             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
335         except sqlite.IntegrityError, reason:
336             # update last insert time
337             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
338         self.store.commit()
339         sender = {'id' : id}
340         sender['host'] = _krpc_sender[0]
341         sender['port'] = _krpc_sender[1]        
342         n = self.Node().initWithDict(sender)
343         n.conn = self.udp.connectionForAddr((n.host, n.port))
344         self.insertNode(n, contacted=0)
345         return {"id" : self.node.id}
346
347 # the whole shebang, for testing
348 class Khashmir(KhashmirWrite):
349     _Node = KNodeWrite