summaryrefslogtreecommitdiff
path: root/libmat2/archive.py
diff options
context:
space:
mode:
Diffstat (limited to 'libmat2/archive.py')
-rw-r--r--libmat2/archive.py127
1 files changed, 127 insertions, 0 deletions
diff --git a/libmat2/archive.py b/libmat2/archive.py
new file mode 100644
index 0000000..d8f9007
--- /dev/null
+++ b/libmat2/archive.py
@@ -0,0 +1,127 @@
1import zipfile
2import datetime
3import tempfile
4import os
5import logging
6import shutil
7from typing import Dict, Set, Pattern
8
9from . import abstract, UnknownMemberPolicy, parser_factory
10
11# Make pyflakes happy
12assert Set
13assert Pattern
14
15
16class ArchiveBasedAbstractParser(abstract.AbstractParser):
17 """ Office files (.docx, .odt, …) are zipped files. """
18 # Those are the files that have a format that _isn't_
19 # supported by MAT2, but that we want to keep anyway.
20 files_to_keep = set() # type: Set[str]
21
22 # Those are the files that we _do not_ want to keep,
23 # no matter if they are supported or not.
24 files_to_omit = set() # type: Set[Pattern]
25
26 # what should the parser do if it encounters an unknown file in
27 # the archive?
28 unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy
29
30 def __init__(self, filename):
31 super().__init__(filename)
32 try: # better fail here than later
33 zipfile.ZipFile(self.filename)
34 except zipfile.BadZipFile:
35 raise ValueError
36
37 def _specific_cleanup(self, full_path: str) -> bool:
38 """ This method can be used to apply specific treatment
39 to files present in the archive."""
40 # pylint: disable=unused-argument,no-self-use
41 return True # pragma: no cover
42
43 @staticmethod
44 def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo:
45 zipinfo.create_system = 3 # Linux
46 zipinfo.comment = b''
47 zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be
48 return zipinfo
49
50 @staticmethod
51 def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]:
52 metadata = {}
53 if zipinfo.create_system == 3: # this is Linux
54 pass
55 elif zipinfo.create_system == 2:
56 metadata['create_system'] = 'Windows'
57 else:
58 metadata['create_system'] = 'Weird'
59
60 if zipinfo.comment:
61 metadata['comment'] = zipinfo.comment # type: ignore
62
63 if zipinfo.date_time != (1980, 1, 1, 0, 0, 0):
64 metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time))
65
66 return metadata
67
68 def remove_all(self) -> bool:
69 # pylint: disable=too-many-branches
70
71 with zipfile.ZipFile(self.filename) as zin,\
72 zipfile.ZipFile(self.output_filename, 'w') as zout:
73
74 temp_folder = tempfile.mkdtemp()
75 abort = False
76
77 for item in zin.infolist():
78 if item.filename[-1] == '/': # `is_dir` is added in Python3.6
79 continue # don't keep empty folders
80
81 zin.extract(member=item, path=temp_folder)
82 full_path = os.path.join(temp_folder, item.filename)
83
84 if self._specific_cleanup(full_path) is False:
85 logging.warning("Something went wrong during deep cleaning of %s",
86 item.filename)
87 abort = True
88 continue
89
90 if item.filename in self.files_to_keep:
91 # those files aren't supported, but we want to add them anyway
92 pass
93 elif any(map(lambda r: r.search(item.filename), self.files_to_omit)):
94 continue
95 else:
96 # supported files that we want to clean then add
97 tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore
98 if not tmp_parser:
99 if self.unknown_member_policy == UnknownMemberPolicy.OMIT:
100 logging.warning("In file %s, omitting unknown element %s (format: %s)",
101 self.filename, item.filename, mtype)
102 continue
103 elif self.unknown_member_policy == UnknownMemberPolicy.KEEP:
104 logging.warning("In file %s, keeping unknown element %s (format: %s)",
105 self.filename, item.filename, mtype)
106 else:
107 logging.error("In file %s, element %s's format (%s) " +
108 "isn't supported",
109 self.filename, item.filename, mtype)
110 abort = True
111 continue
112 if tmp_parser:
113 tmp_parser.remove_all()
114 os.rename(tmp_parser.output_filename, full_path)
115
116 zinfo = zipfile.ZipInfo(item.filename) # type: ignore
117 clean_zinfo = self._clean_zipinfo(zinfo)
118 with open(full_path, 'rb') as f:
119 zout.writestr(clean_zinfo, f.read())
120
121 shutil.rmtree(temp_folder)
122 if abort:
123 os.remove(self.output_filename)
124 return False
125 return True
126
127