diff options
| author | jvoisin | 2018-04-22 22:02:00 +0200 |
|---|---|---|
| committer | jvoisin | 2018-04-22 22:02:00 +0200 |
| commit | 57bf89e035609c39506372cc9deed92dfbd42716 (patch) | |
| tree | 69194a04c62af54644dd3df83dfee1db9e265241 | |
| parent | ecb199b4a6a2f54b84237d4f74c145a051c4c08b (diff) | |
Add support for torrent files cleaning
| -rw-r--r-- | src/torrent.py | 123 | ||||
| -rw-r--r-- | tests/data/dirty.torrent | bin | 0 -> 93903 bytes | |||
| -rw-r--r-- | tests/test_libmat2.py | 23 |
3 files changed, 145 insertions, 1 deletions
diff --git a/src/torrent.py b/src/torrent.py new file mode 100644 index 0000000..df64161 --- /dev/null +++ b/src/torrent.py | |||
| @@ -0,0 +1,123 @@ | |||
| 1 | import os | ||
| 2 | import re | ||
| 3 | import shutil | ||
| 4 | import tempfile | ||
| 5 | import datetime | ||
| 6 | import zipfile | ||
| 7 | |||
| 8 | from . import abstract, parser_factory | ||
| 9 | |||
| 10 | |||
| 11 | |||
| 12 | class TorrentParser(abstract.AbstractParser): | ||
| 13 | mimetypes = {'application/x-bittorrent', } | ||
| 14 | whitelist = {b'announce', b'announce-list', b'info'} | ||
| 15 | |||
| 16 | def __init__(self, filename): | ||
| 17 | super().__init__(filename) | ||
| 18 | self.__decode_func = { | ||
| 19 | ord('l'): self.__decode_list, | ||
| 20 | ord('d'): self.__decode_dict, | ||
| 21 | ord('i'): self.__decode_int | ||
| 22 | } | ||
| 23 | for i in range(0, 10): | ||
| 24 | self.__decode_func[ord(str(i))] = self.__decode_string | ||
| 25 | |||
| 26 | self.__encode_func = { | ||
| 27 | int: self.__encode_int, | ||
| 28 | bytes: self.__encode_string, | ||
| 29 | list: self.__encode_list, | ||
| 30 | dict: self.__encode_dict, | ||
| 31 | } | ||
| 32 | |||
| 33 | |||
| 34 | def get_meta(self): | ||
| 35 | metadata = {} | ||
| 36 | with open(self.filename, 'rb') as f: | ||
| 37 | d = self.__bdecode(f.read()) | ||
| 38 | for k,v in d.items(): | ||
| 39 | if k not in self.whitelist: | ||
| 40 | metadata[k.decode('utf-8')] = v | ||
| 41 | return metadata | ||
| 42 | |||
| 43 | |||
| 44 | def remove_all(self): | ||
| 45 | cleaned = dict() | ||
| 46 | with open(self.filename, 'rb') as f: | ||
| 47 | d = self.__bdecode(f.read()) | ||
| 48 | for k,v in d.items(): | ||
| 49 | if k in self.whitelist: | ||
| 50 | cleaned[k] = v | ||
| 51 | with open(self.output_filename, 'wb') as f: | ||
| 52 | f.write(self.__bencode(cleaned)) | ||
| 53 | return True | ||
| 54 | |||
| 55 | def __decode_int(self, s): | ||
| 56 | s = s[1:] | ||
| 57 | next_idx = s.index(b'e') | ||
| 58 | if s.startswith(b'-0'): | ||
| 59 | raise ValueError # negative zero doesn't exist | ||
| 60 | if s.startswith(b'0') and next_idx != 1: | ||
| 61 | raise ValueError # no leading zero except for zero itself | ||
| 62 | return int(s[:next_idx]), s[next_idx+1:] | ||
| 63 | |||
| 64 | def __decode_string(self, s): | ||
| 65 | end = s.index(b':') | ||
| 66 | str_len = int(s[:end]) | ||
| 67 | if s[0] == b'0' and end != 1: | ||
| 68 | raise ValueError | ||
| 69 | s = s[1:] # skip terminal `:` | ||
| 70 | return s[end:end+str_len], s[end+str_len:] | ||
| 71 | |||
| 72 | def __decode_list(self, s): | ||
| 73 | r = list() | ||
| 74 | s = s[1:] # skip leading `l` | ||
| 75 | while s[0] != ord('e'): | ||
| 76 | v, s = self.__decode_func[s[0]](s) | ||
| 77 | r.append(v) | ||
| 78 | return r, s[1:] | ||
| 79 | |||
| 80 | def __decode_dict(self, s): | ||
| 81 | r = dict() | ||
| 82 | s = s[1:] | ||
| 83 | while s[0] != ord(b'e'): | ||
| 84 | k, s = self.__decode_string(s) | ||
| 85 | r[k], s = self.__decode_func[s[0]](s) | ||
| 86 | return r, s[1:] | ||
| 87 | |||
| 88 | def __bdecode(self, s): | ||
| 89 | try: | ||
| 90 | r, l = self.__decode_func[s[0]](s) | ||
| 91 | except (IndexError, KeyError, ValueError) as e: | ||
| 92 | print("not a valid bencoded string: %s" % e) | ||
| 93 | return None | ||
| 94 | if l != b'': | ||
| 95 | print("invalid bencoded value (data after valid prefix)") | ||
| 96 | return None | ||
| 97 | return r | ||
| 98 | |||
| 99 | @staticmethod | ||
| 100 | def __encode_int(x): | ||
| 101 | return b'i' + bytes(str(x), 'utf-8') + b'e' | ||
| 102 | |||
| 103 | @staticmethod | ||
| 104 | def __encode_string(x:str): | ||
| 105 | return bytes((str(len(x))), 'utf-8') + b':' + x | ||
| 106 | |||
| 107 | def __encode_list(self, x): | ||
| 108 | ret = b'' | ||
| 109 | for i in x: | ||
| 110 | ret += self.__encode_func[type(i)](i) | ||
| 111 | return b'l' + ret + b'e' | ||
| 112 | |||
| 113 | def __encode_dict(self, x): | ||
| 114 | ret = b'' | ||
| 115 | for k, v in sorted(x.items()): | ||
| 116 | ret += self.__encode_func[type(k)](k) | ||
| 117 | ret += self.__encode_func[type(v)](v) | ||
| 118 | return b'd' + ret + b'e' | ||
| 119 | |||
| 120 | def __bencode(self, x): | ||
| 121 | return self.__encode_func[type(x)](x) | ||
| 122 | |||
| 123 | |||
diff --git a/tests/data/dirty.torrent b/tests/data/dirty.torrent new file mode 100644 index 0000000..472371b --- /dev/null +++ b/tests/data/dirty.torrent | |||
| Binary files differ | |||
diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index ae04dc2..ff5c196 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py | |||
| @@ -6,7 +6,7 @@ import os | |||
| 6 | import zipfile | 6 | import zipfile |
| 7 | import tempfile | 7 | import tempfile |
| 8 | 8 | ||
| 9 | from src import pdf, images, audio, office, parser_factory | 9 | from src import pdf, images, audio, office, parser_factory, torrent |
| 10 | 10 | ||
| 11 | 11 | ||
| 12 | class TestParserFactory(unittest.TestCase): | 12 | class TestParserFactory(unittest.TestCase): |
| @@ -28,6 +28,11 @@ class TestGetMeta(unittest.TestCase): | |||
| 28 | "3.1415926-2.5-1.40.14 (TeX Live 2013/Debian) kpathsea " \ | 28 | "3.1415926-2.5-1.40.14 (TeX Live 2013/Debian) kpathsea " \ |
| 29 | "version 6.1.1") | 29 | "version 6.1.1") |
| 30 | 30 | ||
| 31 | def test_torrent(self): | ||
| 32 | p = torrent.TorrentParser('./tests/data/dirty.torrent') | ||
| 33 | meta = p.get_meta() | ||
| 34 | self.assertEqual(meta['created by'], b'mktorrent 1.0') | ||
| 35 | |||
| 31 | def test_png(self): | 36 | def test_png(self): |
| 32 | p = images.PNGParser('./tests/data/dirty.png') | 37 | p = images.PNGParser('./tests/data/dirty.png') |
| 33 | meta = p.get_meta() | 38 | meta = p.get_meta() |
| @@ -322,3 +327,19 @@ class TestCleaning(unittest.TestCase): | |||
| 322 | self.assertEqual(p.get_meta(), {}) | 327 | self.assertEqual(p.get_meta(), {}) |
| 323 | 328 | ||
| 324 | os.remove('./tests/data/clean.bmp') | 329 | os.remove('./tests/data/clean.bmp') |
| 330 | |||
| 331 | |||
| 332 | def test_torrent(self): | ||
| 333 | shutil.copy('./tests/data/dirty.torrent', './tests/data/clean.torrent') | ||
| 334 | p = torrent.TorrentParser('./tests/data/clean.torrent') | ||
| 335 | |||
| 336 | meta = p.get_meta() | ||
| 337 | self.assertEqual(meta, {'created by': b'mktorrent 1.0', 'creation date': 1522397702}) | ||
| 338 | |||
| 339 | ret = p.remove_all() | ||
| 340 | self.assertTrue(ret) | ||
| 341 | |||
| 342 | p = torrent.TorrentParser('./tests/data/clean.torrent.cleaned') | ||
| 343 | self.assertEqual(p.get_meta(), {}) | ||
| 344 | |||
| 345 | os.remove('./tests/data/clean.torrent') | ||
