]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - khashmir.py
e9e53a76ebd60a2f574f38c743e5dd95ced378f0
[quix0rs-apt-p2p.git] / khashmir.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from const import reactor
5 import const
6
7 import time
8
9 from sha import sha
10
11 from ktable import KTable, K
12 from knode import KNode as Node
13
14 from khash import newID, newIDInRange
15
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
17 import krpc
18
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.application import service, internet
23 from twisted.web import server
24 threadable.init()
25 import sys
26
27 import sqlite  ## find this at http://pysqlite.sourceforge.net/
28
29 class KhashmirDBExcept(Exception):
30     pass
31
32 # this is the main class!
33 class Khashmir(protocol.Factory):
34     __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
35     def __init__(self, host, port, db='khashmir.db'):
36         self.setup(host, port, db)
37         
38     def setup(self, host, port, db='khashmir.db'):
39         self._findDB(db)
40         self.port = port
41         self.node = self._loadSelfNode(host, port)
42         self.table = KTable(self.node)
43         self.app = service.Application("krpc")
44         self.udp = krpc.hostbroker(self)
45         self.udp.protocol = krpc.KRPC
46         self.listenport = reactor.listenUDP(port, self.udp)
47         self.last = time.time()
48         self._loadRoutingTable()
49         KeyExpirer(store=self.store)
50         #self.refreshTable(force=1)
51         reactor.callLater(60, self.checkpoint, (1,))
52         
53     def __del__(self):
54         self.listenport.stopListening()
55         
56     def _loadSelfNode(self, host, port):
57         c = self.store.cursor()
58         c.execute('select id from self where num = 0;')
59         if c.rowcount > 0:
60             id = c.fetchone()[0]
61         else:
62             id = newID()
63         return Node().init(id, host, port)
64         
65     def _saveSelfNode(self):
66         self.store.autocommit = 0
67         c = self.store.cursor()
68         c.execute('delete from self where num = 0;')
69         c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
70         self.store.commit()
71         self.store.autocommit = 1
72         
73     def checkpoint(self, auto=0):
74         self._saveSelfNode()
75         self._dumpRoutingTable()
76         if auto:
77             reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
78         
79     def _findDB(self, db):
80         import os
81         try:
82             os.stat(db)
83         except OSError:
84             self._createNewDB(db)
85         else:
86             self._loadDB(db)
87         
88     def _loadDB(self, db):
89         try:
90             self.store = sqlite.connect(db=db)
91             self.store.autocommit = 1
92         except:
93             import traceback
94             raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
95         
96     def _createNewDB(self, db):
97         self.store = sqlite.connect(db=db)
98         self.store.autocommit = 1
99         s = """
100             create table kv (key binary, value binary, time timestamp, primary key (key, value));
101             create index kv_key on kv(key);
102             create index kv_timestamp on kv(time);
103             
104             create table nodes (id binary primary key, host text, port number);
105             
106             create table self (num number primary key, id binary);
107             """
108         c = self.store.cursor()
109         c.execute(s)
110
111     def _dumpRoutingTable(self):
112         """
113             save routing table nodes to the database
114         """
115         self.store.autocommit = 0;
116         c = self.store.cursor()
117         c.execute("delete from nodes where id not NULL;")
118         for bucket in self.table.buckets:
119             for node in bucket.l:
120                 d = node.senderDict()
121                 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
122         self.store.commit()
123         self.store.autocommit = 1;
124         
125     def _loadRoutingTable(self):
126         """
127             load routing table nodes from database
128             it's usually a good idea to call refreshTable(force=1) after loading the table
129         """
130         c = self.store.cursor()
131         c.execute("select * from nodes;")
132         for rec in c.fetchall():
133             n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
134             n.conn = self.udp.connectionForAddr((n.host, n.port))
135             self.table.insertNode(n, contacted=0)
136             
137
138     #######
139     #######  LOCAL INTERFACE    - use these methods!
140     def addContact(self, host, port, callback=None):
141         """
142             ping this node and add the contact info to the table on pong!
143         """
144         n =Node().init(const.NULL_ID, host, port) 
145         n.conn = self.udp.connectionForAddr((n.host, n.port))
146         self.sendPing(n, callback=callback)
147
148     ## this call is async!
149     def findNode(self, id, callback, errback=None):
150         """ returns the contact info for node, or the k closest nodes, from the global table """
151         # get K nodes out of local table/cache, or the node we want
152         nodes = self.table.findNodes(id)
153         d = Deferred()
154         if errback:
155             d.addCallbacks(callback, errback)
156         else:
157             d.addCallback(callback)
158         if len(nodes) == 1 and nodes[0].id == id :
159             d.callback(nodes)
160         else:
161             # create our search state
162             state = FindNode(self, id, d.callback)
163             reactor.callFromThread(state.goWithNodes, nodes)
164     
165     
166     ## also async
167     def valueForKey(self, key, callback, searchlocal = 1):
168         """ returns the values found for key in global table
169             callback will be called with a list of values for each peer that returns unique values
170             final callback will be an empty list - probably should change to 'more coming' arg
171         """
172         nodes = self.table.findNodes(key)
173         
174         # get locals
175         if searchlocal:
176             l = self.retrieveValues(key)
177             if len(l) > 0:
178                 reactor.callLater(0, callback, (l))
179         else:
180             l = []
181         
182         # create our search state
183         state = GetValue(self, key, callback)
184         reactor.callFromThread(state.goWithNodes, nodes, l)
185
186     ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
187     def storeValueForKey(self, key, value, callback=None):
188         """ stores the value for key in the global table, returns immediately, no status 
189             in this implementation, peers respond but don't indicate status to storing values
190             a key can have many values
191         """
192         def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
193             if not response:
194                 # default callback
195                 def _storedValueHandler(sender):
196                     pass
197                 response=_storedValueHandler
198             action = StoreValue(self.table, key, value, response)
199             reactor.callFromThread(action.goWithNodes, nodes)
200             
201         # this call is asynch
202         self.findNode(key, _storeValueForKey)
203         
204     
205     def insertNode(self, n, contacted=1):
206         """
207         insert a node in our local table, pinging oldest contact in bucket, if necessary
208         
209         If all you have is a host/port, then use addContact, which calls this method after
210         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
211         a node into the table without it's peer-ID.  That means of course the node passed into this
212         method needs to be a properly formed Node object with a valid ID.
213         """
214         old = self.table.insertNode(n, contacted=contacted)
215         if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
216             # the bucket is full, check to see if old node is still around and if so, replace it
217             
218             ## these are the callbacks used when we ping the oldest node in a bucket
219             def _staleNodeHandler(oldnode=old, newnode = n):
220                 """ called if the pinged node never responds """
221                 self.table.replaceStaleNode(old, newnode)
222             
223             def _notStaleNodeHandler(dict, old=old):
224                 """ called when we get a pong from the old node """
225                 _krpc_sender = dict['_krpc_sender']
226                 dict = dict['rsp']
227                 sender = dict['sender']
228                 if sender['id'] == old.id:
229                     self.table.justSeenNode(old.id)
230             
231             df = old.ping(self.node.senderDict())
232             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
233
234     def sendPing(self, node, callback=None):
235         """
236             ping a node
237         """
238         df = node.ping(self.node.senderDict())
239         ## these are the callbacks we use when we issue a PING
240         def _pongHandler(dict, node=node, table=self.table, callback=callback):
241             _krpc_sender = dict['_krpc_sender']
242             dict = dict['rsp']
243             sender = dict['sender']
244             if node.id != const.NULL_ID and node.id != sender['id']:
245                 # whoah, got response from different peer than we were expecting
246                 self.table.invalidateNode(node)
247             else:
248                 sender['host'] = node.host
249                 sender['port'] = node.port
250                 n = Node().initWithDict(sender)
251                 n.conn = self.udp.connectionForAddr((n.host, n.port))
252                 table.insertNode(n)
253                 if callback:
254                     callback()
255         def _defaultPong(err, node=node, table=self.table, callback=callback):
256             table.nodeFailed(node)
257             if callback:
258                 callback()
259         
260         df.addCallbacks(_pongHandler,_defaultPong)
261
262     def findCloseNodes(self, callback=lambda a: None):
263         """
264             This does a findNode on the ID one away from our own.  
265             This will allow us to populate our table with nodes on our network closest to our own.
266             This is called as soon as we start up with an empty table
267         """
268         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
269         self.findNode(id, callback)
270
271     def refreshTable(self, force=0):
272         """
273             force=1 will refresh table regardless of last bucket access time
274         """
275         def callback(nodes):
276             pass
277     
278         for bucket in self.table.buckets:
279             if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
280                 id = newIDInRange(bucket.min, bucket.max)
281                 self.findNode(id, callback)
282
283
284     def retrieveValues(self, key):
285         c = self.store.cursor()
286         c.execute("select value from kv where key = %s;", sqlite.encode(key))
287         t = c.fetchone()
288         l = []
289         while t:
290             l.append(t['value'])
291             t = c.fetchone()
292         return l
293     
294     #####
295     ##### INCOMING MESSAGE HANDLERS
296     
297     def krpc_ping(self, sender, _krpc_sender):
298         """
299             takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
300             returns sender dict
301         """
302         sender['host'] = _krpc_sender[0]
303         sender['port'] = _krpc_sender[1]        
304         n = Node().initWithDict(sender)
305         n.conn = self.udp.connectionForAddr((n.host, n.port))
306         self.insertNode(n, contacted=0)
307         return {"sender" : self.node.senderDict()}
308         
309     def krpc_find_node(self, target, sender, _krpc_sender):
310         nodes = self.table.findNodes(target)
311         nodes = map(lambda node: node.senderDict(), nodes)
312         sender['host'] = _krpc_sender[0]
313         sender['port'] = _krpc_sender[1]        
314         n = Node().initWithDict(sender)
315         n.conn = self.udp.connectionForAddr((n.host, n.port))
316         self.insertNode(n, contacted=0)
317         return {"nodes" : nodes, "sender" : self.node.senderDict()}
318             
319     def krpc_store_value(self, key, value, sender, _krpc_sender):
320         t = "%0.6f" % time.time()
321         c = self.store.cursor()
322         try:
323             c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
324         except sqlite.IntegrityError, reason:
325             # update last insert time
326             c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
327         sender['host'] = _krpc_sender[0]
328         sender['port'] = _krpc_sender[1]        
329         n = Node().initWithDict(sender)
330         n.conn = self.udp.connectionForAddr((n.host, n.port))
331         self.insertNode(n, contacted=0)
332         return {"sender" : self.node.senderDict()}
333     
334     def krpc_find_value(self, key, sender, _krpc_sender):
335         sender['host'] = _krpc_sender[0]
336         sender['port'] = _krpc_sender[1]        
337         n = Node().initWithDict(sender)
338         n.conn = self.udp.connectionForAddr((n.host, n.port))
339         self.insertNode(n, contacted=0)
340     
341         l = self.retrieveValues(key)
342         if len(l) > 0:
343             return {'values' : l, "sender": self.node.senderDict()}
344         else:
345             nodes = self.table.findNodes(key)
346             nodes = map(lambda node: node.senderDict(), nodes)
347             return {'nodes' : nodes, "sender": self.node.senderDict()}
348