6e89a1892404418ee3c2a5053047e98723d90560
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / bencode.py
1
2 """Functions for bencoding and bdecoding data.
3
4 @type decode_func: C{dictionary} of C{function}
5 @var decode_func: a dictionary of function calls to be made, based on data,
6     the keys are the first character of the data and the value is the
7     function to use to decode that data
8 @type bencached_marker: C{list}
9 @var bencached_marker: mutable type to ensure class origination
10 @type encode_func: C{dictionary} of C{function}
11 @var encode_func: a dictionary of function calls to be made, based on data,
12     the keys are the type of the data and the value is the
13     function to use to encode that data
14 @type BencachedType: C{type}
15 @var BencachedType: the L{Bencached} type
16 """
17
18 from types import IntType, LongType, StringType, ListType, TupleType, DictType, BooleanType
19 try:
20     from types import UnicodeType
21 except ImportError:
22     UnicodeType = None
23
24 from twisted.python import log
25 from twisted.trial import unittest
26
27 class BencodeError(ValueError):
28     pass
29
30 def decode_int(x, f):
31     """Bdecode an integer.
32     
33     @type x: C{string}
34     @param x: the data to decode
35     @type f: C{int}
36     @param f: the offset in the data to start at
37     @rtype: C{int}, C{int}
38     @return: the bdecoded integer, and the offset to read next
39     @raise BencodeError: if the data is improperly encoded
40     
41     """
42     
43     f += 1
44     newf = x.index('e', f)
45     try:
46         n = int(x[f:newf])
47     except:
48         n = long(x[f:newf])
49     if x[f] == '-':
50         if x[f + 1] == '0':
51             raise BencodeError, "integer has a leading zero after a negative sign"
52     elif x[f] == '0' and newf != f+1:
53         raise BencodeError, "integer has a leading zero"
54     return (n, newf+1)
55   
56 def decode_string(x, f):
57     """Bdecode a string.
58     
59     @type x: C{string}
60     @param x: the data to decode
61     @type f: C{int}
62     @param f: the offset in the data to start at
63     @rtype: C{string}, C{int}
64     @return: the bdecoded string, and the offset to read next
65     @raise BencodeError: if the data is improperly encoded
66     
67     """
68     
69     colon = x.index(':', f)
70     try:
71         n = int(x[f:colon])
72     except (OverflowError, ValueError):
73         n = long(x[f:colon])
74     if x[f] == '0' and colon != f+1:
75         raise BencodeError, "string length has a leading zero"
76     colon += 1
77     return (x[colon:colon+n], colon+n)
78
79 def decode_unicode(x, f):
80     """Bdecode a unicode string.
81     
82     @type x: C{string}
83     @param x: the data to decode
84     @type f: C{int}
85     @param f: the offset in the data to start at
86     @rtype: C{int}, C{int}
87     @return: the bdecoded unicode string, and the offset to read next
88     
89     """
90     
91     s, f = decode_string(x, f+1)
92     return (s.decode('UTF-8'),f)
93
94 def decode_list(x, f):
95     """Bdecode a list.
96     
97     @type x: C{string}
98     @param x: the data to decode
99     @type f: C{int}
100     @param f: the offset in the data to start at
101     @rtype: C{list}, C{int}
102     @return: the bdecoded list, and the offset to read next
103     
104     """
105     
106     r, f = [], f+1
107     while x[f] != 'e':
108         v, f = decode_func[x[f]](x, f)
109         r.append(v)
110     return (r, f + 1)
111
112 def decode_dict(x, f):
113     """Bdecode a dictionary.
114     
115     @type x: C{string}
116     @param x: the data to decode
117     @type f: C{int}
118     @param f: the offset in the data to start at
119     @rtype: C{dictionary}, C{int}
120     @return: the bdecoded dictionary, and the offset to read next
121     @raise BencodeError: if the data is improperly encoded
122     
123     """
124     
125     r, f = {}, f+1
126     lastkey = None
127     while x[f] != 'e':
128         k, f = decode_string(x, f)
129         if lastkey >= k:
130             raise BencodeError, "dictionary keys must be in sorted order"
131         lastkey = k
132         r[k], f = decode_func[x[f]](x, f)
133     return (r, f + 1)
134
135 decode_func = {}
136 decode_func['l'] = decode_list
137 decode_func['d'] = decode_dict
138 decode_func['i'] = decode_int
139 decode_func['0'] = decode_string
140 decode_func['1'] = decode_string
141 decode_func['2'] = decode_string
142 decode_func['3'] = decode_string
143 decode_func['4'] = decode_string
144 decode_func['5'] = decode_string
145 decode_func['6'] = decode_string
146 decode_func['7'] = decode_string
147 decode_func['8'] = decode_string
148 decode_func['9'] = decode_string
149 decode_func['u'] = decode_unicode
150   
151 def bdecode(x, sloppy = 0):
152     """Bdecode a string of data.
153     
154     @type x: C{string}
155     @param x: the data to decode
156     @type sloppy: C{boolean}
157     @param sloppy: whether to allow errors in the decoding
158     @rtype: unknown
159     @return: the bdecoded data
160     @raise BencodeError: if the data is improperly encoded
161     
162     """
163     
164     try:
165         r, l = decode_func[x[0]](x, 0)
166 #    except (IndexError, KeyError):
167     except (IndexError, KeyError, ValueError):
168         raise BencodeError, "bad bencoded data"
169     if not sloppy and l != len(x):
170         raise BencodeError, "bad bencoded data, all could not be decoded"
171     return r
172
173 bencached_marker = []
174
175 class Bencached:
176     """Dummy data structure for storing bencoded data in memory.
177     
178     @type marker: C{list}
179     @ivar marker: mutable type to make sure the data was encoded by this class
180     @type bencoded: C{string}
181     @ivar bencoded: the bencoded data stored in a string
182     
183     """
184     
185     def __init__(self, s):
186         """
187         
188         @type s: C{string}
189         @param s: the new bencoded data to store
190         
191         """
192         
193         self.marker = bencached_marker
194         self.bencoded = s
195
196 BencachedType = type(Bencached('')) # insufficient, but good as a filter
197
198 def encode_bencached(x,r):
199     """Bencode L{Bencached} data.
200     
201     @type x: L{Bencached}
202     @param x: the data to encode
203     @type r: C{list}
204     @param r: the currently bencoded data, to which the bencoding of x
205         will be appended
206     
207     """
208     
209     assert x.marker == bencached_marker
210     r.append(x.bencoded)
211
212 def encode_int(x,r):
213     """Bencode an integer.
214     
215     @type x: C{int}
216     @param x: the data to encode
217     @type r: C{list}
218     @param r: the currently bencoded data, to which the bencoding of x
219         will be appended
220     
221     """
222     
223     r.extend(('i',str(x),'e'))
224
225 def encode_bool(x,r):
226     """Bencode a boolean.
227     
228     @type x: C{boolean}
229     @param x: the data to encode
230     @type r: C{list}
231     @param r: the currently bencoded data, to which the bencoding of x
232         will be appended
233     
234     """
235     
236     encode_int(int(x),r)
237
238 def encode_string(x,r):    
239     """Bencode a string.
240     
241     @type x: C{string}
242     @param x: the data to encode
243     @type r: C{list}
244     @param r: the currently bencoded data, to which the bencoding of x
245         will be appended
246     
247     """
248     
249     r.extend((str(len(x)),':',x))
250
251 def encode_unicode(x,r):
252     """Bencode a unicode string.
253     
254     @type x: C{unicode}
255     @param x: the data to encode
256     @type r: C{list}
257     @param r: the currently bencoded data, to which the bencoding of x
258         will be appended
259     
260     """
261     
262     #r.append('u')
263     encode_string(x.encode('UTF-8'),r)
264
265 def encode_list(x,r):
266     """Bencode a list.
267     
268     @type x: C{list}
269     @param x: the data to encode
270     @type r: C{list}
271     @param r: the currently bencoded data, to which the bencoding of x
272         will be appended
273     
274     """
275     
276     r.append('l')
277     for e in x:
278         encode_func[type(e)](e, r)
279     r.append('e')
280
281 def encode_dict(x,r):
282     """Bencode a dictionary.
283     
284     @type x: C{dictionary}
285     @param x: the data to encode
286     @type r: C{list}
287     @param r: the currently bencoded data, to which the bencoding of x
288         will be appended
289     
290     """
291     
292     r.append('d')
293     ilist = x.items()
294     ilist.sort()
295     for k,v in ilist:
296         r.extend((str(len(k)),':',k))
297         encode_func[type(v)](v, r)
298     r.append('e')
299
300 encode_func = {}
301 encode_func[BencachedType] = encode_bencached
302 encode_func[IntType] = encode_int
303 encode_func[LongType] = encode_int
304 encode_func[StringType] = encode_string
305 encode_func[ListType] = encode_list
306 encode_func[TupleType] = encode_list
307 encode_func[DictType] = encode_dict
308 encode_func[BooleanType] = encode_bool
309 if UnicodeType:
310     encode_func[UnicodeType] = encode_unicode
311     
312 def bencode(x):
313     """Bencode some data.
314     
315     @type x: unknown
316     @param x: the data to encode
317     @rtype: string
318     @return: the bencoded data
319     @raise BencodeError: if the data contains a type that cannot be encoded
320     
321     """
322     r = []
323     try:
324         encode_func[type(x)](x, r)
325     except:
326         raise BencodeError, "failed to bencode the data"
327     return ''.join(r)
328
329 class TestBencode(unittest.TestCase):
330     """Test the bencoding and bdecoding of data."""
331
332     timeout = 2
333
334     def test_bdecode_string(self):
335         self.failUnlessRaises(BencodeError, bdecode, '0:0:')
336         self.failUnlessRaises(BencodeError, bdecode, '')
337         self.failUnlessRaises(BencodeError, bdecode, '35208734823ljdahflajhdf')
338         self.failUnlessRaises(BencodeError, bdecode, '2:abfdjslhfld')
339         self.failUnlessEqual(bdecode('0:'), '')
340         self.failUnlessEqual(bdecode('3:abc'), 'abc')
341         self.failUnlessEqual(bdecode('10:1234567890'), '1234567890')
342         self.failUnlessRaises(BencodeError, bdecode, '02:xy')
343         self.failUnlessRaises(BencodeError, bdecode, '9999:x')
344
345     def test_bdecode_int(self):
346         self.failUnlessRaises(BencodeError, bdecode, 'ie')
347         self.failUnlessRaises(BencodeError, bdecode, 'i341foo382e')
348         self.failUnlessEqual(bdecode('i4e'), 4L)
349         self.failUnlessEqual(bdecode('i0e'), 0L)
350         self.failUnlessEqual(bdecode('i123456789e'), 123456789L)
351         self.failUnlessEqual(bdecode('i-10e'), -10L)
352         self.failUnlessRaises(BencodeError, bdecode, 'i-0e')
353         self.failUnlessRaises(BencodeError, bdecode, 'i123')
354         self.failUnlessRaises(BencodeError, bdecode, 'i6easd')
355         self.failUnlessRaises(BencodeError, bdecode, 'i03e')
356
357     def test_bdecode_list(self):
358         self.failUnlessRaises(BencodeError, bdecode, 'l')
359         self.failUnlessEqual(bdecode('le'), [])
360         self.failUnlessRaises(BencodeError, bdecode, 'leanfdldjfh')
361         self.failUnlessEqual(bdecode('l0:0:0:e'), ['', '', ''])
362         self.failUnlessRaises(BencodeError, bdecode, 'relwjhrlewjh')
363         self.failUnlessEqual(bdecode('li1ei2ei3ee'), [1, 2, 3])
364         self.failUnlessEqual(bdecode('l3:asd2:xye'), ['asd', 'xy'])
365         self.failUnlessEqual(bdecode('ll5:Alice3:Bobeli2ei3eee'), [['Alice', 'Bob'], [2, 3]])
366         self.failUnlessRaises(BencodeError, bdecode, 'l01:ae')
367         self.failUnlessRaises(BencodeError, bdecode, 'l0:')
368
369     def test_bdecode_dict(self):
370         self.failUnlessRaises(BencodeError, bdecode, 'd')
371         self.failUnlessRaises(BencodeError, bdecode, 'defoobar')
372         self.failUnlessEqual(bdecode('de'), {})
373         self.failUnlessEqual(bdecode('d3:agei25e4:eyes4:bluee'), {'age': 25, 'eyes': 'blue'})
374         self.failUnlessEqual(bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee'),
375                              {'spam.mp3': {'author': 'Alice', 'length': 100000}})
376         self.failUnlessRaises(BencodeError, bdecode, 'd3:fooe')
377         self.failUnlessRaises(BencodeError, bdecode, 'di1e0:e')
378         self.failUnlessRaises(BencodeError, bdecode, 'd1:b0:1:a0:e')
379         self.failUnlessRaises(BencodeError, bdecode, 'd1:a0:1:a0:e')
380         self.failUnlessRaises(BencodeError, bdecode, 'd0:0:')
381         self.failUnlessRaises(BencodeError, bdecode, 'd0:')
382
383     def test_bdecode_unicode(self):
384         self.failUnlessRaises(BencodeError, bdecode, 'u0:0:')
385         self.failUnlessRaises(BencodeError, bdecode, 'u')
386         self.failUnlessRaises(BencodeError, bdecode, 'u35208734823ljdahflajhdf')
387         self.failUnlessRaises(BencodeError, bdecode, 'u2:abfdjslhfld')
388         self.failUnlessEqual(bdecode('u0:'), '')
389         self.failUnlessEqual(bdecode('u3:abc'), 'abc')
390         self.failUnlessEqual(bdecode('u10:1234567890'), '1234567890')
391         self.failUnlessRaises(BencodeError, bdecode, 'u02:xy')
392         self.failUnlessRaises(BencodeError, bdecode, 'u9999:x')
393
394     def test_bencode_int(self):
395         self.failUnlessEqual(bencode(4), 'i4e')
396         self.failUnlessEqual(bencode(0), 'i0e')
397         self.failUnlessEqual(bencode(-10), 'i-10e')
398         self.failUnlessEqual(bencode(12345678901234567890L), 'i12345678901234567890e')
399
400     def test_bencode_string(self):
401         self.failUnlessEqual(bencode(''), '0:')
402         self.failUnlessEqual(bencode('abc'), '3:abc')
403         self.failUnlessEqual(bencode('1234567890'), '10:1234567890')
404
405     def test_bencode_list(self):
406         self.failUnlessEqual(bencode([]), 'le')
407         self.failUnlessEqual(bencode([1, 2, 3]), 'li1ei2ei3ee')
408         self.failUnlessEqual(bencode([['Alice', 'Bob'], [2, 3]]), 'll5:Alice3:Bobeli2ei3eee')
409
410     def test_bencode_dict(self):
411         self.failUnlessEqual(bencode({}), 'de')
412         self.failUnlessEqual(bencode({'age': 25, 'eyes': 'blue'}), 'd3:agei25e4:eyes4:bluee')
413         self.failUnlessEqual(bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}), 
414                              'd8:spam.mp3d6:author5:Alice6:lengthi100000eee')
415         self.failUnlessRaises(BencodeError, bencode, {1: 'foo'})
416
417     def test_bencode_unicode(self):
418         self.failUnlessEqual(bencode(u''), '0:')
419         self.failUnlessEqual(bencode(u'abc'), '3:abc')
420         self.failUnlessEqual(bencode(u'1234567890'), '10:1234567890')
421
422     def test_bool(self):
423         self.failUnless(bdecode(bencode(True)))
424         self.failIf(bdecode(bencode(False)))
425
426     if UnicodeType == None:
427         test_bencode_unicode.skip = "Python was not compiled with unicode support"
428         test_bdecode_unicode.skip = "Python was not compiled with unicode support"