summaryrefslogtreecommitdiff
path: root/libmat/archive.py
diff options
context:
space:
mode:
authorjvoisin2016-08-29 22:12:40 +0200
committerjvoisin2016-08-29 22:12:40 +0200
commit64b667be5d6b36d17839482593ccf2207af14ac9 (patch)
tree8ab14777fc5d6a8d9793c2a460ae9e4ea14c2909 /libmat/archive.py
parenta3c289dea1ceebcc2e624d002ab31deb851a7e3a (diff)
Python3, now with less features
I want to release a new version ASAP, so lets ditch some features for now.
Diffstat (limited to 'libmat/archive.py')
-rw-r--r--libmat/archive.py371
1 files changed, 0 insertions, 371 deletions
diff --git a/libmat/archive.py b/libmat/archive.py
deleted file mode 100644
index cfc818d..0000000
--- a/libmat/archive.py
+++ /dev/null
@@ -1,371 +0,0 @@
1""" Take care of archives formats
2"""
3
4import logging
5import os
6import shutil
7import stat
8import tarfile
9import tempfile
10import zipfile
11
12from libmat import parser
13
14# Zip files do not support dates older than 01/01/1980
15ZIP_EPOCH = (1980, 1, 1, 0, 0, 0)
16
17
18class GenericArchiveStripper(parser.GenericParser):
19 """ Represent a generic archive
20 """
21
22 def get_meta(self):
23 raise NotImplementedError
24
25 def __init__(self, filename, mime, backup, is_writable, **kwargs):
26 super(GenericArchiveStripper, self).__init__(filename, mime, backup, is_writable, **kwargs)
27 self.compression = ''
28 self.add2archive = kwargs['add2archive']
29 self.tempdir = tempfile.mkdtemp()
30
31 def __del__(self):
32 """ Remove the files inside the temp dir,
33 then remove the temp dir
34 """
35 for root, _, files in os.walk(self.tempdir):
36 for item in files:
37 from libmat.mat import secure_remove
38 path_file = os.path.join(root, item)
39 secure_remove(path_file)
40 shutil.rmtree(self.tempdir)
41
42 def is_clean(self, list_unsupported=False):
43 """ Virtual method to check for harmul metadata
44 :param bool list_unsupported:
45 """
46 raise NotImplementedError
47
48 def list_unsupported(self):
49 """ Get a list of every non-supported files present in the archive
50 """
51 return self.is_clean(list_unsupported=True)
52
53 def remove_all(self):
54 """ Virtual method to remove all compromising fields
55 """
56 raise NotImplementedError
57
58
59class ZipStripper(GenericArchiveStripper):
60 """ Represent a zip file
61 """
62
63 @staticmethod
64 def __is_zipfile_clean(fileinfo):
65 """ Check if a ZipInfo object is clean of metadata added
66 by zip itself, independently of the corresponding file metadata
67 """
68 if fileinfo.comment != '':
69 return False
70 elif fileinfo.date_time != ZIP_EPOCH:
71 return False
72 elif fileinfo.create_system != 3: # 3 is UNIX
73 return False
74 return True
75
76 def is_clean(self, list_unsupported=False):
77 """ Check if the given file is clean from harmful metadata
78 When list_unsupported is True, the method returns a list
79 of all non-supported/archives files contained in the
80 archive.
81
82 :param bool list_unsupported: Should the list of unsupported files be returned
83 """
84 ret_list = []
85 zipin = zipfile.ZipFile(self.filename, 'r')
86 if zipin.comment != '' and not list_unsupported:
87 logging.debug('%s has a comment', self.filename)
88 return False
89 for item in zipin.infolist():
90 zipin.extract(item, self.tempdir)
91 path = os.path.join(self.tempdir, item.filename)
92 if not self.__is_zipfile_clean(item) and not list_unsupported:
93 logging.debug('%s from %s has compromising zipinfo', item.filename, self.filename)
94 return False
95 if os.path.isfile(path):
96 from libmat.mat import create_class_file
97 cfile = create_class_file(path, False, add2archive=self.add2archive)
98 if cfile is not None:
99 if not cfile.is_clean():
100 logging.debug('%s from %s has metadata', item.filename, self.filename)
101 if not list_unsupported:
102 return False
103 else:
104 logging.info('%s\'s fileformat is not supported or harmless.', item.filename)
105 _, ext = os.path.splitext(path)
106 if os.path.basename(item.filename) not in ('mimetype', '.rels'):
107 if ext not in parser.NOMETA:
108 if not list_unsupported:
109 return False
110 ret_list.append(item.filename)
111 zipin.close()
112 if list_unsupported:
113 return ret_list
114 return True
115
116 def get_meta(self):
117 """ Return all the metadata of a zip archive"""
118 zipin = zipfile.ZipFile(self.filename, 'r')
119 metadata = {}
120 if zipin.comment != '':
121 metadata['comment'] = zipin.comment
122 for item in zipin.infolist():
123 zipinfo_meta = self.__get_zipinfo_meta(item)
124 if zipinfo_meta != {}: # zipinfo metadata
125 metadata[item.filename + "'s zipinfo"] = str(zipinfo_meta)
126 zipin.extract(item, self.tempdir)
127 path = os.path.join(self.tempdir, item.filename)
128 if os.path.isfile(path):
129 from libmat.mat import create_class_file
130 cfile = create_class_file(path, False, add2archive=self.add2archive)
131 if cfile is not None:
132 cfile_meta = cfile.get_meta()
133 if cfile_meta != {}:
134 metadata[item.filename] = str(cfile_meta)
135 else:
136 logging.info('%s\'s fileformat is not supported or harmless', item.filename)
137 zipin.close()
138 return metadata
139
140 @staticmethod
141 def __get_zipinfo_meta(zipinfo):
142 """ Return all the metadata of a ZipInfo
143 """
144 metadata = {}
145 if zipinfo.comment != '':
146 metadata['comment'] = zipinfo.comment
147 if zipinfo.date_time != ZIP_EPOCH:
148 metadata['modified'] = zipinfo.date_time
149 if zipinfo.create_system != 3: # 3 is UNIX
150 metadata['system'] = "windows" if zipinfo.create_system == 2 else "unknown"
151 return metadata
152
153 def remove_all(self, whitelist=None, beginning_blacklist=None, ending_blacklist=None):
154 """ Remove all metadata from a zip archive, even thoses
155 added by Python's zipfile itself. It will not add
156 files starting with "begining_blacklist", or ending with
157 "ending_blacklist". This method also add files present in
158 whitelist to the archive.
159
160 :param list whitelist: Add those files to the produced archive, regardless if they are harmful or not
161 :param list beginning_blacklist: If the file starts with $ending_blacklist, it will _not_ be added
162 :param list ending_blacklist: If the file end with $ending_blacklist, it will _not_ be added
163 """
164 if not ending_blacklist:
165 ending_blacklist = []
166 if not beginning_blacklist:
167 beginning_blacklist = []
168 if not whitelist:
169 whitelist = []
170 zipin = zipfile.ZipFile(self.filename, 'r')
171 zipout = zipfile.ZipFile(self.output, 'w', allowZip64=True)
172 for item in zipin.infolist():
173 zipin.extract(item, self.tempdir)
174 path = os.path.join(self.tempdir, item.filename)
175
176 beginning = any((True for f in beginning_blacklist if item.filename.startswith(f)))
177 ending = any((True for f in ending_blacklist if item.filename.endswith(f)))
178
179 if os.path.isfile(path) and not beginning and not ending:
180 from libmat.mat import create_class_file
181 cfile = create_class_file(path, False, add2archive=self.add2archive)
182 if cfile is not None:
183 # Handle read-only files inside archive
184 old_stat = os.stat(path).st_mode
185 os.chmod(path, old_stat | stat.S_IWUSR)
186 cfile.remove_all()
187 os.chmod(path, old_stat)
188 logging.debug('Processing %s from %s', item.filename, self.filename)
189 elif item.filename not in whitelist:
190 logging.info("%s's format is not supported or harmless", item.filename)
191 _, ext = os.path.splitext(path)
192 if not (self.add2archive or ext in parser.NOMETA):
193 continue
194 zinfo = zipfile.ZipInfo(item.filename, date_time=ZIP_EPOCH)
195 zinfo.compress_type = zipfile.ZIP_DEFLATED
196 zinfo.create_system = 3 # Linux
197 zinfo.comment = b''
198 with open(path, 'r') as f:
199 zipout.writestr(zinfo, str(f.read()))
200 # os.utime(path, (ZIP_EPOCH_SECONDS, ZIP_EPOCH_SECONDS))
201 # zipout.write(path, item.filename)
202 zipin.close()
203 zipout.close()
204
205 logging.info('%s processed', self.filename)
206 self.do_backup()
207 return True
208
209
210class TarStripper(GenericArchiveStripper):
211 """ Represent a tarfile archive
212 """
213
214 @staticmethod
215 def _remove_tar_added(current_file):
216 """ Remove the meta added by tarfile itself to the file
217 """
218 current_file.mtime = 0
219 current_file.uid = 0
220 current_file.gid = 0
221 current_file.uname = ''
222 current_file.gname = ''
223 return current_file
224
225 def remove_all(self, whitelist=None):
226 """ Remove all harmful metadata from the tarfile.
227 The method will also add every files matching
228 whitelist in the produced archive.
229 :param list whitelist: Files to add the to produced archive,
230 regardless if they are considered harmfull.
231 """
232 if not whitelist:
233 whitelist = []
234 tarin = tarfile.open(self.filename, 'r' + self.compression, encoding='utf-8')
235 tarout = tarfile.open(self.output, 'w' + self.compression, encoding='utf-8')
236 for item in tarin.getmembers():
237 tarin.extract(item, self.tempdir)
238 if item.isfile():
239 path = os.path.join(self.tempdir, item.name)
240 from libmat.mat import create_class_file
241 cfile = create_class_file(path, False, add2archive=self.add2archive)
242 if cfile is not None:
243 # Handle read-only files inside archive
244 old_stat = os.stat(path).st_mode
245 os.chmod(path, old_stat | stat.S_IWUSR)
246 cfile.remove_all()
247 os.chmod(path, old_stat)
248 elif self.add2archive or os.path.splitext(item.name)[1] in parser.NOMETA:
249 logging.debug("%s' format is either not supported or harmless", item.name)
250 elif item.name in whitelist:
251 logging.debug('%s is not supported, but MAT was told to add it anyway.', item.name)
252 else: # Don't add the file to the archive
253 logging.debug('%s will not be added', item.name)
254 continue
255 tarout.add(unicode(path.decode('utf-8')),
256 unicode(item.name.decode('utf-8')),
257 filter=self._remove_tar_added)
258 tarin.close()
259 tarout.close()
260 self.do_backup()
261 return True
262
263 @staticmethod
264 def is_file_clean(current_file):
265 """ Check metadatas added by tarfile
266 :param tarfile.TarInfo current_file:
267 """
268 if current_file.mtime != 0:
269 return False
270 elif current_file.uid != 0:
271 return False
272 elif current_file.gid != 0:
273 return False
274 elif current_file.uname != '':
275 return False
276 elif current_file.gname != '':
277 return False
278 return True
279
280 def is_clean(self, list_unsupported=False):
281 """ Check if the file is clean from harmful metadatas
282 When list_unsupported is True, the method returns a list
283 of all non-supported/archives files contained in the
284 archive.
285 :param bool list_unsupported:
286 """
287 ret_list = []
288 tarin = tarfile.open(self.filename, 'r' + self.compression)
289 for item in tarin.getmembers():
290 if not self.is_file_clean(item) and not list_unsupported:
291 logging.debug('%s from %s has compromising tarinfo', item.name, self.filename)
292 return False
293 tarin.extract(item, self.tempdir)
294 path = os.path.join(self.tempdir, item.name)
295 if item.isfile():
296 from libmat.mat import create_class_file
297 cfile = create_class_file(path, False, add2archive=self.add2archive)
298 if cfile is not None:
299 if not cfile.is_clean():
300 logging.debug('%s from %s has metadata', item.name.decode("utf8"), self.filename)
301 if not list_unsupported:
302 return False
303 # Nested archives are treated like unsupported files
304 elif isinstance(cfile, GenericArchiveStripper):
305 ret_list.append(item.name)
306 else:
307 logging.info("%s's format is not supported or harmless", item.name)
308 if os.path.splitext(path)[1] not in parser.NOMETA:
309 if not list_unsupported:
310 return False
311 ret_list.append(item.name)
312 tarin.close()
313 if list_unsupported:
314 return ret_list
315 return True
316
317 def get_meta(self):
318 """ Return a dict with all the meta of the tarfile
319 """
320 tarin = tarfile.open(self.filename, 'r' + self.compression)
321 metadata = {}
322 for item in tarin.getmembers():
323 current_meta = {}
324 if item.isfile():
325 tarin.extract(item, self.tempdir)
326 path = os.path.join(self.tempdir, item.name)
327 from libmat.mat import create_class_file
328 class_file = create_class_file(path, False, add2archive=self.add2archive)
329 if class_file is not None:
330 meta = class_file.get_meta()
331 if meta:
332 current_meta['file'] = str(meta)
333 else:
334 logging.info("%s's format is not supported or harmless", item.name)
335
336 if not self.is_file_clean(item): # if there is meta
337 current_meta['mtime'] = item.mtime
338 current_meta['uid'] = item.uid
339 current_meta['gid'] = item.gid
340 current_meta['uname'] = item.uname
341 current_meta['gname'] = item.gname
342 metadata[item.name] = str(current_meta)
343 tarin.close()
344 return metadata
345
346
347class TerminalZipStripper(ZipStripper):
348 """ Represent a terminal level archive.
349 This type of archive can not contain nested archives.
350 It is used for formats like docx, which are basically
351 ziped xml.
352 """
353 pass
354
355
356class GzipStripper(TarStripper):
357 """ Represent a tar.gz archive
358 """
359
360 def __init__(self, filename, mime, backup, is_writable, **kwargs):
361 super(GzipStripper, self).__init__(filename, mime, backup, is_writable, **kwargs)
362 self.compression = ':gz'
363
364
365class Bzip2Stripper(TarStripper):
366 """ Represent a tar.bz2 archive
367 """
368
369 def __init__(self, filename, mime, backup, is_writable, **kwargs):
370 super(Bzip2Stripper, self).__init__(filename, mime, backup, is_writable, **kwargs)
371 self.compression = ':bz2'