1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import logging
from typing import Union, Tuple, Dict
from . import abstract
logging.basicConfig(level=logging.ERROR)
class TorrentParser(abstract.AbstractParser):
mimetypes = {'application/x-bittorrent', }
whitelist = {b'announce', b'announce-list', b'info'}
def __init__(self, filename):
super().__init__(filename)
with open(self.filename, 'rb') as f:
self.dict_repr = _BencodeHandler().bdecode(f.read())
if self.dict_repr is None:
raise ValueError
def get_meta(self) -> Dict[str, str]:
metadata = {}
for key, value in self.dict_repr.items():
if key not in self.whitelist:
metadata[key.decode('utf-8')] = value
return metadata
def remove_all(self) -> bool:
cleaned = dict()
for key, value in self.dict_repr.items():
if key in self.whitelist:
cleaned[key] = value
with open(self.output_filename, 'wb') as f:
f.write(_BencodeHandler().bencode(cleaned))
self.dict_repr = cleaned # since we're stateful
return True
class _BencodeHandler(object):
"""
Since bencode isn't that hard to parse,
MAT2 comes with its own parser, based on the spec
https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
"""
def __init__(self):
self.__decode_func = {
ord('d'): self.__decode_dict,
ord('i'): self.__decode_int,
ord('l'): self.__decode_list,
}
for i in range(0, 10):
self.__decode_func[ord(str(i))] = self.__decode_string
self.__encode_func = {
bytes: self.__encode_string,
dict: self.__encode_dict,
int: self.__encode_int,
list: self.__encode_list,
}
@staticmethod
def __decode_int(s: bytes) -> Tuple[int, bytes]:
s = s[1:]
next_idx = s.index(b'e')
if s.startswith(b'-0'):
raise ValueError # negative zero doesn't exist
elif s.startswith(b'0') and next_idx != 1:
raise ValueError # no leading zero except for zero itself
return int(s[:next_idx]), s[next_idx+1:]
@staticmethod
def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
colon = s.index(b':')
# FIXME Python3 is broken here, the call to `ord` shouldn't be needed,
# but apparently it is. This is utterly idiotic.
if (s[0] == ord('0') or s[0] == '0') and colon != 1:
raise ValueError
str_len = int(s[:colon])
s = s[1:]
return s[colon:colon+str_len], s[colon+str_len:]
def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
ret = list()
s = s[1:] # skip leading `l`
while s[0] != ord('e'):
value, s = self.__decode_func[s[0]](s)
ret.append(value)
return ret, s[1:]
def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
ret = dict()
s = s[1:] # skip leading `d`
while s[0] != ord(b'e'):
key, s = self.__decode_string(s)
ret[key], s = self.__decode_func[s[0]](s)
return ret, s[1:]
@staticmethod
def __encode_int(x: bytes) -> bytes:
return b'i' + bytes(str(x), 'utf-8') + b'e'
@staticmethod
def __encode_string(x: bytes) -> bytes:
return bytes((str(len(x))), 'utf-8') + b':' + x
def __encode_list(self, x: str) -> bytes:
ret = b''
for i in x:
ret += self.__encode_func[type(i)](i)
return b'l' + ret + b'e'
def __encode_dict(self, x: dict) -> bytes:
ret = b''
for key, value in sorted(x.items()):
ret += self.__encode_func[type(key)](key)
ret += self.__encode_func[type(value)](value)
return b'd' + ret + b'e'
def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
return self.__encode_func[type(s)](s)
def bdecode(self, s: bytes) -> Union[dict, None]:
try:
ret, trail = self.__decode_func[s[0]](s)
except (IndexError, KeyError, ValueError) as e:
logging.debug("Not a valid bencoded string: %s", e)
return None
if trail != b'':
logging.debug("Invalid bencoded value (data after valid prefix)")
return None
return ret
|