summaryrefslogtreecommitdiff
path: root/libmat2/torrent.py
blob: b598065c6ac9c4d95fdd24c626f56dbfdbf93ee8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
from typing import Union, Tuple, Dict

from . import abstract


class TorrentParser(abstract.AbstractParser):
    mimetypes = {'application/x-bittorrent', }
    whitelist = {b'announce', b'announce-list', b'info'}

    def get_meta(self) -> Dict[str, str]:
        metadata = {}
        with open(self.filename, 'rb') as f:
            d = _BencodeHandler().bdecode(f.read())
        if d is None:
            return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename}
        for k, v in d.items():
            if k not in self.whitelist:
                metadata[k.decode('utf-8')] = v
        return metadata


    def remove_all(self) -> bool:
        cleaned = dict()
        with open(self.filename, 'rb') as f:
            d = _BencodeHandler().bdecode(f.read())
        if d is None:
            return False
        for k, v in d.items():
            if k in self.whitelist:
                cleaned[k] = v
        with open(self.output_filename, 'wb') as f:
            f.write(_BencodeHandler().bencode(cleaned))
        return True


class _BencodeHandler(object):
    """
    Since bencode isn't that hard to parse,
    MAT2 comes with its own parser, based on the spec
    https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding
    """
    def __init__(self):
        self.__decode_func = {
            ord('d'): self.__decode_dict,
            ord('i'): self.__decode_int,
            ord('l'): self.__decode_list,
        }
        for i in range(0, 10):
            self.__decode_func[ord(str(i))] = self.__decode_string

        self.__encode_func = {
            bytes: self.__encode_string,
            dict: self.__encode_dict,
            int: self.__encode_int,
            list: self.__encode_list,
        }

    @staticmethod
    def __decode_int(s: bytes) -> Tuple[int, bytes]:
        s = s[1:]
        next_idx = s.index(b'e')
        if next_idx is None:
            raise ValueError  # missing suffix
        if s.startswith(b'-0'):
            raise ValueError  # negative zero doesn't exist
        elif s.startswith(b'0') and next_idx != 1:
            raise ValueError  # no leading zero except for zero itself
        return int(s[:next_idx]), s[next_idx+1:]

    @staticmethod
    def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
        sep = s.index(b':')
        if set is None:
            raise ValueError  # missing suffix
        str_len = int(s[:sep])
        if str_len < 0:
            raise ValueError
        elif s[0] == b'0' and sep != 1:
            raise ValueError
        s = s[1:]
        return s[sep:sep+str_len], s[sep+str_len:]

    def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
        r = list()
        s = s[1:]  # skip leading `l`
        while s[0] != ord('e'):
            v, s = self.__decode_func[s[0]](s)
            r.append(v)
        return r, s[1:]

    def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
        r = dict()
        s = s[1:]  # skip leading `d`
        while s[0] != ord(b'e'):
            k, s = self.__decode_string(s)
            r[k], s = self.__decode_func[s[0]](s)
        return r, s[1:]

    @staticmethod
    def __encode_int(x: bytes) -> bytes:
        return b'i' + bytes(str(x), 'utf-8') + b'e'

    @staticmethod
    def __encode_string(x: bytes) -> bytes:
        return bytes((str(len(x))), 'utf-8') + b':' + x

    def __encode_list(self, x: str) -> bytes:
        ret = b''
        for i in x:
            ret += self.__encode_func[type(i)](i)
        return b'l' + ret + b'e'

    def __encode_dict(self, x: dict) -> bytes:
        ret = b''
        for k, v in sorted(x.items()):
            ret += self.__encode_func[type(k)](k)
            ret += self.__encode_func[type(v)](v)
        return b'd' + ret + b'e'

    def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
        return self.__encode_func[type(s)](s)

    def bdecode(self, s: bytes) -> Union[dict, None]:
        try:
            r, l = self.__decode_func[s[0]](s)
        except (IndexError, KeyError, ValueError) as e:
            logging.debug("Not a valid bencoded string: %s" % e)
            return None
        if l != b'':
            logging.debug("Invalid bencoded value (data after valid prefix)")
            return None
        return r