summaryrefslogtreecommitdiff
path: root/libmat2/torrent.py
diff options
context:
space:
mode:
Diffstat (limited to 'libmat2/torrent.py')
-rw-r--r--libmat2/torrent.py126
1 files changed, 126 insertions, 0 deletions
diff --git a/libmat2/torrent.py b/libmat2/torrent.py
new file mode 100644
index 0000000..cb4b5e3
--- /dev/null
+++ b/libmat2/torrent.py
@@ -0,0 +1,126 @@
1from . import abstract
2
3
4class TorrentParser(abstract.AbstractParser):
5 mimetypes = {'application/x-bittorrent', }
6 whitelist = {b'announce', b'announce-list', b'info'}
7
8 def get_meta(self) -> dict:
9 metadata = {}
10 with open(self.filename, 'rb') as f:
11 d = _BencodeHandler().bdecode(f.read())
12 if d is None:
13 return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
14 for k, v in d.items():
15 if k not in self.whitelist:
16 metadata[k.decode('utf-8')] = v
17 return metadata
18
19
20 def remove_all(self) -> bool:
21 cleaned = dict()
22 with open(self.filename, 'rb') as f:
23 d = _BencodeHandler().bdecode(f.read())
24 if d is None:
25 return False
26 for k, v in d.items():
27 if k in self.whitelist:
28 cleaned[k] = v
29 with open(self.output_filename, 'wb') as f:
30 f.write(_BencodeHandler().bencode(cleaned))
31 return True
32
33
34class _BencodeHandler(object):
35 """
36 Since bencode isn't that hard to parse,
37 MAT2 comes with its own parser, based on the spec
38 https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
39 """
40 def __init__(self):
41 self.__decode_func = {
42 ord('d'): self.__decode_dict,
43 ord('i'): self.__decode_int,
44 ord('l'): self.__decode_list,
45 }
46 for i in range(0, 10):
47 self.__decode_func[ord(str(i))] = self.__decode_string
48
49 self.__encode_func = {
50 bytes: self.__encode_string,
51 dict: self.__encode_dict,
52 int: self.__encode_int,
53 list: self.__encode_list,
54 }
55
56 @staticmethod
57 def __decode_int(s: str) -> (int, str):
58 s = s[1:]
59 next_idx = s.index(b'e')
60 if s.startswith(b'-0'):
61 raise ValueError # negative zero doesn't exist
62 elif s.startswith(b'0') and next_idx != 1:
63 raise ValueError # no leading zero except for zero itself
64 return int(s[:next_idx]), s[next_idx+1:]
65
66 @staticmethod
67 def __decode_string(s: str) -> (str, str):
68 sep = s.index(b':')
69 str_len = int(s[:sep])
70 if str_len < 0:
71 raise ValueError
72 elif s[0] == b'0' and sep != 1:
73 raise ValueError
74 s = s[1:]
75 return s[sep:sep+str_len], s[sep+str_len:]
76
77 def __decode_list(self, s: str) -> (list, str):
78 r = list()
79 s = s[1:] # skip leading `l`
80 while s[0] != ord('e'):
81 v, s = self.__decode_func[s[0]](s)
82 r.append(v)
83 return r, s[1:]
84
85 def __decode_dict(self, s: str) -> (dict, str):
86 r = dict()
87 s = s[1:] # skip leading `d`
88 while s[0] != ord(b'e'):
89 k, s = self.__decode_string(s)
90 r[k], s = self.__decode_func[s[0]](s)
91 return r, s[1:]
92
93 @staticmethod
94 def __encode_int(x: str) -> bytes:
95 return b'i' + bytes(str(x), 'utf-8') + b'e'
96
97 @staticmethod
98 def __encode_string(x: str) -> bytes:
99 return bytes((str(len(x))), 'utf-8') + b':' + x
100
101 def __encode_list(self, x: str) -> bytes:
102 ret = b''
103 for i in x:
104 ret += self.__encode_func[type(i)](i)
105 return b'l' + ret + b'e'
106
107 def __encode_dict(self, x: str) -> bytes:
108 ret = b''
109 for k, v in sorted(x.items()):
110 ret += self.__encode_func[type(k)](k)
111 ret += self.__encode_func[type(v)](v)
112 return b'd' + ret + b'e'
113
114 def bencode(self, s: str) -> bytes:
115 return self.__encode_func[type(s)](s)
116
117 def bdecode(self, s: str):
118 try:
119 r, l = self.__decode_func[s[0]](s)
120 except (IndexError, KeyError, ValueError) as e:
121 print("not a valid bencoded string: %s" % e)
122 return None
123 if l != b'':
124 print("invalid bencoded value (data after valid prefix)")
125 return None
126 return r