X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fbencode.py;h=b9153ee8b1f7b9a08eba9f2237266d6044a18779;hb=30b935bcd7c698e2be8ac4a20960e48212ad7249;hp=a627fe0fe1f6e3745ebcead8c28415b105e1a2ed;hpb=39e946bf2b0dee67d119301207da987da73d4503;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/bencode.py b/apt_dht_Khashmir/bencode.py index a627fe0..b9153ee 100644 --- a/apt_dht_Khashmir/bencode.py +++ b/apt_dht_Khashmir/bencode.py @@ -1,13 +1,6 @@ -# Written by Petru Paler, Uoti Urpala, Ross Cohen and John Hoffman -# Modified by Cameron Dale -# see LICENSE.txt for license information -# -# $Id: bencode.py 268 2007-08-18 23:45:45Z camrdale-guest $ """Functions for bencoding and bdecoding data. -@type logger: C{logging.Logger} -@var logger: the logger to send all log messages to for this module @type decode_func: C{dictionary} of C{function} @var decode_func: a dictionary of function calls to be made, based on data, the keys are the first character of the data and the value is the @@ -22,19 +15,19 @@ @var BencachedType: the L{Bencached} type """ -from types import IntType, LongType, StringType, ListType, TupleType, DictType -import logging -try: - from types import BooleanType -except ImportError: - BooleanType = None +from types import IntType, LongType, StringType, ListType, TupleType, DictType, BooleanType try: from types import UnicodeType except ImportError: UnicodeType = None -from cStringIO import StringIO +from datetime import datetime +import time -logger = logging.getLogger('DebTorrent.bencode') +from twisted.python import log +from twisted.trial import unittest + +class BencodeError(ValueError): + pass def decode_int(x, f): """Bdecode an integer. @@ -45,7 +38,7 @@ def decode_int(x, f): @param f: the offset in the data to start at @rtype: C{int}, C{int} @return: the bdecoded integer, and the offset to read next - @raise ValueError: if the data is improperly encoded + @raise BencodeError: if the data is improperly encoded """ @@ -57,9 +50,9 @@ def decode_int(x, f): n = long(x[f:newf]) if x[f] == '-': if x[f + 1] == '0': - raise ValueError + raise BencodeError, "integer has a leading zero after a negative sign" elif x[f] == '0' and newf != f+1: - raise ValueError + raise BencodeError, "integer has a leading zero" return (n, newf+1) def decode_string(x, f): @@ -71,7 +64,7 @@ def decode_string(x, f): @param f: the offset in the data to start at @rtype: C{string}, C{int} @return: the bdecoded string, and the offset to read next - @raise ValueError: if the data is improperly encoded + @raise BencodeError: if the data is improperly encoded """ @@ -81,7 +74,7 @@ def decode_string(x, f): except (OverflowError, ValueError): n = long(x[f:colon]) if x[f] == '0' and colon != f+1: - raise ValueError + raise BencodeError, "string length has a leading zero" colon += 1 return (x[colon:colon+n], colon+n) @@ -100,6 +93,27 @@ def decode_unicode(x, f): s, f = decode_string(x, f+1) return (s.decode('UTF-8'),f) +def decode_datetime(x, f): + """Bdecode a datetime value. + + @type x: C{string} + @param x: the data to decode + @type f: C{int} + @param f: the offset in the data to start at + @rtype: C{datetime.datetime}, C{int} + @return: the bdecoded integer, and the offset to read next + @raise BencodeError: if the data is improperly encoded + + """ + + f += 1 + newf = x.index('e', f) + try: + date = datetime(*(time.strptime(x[f:newf], '%Y-%m-%dT%H:%M:%S')[0:6])) + except: + raise BencodeError, "datetime value could not be decoded: %s" % x[f:newf] + return (date, newf+1) + def decode_list(x, f): """Bdecode a list. @@ -127,7 +141,7 @@ def decode_dict(x, f): @param f: the offset in the data to start at @rtype: C{dictionary}, C{int} @return: the bdecoded dictionary, and the offset to read next - @raise ValueError: if the data is improperly encoded + @raise BencodeError: if the data is improperly encoded """ @@ -136,7 +150,7 @@ def decode_dict(x, f): while x[f] != 'e': k, f = decode_string(x, f) if lastkey >= k: - raise ValueError + raise BencodeError, "dictionary keys must be in sorted order" lastkey = k r[k], f = decode_func[x[f]](x, f) return (r, f + 1) @@ -155,7 +169,8 @@ decode_func['6'] = decode_string decode_func['7'] = decode_string decode_func['8'] = decode_string decode_func['9'] = decode_string -#decode_func['u'] = decode_unicode +decode_func['u'] = decode_unicode +decode_func['t'] = decode_datetime def bdecode(x, sloppy = 0): """Bdecode a string of data. @@ -166,7 +181,7 @@ def bdecode(x, sloppy = 0): @param sloppy: whether to allow errors in the decoding @rtype: unknown @return: the bdecoded data - @raise ValueError: if the data is improperly encoded + @raise BencodeError: if the data is improperly encoded """ @@ -174,158 +189,14 @@ def bdecode(x, sloppy = 0): r, l = decode_func[x[0]](x, 0) # except (IndexError, KeyError): except (IndexError, KeyError, ValueError): - logger.exception('bad bencoded data') - raise ValueError, "bad bencoded data" + raise BencodeError, "bad bencoded data" if not sloppy and l != len(x): - raise ValueError, "bad bencoded data" + raise BencodeError, "bad bencoded data, all could not be decoded" return r -def test_bdecode(): - """A test routine for the bdecoding functions.""" - try: - bdecode('0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('ie') - assert 0 - except ValueError: - pass - try: - bdecode('i341foo382e') - assert 0 - except ValueError: - pass - assert bdecode('i4e') == 4L - assert bdecode('i0e') == 0L - assert bdecode('i123456789e') == 123456789L - assert bdecode('i-10e') == -10L - try: - bdecode('i-0e') - assert 0 - except ValueError: - pass - try: - bdecode('i123') - assert 0 - except ValueError: - pass - try: - bdecode('') - assert 0 - except ValueError: - pass - try: - bdecode('i6easd') - assert 0 - except ValueError: - pass - try: - bdecode('35208734823ljdahflajhdf') - assert 0 - except ValueError: - pass - try: - bdecode('2:abfdjslhfld') - assert 0 - except ValueError: - pass - assert bdecode('0:') == '' - assert bdecode('3:abc') == 'abc' - assert bdecode('10:1234567890') == '1234567890' - try: - bdecode('02:xy') - assert 0 - except ValueError: - pass - try: - bdecode('l') - assert 0 - except ValueError: - pass - assert bdecode('le') == [] - try: - bdecode('leanfdldjfh') - assert 0 - except ValueError: - pass - assert bdecode('l0:0:0:e') == ['', '', ''] - try: - bdecode('relwjhrlewjh') - assert 0 - except ValueError: - pass - assert bdecode('li1ei2ei3ee') == [1, 2, 3] - assert bdecode('l3:asd2:xye') == ['asd', 'xy'] - assert bdecode('ll5:Alice3:Bobeli2ei3eee') == [['Alice', 'Bob'], [2, 3]] - try: - bdecode('d') - assert 0 - except ValueError: - pass - try: - bdecode('defoobar') - assert 0 - except ValueError: - pass - assert bdecode('de') == {} - assert bdecode('d3:agei25e4:eyes4:bluee') == {'age': 25, 'eyes': 'blue'} - assert bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee') == {'spam.mp3': {'author': 'Alice', 'length': 100000}} - try: - bdecode('d3:fooe') - assert 0 - except ValueError: - pass - try: - bdecode('di1e0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:b0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('d1:a0:1:a0:e') - assert 0 - except ValueError: - pass - try: - bdecode('i03e') - assert 0 - except ValueError: - pass - try: - bdecode('l01:ae') - assert 0 - except ValueError: - pass - try: - bdecode('9999:x') - assert 0 - except ValueError: - pass - try: - bdecode('l0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:0:') - assert 0 - except ValueError: - pass - try: - bdecode('d0:') - assert 0 - except ValueError: - pass - bencached_marker = [] -class Bencached: +class Bencached(object): """Dummy data structure for storing bencoded data in memory. @type marker: C{list} @@ -415,6 +286,29 @@ def encode_unicode(x,r): #r.append('u') encode_string(x.encode('UTF-8'),r) +def encode_datetime(x,r): + """Bencode a datetime value in UTC. + + If the datetime object has time zone info, it is converted to UTC time. + Otherwise it is assumed that the time is already in UTC time. + Microseconds are removed. + + @type x: C{datetime.datetime} + @param x: the data to encode + @type r: C{list} + @param r: the currently bencoded data, to which the bencoding of x + will be appended + + """ + + date = x.replace(microsecond = 0) + offset = date.utcoffset() + if offset is not None: + utcdate = date.replace(tzinfo = None) + offset + else: + utcdate = date + r.extend(('t',utcdate.isoformat(),'e')) + def encode_list(x,r): """Bencode a list. @@ -458,8 +352,8 @@ encode_func[StringType] = encode_string encode_func[ListType] = encode_list encode_func[TupleType] = encode_list encode_func[DictType] = encode_dict -if BooleanType: - encode_func[BooleanType] = encode_bool +encode_func[BooleanType] = encode_bool +encode_func[datetime] = encode_datetime if UnicodeType: encode_func[UnicodeType] = encode_unicode @@ -470,42 +364,117 @@ def bencode(x): @param x: the data to encode @rtype: string @return: the bencoded data - @raise ValueError: if the data contains a type that cannot be encoded + @raise BencodeError: if the data contains a type that cannot be encoded """ r = [] try: encode_func[type(x)](x, r) except: - logger.exception('could not encode type '+str(type(x))+' (value: '+str(x)+')') - assert 0 + raise BencodeError, "failed to bencode the data" return ''.join(r) -def test_bencode(): - """A test routine for the bencoding functions.""" - assert bencode(4) == 'i4e' - assert bencode(0) == 'i0e' - assert bencode(-10) == 'i-10e' - assert bencode(12345678901234567890L) == 'i12345678901234567890e' - assert bencode('') == '0:' - assert bencode('abc') == '3:abc' - assert bencode('1234567890') == '10:1234567890' - assert bencode([]) == 'le' - assert bencode([1, 2, 3]) == 'li1ei2ei3ee' - assert bencode([['Alice', 'Bob'], [2, 3]]) == 'll5:Alice3:Bobeli2ei3eee' - assert bencode({}) == 'de' - assert bencode({'age': 25, 'eyes': 'blue'}) == 'd3:agei25e4:eyes4:bluee' - assert bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}) == 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee' - try: - bencode({1: 'foo'}) - assert 0 - except AssertionError: - pass +class TestBencode(unittest.TestCase): + """Test the bencoding and bdecoding of data.""" - -try: - import psyco - psyco.bind(bdecode) - psyco.bind(bencode) -except ImportError: - pass + timeout = 2 + + def test_bdecode_string(self): + self.failUnlessRaises(BencodeError, bdecode, '0:0:') + self.failUnlessRaises(BencodeError, bdecode, '') + self.failUnlessRaises(BencodeError, bdecode, '35208734823ljdahflajhdf') + self.failUnlessRaises(BencodeError, bdecode, '2:abfdjslhfld') + self.failUnlessEqual(bdecode('0:'), '') + self.failUnlessEqual(bdecode('3:abc'), 'abc') + self.failUnlessEqual(bdecode('10:1234567890'), '1234567890') + self.failUnlessRaises(BencodeError, bdecode, '02:xy') + self.failUnlessRaises(BencodeError, bdecode, '9999:x') + + def test_bdecode_int(self): + self.failUnlessRaises(BencodeError, bdecode, 'ie') + self.failUnlessRaises(BencodeError, bdecode, 'i341foo382e') + self.failUnlessEqual(bdecode('i4e'), 4L) + self.failUnlessEqual(bdecode('i0e'), 0L) + self.failUnlessEqual(bdecode('i123456789e'), 123456789L) + self.failUnlessEqual(bdecode('i-10e'), -10L) + self.failUnlessRaises(BencodeError, bdecode, 'i-0e') + self.failUnlessRaises(BencodeError, bdecode, 'i123') + self.failUnlessRaises(BencodeError, bdecode, 'i6easd') + self.failUnlessRaises(BencodeError, bdecode, 'i03e') + + def test_bdecode_list(self): + self.failUnlessRaises(BencodeError, bdecode, 'l') + self.failUnlessEqual(bdecode('le'), []) + self.failUnlessRaises(BencodeError, bdecode, 'leanfdldjfh') + self.failUnlessEqual(bdecode('l0:0:0:e'), ['', '', '']) + self.failUnlessRaises(BencodeError, bdecode, 'relwjhrlewjh') + self.failUnlessEqual(bdecode('li1ei2ei3ee'), [1, 2, 3]) + self.failUnlessEqual(bdecode('l3:asd2:xye'), ['asd', 'xy']) + self.failUnlessEqual(bdecode('ll5:Alice3:Bobeli2ei3eee'), [['Alice', 'Bob'], [2, 3]]) + self.failUnlessRaises(BencodeError, bdecode, 'l01:ae') + self.failUnlessRaises(BencodeError, bdecode, 'l0:') + + def test_bdecode_dict(self): + self.failUnlessRaises(BencodeError, bdecode, 'd') + self.failUnlessRaises(BencodeError, bdecode, 'defoobar') + self.failUnlessEqual(bdecode('de'), {}) + self.failUnlessEqual(bdecode('d3:agei25e4:eyes4:bluee'), {'age': 25, 'eyes': 'blue'}) + self.failUnlessEqual(bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee'), + {'spam.mp3': {'author': 'Alice', 'length': 100000}}) + self.failUnlessRaises(BencodeError, bdecode, 'd3:fooe') + self.failUnlessRaises(BencodeError, bdecode, 'di1e0:e') + self.failUnlessRaises(BencodeError, bdecode, 'd1:b0:1:a0:e') + self.failUnlessRaises(BencodeError, bdecode, 'd1:a0:1:a0:e') + self.failUnlessRaises(BencodeError, bdecode, 'd0:0:') + self.failUnlessRaises(BencodeError, bdecode, 'd0:') + + def test_bdecode_unicode(self): + self.failUnlessRaises(BencodeError, bdecode, 'u0:0:') + self.failUnlessRaises(BencodeError, bdecode, 'u') + self.failUnlessRaises(BencodeError, bdecode, 'u35208734823ljdahflajhdf') + self.failUnlessRaises(BencodeError, bdecode, 'u2:abfdjslhfld') + self.failUnlessEqual(bdecode('u0:'), '') + self.failUnlessEqual(bdecode('u3:abc'), 'abc') + self.failUnlessEqual(bdecode('u10:1234567890'), '1234567890') + self.failUnlessRaises(BencodeError, bdecode, 'u02:xy') + self.failUnlessRaises(BencodeError, bdecode, 'u9999:x') + + def test_bencode_int(self): + self.failUnlessEqual(bencode(4), 'i4e') + self.failUnlessEqual(bencode(0), 'i0e') + self.failUnlessEqual(bencode(-10), 'i-10e') + self.failUnlessEqual(bencode(12345678901234567890L), 'i12345678901234567890e') + + def test_bencode_string(self): + self.failUnlessEqual(bencode(''), '0:') + self.failUnlessEqual(bencode('abc'), '3:abc') + self.failUnlessEqual(bencode('1234567890'), '10:1234567890') + + def test_bencode_list(self): + self.failUnlessEqual(bencode([]), 'le') + self.failUnlessEqual(bencode([1, 2, 3]), 'li1ei2ei3ee') + self.failUnlessEqual(bencode([['Alice', 'Bob'], [2, 3]]), 'll5:Alice3:Bobeli2ei3eee') + + def test_bencode_dict(self): + self.failUnlessEqual(bencode({}), 'de') + self.failUnlessEqual(bencode({'age': 25, 'eyes': 'blue'}), 'd3:agei25e4:eyes4:bluee') + self.failUnlessEqual(bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}), + 'd8:spam.mp3d6:author5:Alice6:lengthi100000eee') + self.failUnlessRaises(BencodeError, bencode, {1: 'foo'}) + + def test_bencode_unicode(self): + self.failUnlessEqual(bencode(u''), '0:') + self.failUnlessEqual(bencode(u'abc'), '3:abc') + self.failUnlessEqual(bencode(u'1234567890'), '10:1234567890') + + def test_bool(self): + self.failUnless(bdecode(bencode(True))) + self.failIf(bdecode(bencode(False))) + + def test_datetime(self): + date = datetime.utcnow() + self.failUnlessEqual(bdecode(bencode(date)), date.replace(microsecond = 0)) + + if UnicodeType == None: + test_bencode_unicode.skip = "Python was not compiled with unicode support" + test_bdecode_unicode.skip = "Python was not compiled with unicode support"