From 82cc822a1dc7090f7a6af977ed6d4b7b945d038a Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sat, 27 Apr 2019 04:05:36 -0700 Subject: Add tar archive support --- libmat2/archive.py | 256 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 193 insertions(+), 63 deletions(-) (limited to 'libmat2/archive.py') diff --git a/libmat2/archive.py b/libmat2/archive.py index aa1b24c..2936f39 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py @@ -1,5 +1,7 @@ +import abc import zipfile import datetime +import tarfile import tempfile import os import logging @@ -11,14 +13,37 @@ from . import abstract, UnknownMemberPolicy, parser_factory # Make pyflakes happy assert Set assert Pattern -assert List -assert Union + +# pylint: disable=not-callable,assignment-from-no-return + +# An ArchiveClass is a class representing an archive, +# while an ArchiveMember is a class representing an element +# (usually a file) of an archive. +ArchiveClass = Union[zipfile.ZipFile, tarfile.TarFile] +ArchiveMember = Union[zipfile.ZipInfo, tarfile.TarInfo] class ArchiveBasedAbstractParser(abstract.AbstractParser): - """ Office files (.docx, .odt, …) are zipped files. """ + """Base class for all archive-based formats. + + Welcome to a world of frustrating complexity and tediouness: + - A lot of file formats (docx, odt, epubs, …) are archive-based, + so we need to add callbacks erverywhere to allow their respective + parsers to apply specific cleanup to the required files. + - Python has two different modules to deal with .tar and .zip files, + with similar-but-yet-o-so-different API, so we need to write + a ghetto-wrapper to avoid duplicating everything + - The combination of @staticmethod and @abstractstaticmethod is + required because for now, mypy doesn't know that + @abstractstaticmethod is, indeed, a static method. + - Mypy is too dumb (yet) to realise that a type A is valid under + the Union[A, B] constrain, hence the weird `# type: ignore` + annotations. + """ def __init__(self, filename): super().__init__(filename) + self.archive_class = None # type: Optional[ArchiveClass] + self.member_class = None # type: Optional[ArchiveMember] # Those are the files that have a format that _isn't_ # supported by MAT2, but that we want to keep anyway. @@ -32,10 +57,10 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): # the archive? self.unknown_member_policy = UnknownMemberPolicy.ABORT # type: UnknownMemberPolicy - try: # better fail here than later - zipfile.ZipFile(self.filename) - except zipfile.BadZipFile: - raise ValueError + self.is_archive_valid() + + def is_archive_valid(self): + """Raise a ValueError is the current archive isn't a valid one.""" def _specific_cleanup(self, full_path: str) -> bool: """ This method can be used to apply specific treatment @@ -50,59 +75,57 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): return {} # pragma: no cover @staticmethod - def _clean_zipinfo(zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: - zipinfo.create_system = 3 # Linux - zipinfo.comment = b'' - zipinfo.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be - return zipinfo + @abc.abstractstaticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + """Return all the members of the archive.""" @staticmethod - def _get_zipinfo_meta(zipinfo: zipfile.ZipInfo) -> Dict[str, str]: - metadata = {} - if zipinfo.create_system == 3: # this is Linux - pass - elif zipinfo.create_system == 2: - metadata['create_system'] = 'Windows' - else: - metadata['create_system'] = 'Weird' + @abc.abstractstaticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + """Remove all the metadata for a given member.""" - if zipinfo.comment: - metadata['comment'] = zipinfo.comment # type: ignore + @staticmethod + @abc.abstractstaticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + """Return all the metadata of a given member.""" - if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): - metadata['date_time'] = str(datetime.datetime(*zipinfo.date_time)) + @staticmethod + @abc.abstractstaticmethod + def _get_member_name(member: ArchiveMember) -> str: + """Return the name of the given member.""" - return metadata + @staticmethod + @abc.abstractstaticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + """Add the file at full_path to the archive, via the given member.""" def get_meta(self) -> Dict[str, Union[str, dict]]: meta = dict() # type: Dict[str, Union[str, dict]] - with zipfile.ZipFile(self.filename) as zin: + with self.archive_class(self.filename) as zin: temp_folder = tempfile.mkdtemp() - for item in zin.infolist(): - local_meta = dict() # type: Dict[str, Union[str, Dict]] - for k, v in self._get_zipinfo_meta(item).items(): - local_meta[k] = v + for item in self._get_all_members(zin): + local_meta = self._get_member_meta(item) + member_name = self._get_member_name(item) - if item.filename[-1] == '/': # pragma: no cover + if member_name[-1] == '/': # pragma: no cover # `is_dir` is added in Python3.6 continue # don't keep empty folders zin.extract(member=item, path=temp_folder) - full_path = os.path.join(temp_folder, item.filename) + full_path = os.path.join(temp_folder, member_name) - specific_meta = self._specific_get_meta(full_path, item.filename) - for (k, v) in specific_meta.items(): - local_meta[k] = v + specific_meta = self._specific_get_meta(full_path, member_name) + local_meta = {**local_meta, **specific_meta} - tmp_parser, _ = parser_factory.get_parser(full_path) # type: ignore - if tmp_parser: - for k, v in tmp_parser.get_meta().items(): - local_meta[k] = v + member_parser, _ = parser_factory.get_parser(full_path) # type: ignore + if member_parser: + local_meta = {**local_meta, **member_parser.get_meta()} if local_meta: - meta[item.filename] = local_meta + meta[member_name] = local_meta shutil.rmtree(temp_folder) return meta @@ -110,17 +133,19 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): def remove_all(self) -> bool: # pylint: disable=too-many-branches - with zipfile.ZipFile(self.filename) as zin,\ - zipfile.ZipFile(self.output_filename, 'w') as zout: + with self.archive_class(self.filename) as zin,\ + self.archive_class(self.output_filename, 'w') as zout: temp_folder = tempfile.mkdtemp() abort = False - items = list() # type: List[zipfile.ZipInfo] - for item in sorted(zin.infolist(), key=lambda z: z.filename): + # Sort the items to process, to reduce fingerprinting, + # and keep them in the `items` variable. + items = list() # type: List[ArchiveMember] + for item in sorted(self._get_all_members(zin), key=self._get_member_name): # Some fileformats do require to have the `mimetype` file # as the first file in the archive. - if item.filename == 'mimetype': + if self._get_member_name(item) == 'mimetype': items = [item] + items else: items.append(item) @@ -128,53 +153,53 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): # Since files order is a fingerprint factor, # we're iterating (and thus inserting) them in lexicographic order. for item in items: - if item.filename[-1] == '/': # `is_dir` is added in Python3.6 + member_name = self._get_member_name(item) + if member_name[-1] == '/': # `is_dir` is added in Python3.6 continue # don't keep empty folders zin.extract(member=item, path=temp_folder) - full_path = os.path.join(temp_folder, item.filename) + full_path = os.path.join(temp_folder, member_name) if self._specific_cleanup(full_path) is False: logging.warning("Something went wrong during deep cleaning of %s", - item.filename) + member_name) abort = True continue - if any(map(lambda r: r.search(item.filename), self.files_to_keep)): + if any(map(lambda r: r.search(member_name), self.files_to_keep)): # those files aren't supported, but we want to add them anyway pass - elif any(map(lambda r: r.search(item.filename), self.files_to_omit)): + elif any(map(lambda r: r.search(member_name), self.files_to_omit)): continue else: # supported files that we want to first clean, then add - tmp_parser, mtype = parser_factory.get_parser(full_path) # type: ignore - if not tmp_parser: + member_parser, mtype = parser_factory.get_parser(full_path) # type: ignore + if not member_parser: if self.unknown_member_policy == UnknownMemberPolicy.OMIT: logging.warning("In file %s, omitting unknown element %s (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) continue elif self.unknown_member_policy == UnknownMemberPolicy.KEEP: logging.warning("In file %s, keeping unknown element %s (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) else: logging.error("In file %s, element %s's format (%s) " \ "isn't supported", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) abort = True continue - if tmp_parser: - if tmp_parser.remove_all() is False: + else: + if member_parser.remove_all() is False: logging.warning("In file %s, something went wrong \ with the cleaning of %s \ (format: %s)", - self.filename, item.filename, mtype) + self.filename, member_name, mtype) abort = True continue - os.rename(tmp_parser.output_filename, full_path) + os.rename(member_parser.output_filename, full_path) - zinfo = zipfile.ZipInfo(item.filename) # type: ignore - clean_zinfo = self._clean_zipinfo(zinfo) - with open(full_path, 'rb') as f: - zout.writestr(clean_zinfo, f.read()) + zinfo = self.member_class(member_name) # type: ignore + clean_zinfo = self._clean_member(zinfo) + self._add_file_to_archive(zout, clean_zinfo, full_path) shutil.rmtree(temp_folder) if abort: @@ -183,6 +208,111 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): return True +class TarParser(ArchiveBasedAbstractParser): + mimetypes = {'application/x-tar'} + def __init__(self, filename): + super().__init__(filename) + self.archive_class = tarfile.TarFile + self.member_class = tarfile.TarInfo + + def is_archive_valid(self): + if tarfile.is_tarfile(self.filename) is False: + raise ValueError + + @staticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + assert isinstance(member, tarfile.TarInfo) # please mypy + member.mtime = member.uid = member.gid = 0 + member.uname = member.gname = '' + return member + + @staticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + assert isinstance(member, tarfile.TarInfo) # please mypy + metadata = {} + if member.mtime != 0: + metadata['mtime'] = str(member.mtime) + if member.uid != 0: + metadata['uid'] = str(member.uid) + if member.gid != 0: + metadata['gid'] = str(member.gid) + if member.uname != '': + metadata['uname'] = member.uname + if member.gname != '': + metadata['gname'] = member.gname + return metadata + + @staticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + assert isinstance(member, tarfile.TarInfo) # please mypy + assert isinstance(archive, tarfile.TarFile) # please mypy + archive.add(full_path, member.name, filter=TarParser._clean_member) # type: ignore + + @staticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + assert isinstance(archive, tarfile.TarFile) # please mypy + return archive.getmembers() # type: ignore + + @staticmethod + def _get_member_name(member: ArchiveMember) -> str: + assert isinstance(member, tarfile.TarInfo) # please mypy + return member.name class ZipParser(ArchiveBasedAbstractParser): mimetypes = {'application/zip'} + def __init__(self, filename): + super().__init__(filename) + self.archive_class = zipfile.ZipFile + self.member_class = zipfile.ZipInfo + + def is_archive_valid(self): + try: + zipfile.ZipFile(self.filename) + except zipfile.BadZipFile: + raise ValueError + + @staticmethod + def _clean_member(member: ArchiveMember) -> ArchiveMember: + assert isinstance(member, zipfile.ZipInfo) # please mypy + member.create_system = 3 # Linux + member.comment = b'' + member.date_time = (1980, 1, 1, 0, 0, 0) # this is as early as a zipfile can be + return member + + @staticmethod + def _get_member_meta(member: ArchiveMember) -> Dict[str, str]: + assert isinstance(member, zipfile.ZipInfo) # please mypy + metadata = {} + if member.create_system == 3: # this is Linux + pass + elif member.create_system == 2: + metadata['create_system'] = 'Windows' + else: + metadata['create_system'] = 'Weird' + + if member.comment: + metadata['comment'] = member.comment # type: ignore + + if member.date_time != (1980, 1, 1, 0, 0, 0): + metadata['date_time'] = str(datetime.datetime(*member.date_time)) + + return metadata + + @staticmethod + def _add_file_to_archive(archive: ArchiveClass, member: ArchiveMember, + full_path: str): + assert isinstance(archive, zipfile.ZipFile) # please mypy + assert isinstance(member, zipfile.ZipInfo) # please mypy + with open(full_path, 'rb') as f: + archive.writestr(member, f.read()) + + @staticmethod + def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: + assert isinstance(archive, zipfile.ZipFile) # please mypy + return archive.infolist() # type: ignore + + @staticmethod + def _get_member_name(member: ArchiveMember) -> str: + assert isinstance(member, zipfile.ZipInfo) # please mypy + return member.filename -- cgit v1.3