summaryrefslogtreecommitdiff
path: root/src/torrent.py
blob: df641610d3bc728cae640e0463905b256e38dd2f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import re
import shutil
import tempfile
import datetime
import zipfile

from . import abstract, parser_factory



class TorrentParser(abstract.AbstractParser):
    mimetypes = {'application/x-bittorrent', }
    whitelist = {b'announce', b'announce-list', b'info'}

    def __init__(self, filename):
        super().__init__(filename)
        self.__decode_func = {
                    ord('l'): self.__decode_list,
                    ord('d'): self.__decode_dict,
                    ord('i'): self.__decode_int
            }
        for i in range(0, 10):
            self.__decode_func[ord(str(i))] = self.__decode_string

        self.__encode_func = {
                int: self.__encode_int,
                bytes: self.__encode_string,
                list: self.__encode_list,
                dict: self.__encode_dict,
        }


    def get_meta(self):
        metadata = {}
        with open(self.filename, 'rb') as f:
            d = self.__bdecode(f.read())
        for k,v in d.items():
            if k not in self.whitelist:
                metadata[k.decode('utf-8')] = v
        return metadata


    def remove_all(self):
        cleaned = dict()
        with open(self.filename, 'rb') as f:
            d = self.__bdecode(f.read())
        for k,v in d.items():
            if k in self.whitelist:
                cleaned[k] = v
        with open(self.output_filename, 'wb') as f:
            f.write(self.__bencode(cleaned))
        return True

    def __decode_int(self, s):
        s = s[1:]
        next_idx = s.index(b'e')
        if s.startswith(b'-0'):
            raise ValueError  # negative zero doesn't exist
        if s.startswith(b'0') and next_idx != 1:
            raise ValueError  # no leading zero except for zero itself
        return int(s[:next_idx]), s[next_idx+1:]

    def __decode_string(self, s):
        end = s.index(b':')
        str_len = int(s[:end])
        if s[0] == b'0' and end != 1:
            raise ValueError
        s = s[1:]  # skip terminal `:`
        return s[end:end+str_len], s[end+str_len:]

    def __decode_list(self, s):
        r = list()
        s = s[1:]  # skip leading `l`
        while s[0] != ord('e'):
            v, s = self.__decode_func[s[0]](s)
            r.append(v)
        return r, s[1:]

    def __decode_dict(self, s):
        r = dict()
        s = s[1:]
        while s[0] != ord(b'e'):
            k, s = self.__decode_string(s)
            r[k], s = self.__decode_func[s[0]](s)
        return r, s[1:]

    def __bdecode(self, s):
        try:
            r, l = self.__decode_func[s[0]](s)
        except (IndexError, KeyError, ValueError) as e:
            print("not a valid bencoded string: %s" % e)
            return None
        if l != b'':
            print("invalid bencoded value (data after valid prefix)")
            return None
        return r

    @staticmethod
    def __encode_int(x):
        return b'i' + bytes(str(x), 'utf-8') + b'e'

    @staticmethod
    def __encode_string(x:str):
        return bytes((str(len(x))), 'utf-8') + b':' + x

    def __encode_list(self, x):
        ret = b''
        for i in x:
            ret += self.__encode_func[type(i)](i)
        return b'l' + ret + b'e'

    def __encode_dict(self, x):
        ret = b''
        for k, v in sorted(x.items()):
            ret += self.__encode_func[type(k)](k)
            ret += self.__encode_func[type(v)](v)
        return b'd' + ret + b'e'

    def __bencode(self, x):
        return self.__encode_func[type(x)](x)