summaryrefslogtreecommitdiff
path: root/libmat2
diff options
context:
space:
mode:
authorjvoisin2018-06-04 22:54:01 +0200
committerjvoisin2018-06-04 23:20:30 +0200
commit6a1b0b31f0fbfa59a78a8b9f4f07bf9ed3f91cdf (patch)
treefdb8e31a7ad5bf6982cb8c11a2012205a0cfe14f /libmat2
parent4ebf9754f84e28eb73a09df0f788b5be80c9c73e (diff)
Add more typing and use mypy in the CI
Diffstat (limited to 'libmat2')
-rw-r--r--libmat2/harmless.py7
-rw-r--r--libmat2/office.py38
-rw-r--r--libmat2/parser_factory.py8
-rw-r--r--libmat2/pdf.py3
-rw-r--r--libmat2/torrent.py23
5 files changed, 49 insertions, 30 deletions
diff --git a/libmat2/harmless.py b/libmat2/harmless.py
index a63407f..d25603b 100644
--- a/libmat2/harmless.py
+++ b/libmat2/harmless.py
@@ -1,17 +1,18 @@
1from typing import Dict
1from . import abstract 2from . import abstract
2 3
3 4
4class HarmlessParser(abstract.AbstractParser): 5class HarmlessParser(abstract.AbstractParser):
5 """ This is the parser for filetypes that do not contain metadata. """ 6 """ This is the parser for filetypes that do not contain metadata. """
6 mimetypes = {'application/xml', 'text/plain'} 7 mimetypes = {'application/xml', 'text/plain', 'application/rdf+xml'}
7 8
8 def __init__(self, filename: str) -> None: 9 def __init__(self, filename: str) -> None:
9 super().__init__(filename) 10 super().__init__(filename)
10 self.filename = filename 11 self.filename = filename
11 self.output_filename = filename 12 self.output_filename = filename
12 13
13 def get_meta(self): 14 def get_meta(self) -> Dict[str, str]:
14 return dict() 15 return dict()
15 16
16 def remove_all(self): 17 def remove_all(self) -> bool:
17 return True 18 return True
diff --git a/libmat2/office.py b/libmat2/office.py
index 749fc7d..90f7c7a 100644
--- a/libmat2/office.py
+++ b/libmat2/office.py
@@ -4,11 +4,15 @@ import shutil
4import tempfile 4import tempfile
5import datetime 5import datetime
6import zipfile 6import zipfile
7from typing import Dict, Set
7 8
8from . import abstract, parser_factory 9from . import abstract, parser_factory
9 10
11assert Set # make pyflakes happy
10 12
11class ArchiveBasedAbstractParser(abstract.AbstractParser): 13class ArchiveBasedAbstractParser(abstract.AbstractParser):
14 whitelist = set() # type: Set[str]
15
12 def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: 16 def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
13 zipinfo.compress_type = zipfile.ZIP_DEFLATED 17 zipinfo.compress_type = zipfile.ZIP_DEFLATED
14 zipinfo.create_system = 3 # Linux 18 zipinfo.create_system = 3 # Linux
@@ -16,7 +20,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
16 zipinfo.date_time = (1980, 1, 1, 0, 0, 0) 20 zipinfo.date_time = (1980, 1, 1, 0, 0, 0)
17 return zipinfo 21 return zipinfo
18 22
19 def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> dict: 23 def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
20 metadata = {} 24 metadata = {}
21 if zipinfo.create_system == 3: 25 if zipinfo.create_system == 3:
22 #metadata['create_system'] = 'Linux' 26 #metadata['create_system'] = 'Linux'
@@ -27,25 +31,31 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser):
27 metadata['create_system'] = 'Weird' 31 metadata['create_system'] = 'Weird'
28 32
29 if zipinfo.comment: 33 if zipinfo.comment:
30 metadata['comment'] = zipinfo.comment 34 metadata['comment'] = zipinfo.comment # type: ignore
31 35
32 if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): 36 if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
33 metadata['date_time'] = datetime.datetime(*zipinfo.date_time) 37 metadata['date_time'] =str(datetime.datetime(*zipinfo.date_time))
34 38
35 return metadata 39 return metadata
36 40
37 41
38 def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str, 42 def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str,
39 zin: zipfile.ZipFile, zout: zipfile.ZipFile): 43 zin: zipfile.ZipFile, zout: zipfile.ZipFile):
44 output = ''
40 zin.extract(member=item, path=temp_folder) 45 zin.extract(member=item, path=temp_folder)
41 tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename)) 46 if item.filename not in self.whitelist:
42 if not tmp_parser: 47 full_path = os.path.join(temp_folder, item.filename)
43 print("%s's format (%s) isn't supported" % (item.filename, mtype)) 48 tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
44 return 49 if not tmp_parser:
45 tmp_parser.remove_all() 50 print("%s's format (%s) isn't supported" % (item.filename, mtype))
46 zinfo = zipfile.ZipInfo(item.filename) 51 return
52 tmp_parser.remove_all()
53 output = tmp_parser.output_filename
54 else:
55 output = os.path.join(temp_folder, item.filename)
56 zinfo = zipfile.ZipInfo(item.filename) # type: ignore
47 clean_zinfo = self._clean_zipinfo(zinfo) 57 clean_zinfo = self._clean_zipinfo(zinfo)
48 with open(tmp_parser.output_filename, 'rb') as f: 58 with open(output, 'rb') as f:
49 zout.writestr(clean_zinfo, f.read()) 59 zout.writestr(clean_zinfo, f.read())
50 60
51 61
@@ -72,7 +82,8 @@ class MSOfficeParser(ArchiveBasedAbstractParser):
72 if not metadata: # better safe than sorry 82 if not metadata: # better safe than sorry
73 metadata[item] = 'harmful content' 83 metadata[item] = 'harmful content'
74 84
75 metadata = {**metadata, **self._get_zipinfo_meta(item)} 85 for key, value in self._get_zipinfo_meta(item).items():
86 metadata[key] = value
76 zipin.close() 87 zipin.close()
77 return metadata 88 return metadata
78 89
@@ -112,6 +123,8 @@ class LibreOfficeParser(ArchiveBasedAbstractParser):
112 'application/vnd.oasis.opendocument.formula', 123 'application/vnd.oasis.opendocument.formula',
113 'application/vnd.oasis.opendocument.image', 124 'application/vnd.oasis.opendocument.image',
114 } 125 }
126 whitelist = {'mimetype', 'manifest.rdf'}
127
115 128
116 def get_meta(self): 129 def get_meta(self):
117 """ 130 """
@@ -127,7 +140,8 @@ class LibreOfficeParser(ArchiveBasedAbstractParser):
127 metadata[key] = value 140 metadata[key] = value
128 if not metadata: # better safe than sorry 141 if not metadata: # better safe than sorry
129 metadata[item] = 'harmful content' 142 metadata[item] = 'harmful content'
130 metadata = {**metadata, **self._get_zipinfo_meta(item)} 143 for key, value in self._get_zipinfo_meta(item).items():
144 metadata[key] = value
131 zipin.close() 145 zipin.close()
132 return metadata 146 return metadata
133 147
diff --git a/libmat2/parser_factory.py b/libmat2/parser_factory.py
index 2f6acc1..42d20de 100644
--- a/libmat2/parser_factory.py
+++ b/libmat2/parser_factory.py
@@ -2,10 +2,12 @@ import glob
2import os 2import os
3import mimetypes 3import mimetypes
4import importlib 4import importlib
5from typing import TypeVar, List 5from typing import TypeVar, List, Tuple, Optional
6 6
7from . import abstract, unsupported_extensions 7from . import abstract, unsupported_extensions
8 8
9assert Tuple # make pyflakes happy
10
9T = TypeVar('T', bound='abstract.AbstractParser') 11T = TypeVar('T', bound='abstract.AbstractParser')
10 12
11def __load_all_parsers(): 13def __load_all_parsers():
@@ -28,14 +30,14 @@ def _get_parsers() -> List[T]:
28 return __get_parsers(abstract.AbstractParser) 30 return __get_parsers(abstract.AbstractParser)
29 31
30 32
31def get_parser(filename: str) -> (T, str): 33def get_parser(filename: str) -> Tuple[Optional[T], Optional[str]]:
32 mtype, _ = mimetypes.guess_type(filename) 34 mtype, _ = mimetypes.guess_type(filename)
33 35
34 _, extension = os.path.splitext(filename) 36 _, extension = os.path.splitext(filename)
35 if extension in unsupported_extensions: 37 if extension in unsupported_extensions:
36 return None, mtype 38 return None, mtype
37 39
38 for c in _get_parsers(): 40 for c in _get_parsers(): # type: ignore
39 if mtype in c.mimetypes: 41 if mtype in c.mimetypes:
40 try: 42 try:
41 return c(filename), mtype 43 return c(filename), mtype
diff --git a/libmat2/pdf.py b/libmat2/pdf.py
index 5b99192..77710bf 100644
--- a/libmat2/pdf.py
+++ b/libmat2/pdf.py
@@ -131,5 +131,6 @@ class PDFParser(abstract.AbstractParser):
131 metadata[key] = document.get_property(key) 131 metadata[key] = document.get_property(key)
132 if 'metadata' in metadata: 132 if 'metadata' in metadata:
133 parsed_meta = self.__parse_metadata_field(metadata['metadata']) 133 parsed_meta = self.__parse_metadata_field(metadata['metadata'])
134 return {**metadata, **parsed_meta} 134 for key, value in parsed_meta.items():
135 metadata[key] = value
135 return metadata 136 return metadata
diff --git a/libmat2/torrent.py b/libmat2/torrent.py
index 3a819fb..f5935e6 100644
--- a/libmat2/torrent.py
+++ b/libmat2/torrent.py
@@ -1,11 +1,12 @@
1from typing import Union, Tuple, Dict
1from . import abstract 2from . import abstract
2 3
3 4
4class TorrentParser(abstract.AbstractParser): 5class TorrentParser(abstract.AbstractParser):
5 mimetypes = {b'application/x-bittorrent', } 6 mimetypes = {'application/x-bittorrent', }
6 whitelist = {b'announce', b'announce-list', b'info'} 7 whitelist = {b'announce', b'announce-list', b'info'}
7 8
8 def get_meta(self) -> dict: 9 def get_meta(self) -> Dict[str, str]:
9 metadata = {} 10 metadata = {}
10 with open(self.filename, 'rb') as f: 11 with open(self.filename, 'rb') as f:
11 d = _BencodeHandler().bdecode(f.read()) 12 d = _BencodeHandler().bdecode(f.read())
@@ -54,7 +55,7 @@ class _BencodeHandler(object):
54 } 55 }
55 56
56 @staticmethod 57 @staticmethod
57 def __decode_int(s: str) -> (int, str): 58 def __decode_int(s: bytes) -> Tuple[int, bytes]:
58 s = s[1:] 59 s = s[1:]
59 next_idx = s.index(b'e') 60 next_idx = s.index(b'e')
60 if s.startswith(b'-0'): 61 if s.startswith(b'-0'):
@@ -64,7 +65,7 @@ class _BencodeHandler(object):
64 return int(s[:next_idx]), s[next_idx+1:] 65 return int(s[:next_idx]), s[next_idx+1:]
65 66
66 @staticmethod 67 @staticmethod
67 def __decode_string(s: str) -> (str, str): 68 def __decode_string(s: bytes) -> Tuple[bytes, bytes]:
68 sep = s.index(b':') 69 sep = s.index(b':')
69 str_len = int(s[:sep]) 70 str_len = int(s[:sep])
70 if str_len < 0: 71 if str_len < 0:
@@ -74,7 +75,7 @@ class _BencodeHandler(object):
74 s = s[1:] 75 s = s[1:]
75 return s[sep:sep+str_len], s[sep+str_len:] 76 return s[sep:sep+str_len], s[sep+str_len:]
76 77
77 def __decode_list(self, s: str) -> (list, str): 78 def __decode_list(self, s: bytes) -> Tuple[list, bytes]:
78 r = list() 79 r = list()
79 s = s[1:] # skip leading `l` 80 s = s[1:] # skip leading `l`
80 while s[0] != ord('e'): 81 while s[0] != ord('e'):
@@ -82,7 +83,7 @@ class _BencodeHandler(object):
82 r.append(v) 83 r.append(v)
83 return r, s[1:] 84 return r, s[1:]
84 85
85 def __decode_dict(self, s: str) -> (dict, str): 86 def __decode_dict(self, s: bytes) -> Tuple[dict, bytes]:
86 r = dict() 87 r = dict()
87 s = s[1:] # skip leading `d` 88 s = s[1:] # skip leading `d`
88 while s[0] != ord(b'e'): 89 while s[0] != ord(b'e'):
@@ -91,11 +92,11 @@ class _BencodeHandler(object):
91 return r, s[1:] 92 return r, s[1:]
92 93
93 @staticmethod 94 @staticmethod
94 def __encode_int(x: str) -> bytes: 95 def __encode_int(x: bytes) -> bytes:
95 return b'i' + bytes(str(x), 'utf-8') + b'e' 96 return b'i' + bytes(str(x), 'utf-8') + b'e'
96 97
97 @staticmethod 98 @staticmethod
98 def __encode_string(x: str) -> bytes: 99 def __encode_string(x: bytes) -> bytes:
99 return bytes((str(len(x))), 'utf-8') + b':' + x 100 return bytes((str(len(x))), 'utf-8') + b':' + x
100 101
101 def __encode_list(self, x: str) -> bytes: 102 def __encode_list(self, x: str) -> bytes:
@@ -104,17 +105,17 @@ class _BencodeHandler(object):
104 ret += self.__encode_func[type(i)](i) 105 ret += self.__encode_func[type(i)](i)
105 return b'l' + ret + b'e' 106 return b'l' + ret + b'e'
106 107
107 def __encode_dict(self, x: str) -> bytes: 108 def __encode_dict(self, x: dict) -> bytes:
108 ret = b'' 109 ret = b''
109 for k, v in sorted(x.items()): 110 for k, v in sorted(x.items()):
110 ret += self.__encode_func[type(k)](k) 111 ret += self.__encode_func[type(k)](k)
111 ret += self.__encode_func[type(v)](v) 112 ret += self.__encode_func[type(v)](v)
112 return b'd' + ret + b'e' 113 return b'd' + ret + b'e'
113 114
114 def bencode(self, s: str) -> bytes: 115 def bencode(self, s: Union[dict, list, bytes, int]) -> bytes:
115 return self.__encode_func[type(s)](s) 116 return self.__encode_func[type(s)](s)
116 117
117 def bdecode(self, s: str): 118 def bdecode(self, s: bytes) -> Union[dict, None]:
118 try: 119 try:
119 r, l = self.__decode_func[s[0]](s) 120 r, l = self.__decode_func[s[0]](s)
120 except (IndexError, KeyError, ValueError) as e: 121 except (IndexError, KeyError, ValueError) as e: