diff options
| author | jvoisin | 2018-04-22 23:48:01 +0200 |
|---|---|---|
| committer | jvoisin | 2018-04-22 23:48:01 +0200 |
| commit | 9e485c0dad6cf67d73992efe871f68fe91b6a507 (patch) | |
| tree | 3356fada383d52f27b9a28f4360348215baf9854 | |
| parent | 57bf89e035609c39506372cc9deed92dfbd42716 (diff) | |
Clean up the torrent implementation
| -rw-r--r-- | src/torrent.py | 123 |
1 files changed, 62 insertions, 61 deletions
diff --git a/src/torrent.py b/src/torrent.py index df64161..bdf83ce 100644 --- a/src/torrent.py +++ b/src/torrent.py | |||
| @@ -1,75 +1,78 @@ | |||
| 1 | import os | 1 | from . import abstract |
| 2 | import re | ||
| 3 | import shutil | ||
| 4 | import tempfile | ||
| 5 | import datetime | ||
| 6 | import zipfile | ||
| 7 | |||
| 8 | from . import abstract, parser_factory | ||
| 9 | |||
| 10 | 2 | ||
| 11 | 3 | ||
| 12 | class TorrentParser(abstract.AbstractParser): | 4 | class TorrentParser(abstract.AbstractParser): |
| 13 | mimetypes = {'application/x-bittorrent', } | 5 | mimetypes = {'application/x-bittorrent', } |
| 14 | whitelist = {b'announce', b'announce-list', b'info'} | 6 | whitelist = {b'announce', b'announce-list', b'info'} |
| 15 | 7 | ||
| 16 | def __init__(self, filename): | 8 | def get_meta(self) -> dict: |
| 17 | super().__init__(filename) | ||
| 18 | self.__decode_func = { | ||
| 19 | ord('l'): self.__decode_list, | ||
| 20 | ord('d'): self.__decode_dict, | ||
| 21 | ord('i'): self.__decode_int | ||
| 22 | } | ||
| 23 | for i in range(0, 10): | ||
| 24 | self.__decode_func[ord(str(i))] = self.__decode_string | ||
| 25 | |||
| 26 | self.__encode_func = { | ||
| 27 | int: self.__encode_int, | ||
| 28 | bytes: self.__encode_string, | ||
| 29 | list: self.__encode_list, | ||
| 30 | dict: self.__encode_dict, | ||
| 31 | } | ||
| 32 | |||
| 33 | |||
| 34 | def get_meta(self): | ||
| 35 | metadata = {} | 9 | metadata = {} |
| 36 | with open(self.filename, 'rb') as f: | 10 | with open(self.filename, 'rb') as f: |
| 37 | d = self.__bdecode(f.read()) | 11 | d = _BencodeHandler().bdecode(f.read()) |
| 12 | if d is None: | ||
| 13 | return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename} | ||
| 38 | for k,v in d.items(): | 14 | for k,v in d.items(): |
| 39 | if k not in self.whitelist: | 15 | if k not in self.whitelist: |
| 40 | metadata[k.decode('utf-8')] = v | 16 | metadata[k.decode('utf-8')] = v |
| 41 | return metadata | 17 | return metadata |
| 42 | 18 | ||
| 43 | 19 | ||
| 44 | def remove_all(self): | 20 | def remove_all(self) -> bool: |
| 45 | cleaned = dict() | 21 | cleaned = dict() |
| 46 | with open(self.filename, 'rb') as f: | 22 | with open(self.filename, 'rb') as f: |
| 47 | d = self.__bdecode(f.read()) | 23 | d = _BencodeHandler().bdecode(f.read()) |
| 24 | if d is None: | ||
| 25 | return False | ||
| 48 | for k,v in d.items(): | 26 | for k,v in d.items(): |
| 49 | if k in self.whitelist: | 27 | if k in self.whitelist: |
| 50 | cleaned[k] = v | 28 | cleaned[k] = v |
| 51 | with open(self.output_filename, 'wb') as f: | 29 | with open(self.output_filename, 'wb') as f: |
| 52 | f.write(self.__bencode(cleaned)) | 30 | f.write(_BencodeHandler().bencode(cleaned)) |
| 53 | return True | 31 | return True |
| 54 | 32 | ||
| 55 | def __decode_int(self, s): | 33 | |
| 34 | class _BencodeHandler(object): | ||
| 35 | """ | ||
| 36 | Since bencode isn't that hard to parse, | ||
| 37 | MAT2 comes with its own parser, based on the spec | ||
| 38 | https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding | ||
| 39 | """ | ||
| 40 | def __init__(self): | ||
| 41 | self.__decode_func = { | ||
| 42 | ord('d'): self.__decode_dict, | ||
| 43 | ord('i'): self.__decode_int, | ||
| 44 | ord('l'): self.__decode_list, | ||
| 45 | } | ||
| 46 | for i in range(0, 10): | ||
| 47 | self.__decode_func[ord(str(i))] = self.__decode_string | ||
| 48 | |||
| 49 | self.__encode_func = { | ||
| 50 | bytes: self.__encode_string, | ||
| 51 | dict: self.__encode_dict, | ||
| 52 | int: self.__encode_int, | ||
| 53 | list: self.__encode_list, | ||
| 54 | } | ||
| 55 | |||
| 56 | def __decode_int(self, s:str) -> (int, str): | ||
| 56 | s = s[1:] | 57 | s = s[1:] |
| 57 | next_idx = s.index(b'e') | 58 | next_idx = s.index(b'e') |
| 58 | if s.startswith(b'-0'): | 59 | if s.startswith(b'-0'): |
| 59 | raise ValueError # negative zero doesn't exist | 60 | raise ValueError # negative zero doesn't exist |
| 60 | if s.startswith(b'0') and next_idx != 1: | 61 | elif s.startswith(b'0') and next_idx != 1: |
| 61 | raise ValueError # no leading zero except for zero itself | 62 | raise ValueError # no leading zero except for zero itself |
| 62 | return int(s[:next_idx]), s[next_idx+1:] | 63 | return int(s[:next_idx]), s[next_idx+1:] |
| 63 | 64 | ||
| 64 | def __decode_string(self, s): | 65 | def __decode_string(self, s:str) -> (str, str): |
| 65 | end = s.index(b':') | 66 | sep = s.index(b':') |
| 66 | str_len = int(s[:end]) | 67 | str_len = int(s[:sep]) |
| 67 | if s[0] == b'0' and end != 1: | 68 | if str_len < 0: |
| 68 | raise ValueError | 69 | raise ValueError |
| 69 | s = s[1:] # skip terminal `:` | 70 | elif s[0] == b'0' and sep != 1: |
| 70 | return s[end:end+str_len], s[end+str_len:] | 71 | raise ValueError |
| 72 | s = s[1:] | ||
| 73 | return s[sep:sep+str_len], s[sep+str_len:] | ||
| 71 | 74 | ||
| 72 | def __decode_list(self, s): | 75 | def __decode_list(self, s:str) -> (list, str): |
| 73 | r = list() | 76 | r = list() |
| 74 | s = s[1:] # skip leading `l` | 77 | s = s[1:] # skip leading `l` |
| 75 | while s[0] != ord('e'): | 78 | while s[0] != ord('e'): |
| @@ -77,47 +80,45 @@ class TorrentParser(abstract.AbstractParser): | |||
| 77 | r.append(v) | 80 | r.append(v) |
| 78 | return r, s[1:] | 81 | return r, s[1:] |
| 79 | 82 | ||
| 80 | def __decode_dict(self, s): | 83 | def __decode_dict(self, s:str) -> (dict, str): |
| 81 | r = dict() | 84 | r = dict() |
| 82 | s = s[1:] | 85 | s = s[1:] # skip leading `d` |
| 83 | while s[0] != ord(b'e'): | 86 | while s[0] != ord(b'e'): |
| 84 | k, s = self.__decode_string(s) | 87 | k, s = self.__decode_string(s) |
| 85 | r[k], s = self.__decode_func[s[0]](s) | 88 | r[k], s = self.__decode_func[s[0]](s) |
| 86 | return r, s[1:] | 89 | return r, s[1:] |
| 87 | 90 | ||
| 88 | def __bdecode(self, s): | ||
| 89 | try: | ||
| 90 | r, l = self.__decode_func[s[0]](s) | ||
| 91 | except (IndexError, KeyError, ValueError) as e: | ||
| 92 | print("not a valid bencoded string: %s" % e) | ||
| 93 | return None | ||
| 94 | if l != b'': | ||
| 95 | print("invalid bencoded value (data after valid prefix)") | ||
| 96 | return None | ||
| 97 | return r | ||
| 98 | |||
| 99 | @staticmethod | 91 | @staticmethod |
| 100 | def __encode_int(x): | 92 | def __encode_int(x:str) -> bytes: |
| 101 | return b'i' + bytes(str(x), 'utf-8') + b'e' | 93 | return b'i' + bytes(str(x), 'utf-8') + b'e' |
| 102 | 94 | ||
| 103 | @staticmethod | 95 | @staticmethod |
| 104 | def __encode_string(x:str): | 96 | def __encode_string(x:str) -> bytes: |
| 105 | return bytes((str(len(x))), 'utf-8') + b':' + x | 97 | return bytes((str(len(x))), 'utf-8') + b':' + x |
| 106 | 98 | ||
| 107 | def __encode_list(self, x): | 99 | def __encode_list(self, x:str) -> bytes: |
| 108 | ret = b'' | 100 | ret = b'' |
| 109 | for i in x: | 101 | for i in x: |
| 110 | ret += self.__encode_func[type(i)](i) | 102 | ret += self.__encode_func[type(i)](i) |
| 111 | return b'l' + ret + b'e' | 103 | return b'l' + ret + b'e' |
| 112 | 104 | ||
| 113 | def __encode_dict(self, x): | 105 | def __encode_dict(self, x:str) -> bytes: |
| 114 | ret = b'' | 106 | ret = b'' |
| 115 | for k, v in sorted(x.items()): | 107 | for k, v in sorted(x.items()): |
| 116 | ret += self.__encode_func[type(k)](k) | 108 | ret += self.__encode_func[type(k)](k) |
| 117 | ret += self.__encode_func[type(v)](v) | 109 | ret += self.__encode_func[type(v)](v) |
| 118 | return b'd' + ret + b'e' | 110 | return b'd' + ret + b'e' |
| 119 | 111 | ||
| 120 | def __bencode(self, x): | 112 | def bencode(self, s:str) -> bytes: |
| 121 | return self.__encode_func[type(x)](x) | 113 | return self.__encode_func[type(s)](s) |
| 122 | |||
| 123 | 114 | ||
| 115 | def bdecode(self, s:str): | ||
| 116 | try: | ||
| 117 | r, l = self.__decode_func[s[0]](s) | ||
| 118 | except (IndexError, KeyError, ValueError) as e: | ||
| 119 | print("not a valid bencoded string: %s" % e) | ||
| 120 | return None | ||
| 121 | if l != b'': | ||
| 122 | print("invalid bencoded value (data after valid prefix)") | ||
| 123 | return None | ||
| 124 | return r | ||
