--- /dev/null
+*.pyc
+.DS_Store
from const import reactor
import const
-from hash import intify
+from khash import intify
from knode import KNode as Node
from ktable import KTable, K
sender = dict["sender"]
sender['port'] = _krpc_sender[1]
sender = Node().initWithDict(sender)
- sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
self.answered[sender.id] = 1
for node in l:
n = Node().initWithDict(node)
- n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
self.schedule()
sender = dict["sender"]
sender['port'] = _krpc_sender[1]
sender = Node().initWithDict(sender)
- sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
if dict.has_key('nodes'):
for node in dict['nodes']:
n = Node().initWithDict(node)
- n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.table.udp.connectionForAddr((n.host, n.port))
if not self.found.has_key(n.id):
self.found[n.id] = n
elif dict.has_key('values'):
## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
+"""
from twisted.internet.default import SelectReactor ## twistedmatrix.com
reactor = SelectReactor()
reactor.installResolver(twisted.names.client.theResolver)
except IOError:
print "no resolv.conf!"
+"""
+from twisted.internet import reactor
# magic id to use before we know a peer's id
NULL_ID = 20 * '\0'
+++ /dev/null
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-from sha import sha
-import whrandom
-
-
-def intify(hstr):
- """20 bit hash, big-endian -> long python integer"""
- assert len(hstr) == 20
- return long(hstr.encode('hex'), 16)
-
-def stringify(num):
- """long int -> 20-character string"""
- str = hex(num)[2:]
- if str[-1] == 'L':
- str = str[:-1]
- if len(str) % 2 != 0:
- str = '0' + str
- str = str.decode('hex')
- return (20 - len(str)) *'\x00' + str
-
-def distance(a, b):
- """distance between two 160-bit hashes expressed as 20-character strings"""
- return intify(a) ^ intify(b)
-
-
-def newID():
- """returns a new pseudorandom globally unique ID string"""
- h = sha()
- for i in range(20):
- h.update(chr(whrandom.randint(0,255)))
- return h.digest()
-
-def newIDInRange(min, max):
- return stringify(randRange(min,max))
-
-def randRange(min, max):
- return min + intify(newID()) % (max - min)
-
-
-### Test Cases ###
-import unittest
-
-class NewID(unittest.TestCase):
- def testLength(self):
- self.assertEqual(len(newID()), 20)
- def testHundreds(self):
- for x in xrange(100):
- self.testLength
-
-class Intify(unittest.TestCase):
- known = [('\0' * 20, 0),
- ('\xff' * 20, 2L**160 - 1),
- ]
- def testKnown(self):
- for str, value in self.known:
- self.assertEqual(intify(str), value)
- def testEndianessOnce(self):
- h = newID()
- while h[-1] == '\xff':
- h = newID()
- k = h[:-1] + chr(ord(h[-1]) + 1)
- self.assertEqual(intify(k) - intify(h), 1)
- def testEndianessLots(self):
- for x in xrange(100):
- self.testEndianessOnce()
-
-class Disantance(unittest.TestCase):
- known = [
- (("\0" * 20, "\xff" * 20), 2**160L -1),
- ((sha("foo").digest(), sha("foo").digest()), 0),
- ((sha("bar").digest(), sha("bar").digest()), 0)
- ]
- def testKnown(self):
- for pair, dist in self.known:
- self.assertEqual(distance(pair[0], pair[1]), dist)
- def testCommutitive(self):
- for i in xrange(100):
- x, y, z = newID(), newID(), newID()
- self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
-
-class RandRange(unittest.TestCase):
- def testOnce(self):
- a = intify(newID())
- b = intify(newID())
- if a < b:
- c = randRange(a, b)
- self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a))
- else:
- c = randRange(b, a)
- assert b <= c < a, "output out of range %d %d %d" % (b, c, a)
-
- def testOneHundredTimes(self):
- for i in xrange(100):
- self.testOnce()
-
-
-
-if __name__ == '__main__':
- unittest.main()
-
-
--- /dev/null
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
+from sha import sha
+import whrandom
+
+#this is ugly, hopefully os.entropy will be in 2.4
+try:
+ from entropy import entropy
+except ImportError:
+ def entropy(n):
+ s = ''
+ for i in range(n):
+ s += chr(whrandom.randint(0,255))
+ return s
+
+def intify(hstr):
+ """20 bit hash, big-endian -> long python integer"""
+ assert len(hstr) == 20
+ return long(hstr.encode('hex'), 16)
+
+def stringify(num):
+ """long int -> 20-character string"""
+ str = hex(num)[2:]
+ if str[-1] == 'L':
+ str = str[:-1]
+ if len(str) % 2 != 0:
+ str = '0' + str
+ str = str.decode('hex')
+ return (20 - len(str)) *'\x00' + str
+
+def distance(a, b):
+ """distance between two 160-bit hashes expressed as 20-character strings"""
+ return intify(a) ^ intify(b)
+
+
+def newID():
+ """returns a new pseudorandom globally unique ID string"""
+ h = sha()
+ h.update(entropy(20))
+ return h.digest()
+
+def newIDInRange(min, max):
+ return stringify(randRange(min,max))
+
+def randRange(min, max):
+ return min + intify(newID()) % (max - min)
+
+def newTID():
+ return randRange(-2**30, 2**30)
+
+### Test Cases ###
+import unittest
+
+class NewID(unittest.TestCase):
+ def testLength(self):
+ self.assertEqual(len(newID()), 20)
+ def testHundreds(self):
+ for x in xrange(100):
+ self.testLength
+
+class Intify(unittest.TestCase):
+ known = [('\0' * 20, 0),
+ ('\xff' * 20, 2L**160 - 1),
+ ]
+ def testKnown(self):
+ for str, value in self.known:
+ self.assertEqual(intify(str), value)
+ def testEndianessOnce(self):
+ h = newID()
+ while h[-1] == '\xff':
+ h = newID()
+ k = h[:-1] + chr(ord(h[-1]) + 1)
+ self.assertEqual(intify(k) - intify(h), 1)
+ def testEndianessLots(self):
+ for x in xrange(100):
+ self.testEndianessOnce()
+
+class Disantance(unittest.TestCase):
+ known = [
+ (("\0" * 20, "\xff" * 20), 2**160L -1),
+ ((sha("foo").digest(), sha("foo").digest()), 0),
+ ((sha("bar").digest(), sha("bar").digest()), 0)
+ ]
+ def testKnown(self):
+ for pair, dist in self.known:
+ self.assertEqual(distance(pair[0], pair[1]), dist)
+ def testCommutitive(self):
+ for i in xrange(100):
+ x, y, z = newID(), newID(), newID()
+ self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
+
+class RandRange(unittest.TestCase):
+ def testOnce(self):
+ a = intify(newID())
+ b = intify(newID())
+ if a < b:
+ c = randRange(a, b)
+ self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a))
+ else:
+ c = randRange(b, a)
+ assert b <= c < a, "output out of range %d %d %d" % (b, c, a)
+
+ def testOneHundredTimes(self):
+ for i in xrange(100):
+ self.testOnce()
+
+
+
+if __name__ == '__main__':
+ unittest.main()
+
+
from ktable import KTable, K
from knode import KNode as Node
-from hash import newID, newIDInRange
+from khash import newID, newIDInRange
from actions import FindNode, GetValue, KeyExpirer, StoreValue
import krpc
-import airhook
from twisted.internet.defer import Deferred
from twisted.internet import protocol
from twisted.python import threadable
-from twisted.internet.app import Application
+from twisted.application import service, internet
from twisted.web import server
threadable.init()
import sys
import sqlite ## find this at http://pysqlite.sourceforge.net/
-import pysqlite_exceptions
class KhashmirDBExcept(Exception):
pass
# this is the main class!
class Khashmir(protocol.Factory):
__slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
- protocol = krpc.KRPC
def __init__(self, host, port, db='khashmir.db'):
self.setup(host, port, db)
self.port = port
self.node = self._loadSelfNode(host, port)
self.table = KTable(self.node)
- self.app = Application("krpc")
- self.airhook = airhook.listenAirhookStream(port, self)
+ self.app = service.Application("krpc")
+ self.udp = krpc.hostbroker(self)
+ self.udp.protocol = krpc.KRPC
+ self.listenport = reactor.listenUDP(port, self.udp)
self.last = time.time()
self._loadRoutingTable()
KeyExpirer(store=self.store)
#self.refreshTable(force=1)
reactor.callLater(60, self.checkpoint, (1,))
+ def __del__(self):
+ self.listenport.stopListening()
+
def _loadSelfNode(self, host, port):
c = self.store.cursor()
c.execute('select id from self where num = 0;')
if c.rowcount > 0:
- id = c.fetchone()[0].decode('hex')
+ id = c.fetchone()[0]
else:
id = newID()
return Node().init(id, host, port)
self.store.autocommit = 0
c = self.store.cursor()
c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+ c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
self.store.commit()
self.store.autocommit = 1
self.store = sqlite.connect(db=db)
self.store.autocommit = 1
s = """
- create table kv (key text, value text, time timestamp, primary key (key, value));
+ create table kv (key binary, value binary, time timestamp, primary key (key, value));
create index kv_key on kv(key);
create index kv_timestamp on kv(time);
- create table nodes (id text primary key, host text, port number);
+ create table nodes (id binary primary key, host text, port number);
- create table self (num number primary key, id text);
+ create table self (num number primary key, id binary);
"""
c = self.store.cursor()
c.execute(s)
for bucket in self.table.buckets:
for node in bucket.l:
d = node.senderDict()
- c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+ c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port']))
self.store.commit()
self.store.autocommit = 1;
c = self.store.cursor()
c.execute("select * from nodes;")
for rec in c.fetchall():
- n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.table.insertNode(n, contacted=0)
ping this node and add the contact info to the table on pong!
"""
n =Node().init(const.NULL_ID, host, port)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.sendPing(n, callback=callback)
## this call is async!
sender['host'] = node.host
sender['port'] = node.port
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
table.insertNode(n)
if callback:
callback()
def retrieveValues(self, key):
- s = "select value from kv where key = '%s';" % key.encode('hex')
c = self.store.cursor()
- c.execute(s)
+ c.execute("select value from kv where key = %s;", sqlite.encode(key))
t = c.fetchone()
l = []
while t:
- l.append(t['value'].decode('base64'))
+ l.append(t['value'])
t = c.fetchone()
return l
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"sender" : self.node.senderDict()}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"nodes" : nodes, "sender" : self.node.senderDict()}
def krpc_store_value(self, key, value, sender, _krpc_sender):
t = "%0.6f" % time.time()
- s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
c = self.store.cursor()
try:
- c.execute(s)
- except pysqlite_exceptions.IntegrityError, reason:
+ c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
+ except sqlite.IntegrityError, reason:
# update last insert time
- s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
- c.execute(s)
+ c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"sender" : self.node.senderDict()}
sender['host'] = _krpc_sender[0]
sender['port'] = _krpc_sender[1]
n = Node().initWithDict(sender)
- n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
nodes = map(lambda node: node.senderDict(), nodes)
return {'nodes' : nodes, "sender": self.node.senderDict()}
-### TESTING ###
-from random import randrange
-import threading, thread, sys, time
-from sha import sha
-from hash import newID
-
-
-def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
- import thread
- l = []
- for i in xrange(peers):
- a = Khashmir(host, startport + i, db = dbprefix+`i`)
- l.append(a)
- thread.start_new_thread(l[0].app.run, ())
- for peer in l[1:]:
- peer.app.run(installSignalHandlers=0)
- return l
-
-def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'):
- from whrandom import randrange
- import threading
- import thread
- import sys
- port = startport
- l = []
- if not quiet:
- print "Building %s peer table." % peers
-
- for i in xrange(peers):
- a = Khashmir(host, port + i, db = dbprefix +`i`)
- l.append(a)
-
-
- thread.start_new_thread(l[0].app.run, ())
- time.sleep(1)
- for peer in l[1:]:
- peer.app.run(installSignalHandlers=0)
- #time.sleep(3)
-
- def spewer(frame, s, ignored):
- from twisted.python import reflect
- if frame.f_locals.has_key('self'):
- se = frame.f_locals['self']
- print 'method %s of %s at %s' % (
- frame.f_code.co_name, reflect.qual(se.__class__), id(se)
- )
- #sys.settrace(spewer)
-
- print "adding contacts...."
- def makecb(flag):
- def cb(f=flag):
- f.set()
- return cb
-
- for peer in l:
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
- p = l[randrange(0, len(l))]
- if p != peer:
- n = p.node
- flag = threading.Event()
- peer.addContact(host, n.port, makecb(flag))
- flag.wait()
-
- print "finding close nodes...."
-
- for peer in l:
- flag = threading.Event()
- def cb(nodes, f=flag):
- f.set()
- peer.findCloseNodes(cb)
- flag.wait()
- # for peer in l:
- # peer.refreshTable()
- return l
-
-def test_find_nodes(l, quiet=0):
- flag = threading.Event()
-
- n = len(l)
-
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
-
- def callback(nodes, flag=flag, id = b.node.id):
- if (len(nodes) >0) and (nodes[0].id == id):
- print "test_find_nodes PASSED"
- else:
- print "test_find_nodes FAILED"
- flag.set()
- a.findNode(b.node.id, callback)
- flag.wait()
-
-def test_find_value(l, quiet=0):
- ff = threading.Event()
- fa = threading.Event()
- fb = threading.Event()
- fc = threading.Event()
-
- n = len(l)
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
- c = l[randrange(0,n)]
- d = l[randrange(0,n)]
-
- key = newID()
- value = newID()
- if not quiet: print "inserting value..."
- def acb(p, f=ff):
- f.set()
- a.storeValueForKey(key, value, acb)
- ff.wait()
-
- if not quiet:
- print "finding..."
-
- class cb:
- def __init__(self, flag, value=value, port=None):
- self.flag = flag
- self.val = value
- self.found = 0
- self.port = port
- def callback(self, values):
- if(len(values) == 0):
- if not self.found:
- print "find %s NOT FOUND" % self.port
- else:
- print "find %s FOUND" % self.port
- self.flag.set()
- else:
- if self.val in values:
- self.found = 1
-
- b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
- fa.wait()
- c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
- fb.wait()
- d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)
- fc.wait()
-
-def test_one(host, port, db='/tmp/test'):
- import thread
- k = Khashmir(host, port, db)
- thread.start_new_thread(reactor.run, ())
- return k
-
-if __name__ == "__main__":
- import sys
- n = 8
- if len(sys.argv) > 1: n = int(sys.argv[1])
- l = test_build_net(peers=n)
- time.sleep(3)
- print "finding nodes..."
- for i in range(n):
- test_find_nodes(l)
- print "inserting and fetching values..."
- for i in range(10):
- test_find_value(l)
return dict
def ping(self, sender):
- df = self.conn.protocol.sendRequest('ping', {"sender":sender})
+ df = self.conn.sendRequest('ping', {"sender":sender})
df.addCallback(self.checkSender)
return df
def findNode(self, target, sender):
- df = self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender})
+ df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender})
df.addCallback(self.checkSender)
return df
def storeValue(self, key, value, sender):
- df = self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+ df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
df.addCallback(self.checkSender)
return df
def findValue(self, key, sender):
- df = self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender})
+ df = self.conn.sendRequest('find_value', {"key" : key, "sender" : sender})
df.addCallback(self.checkSender)
return df
from twisted.internet.defer import Deferred
from twisted.protocols import basic
from bencode import bencode, bdecode
+from twisted.internet import protocol
+
from twisted.internet import reactor
import time
-import hash
+import khash as hash
KRPC_TIMEOUT = 60
KRPC_ERROR_RECEIVED_UNKNOWN = 3
KRPC_ERROR_TIMEOUT = 4
-class KRPC(basic.NetstringReceiver):
- noisy = 1
- def __init__(self):
- self.tids = {}
+# commands
+TID = 'tid'
+REQ = 'req'
+RSP = 'rsp'
+TYP = 'typ'
+ARG = 'arg'
+ERR = 'err'
+
+class hostbroker(protocol.DatagramProtocol):
+ def __init__(self, server):
+ self.noisy = 0
+ self.server = server
+ # this should be changed to storage that drops old entries
+ self.connections = {}
+
+ def datagramReceived(self, datagram, addr):
+ #print `addr`, `datagram`
+ #if addr != self.addr:
+ c = self.connectionForAddr(addr)
+ c.datagramReceived(datagram)
+ #if c.idle():
+ # del self.connections[addr]
+ def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise Exception
+ if not self.connections.has_key(addr):
+ conn = self.protocol(addr, self.server, self.transport)
+ self.connections[addr] = conn
+ else:
+ conn = self.connections[addr]
+ return conn
+
+ def makeConnection(self, transport):
+ protocol.DatagramProtocol.makeConnection(self, transport)
+ tup = transport.getHost()
+ self.addr = (tup.host, tup.port)
- def dataRecieved(self, data):
- basic.NetstringReceiver(self, data)
- if self.brokenPeer:
- self.resetConnection()
-
- def resetConnection(self):
- self.brokenPeer = 0
- self._readerState = basic.LENGTH
- self._readerLength = 0
+## connection
+class KRPC:
+ noisy = 1
+ def __init__(self, addr, server, transport):
+ self.transport = transport
+ self.factory = server
+ self.addr = addr
+ self.tids = {}
- def stringReceived(self, str):
+ def datagramReceived(self, str):
# bdecode
try:
msg = bdecode(str)
except Exception, e:
- if self.naisy:
+ if self.noisy:
print "response decode error: " + `e`
self.d.errback()
else:
+ #if self.noisy:
+ # print msg
# look at msg type
- if msg['typ'] == 'req':
+ if msg[TYP] == REQ:
ilen = len(str)
# if request
# tell factory to handle
- f = getattr(self.factory ,"krpc_" + msg['req'], None)
+ f = getattr(self.factory ,"krpc_" + msg[REQ], None)
if f and callable(f):
- msg['arg']['_krpc_sender'] = self.transport.addr
+ msg[ARG]['_krpc_sender'] = self.addr
try:
- ret = apply(f, (), msg['arg'])
+ ret = apply(f, (), msg[ARG])
except Exception, e:
## send error
- out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+ out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
olen = len(out)
- self.sendString(out)
+ self.transport.write(out, self.addr)
else:
if ret:
# make response
- out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
else:
- out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
# send response
olen = len(out)
- self.sendString(out)
+ self.transport.write(out, self.addr)
else:
if self.noisy:
- print "don't know about method %s" % msg['req']
+ print "don't know about method %s" % msg[REQ]
# unknown method
- out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+ out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
olen = len(out)
- self.sendString(out)
+ self.transport.write(out, self.addr)
if self.noisy:
- print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port,
- ilen, msg['req'], olen)
- elif msg['typ'] == 'rsp':
+ print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port,
+ ilen, msg[REQ], olen)
+ elif msg[TYP] == RSP:
# if response
# lookup tid
- if self.tids.has_key(msg['tid']):
- df = self.tids[msg['tid']]
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
# callback
- del(self.tids[msg['tid']])
- df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
+ del(self.tids[msg[TID]])
+ df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
else:
- print 'timeout ' + `msg['rsp']['sender']`
+ print 'timeout ' + `msg[RSP]['sender']`
# no tid, this transaction timed out already...
- elif msg['typ'] == 'err':
+ elif msg[TYP] == ERR:
# if error
# lookup tid
- if self.tids.has_key(msg['tid']):
- df = self.tids[msg['tid']]
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
# callback
- df.errback(msg['err'])
- del(self.tids[msg['tid']])
+ df.errback(msg[ERR])
+ del(self.tids[msg[TID]])
else:
# day late and dollar short
pass
else:
print "unknown message type " + `msg`
# unknown message type
- df = self.tids[msg['tid']]
+ df = self.tids[msg[TID]]
# callback
df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
- del(self.tids[msg['tid']])
+ del(self.tids[msg[TID]])
def sendRequest(self, method, args):
# make message
# send it
- msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args}
+ msg = {TID : hash.newTID(), TYP : REQ, REQ : method, ARG : args}
str = bencode(msg)
d = Deferred()
- self.tids[msg['tid']] = d
- def timeOut(tids = self.tids, id = msg['tid']):
+ self.tids[msg[TID]] = d
+ def timeOut(tids = self.tids, id = msg[TID]):
if tids.has_key(id):
df = tids[id]
del(tids[id])
print ">>>>>> KRPC_ERROR_TIMEOUT"
df.errback(KRPC_ERROR_TIMEOUT)
reactor.callLater(KRPC_TIMEOUT, timeOut)
- self.sendString(str)
+ self.transport.write(str, self.addr)
return d
from bisect import *
from types import *
-import hash
+import khash as hash
import const
from const import K, HASH_LENGTH, NULL_ID
from node import Node
## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
-import hash
+import khash
import time
from types import *
def init(self, id, host, port):
self.id = id
- self.num = hash.intify(id)
+ self.num = khash.intify(id)
self.host = host
self.port = port
self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
def initWithDict(self, dict):
self._senderDict = dict
self.id = dict['id']
- self.num = hash.intify(self.id)
+ self.num = khash.intify(self.id)
self.port = dict['port']
self.host = dict['host']
return self
class TestNode(unittest.TestCase):
def setUp(self):
- self.node = Node().init(hash.newID(), 'localhost', 2002)
+ self.node = Node().init(khash.newID(), 'localhost', 2002)
def testUpdateLastSeen(self):
t = self.node.lastSeen
self.node.updateLastSeen()
--- /dev/null
+from unittest import *
+from khashmir import *
+import khash
+
+from whrandom import randrange
+
+import os
+
+if __name__ =="__main__":
+ tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+ result = TextTestRunner().run(tests)
+
+class SimpleTests(TestCase):
+ def setUp(self):
+ self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test')
+ self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test')
+
+ def tearDown(self):
+ self.a.listenport.stopListening()
+ self.b.listenport.stopListening()
+ os.unlink('/tmp/a.test')
+ os.unlink('/tmp/b.test')
+ reactor.iterate()
+ reactor.iterate()
+
+ def addContacts(self):
+ self.a.addContact('127.0.0.1', 4045)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ def testAddContact(self):
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 0)
+
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 0)
+
+ self.addContacts()
+
+ self.assertEqual(len(self.a.table.buckets), 1)
+ self.assertEqual(len(self.a.table.buckets[0].l), 1)
+ self.assertEqual(len(self.b.table.buckets), 1)
+ self.assertEqual(len(self.b.table.buckets[0].l), 1)
+
+ def testStoreRetrieve(self):
+ self.addContacts()
+ self.got = 0
+ self.a.storeValueForKey(sha('foo').digest(), 'foobar')
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.a.valueForKey(sha('foo').digest(), self._cb)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ def _cb(self, val):
+ if not val:
+ self.assertEqual(self.got, 1)
+ elif 'foobar' in val:
+ self.got = 1
+
+
+class MultiTest(TestCase):
+ num = 30
+ def _done(self, val):
+ self.done = 1
+
+ def setUp(self):
+ self.l = []
+ self.startport = 4044
+ for i in range(self.num):
+ self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i)))
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+ for i in self.l:
+ self.done = 0
+ i.findCloseNodes(self._done)
+ while not self.done:
+ reactor.iterate()
+
+ def tearDown(self):
+ for i in self.l:
+ i.listenport.stopListening()
+ for i in range(self.startport, self.startport+self.num):
+ os.unlink('/tmp/%s.test' % i)
+
+ reactor.iterate()
+
+ def testStoreRetrieve(self):
+ for i in range(10):
+ K = khash.newID()
+ V = khash.newID()
+
+ self.done = 0
+ def _cb(val):
+ self.done = 1
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, _cb)
+ while not self.done:
+ reactor.iterate()
+
+ self.got = 0
+ self.done = 0
+
+ def _cb(val):
+ if not val:
+ self.done = 1
+ self.assertEqual(self.got, 1)
+ elif V in val:
+ self.got = 1
+
+ self.l[randrange(0, self.num)].valueForKey(K, _cb)
+ while not self.done:
+ reactor.iterate()
from unittest import *
from krpc import *
-from airhook import *
+#from airhook import *
KRPC.noisy = 0
import sys
if __name__ =="__main__":
- tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
- result = unittest.TextTestRunner().run(tests)
+ tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+ result = TextTestRunner().run(tests)
def connectionForAddr(host, port):
return host
+
class Receiver(protocol.Factory):
protocol = KRPC
def __init__(self):
def krpc_echo(self, msg, _krpc_sender):
return msg
-class SimpleTest(TestCase):
+def make(port):
+ af = Receiver()
+ a = hostbroker(af)
+ a.protocol = KRPC
+ p = reactor.listenUDP(port, a)
+ return af, a, p
+
+class KRPCTests(TestCase):
def setUp(self):
self.noisy = 0
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4040, self.af)
- self.b = listenAirhookStream(4041, self.bf)
-
- def testSimpleMessage(self):
- self.noisy = 0
- self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."})
- reactor.iterate()
+ self.af, self.a, self.ap = make(1180)
+ self.bf, self.b, self.bp = make(1181)
+
+ def tearDown(self):
+ self.ap.stopListening()
+ self.bp.stopListening()
reactor.iterate()
reactor.iterate()
- self.assertEqual(self.bf.buf, ["This is a test."])
-class SimpleTest(TestCase):
- def setUp(self):
- self.noisy = 0
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4050, self.af)
- self.b = listenAirhookStream(4051, self.bf)
-
def testSimpleMessage(self):
self.noisy = 0
- self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.bf.buf, ["This is a test."])
-class BlastTest(TestCase):
- def setUp(self):
- self.noisy = 0
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4060, self.af)
- self.b = listenAirhookStream(4061, self.bf)
-
def testMessageBlast(self):
- self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.bf.buf = []
for i in range(100):
- self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
reactor.iterate()
#self.bf.buf = []
self.assertEqual(self.bf.buf, ["This is a test."] * 100)
-class EchoTest(TestCase):
- def setUp(self):
- self.noisy = 0
- self.msg = None
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4042, self.af)
- self.b = listenAirhookStream(4043, self.bf)
-
def testEcho(self):
- df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
def gotMsg(self, dict):
_krpc_sender = dict['_krpc_sender']
- msg = dict['rsp']
+ msg = dict[RSP]
self.msg = msg
-class ManyEchoTest(TestCase):
- def setUp(self):
- self.noisy = 0
- self.msg = None
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4588, self.af)
- self.b = listenAirhookStream(4589, self.bf)
-
def testManyEcho(self):
- df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is a test.")
for i in xrange(100):
self.msg = None
- df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is a test.")
-
- def gotMsg(self, dict):
- _krpc_sender = dict['_krpc_sender']
- msg = dict['rsp']
- self.msg = msg
-class MultiEchoTest(TestCase):
- def setUp(self):
- self.noisy = 0
- self.msg = None
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4048, self.af)
- self.b = listenAirhookStream(4049, self.bf)
-
def testMultiEcho(self):
self.noisy = 1
- df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is a test.")
- df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is another test.")
- df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is yet another test.")
- def gotMsg(self, dict):
- _krpc_sender = dict['_krpc_sender']
- msg = dict['rsp']
- self.msg = msg
-
-class EchoResetTest(TestCase):
- def setUp(self):
- self.noisy = 0
- self.msg = None
-
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4078, self.af)
- self.b = listenAirhookStream(4079, self.bf)
-
def testEchoReset(self):
self.noisy = 1
- df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is a test.")
- df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is another test.")
- del(self.a.connections[('127.0.0.1', 4079)])
- df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+ del(self.a.connections[('127.0.0.1', 1181)])
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
def testLotsofEchoReset(self):
for i in range(100):
self.testEchoReset()
- def gotMsg(self, dict):
- _krpc_sender = dict['_krpc_sender']
- msg = dict['rsp']
- self.msg = msg
-
-class UnknownMethErrTest(TestCase):
- def setUp(self):
- self.noisy = 0
- self.err = None
- self.af = Receiver()
- self.bf = Receiver()
- self.a = listenAirhookStream(4044, self.af)
- self.b = listenAirhookStream(4045, self.bf)
-
+
def testUnknownMeth(self):
self.noisy = 1
- df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."})
+ df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
df.addErrback(self.gotErr)
reactor.iterate()
reactor.iterate()
def gotErr(self, err):
self.err = err.value
-
+
\ No newline at end of file