Changed bencode's test functions to trial unittests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / bencode.py
1
2 """Functions for bencoding and bdecoding data.
3
4 @type decode_func: C{dictionary} of C{function}
5 @var decode_func: a dictionary of function calls to be made, based on data,
6     the keys are the first character of the data and the value is the
7     function to use to decode that data
8 @type bencached_marker: C{list}
9 @var bencached_marker: mutable type to ensure class origination
10 @type encode_func: C{dictionary} of C{function}
11 @var encode_func: a dictionary of function calls to be made, based on data,
12     the keys are the type of the data and the value is the
13     function to use to encode that data
14 @type BencachedType: C{type}
15 @var BencachedType: the L{Bencached} type
16 """
17
18 from types import IntType, LongType, StringType, ListType, TupleType, DictType, BooleanType
19 try:
20     from types import UnicodeType
21 except ImportError:
22     UnicodeType = None
23
24 from twisted.python import log
25 from twisted.trial import unittest
26
27 def decode_int(x, f):
28     """Bdecode an integer.
29     
30     @type x: C{string}
31     @param x: the data to decode
32     @type f: C{int}
33     @param f: the offset in the data to start at
34     @rtype: C{int}, C{int}
35     @return: the bdecoded integer, and the offset to read next
36     @raise ValueError: if the data is improperly encoded
37     
38     """
39     
40     f += 1
41     newf = x.index('e', f)
42     try:
43         n = int(x[f:newf])
44     except:
45         n = long(x[f:newf])
46     if x[f] == '-':
47         if x[f + 1] == '0':
48             raise ValueError
49     elif x[f] == '0' and newf != f+1:
50         raise ValueError
51     return (n, newf+1)
52   
53 def decode_string(x, f):
54     """Bdecode a string.
55     
56     @type x: C{string}
57     @param x: the data to decode
58     @type f: C{int}
59     @param f: the offset in the data to start at
60     @rtype: C{string}, C{int}
61     @return: the bdecoded string, and the offset to read next
62     @raise ValueError: if the data is improperly encoded
63     
64     """
65     
66     colon = x.index(':', f)
67     try:
68         n = int(x[f:colon])
69     except (OverflowError, ValueError):
70         n = long(x[f:colon])
71     if x[f] == '0' and colon != f+1:
72         raise ValueError
73     colon += 1
74     return (x[colon:colon+n], colon+n)
75
76 def decode_unicode(x, f):
77     """Bdecode a unicode string.
78     
79     @type x: C{string}
80     @param x: the data to decode
81     @type f: C{int}
82     @param f: the offset in the data to start at
83     @rtype: C{int}, C{int}
84     @return: the bdecoded unicode string, and the offset to read next
85     
86     """
87     
88     s, f = decode_string(x, f+1)
89     return (s.decode('UTF-8'),f)
90
91 def decode_list(x, f):
92     """Bdecode a list.
93     
94     @type x: C{string}
95     @param x: the data to decode
96     @type f: C{int}
97     @param f: the offset in the data to start at
98     @rtype: C{list}, C{int}
99     @return: the bdecoded list, and the offset to read next
100     
101     """
102     
103     r, f = [], f+1
104     while x[f] != 'e':
105         v, f = decode_func[x[f]](x, f)
106         r.append(v)
107     return (r, f + 1)
108
109 def decode_dict(x, f):
110     """Bdecode a dictionary.
111     
112     @type x: C{string}
113     @param x: the data to decode
114     @type f: C{int}
115     @param f: the offset in the data to start at
116     @rtype: C{dictionary}, C{int}
117     @return: the bdecoded dictionary, and the offset to read next
118     @raise ValueError: if the data is improperly encoded
119     
120     """
121     
122     r, f = {}, f+1
123     lastkey = None
124     while x[f] != 'e':
125         k, f = decode_string(x, f)
126         if lastkey >= k:
127             raise ValueError
128         lastkey = k
129         r[k], f = decode_func[x[f]](x, f)
130     return (r, f + 1)
131
132 decode_func = {}
133 decode_func['l'] = decode_list
134 decode_func['d'] = decode_dict
135 decode_func['i'] = decode_int
136 decode_func['0'] = decode_string
137 decode_func['1'] = decode_string
138 decode_func['2'] = decode_string
139 decode_func['3'] = decode_string
140 decode_func['4'] = decode_string
141 decode_func['5'] = decode_string
142 decode_func['6'] = decode_string
143 decode_func['7'] = decode_string
144 decode_func['8'] = decode_string
145 decode_func['9'] = decode_string
146 decode_func['u'] = decode_unicode
147   
148 def bdecode(x, sloppy = 0):
149     """Bdecode a string of data.
150     
151     @type x: C{string}
152     @param x: the data to decode
153     @type sloppy: C{boolean}
154     @param sloppy: whether to allow errors in the decoding
155     @rtype: unknown
156     @return: the bdecoded data
157     @raise ValueError: if the data is improperly encoded
158     
159     """
160     
161     try:
162         r, l = decode_func[x[0]](x, 0)
163 #    except (IndexError, KeyError):
164     except (IndexError, KeyError, ValueError):
165         raise ValueError, "bad bencoded data"
166     if not sloppy and l != len(x):
167         raise ValueError, "bad bencoded data"
168     return r
169
170 bencached_marker = []
171
172 class Bencached:
173     """Dummy data structure for storing bencoded data in memory.
174     
175     @type marker: C{list}
176     @ivar marker: mutable type to make sure the data was encoded by this class
177     @type bencoded: C{string}
178     @ivar bencoded: the bencoded data stored in a string
179     
180     """
181     
182     def __init__(self, s):
183         """
184         
185         @type s: C{string}
186         @param s: the new bencoded data to store
187         
188         """
189         
190         self.marker = bencached_marker
191         self.bencoded = s
192
193 BencachedType = type(Bencached('')) # insufficient, but good as a filter
194
195 def encode_bencached(x,r):
196     """Bencode L{Bencached} data.
197     
198     @type x: L{Bencached}
199     @param x: the data to encode
200     @type r: C{list}
201     @param r: the currently bencoded data, to which the bencoding of x
202         will be appended
203     
204     """
205     
206     assert x.marker == bencached_marker
207     r.append(x.bencoded)
208
209 def encode_int(x,r):
210     """Bencode an integer.
211     
212     @type x: C{int}
213     @param x: the data to encode
214     @type r: C{list}
215     @param r: the currently bencoded data, to which the bencoding of x
216         will be appended
217     
218     """
219     
220     r.extend(('i',str(x),'e'))
221
222 def encode_bool(x,r):
223     """Bencode a boolean.
224     
225     @type x: C{boolean}
226     @param x: the data to encode
227     @type r: C{list}
228     @param r: the currently bencoded data, to which the bencoding of x
229         will be appended
230     
231     """
232     
233     encode_int(int(x),r)
234
235 def encode_string(x,r):    
236     """Bencode a string.
237     
238     @type x: C{string}
239     @param x: the data to encode
240     @type r: C{list}
241     @param r: the currently bencoded data, to which the bencoding of x
242         will be appended
243     
244     """
245     
246     r.extend((str(len(x)),':',x))
247
248 def encode_unicode(x,r):
249     """Bencode a unicode string.
250     
251     @type x: C{unicode}
252     @param x: the data to encode
253     @type r: C{list}
254     @param r: the currently bencoded data, to which the bencoding of x
255         will be appended
256     
257     """
258     
259     #r.append('u')
260     encode_string(x.encode('UTF-8'),r)
261
262 def encode_list(x,r):
263     """Bencode a list.
264     
265     @type x: C{list}
266     @param x: the data to encode
267     @type r: C{list}
268     @param r: the currently bencoded data, to which the bencoding of x
269         will be appended
270     
271     """
272     
273     r.append('l')
274     for e in x:
275         encode_func[type(e)](e, r)
276     r.append('e')
277
278 def encode_dict(x,r):
279     """Bencode a dictionary.
280     
281     @type x: C{dictionary}
282     @param x: the data to encode
283     @type r: C{list}
284     @param r: the currently bencoded data, to which the bencoding of x
285         will be appended
286     
287     """
288     
289     r.append('d')
290     ilist = x.items()
291     ilist.sort()
292     for k,v in ilist:
293         r.extend((str(len(k)),':',k))
294         encode_func[type(v)](v, r)
295     r.append('e')
296
297 encode_func = {}
298 encode_func[BencachedType] = encode_bencached
299 encode_func[IntType] = encode_int
300 encode_func[LongType] = encode_int
301 encode_func[StringType] = encode_string
302 encode_func[ListType] = encode_list
303 encode_func[TupleType] = encode_list
304 encode_func[DictType] = encode_dict
305 encode_func[BooleanType] = encode_bool
306 if UnicodeType:
307     encode_func[UnicodeType] = encode_unicode
308     
309 def bencode(x):
310     """Bencode some data.
311     
312     @type x: unknown
313     @param x: the data to encode
314     @rtype: string
315     @return: the bencoded data
316     @raise ValueError: if the data contains a type that cannot be encoded
317     
318     """
319     r = []
320     try:
321         encode_func[type(x)](x, r)
322     except:
323         raise ValueError, "failed to bencode the data"
324     return ''.join(r)
325
326 class TestBencode(unittest.TestCase):
327     """Test the bencoding and bdecoding of data."""
328
329     timeout = 2
330
331     def test_bdecode_string(self):
332         self.failUnlessRaises(ValueError, bdecode, '0:0:')
333         self.failUnlessRaises(ValueError, bdecode, '')
334         self.failUnlessRaises(ValueError, bdecode, '35208734823ljdahflajhdf')
335         self.failUnlessRaises(ValueError, bdecode, '2:abfdjslhfld')
336         self.failUnlessEqual(bdecode('0:'), '')
337         self.failUnlessEqual(bdecode('3:abc'), 'abc')
338         self.failUnlessEqual(bdecode('10:1234567890'), '1234567890')
339         self.failUnlessRaises(ValueError, bdecode, '02:xy')
340         self.failUnlessRaises(ValueError, bdecode, '9999:x')
341
342     def test_bdecode_int(self):
343         self.failUnlessRaises(ValueError, bdecode, 'ie')
344         self.failUnlessRaises(ValueError, bdecode, 'i341foo382e')
345         self.failUnlessEqual(bdecode('i4e'), 4L)
346         self.failUnlessEqual(bdecode('i0e'), 0L)
347         self.failUnlessEqual(bdecode('i123456789e'), 123456789L)
348         self.failUnlessEqual(bdecode('i-10e'), -10L)
349         self.failUnlessRaises(ValueError, bdecode, 'i-0e')
350         self.failUnlessRaises(ValueError, bdecode, 'i123')
351         self.failUnlessRaises(ValueError, bdecode, 'i6easd')
352         self.failUnlessRaises(ValueError, bdecode, 'i03e')
353
354     def test_bdecode_list(self):
355         self.failUnlessRaises(ValueError, bdecode, 'l')
356         self.failUnlessEqual(bdecode('le'), [])
357         self.failUnlessRaises(ValueError, bdecode, 'leanfdldjfh')
358         self.failUnlessEqual(bdecode('l0:0:0:e'), ['', '', ''])
359         self.failUnlessRaises(ValueError, bdecode, 'relwjhrlewjh')
360         self.failUnlessEqual(bdecode('li1ei2ei3ee'), [1, 2, 3])
361         self.failUnlessEqual(bdecode('l3:asd2:xye'), ['asd', 'xy'])
362         self.failUnlessEqual(bdecode('ll5:Alice3:Bobeli2ei3eee'), [['Alice', 'Bob'], [2, 3]])
363         self.failUnlessRaises(ValueError, bdecode, 'l01:ae')
364         self.failUnlessRaises(ValueError, bdecode, 'l0:')
365
366     def test_bdecode_dict(self):
367         self.failUnlessRaises(ValueError, bdecode, 'd')
368         self.failUnlessRaises(ValueError, bdecode, 'defoobar')
369         self.failUnlessEqual(bdecode('de'), {})
370         self.failUnlessEqual(bdecode('d3:agei25e4:eyes4:bluee'), {'age': 25, 'eyes': 'blue'})
371         self.failUnlessEqual(bdecode('d8:spam.mp3d6:author5:Alice6:lengthi100000eee'),
372                              {'spam.mp3': {'author': 'Alice', 'length': 100000}})
373         self.failUnlessRaises(ValueError, bdecode, 'd3:fooe')
374         self.failUnlessRaises(ValueError, bdecode, 'di1e0:e')
375         self.failUnlessRaises(ValueError, bdecode, 'd1:b0:1:a0:e')
376         self.failUnlessRaises(ValueError, bdecode, 'd1:a0:1:a0:e')
377         self.failUnlessRaises(ValueError, bdecode, 'd0:0:')
378         self.failUnlessRaises(ValueError, bdecode, 'd0:')
379
380     def test_bencode_int(self):
381         self.failUnlessEqual(bencode(4), 'i4e')
382         self.failUnlessEqual(bencode(0), 'i0e')
383         self.failUnlessEqual(bencode(-10), 'i-10e')
384         self.failUnlessEqual(bencode(12345678901234567890L), 'i12345678901234567890e')
385
386     def test_bencode_string(self):
387         self.failUnlessEqual(bencode(''), '0:')
388         self.failUnlessEqual(bencode('abc'), '3:abc')
389         self.failUnlessEqual(bencode('1234567890'), '10:1234567890')
390
391     def test_bencode_list(self):
392         self.failUnlessEqual(bencode([]), 'le')
393         self.failUnlessEqual(bencode([1, 2, 3]), 'li1ei2ei3ee')
394         self.failUnlessEqual(bencode([['Alice', 'Bob'], [2, 3]]), 'll5:Alice3:Bobeli2ei3eee')
395
396     def test_bencode_dict(self):
397         self.failUnlessEqual(bencode({}), 'de')
398         self.failUnlessEqual(bencode({'age': 25, 'eyes': 'blue'}), 'd3:agei25e4:eyes4:bluee')
399         self.failUnlessEqual(bencode({'spam.mp3': {'author': 'Alice', 'length': 100000}}), 
400                              'd8:spam.mp3d6:author5:Alice6:lengthi100000eee')
401         self.failUnlessRaises(ValueError, bencode, {1: 'foo'})