diff options
| -rw-r--r-- | libmat2/archive.py | 27 | ||||
| -rw-r--r-- | tests/test_libmat2.py | 43 |
2 files changed, 62 insertions, 8 deletions
diff --git a/libmat2/archive.py b/libmat2/archive.py index 48c1594..29db417 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py | |||
| @@ -120,6 +120,18 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 120 | # pylint: disable=unused-argument | 120 | # pylint: disable=unused-argument |
| 121 | return member | 121 | return member |
| 122 | 122 | ||
| 123 | @staticmethod | ||
| 124 | def _get_member_compression(member: ArchiveMember): | ||
| 125 | """Get the compression of the archive member.""" | ||
| 126 | # pylint: disable=unused-argument | ||
| 127 | return None | ||
| 128 | |||
| 129 | @staticmethod | ||
| 130 | def _set_member_compression(member: ArchiveMember, compression) -> ArchiveMember: | ||
| 131 | """Set the compression of the archive member.""" | ||
| 132 | # pylint: disable=unused-argument | ||
| 133 | return member | ||
| 134 | |||
| 123 | def get_meta(self) -> Dict[str, Union[str, dict]]: | 135 | def get_meta(self) -> Dict[str, Union[str, dict]]: |
| 124 | meta = dict() # type: Dict[str, Union[str, dict]] | 136 | meta = dict() # type: Dict[str, Union[str, dict]] |
| 125 | 137 | ||
| @@ -184,6 +196,8 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 184 | original_permissions = os.stat(full_path).st_mode | 196 | original_permissions = os.stat(full_path).st_mode |
| 185 | os.chmod(full_path, original_permissions | stat.S_IWUSR | stat.S_IRUSR) | 197 | os.chmod(full_path, original_permissions | stat.S_IWUSR | stat.S_IRUSR) |
| 186 | 198 | ||
| 199 | original_compression = self._get_member_compression(item) | ||
| 200 | |||
| 187 | if self._specific_cleanup(full_path) is False: | 201 | if self._specific_cleanup(full_path) is False: |
| 188 | logging.warning("Something went wrong during deep cleaning of %s", | 202 | logging.warning("Something went wrong during deep cleaning of %s", |
| 189 | member_name) | 203 | member_name) |
| @@ -223,6 +237,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 223 | 237 | ||
| 224 | zinfo = self.member_class(member_name) # type: ignore | 238 | zinfo = self.member_class(member_name) # type: ignore |
| 225 | zinfo = self._set_member_permissions(zinfo, original_permissions) | 239 | zinfo = self._set_member_permissions(zinfo, original_permissions) |
| 240 | zinfo = self._set_member_compression(zinfo, original_compression) | ||
| 226 | clean_zinfo = self._clean_member(zinfo) | 241 | clean_zinfo = self._clean_member(zinfo) |
| 227 | self._add_file_to_archive(zout, clean_zinfo, full_path) | 242 | self._add_file_to_archive(zout, clean_zinfo, full_path) |
| 228 | 243 | ||
| @@ -368,7 +383,6 @@ class ZipParser(ArchiveBasedAbstractParser): | |||
| 368 | super().__init__(filename) | 383 | super().__init__(filename) |
| 369 | self.archive_class = zipfile.ZipFile | 384 | self.archive_class = zipfile.ZipFile |
| 370 | self.member_class = zipfile.ZipInfo | 385 | self.member_class = zipfile.ZipInfo |
| 371 | self.zip_compression_type = zipfile.ZIP_DEFLATED | ||
| 372 | 386 | ||
| 373 | def is_archive_valid(self): | 387 | def is_archive_valid(self): |
| 374 | try: | 388 | try: |
| @@ -410,7 +424,7 @@ class ZipParser(ArchiveBasedAbstractParser): | |||
| 410 | assert isinstance(member, zipfile.ZipInfo) # please mypy | 424 | assert isinstance(member, zipfile.ZipInfo) # please mypy |
| 411 | with open(full_path, 'rb') as f: | 425 | with open(full_path, 'rb') as f: |
| 412 | archive.writestr(member, f.read(), | 426 | archive.writestr(member, f.read(), |
| 413 | compress_type=self.zip_compression_type) | 427 | compress_type=member.compress_type) |
| 414 | 428 | ||
| 415 | @staticmethod | 429 | @staticmethod |
| 416 | def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: | 430 | def _get_all_members(archive: ArchiveClass) -> List[ArchiveMember]: |
| @@ -421,3 +435,12 @@ class ZipParser(ArchiveBasedAbstractParser): | |||
| 421 | def _get_member_name(member: ArchiveMember) -> str: | 435 | def _get_member_name(member: ArchiveMember) -> str: |
| 422 | assert isinstance(member, zipfile.ZipInfo) # please mypy | 436 | assert isinstance(member, zipfile.ZipInfo) # please mypy |
| 423 | return member.filename | 437 | return member.filename |
| 438 | |||
| 439 | @staticmethod | ||
| 440 | def _get_member_compression(member: ArchiveMember): | ||
| 441 | return member.compress_type | ||
| 442 | |||
| 443 | @staticmethod | ||
| 444 | def _set_member_compression(member: ArchiveMember, compression) -> ArchiveMember: | ||
| 445 | member.compress_type = compression | ||
| 446 | return member | ||
diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index af92db3..f8e62de 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py | |||
| @@ -175,14 +175,30 @@ class TestGetMeta(unittest.TestCase): | |||
| 175 | 175 | ||
| 176 | def test_zip(self): | 176 | def test_zip(self): |
| 177 | with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout: | 177 | with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout: |
| 178 | zout.write('./tests/data/dirty.flac') | 178 | zout.write('./tests/data/dirty.flac', |
| 179 | zout.write('./tests/data/dirty.docx') | 179 | compress_type = zipfile.ZIP_STORED) |
| 180 | zout.write('./tests/data/dirty.jpg') | 180 | zout.write('./tests/data/dirty.docx', |
| 181 | compress_type = zipfile.ZIP_DEFLATED) | ||
| 182 | zout.write('./tests/data/dirty.jpg', | ||
| 183 | compress_type = zipfile.ZIP_BZIP2) | ||
| 184 | zout.write('./tests/data/dirty.txt', | ||
| 185 | compress_type = zipfile.ZIP_LZMA) | ||
| 181 | p, mimetype = parser_factory.get_parser('./tests/data/dirty.zip') | 186 | p, mimetype = parser_factory.get_parser('./tests/data/dirty.zip') |
| 182 | self.assertEqual(mimetype, 'application/zip') | 187 | self.assertEqual(mimetype, 'application/zip') |
| 183 | meta = p.get_meta() | 188 | meta = p.get_meta() |
| 184 | self.assertEqual(meta['tests/data/dirty.flac']['comments'], 'Thank you for using MAT !') | 189 | self.assertEqual(meta['tests/data/dirty.flac']['comments'], 'Thank you for using MAT !') |
| 185 | self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') | 190 | self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') |
| 191 | |||
| 192 | with zipfile.ZipFile('./tests/data/dirty.zip') as zipin: | ||
| 193 | members = { | ||
| 194 | 'tests/data/dirty.flac' : zipfile.ZIP_STORED, | ||
| 195 | 'tests/data/dirty.docx': zipfile.ZIP_DEFLATED, | ||
| 196 | 'tests/data/dirty.jpg' : zipfile.ZIP_BZIP2, | ||
| 197 | 'tests/data/dirty.txt' : zipfile.ZIP_LZMA, | ||
| 198 | } | ||
| 199 | for k, v in members.items(): | ||
| 200 | self.assertEqual(zipin.getinfo(k).compress_type, v) | ||
| 201 | |||
| 186 | os.remove('./tests/data/dirty.zip') | 202 | os.remove('./tests/data/dirty.zip') |
| 187 | 203 | ||
| 188 | def test_wmv(self): | 204 | def test_wmv(self): |
| @@ -595,9 +611,14 @@ class TestCleaning(unittest.TestCase): | |||
| 595 | class TestCleaningArchives(unittest.TestCase): | 611 | class TestCleaningArchives(unittest.TestCase): |
| 596 | def test_zip(self): | 612 | def test_zip(self): |
| 597 | with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout: | 613 | with zipfile.ZipFile('./tests/data/dirty.zip', 'w') as zout: |
| 598 | zout.write('./tests/data/dirty.flac') | 614 | zout.write('./tests/data/dirty.flac', |
| 599 | zout.write('./tests/data/dirty.docx') | 615 | compress_type = zipfile.ZIP_STORED) |
| 600 | zout.write('./tests/data/dirty.jpg') | 616 | zout.write('./tests/data/dirty.docx', |
| 617 | compress_type = zipfile.ZIP_DEFLATED) | ||
| 618 | zout.write('./tests/data/dirty.jpg', | ||
| 619 | compress_type = zipfile.ZIP_BZIP2) | ||
| 620 | zout.write('./tests/data/dirty.txt', | ||
| 621 | compress_type = zipfile.ZIP_LZMA) | ||
| 601 | p = archive.ZipParser('./tests/data/dirty.zip') | 622 | p = archive.ZipParser('./tests/data/dirty.zip') |
| 602 | meta = p.get_meta() | 623 | meta = p.get_meta() |
| 603 | self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') | 624 | self.assertEqual(meta['tests/data/dirty.docx']['word/media/image1.png']['Comment'], 'This is a comment, be careful!') |
| @@ -609,6 +630,16 @@ class TestCleaningArchives(unittest.TestCase): | |||
| 609 | self.assertEqual(p.get_meta(), {}) | 630 | self.assertEqual(p.get_meta(), {}) |
| 610 | self.assertTrue(p.remove_all()) | 631 | self.assertTrue(p.remove_all()) |
| 611 | 632 | ||
| 633 | with zipfile.ZipFile('./tests/data/dirty.zip') as zipin: | ||
| 634 | members = { | ||
| 635 | 'tests/data/dirty.flac' : zipfile.ZIP_STORED, | ||
| 636 | 'tests/data/dirty.docx': zipfile.ZIP_DEFLATED, | ||
| 637 | 'tests/data/dirty.jpg' : zipfile.ZIP_BZIP2, | ||
| 638 | 'tests/data/dirty.txt' : zipfile.ZIP_LZMA, | ||
| 639 | } | ||
| 640 | for k, v in members.items(): | ||
| 641 | self.assertEqual(zipin.getinfo(k).compress_type, v) | ||
| 642 | |||
| 612 | os.remove('./tests/data/dirty.zip') | 643 | os.remove('./tests/data/dirty.zip') |
| 613 | os.remove('./tests/data/dirty.cleaned.zip') | 644 | os.remove('./tests/data/dirty.cleaned.zip') |
| 614 | os.remove('./tests/data/dirty.cleaned.cleaned.zip') | 645 | os.remove('./tests/data/dirty.cleaned.cleaned.zip') |
