-## Copyright 2002 Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
from const import reactor
import const
from ktable import KTable, K
from knode import KNode as Node
-from hash import newID, newIDInRange
+from khash import newID, newIDInRange
from actions import FindNode, GetValue, KeyExpirer, StoreValue
import krpc
-import airhook
from twisted.internet.defer import Deferred
from twisted.internet import protocol
from twisted.python import threadable
-from twisted.internet.app import Application
+from twisted.application import service, internet
from twisted.web import server
threadable.init()
import sys
import sqlite ## find this at http://pysqlite.sourceforge.net/
-import pysqlite_exceptions
class KhashmirDBExcept(Exception):
pass
# this is the main class!
class Khashmir(protocol.Factory):
__slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
- protocol = krpc.KRPC
def __init__(self, host, port, db='khashmir.db'):
self.setup(host, port, db)
self.port = port
self.node = self._loadSelfNode(host, port)
self.table = KTable(self.node)
- self.app = Application("krpc")
- self.airhook = airhook.listenAirhookStream(port, self)
+ self.app = service.Application("krpc")
+ self.udp = krpc.hostbroker(self)
+ self.udp.protocol = krpc.KRPC
+ self.listenport = reactor.listenUDP(port, self.udp)
self.last = time.time()
self._loadRoutingTable()
KeyExpirer(store=self.store)
#self.refreshTable(force=1)
reactor.callLater(60, self.checkpoint, (1,))
+ def __del__(self):
+ self.listenport.stopListening()
+
def _loadSelfNode(self, host, port):
c = self.store.cursor()
c.execute('select id from self where num = 0;')
if c.rowcount > 0:
- id = c.fetchone()[0].decode('hex')
+ id = c.fetchone()[0]
else:
id = newID()
return Node().init(id, host, port)
self.store.autocommit = 0
c = self.store.cursor()
c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+ c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
self.store.commit()
self.store.autocommit = 1
self.store = sqlite.connect(db=db)
self.store.autocommit = 1
s = """
- create table kv (key text, value text, time timestamp, primary key (key, value));
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
create index kv_key on kv(key);
create index kv_timestamp on kv(time);
- create table nodes (id text primary key, host text, port number);
+ create table nodes (id binary primary key, host text, port number);
- create table self (num number primary key, id text);
+ create table self (num number primary key, id binary);
"""
c = self.store.cursor()
c.execute(s)
c.execute("delete from nodes where id not NULL;")
for bucket in self.table.buckets:
for node in bucket.l:
- d = node.senderDict()
- c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+ c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
self.store.commit()
self.store.autocommit = 1;
c = self.store.cursor()
c.execute("select * from nodes;")
for rec in c.fetchall():
- n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.table.insertNode(n, contacted=0)
ping this node and add the contact info to the table on pong!
"""
n =Node().init(const.NULL_ID, host, port)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.sendPing(n, callback=callback)
## this call is async!
def _notStaleNodeHandler(dict, old=old):
""" called when we get a pong from the old node """
- _krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
- sender = dict['sender']
- if sender['id'] == old.id:
+ if dict['id'] == old.id:
self.table.justSeenNode(old.id)
- df = old.ping(self.node.senderDict())
+ df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
def sendPing(self, node, callback=None):
"""
ping a node
"""
- df = node.ping(self.node.senderDict())
+ df = node.ping(self.node.id)
## these are the callbacks we use when we issue a PING
def _pongHandler(dict, node=node, table=self.table, callback=callback):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
- sender = dict['sender']
+ sender = {'id' : dict['id']}
if node.id != const.NULL_ID and node.id != sender['id']:
# whoah, got response from different peer than we were expecting
self.table.invalidateNode(node)
else:
- sender['host'] = node.host
- sender['port'] = node.port
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
table.insertNode(n)
if callback:
callback()
def retrieveValues(self, key):
- s = "select value from kv where key = '%s';" % key.encode('hex')
c = self.store.cursor()
- c.execute(s)
+ c.execute("select value from kv where key = %s;", sqlite.encode(key))
t = c.fetchone()
l = []
while t:
- l.append(t['value'].decode('base64'))
+ l.append(t['value'])
t = c.fetchone()
return l
#####
##### INCOMING MESSAGE HANDLERS
- def krpc_ping(self, sender, _krpc_sender):
- """
- takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
- returns sender dict
- """
+ def krpc_ping(self, id, _krpc_sender):
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"sender" : self.node.senderDict()}
+ return {"id" : self.node.id}
- def krpc_find_node(self, target, sender, _krpc_sender):
+ def krpc_find_node(self, target, id, _krpc_sender):
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"nodes" : nodes, "sender" : self.node.senderDict()}
+ return {"nodes" : nodes, "id" : self.node.id}
- def krpc_store_value(self, key, value, sender, _krpc_sender):
+ def krpc_store_value(self, key, value, id, _krpc_sender):
t = "%0.6f" % time.time()
- s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
c = self.store.cursor()
try:
- c.execute(s)
- except pysqlite_exceptions.IntegrityError, reason:
+ c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+ except sqlite.IntegrityError, reason:
# update last insert time
- s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
- c.execute(s)
+ c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
- return {"sender" : self.node.senderDict()}
+ return {"id" : self.node.id}
- def krpc_find_value(self, key, sender, _krpc_sender):
+ def krpc_find_value(self, key, id, _krpc_sender):
+ sender = {'id' : id}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
if len(l) > 0:
- return {'values' : l, "sender": self.node.senderDict()}
+ return {'values' : l, "id": self.node.id}
else:
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.senderDict(), nodes)
- return {'nodes' : nodes, "sender": self.node.senderDict()}
+ return {'nodes' : nodes, "id": self.node.id}
-### TESTING ###
-from random import randrange
-import threading, thread, sys, time
-from sha import sha
-from hash import newID
-
-
-def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
- import thread
- l = []
- for i in xrange(peers):
- a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
- l.append(a)
- thread.start_new_thread(l[0].app.run, ())
- for peer in l[1:]:
- peer.app.run()
- return l
-
-def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'):
- from whrandom import randrange
- import threading
- import thread
- import sys
- port = startport
- l = []
- if not quiet:
- print "Building %s peer table." % peers
-
- for i in xrange(peers):
- a = Khashmir(host, port + i, db = dbprefix +`i`)
- l.append(a)
-
-
- thread.start_new_thread(l[0].app.run, ())
- time.sleep(1)
- for peer in l[1:]:
- peer.app.run()
- #time.sleep(3)
-
- def spewer(frame, s, ignored):
- from twisted.python import reflect
- if frame.f_locals.has_key('self'):
- se = frame.f_locals['self']
- print 'method %s of %s at %s' % (
- frame.f_code.co_name, reflect.qual(se.__class__), id(se)
- )
- #sys.settrace(spewer)
-
- print "adding contacts...."
- def makecb(flag):
- def cb(f=flag):
- f.set()
- return cb
-
- for peer in l:
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
-
- print "finding close nodes...."
-
- for peer in l:
- flag = threading.Event()
- def cb(nodes, f=flag):
- f.set()
- peer.findCloseNodes(cb)
- flag.wait()
- # for peer in l:
- # peer.refreshTable()
- return l
-
-def test_find_nodes(l, quiet=0):
- flag = threading.Event()
-
- n = len(l)
-
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
-
- def callback(nodes, flag=flag, id = b.node.id):
- if (len(nodes) >0) and (nodes[0].id == id):
- print "test_find_nodes PASSED"
- else:
- print "test_find_nodes FAILED"
- flag.set()
- a.findNode(b.node.id, callback)
- flag.wait()
-
-def test_find_value(l, quiet=0):
- ff = threading.Event()
- fa = threading.Event()
- fb = threading.Event()
- fc = threading.Event()
-
- n = len(l)
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
- c = l[randrange(0,n)]
- d = l[randrange(0,n)]
-
- key = newID()
- value = newID()
- if not quiet: print "inserting value..."
- def acb(p, f=ff):
- f.set()
- a.storeValueForKey(key, value, acb)
- ff.wait()
-
- if not quiet:
- print "finding..."
-
- class cb:
- def __init__(self, flag, value=value, port=None):
- self.flag = flag
- self.val = value
- self.found = 0
- self.port = port
- def callback(self, values):
- try:
- if(len(values) == 0):
- if not self.found:
- print "find %s NOT FOUND" % self.port
- else:
- print "find %s FOUND" % self.port
- else:
- if self.val in values:
- self.found = 1
- finally:
- self.flag.set()
-
- b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
- fa.wait()
- c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
- fb.wait()
- d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)
- fc.wait()
-
-def test_one(host, port, db='/tmp/test'):
- import thread
- k = Khashmir(host, port, db)
- thread.start_new_thread(reactor.run, ())
- return k
-
-if __name__ == "__main__":
- import sys
- n = 8
- if len(sys.argv) > 1: n = int(sys.argv[1])
- l = test_build_net(peers=n)
- time.sleep(3)
- print "finding nodes..."
- for i in range(n):
- test_find_nodes(l)
- print "inserting and fetching values..."
- for i in range(10):
- test_find_value(l)