More work on the TODO.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / bencode.py
1
2 """Functions for bencoding and bdecoding data.
3
4 @type decode_func: C{dictionary} of C{function}
5 @var decode_func: a dictionary of function calls to be made, based on data,
6     the keys are the first character of the data and the value is the
7     function to use to decode that data
8 @type bencached_marker: C{list}
9 @var bencached_marker: mutable type to ensure class origination
10 @type encode_func: C{dictionary} of C{function}
11 @var encode_func: a dictionary of function calls to be made, based on data,
12     the keys are the type of the data and the value is the
13     function to use to encode that data
14 @type BencachedType: C{type}
15 @var BencachedType: the L{Bencached} type
16 """
17
18 from types import IntType, LongType, StringType, ListType, TupleType, DictType, BooleanType
19 try:
20     from types import UnicodeType
21 except ImportError:
22     UnicodeType = None
23 from datetime import datetime
24 import time
25
26 from twisted.python import log
27 from twisted.trial import unittest
28
29 class BencodeError(ValueError):
30     pass
31
32 def decode_int(x, f):
33     """Bdecode an integer.
34     
35     @type x: C{string}
36     @param x: the data to decode
37     @type f: C{int}
38     @param f: the offset in the data to start at
39     @rtype: C{int}, C{int}
40     @return: the bdecoded integer, and the offset to read next
41     @raise BencodeError: if the data is improperly encoded
42     
43     """
44     
45     f += 1
46     newf = x.index('e', f)
47     try:
48         n = int(x[f:newf])
49     except:
50         n = long(x[f:newf])
51     if x[f] == '-':
52         if x[f + 1] == '0':
53             raise BencodeError, "integer has a leading zero after a negative sign"
54     elif x[f] == '0' and newf != f+1:
55         raise BencodeError, "integer has a leading zero"
56     return (n, newf+1)
57   
58 def decode_string(x, f):
59     """Bdecode a string.
60     
61     @type x: C{string}
62     @param x: the data to decode
63     @type f: C{int}
64     @param f: the offset in the data to start at
65     @rtype: C{string}, C{int}
66     @return: the bdecoded string, and the offset to read next
67     @raise BencodeError: if the data is improperly encoded
68     
69     """
70     
71     colon = x.index(':', f)
72     try:
73         n = int(x[f:colon])
74     except (OverflowError, ValueError):
75         n = long(x[f:colon])
76     if x[f] == '0' and colon != f+1:
77         raise BencodeError, "string length has a leading zero"
78     colon += 1
79     return (x[colon:colon+n], colon+n)
80
81 def decode_unicode(x, f):
82     """Bdecode a unicode string.
83     
84     @type x: C{string}
85     @param x: the data to decode
86     @type f: C{int}
87     @param f: the offset in the data to start at
88     @rtype: C{int}, C{int}
89     @return: the bdecoded unicode string, and the offset to read next
90     
91     """
92     
93     s, f = decode_string(x, f+1)
94     return (s.decode('UTF-8'),f)
95
96 def decode_datetime(x, f):
97     """Bdecode a datetime value.
98     
99     @type x: C{string}
100     @param x: the data to decode
101     @type f: C{int}
102     @param f: the offset in the data to start at
103     @rtype: C{datetime.datetime}, C{int}
104     @return: the bdecoded integer, and the offset to read next
105     @raise BencodeError: if the data is improperly encoded
106     
107     """
108     
109     f += 1
110     newf = x.index('e', f)
111     try:
112         date = datetime(*(time.strptime(x[f:newf], '%Y-%m-%dT%H:%M:%S')[0:6]))
113     except:
114         raise BencodeError, "datetime value could not be decoded: %s" % x[f:newf]
115     return (date, newf+1)
116   
117 def decode_list(x, f):
118     """Bdecode a list.
119     
120     @type x: C{string}
121     @param x: the data to decode
122     @type f: C{int}
123     @param f: the offset in the data to start at
124     @rtype: C{list}, C{int}
125     @return: the bdecoded list, and the offset to read next
126     
127     """
128     
129     r, f = [], f+1
130     while x[f] != 'e':
131         v, f = decode_func[x[f]](x, f)
132         r.append(v)
133     return (r, f + 1)
134
135 def decode_dict(x, f):
136     """Bdecode a dictionary.
137     
138     @type x: C{string}
139     @param x: the data to decode
140     @type f: C{int}
141     @param f: the offset in the data to start at
142     @rtype: C{dictionary}, C{int}
143     @return: the bdecoded dictionary, and the offset to read next
144     @raise BencodeError: if the data is improperly encoded
145     
146     """
147     
148     r, f = {}, f+1
149     lastkey = None
150     while x[f] != 'e':
151         k, f = decode_string(x, f)
152         if lastkey >= k:
153             raise BencodeError, "dictionary keys must be in sorted order"
154         lastkey = k
155         r[k], f = decode_func[x[f]](x, f)
156     return (r, f + 1)
157
158 decode_func = {}
159 decode_func['l'] = decode_list
160 decode_func['d'] = decode_dict
161 decode_func['i'] = decode_int
162 decode_func['0'] = decode_string
163 decode_func['1'] = decode_string
164 decode_func['2'] = decode_string
165 decode_func['3'] = decode_string
166 decode_func['4'] = decode_string
167 decode_func['5'] = decode_string
168 decode_func['6'] = decode_string
169 decode_func['7'] = decode_string
170 decode_func['8'] = decode_string
171 decode_func['9'] = decode_string
172 decode_func['u'] = decode_unicode
173 decode_func['t'] = decode_datetime
174   
175 def bdecode(x, sloppy = 0):
176     """Bdecode a string of data.
177     
178     @type x: C{string}
179     @param x: the data to decode
180     @type sloppy: C{boolean}
181     @param sloppy: whether to allow errors in the decoding
182     @rtype: unknown
183     @return: the bdecoded data
184     @raise BencodeError: if the data is improperly encoded
185     
186     """
187     
188     try:
189         r, l = decode_func[x[0]](x, 0)
190 #    except (IndexError, KeyError):
191     except (IndexError, KeyError, ValueError):
192         raise BencodeError, "bad bencoded data"
193     if not sloppy and l != len(x):
194         raise BencodeError, "bad bencoded data, all could not be decoded"
195     return r
196
197 bencached_marker = []
198
199 class Bencached:
200     """Dummy data structure for storing bencoded data in memory.
201     
202     @type marker: C{list}
203     @ivar marker: mutable type to make sure the data was encoded by this class
204     @type bencoded: C{string}
205     @ivar bencoded: the bencoded data stored in a string
206     
207     """
208     
209     def __init__(self, s):
210         """
211         
212         @type s: C{string}
213         @param s: the new bencoded data to store
214         
215         """
216         
217         self.marker = bencached_marker
218         self.bencoded = s
219
220 BencachedType = type(Bencached('')) # insufficient, but good as a filter
221
222 def encode_bencached(x,r):
223     """Bencode L{Bencached} data.
224     
225     @type x: L{Bencached}
226     @param x: the data to encode
227     @type r: C{list}
228     @param r: the currently bencoded data, to which the bencoding of x
229         will be appended
230     
231     """
232     
233     assert x.marker == bencached_marker
234     r.append(x.bencoded)
235
236 def encode_int(x,r):
237     """Bencode an integer.
238     
239     @type x: C{int}
240     @param x: the data to encode
241     @type r: C{list}
242     @param r: the currently bencoded data, to which the bencoding of x
243         will be appended
244     
245     """
246     
247     r.extend(('i',str(x),'e'))
248
249 def encode_bool(x,r):
250     """Bencode a boolean.
251     
252     @type x: C{boolean}
253     @param x: the data to encode
254     @type r: C{list}
255     @param r: the currently bencoded data, to which the bencoding of x
256         will be appended
257     
258     """
259     
260     encode_int(int(x),r)
261
262 def encode_string(x,r):    
263     """Bencode a string.
264     
265     @type x: C{string}
266     @param x: the data to encode
267     @type r: C{list}
268     @param r: the currently bencoded data, to which the bencoding of x
269         will be appended
270     
271     """
272     
273     r.extend((str(len(x)),':',x))
274
275 def encode_unicode(x,r):
276     """Bencode a unicode string.
277     
278     @type x: C{unicode}
279     @param x: the data to encode
280     @type r: C{list}
281     @param r: the currently bencoded data, to which the bencoding of x
282         will be appended
283     
284     """
285     
286     #r.append('u')
287     encode_string(x.encode('UTF-8'),r)
288
289 def encode_datetime(x,r):
290     """Bencode a datetime value in UTC.
291     
292     If the datetime object has time zone info, it is converted to UTC time.
293     Otherwise it is assumed that the time is already in UTC time.
294     Microseconds are removed.
295     
296     @type x: C{datetime.datetime}
297     @param x: the data to encode
298     @type r: C{list}
299     @param r: the currently bencoded data, to which the bencoding of x
300         will be appended
301     
302     """
303     
304     date = x.replace(microsecond = 0)
305     offset = date.utcoffset()
306     if offset is not None:
307         utcdate = date.replace(tzinfo = None) + offset
308     else:
309         utcdate = date
310     r.extend(('t',utcdate.isoformat(),'e'))
311
312 def encode_list(x,r):
313     """Bencode a list.
314     
315     @type x: C{list}
316     @param x: the data to encode
317     @type r: C{list}
318     @param r: the currently bencoded data, to which the bencoding of x
319         will be appended
320     
321     """
322     
323     r.append('l')
324     for e in x:
325         encode_func[type(e)](e, r)
326     r.append('e')
327
328 def encode_dict(x,r):
329     """Bencode a dictionary.
330     
331     @type x: C{dictionary}
332     @param x: the data to encode
333     @type r: C{list}
334     @param r: the currently bencoded data, to which the bencoding of x
335         will be appended
336     
337     """
338     
339     r.append('d')
340     ilist = x.items()
341     ilist.sort()
342     for k,v in ilist:
343         r.extend((str(len(k)),':',k))
344         encode_func[type(v)](v, r)
345     r.append('e')
346
347 encode_func = {}
348 encode_func[BencachedType] = encode_bencached
349 encode_func[IntType] = encode_int
350 encode_func[LongType] = encode_int
351 encode_func[StringType] = encode_string
352 encode_func[ListType] = encode_list
353 encode_func[TupleType] = encode_list
354 encode_func[DictType] = encode_dict
355 encode_func[BooleanType] = encode_bool
356 encode_func[datetime] = encode_datetime
357 if UnicodeType:
358     encode_func[UnicodeType] = encode_unicode
359     
360 def bencode(x):
361     """Bencode some data.
362     
363     @type x: unknown
364     @param x: the data to encode
365     @rtype: string
366     @return: the bencoded data
367     @raise BencodeError: if the data contains a type that cannot be encoded
368     
369     """
370     r = []
371     try:
372         encode_func[type(x)](x, r)
373     except:
374         raise BencodeError, "failed to bencode the data"
375     return ''.join(r)
376
377 class TestBencode(unittest.TestCase):
378     """Test the bencoding and bdecoding of data."""
379
380     timeout = 2
381
382     def test_bdecode_string(self):
383         self.failUnlessRaises(BencodeError, bdecode, '0:0:')
384         self.failUnlessRaises(BencodeError, bdecode, '')
385         self.failUnlessRaises(BencodeError, bdecode, '35208734823ljdahflajhdf')
386         self.failUnlessRaises(BencodeError, bdecode, '2:abfdjslhfld')
387         self.failUnlessEqual(bdecode('0:'), '')
388         self.failUnlessEqual(bdecode('3:abc'), 'abc')
389         self.failUnlessEqual(bdecode('10:1234567890'), '1234567890')
390         self.failUnlessRaises(BencodeError, bdecode, '02:xy')
391         self.failUnlessRaises(BencodeError, bdecode, '9999:x')
392
393     def test_bdecode_int(self):
394         self.failUnlessRaises(BencodeError, bdecode, 'ie')
395         self.failUnlessRaises(BencodeError, bdecode, 'i341foo382e')
396         self.failUnlessEqual(bdecode('i4e'), 4L)
397         self.failUnlessEqual(bdecode('i0e'), 0L)
398         self.failUnlessEqual(bdecode('i123456789e'), 123456789L)
399         self.failUnlessEqual(bdecode('i-10e'), -10L)
400         self.failUnlessRaises(BencodeError, bdecode, 'i-0e')
401         self.failUnlessRaises(BencodeError, bdecode, 'i123')
402         self.failUnlessRaises(BencodeError, bdecode, 'i6easd')
403         self.failUnlessRaises(BencodeError, bdecode, 'i03e')
404
405     def test_bdecode_list(self):
406         self.failUnlessRaises(BencodeError, bdecode, 'l')
407         self.failUnlessEqual(bdecode('le'), [])
408         self.failUnlessRaises(BencodeError, bdecode, 'leanfdldjfh')
409         self.failUnlessEqual(bdecode('l0:0:0:e'), ['', '', ''])
410         self.failUnlessRaises(BencodeError, bdecode, 'relwjhrlewjh')
411         self.failUnlessEqual(bdecode('li1ei2ei3ee'), [1, 2, 3])
412         self.failUnlessEqual(bdecode('l3:asd2:xye'), ['asd', 'xy'])
413         self.failUnlessEqual(bdecode('ll5:Alice3:Bobeli2ei3eee'), [['Alice', 'Bob'], [2, 3]])
414         self.failUnlessRaises(BencodeError, bdecode, 'l01:ae')
415         self.failUnlessRaises(BencodeError, bdecode, 'l0:')
416
417     def test_bdecode_dict(self):
418         self.failUnlessRaises(BencodeError, bdecode, 'd')
419         self.failUnlessRaises(BencodeError, bdecode, 'defoobar')
420         self.failUnlessEqual(bdecode('de'), {})
421         self.failUnlessEqual(bdecode('d3:agei25e4:eyes4:bluee'), {'age': 25, 'eyes': 'blue'})
422         self.failUnlessEqual(bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee'),
423                              {'spam.mp3': {'author': 'Alice', 'length': 100000}})
424         self.failUnlessRaises(BencodeError, bdecode, 'd3:fooe')
425         self.failUnlessRaises(BencodeError, bdecode, 'di1e0:e')
426         self.failUnlessRaises(BencodeError, bdecode, 'd1:b0:1:a0:e')
427         self.failUnlessRaises(BencodeError, bdecode, 'd1:a0:1:a0:e')
428         self.failUnlessRaises(BencodeError, bdecode, 'd0:0:')
429         self.failUnlessRaises(BencodeError, bdecode, 'd0:')
430
431     def test_bdecode_unicode(self):
432         self.failUnlessRaises(BencodeError, bdecode, 'u0:0:')
433         self.failUnlessRaises(BencodeError, bdecode, 'u')
434         self.failUnlessRaises(BencodeError, bdecode, 'u35208734823ljdahflajhdf')
435         self.failUnlessRaises(BencodeError, bdecode, 'u2:abfdjslhfld')
436         self.failUnlessEqual(bdecode('u0:'), '')
437         self.failUnlessEqual(bdecode('u3:abc'), 'abc')
438         self.failUnlessEqual(bdecode('u10:1234567890'), '1234567890')
439         self.failUnlessRaises(BencodeError, bdecode, 'u02:xy')
440         self.failUnlessRaises(BencodeError, bdecode, 'u9999:x')
441
442     def test_bencode_int(self):
443         self.failUnlessEqual(bencode(4), 'i4e')
444         self.failUnlessEqual(bencode(0), 'i0e')
445         self.failUnlessEqual(bencode(-10), 'i-10e')
446         self.failUnlessEqual(bencode(12345678901234567890L), 'i12345678901234567890e')
447
448     def test_bencode_string(self):
449         self.failUnlessEqual(bencode(''), '0:')
450         self.failUnlessEqual(bencode('abc'), '3:abc')
451         self.failUnlessEqual(bencode('1234567890'), '10:1234567890')
452
453     def test_bencode_list(self):
454         self.failUnlessEqual(bencode([]), 'le')
455         self.failUnlessEqual(bencode([1, 2, 3]), 'li1ei2ei3ee')
456         self.failUnlessEqual(bencode([['Alice', 'Bob'], [2, 3]]), 'll5:Alice3:Bobeli2ei3eee')
457
458     def test_bencode_dict(self):
459         self.failUnlessEqual(bencode({}), 'de')
460         self.failUnlessEqual(bencode({'age': 25, 'eyes': 'blue'}), 'd3:agei25e4:eyes4:bluee')
461         self.failUnlessEqual(bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}), 
462                              'd8:spam.mp3d6:author5:Alice6:lengthi100000eee')
463         self.failUnlessRaises(BencodeError, bencode, {1: 'foo'})
464
465     def test_bencode_unicode(self):
466         self.failUnlessEqual(bencode(u''), '0:')
467         self.failUnlessEqual(bencode(u'abc'), '3:abc')
468         self.failUnlessEqual(bencode(u'1234567890'), '10:1234567890')
469
470     def test_bool(self):
471         self.failUnless(bdecode(bencode(True)))
472         self.failIf(bdecode(bencode(False)))
473
474     def test_datetime(self):
475         date = datetime.utcnow()
476         self.failUnlessEqual(bdecode(bencode(date)), date.replace(microsecond = 0))
477
478     if UnicodeType == None:
479         test_bencode_unicode.skip = "Python was not compiled with unicode support"
480         test_bdecode_unicode.skip = "Python was not compiled with unicode support"