diff options
| -rw-r--r-- | libmat2/archive.py | 24 | ||||
| -rw-r--r-- | tests/test_corrupted_files.py | 26 |
2 files changed, 46 insertions, 4 deletions
diff --git a/libmat2/archive.py b/libmat2/archive.py index 7aa5cb9..969bbd8 100644 --- a/libmat2/archive.py +++ b/libmat2/archive.py | |||
| @@ -1,4 +1,5 @@ | |||
| 1 | import abc | 1 | import abc |
| 2 | import stat | ||
| 2 | import zipfile | 3 | import zipfile |
| 3 | import datetime | 4 | import datetime |
| 4 | import tarfile | 5 | import tarfile |
| @@ -104,6 +105,12 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 104 | full_path: str): | 105 | full_path: str): |
| 105 | """Add the file at full_path to the archive, via the given member.""" | 106 | """Add the file at full_path to the archive, via the given member.""" |
| 106 | 107 | ||
| 108 | @staticmethod | ||
| 109 | def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember: | ||
| 110 | """Set the permission of the archive member.""" | ||
| 111 | # pylint: disable=unused-argument | ||
| 112 | return member | ||
| 113 | |||
| 107 | def get_meta(self) -> Dict[str, Union[str, dict]]: | 114 | def get_meta(self) -> Dict[str, Union[str, dict]]: |
| 108 | meta = dict() # type: Dict[str, Union[str, dict]] | 115 | meta = dict() # type: Dict[str, Union[str, dict]] |
| 109 | 116 | ||
| @@ -120,6 +127,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 120 | 127 | ||
| 121 | zin.extract(member=item, path=temp_folder) | 128 | zin.extract(member=item, path=temp_folder) |
| 122 | full_path = os.path.join(temp_folder, member_name) | 129 | full_path = os.path.join(temp_folder, member_name) |
| 130 | os.chmod(full_path, stat.S_IRUSR) | ||
| 123 | 131 | ||
| 124 | specific_meta = self._specific_get_meta(full_path, member_name) | 132 | specific_meta = self._specific_get_meta(full_path, member_name) |
| 125 | local_meta = {**local_meta, **specific_meta} | 133 | local_meta = {**local_meta, **specific_meta} |
| @@ -164,6 +172,9 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 164 | zin.extract(member=item, path=temp_folder) | 172 | zin.extract(member=item, path=temp_folder) |
| 165 | full_path = os.path.join(temp_folder, member_name) | 173 | full_path = os.path.join(temp_folder, member_name) |
| 166 | 174 | ||
| 175 | original_permissions = os.stat(full_path).st_mode | ||
| 176 | os.chmod(full_path, original_permissions | stat.S_IWUSR | stat.S_IRUSR) | ||
| 177 | |||
| 167 | if self._specific_cleanup(full_path) is False: | 178 | if self._specific_cleanup(full_path) is False: |
| 168 | logging.warning("Something went wrong during deep cleaning of %s", | 179 | logging.warning("Something went wrong during deep cleaning of %s", |
| 169 | member_name) | 180 | member_name) |
| @@ -202,6 +213,7 @@ class ArchiveBasedAbstractParser(abstract.AbstractParser): | |||
| 202 | os.rename(member_parser.output_filename, full_path) | 213 | os.rename(member_parser.output_filename, full_path) |
| 203 | 214 | ||
| 204 | zinfo = self.member_class(member_name) # type: ignore | 215 | zinfo = self.member_class(member_name) # type: ignore |
| 216 | zinfo = self._set_member_permissions(zinfo, original_permissions) | ||
| 205 | clean_zinfo = self._clean_member(zinfo) | 217 | clean_zinfo = self._clean_member(zinfo) |
| 206 | self._add_file_to_archive(zout, clean_zinfo, full_path) | 218 | self._add_file_to_archive(zout, clean_zinfo, full_path) |
| 207 | 219 | ||
| @@ -216,11 +228,11 @@ class TarParser(ArchiveBasedAbstractParser): | |||
| 216 | mimetypes = {'application/x-tar'} | 228 | mimetypes = {'application/x-tar'} |
| 217 | def __init__(self, filename): | 229 | def __init__(self, filename): |
| 218 | super().__init__(filename) | 230 | super().__init__(filename) |
| 219 | # yes, it's tarfile.TarFile.open and not tarfile.TarFile, | 231 | # yes, it's tarfile.open and not tarfile.TarFile, |
| 220 | # as stated in the documentation: | 232 | # as stated in the documentation: |
| 221 | # https://docs.python.org/3/library/tarfile.html#tarfile.TarFile | 233 | # https://docs.python.org/3/library/tarfile.html#tarfile.TarFile |
| 222 | # This is required to support compressed archives. | 234 | # This is required to support compressed archives. |
| 223 | self.archive_class = tarfile.TarFile.open | 235 | self.archive_class = tarfile.open |
| 224 | self.member_class = tarfile.TarInfo | 236 | self.member_class = tarfile.TarInfo |
| 225 | 237 | ||
| 226 | def is_archive_valid(self): | 238 | def is_archive_valid(self): |
| @@ -239,7 +251,7 @@ class TarParser(ArchiveBasedAbstractParser): | |||
| 239 | assert isinstance(member, tarfile.TarInfo) # please mypy | 251 | assert isinstance(member, tarfile.TarInfo) # please mypy |
| 240 | metadata = {} | 252 | metadata = {} |
| 241 | if member.mtime != 0: | 253 | if member.mtime != 0: |
| 242 | metadata['mtime'] = datetime.datetime.fromtimestamp(member.mtime) | 254 | metadata['mtime'] = str(datetime.datetime.fromtimestamp(member.mtime)) |
| 243 | if member.uid != 0: | 255 | if member.uid != 0: |
| 244 | metadata['uid'] = str(member.uid) | 256 | metadata['uid'] = str(member.uid) |
| 245 | if member.gid != 0: | 257 | if member.gid != 0: |
| @@ -267,6 +279,12 @@ class TarParser(ArchiveBasedAbstractParser): | |||
| 267 | assert isinstance(member, tarfile.TarInfo) # please mypy | 279 | assert isinstance(member, tarfile.TarInfo) # please mypy |
| 268 | return member.name | 280 | return member.name |
| 269 | 281 | ||
| 282 | @staticmethod | ||
| 283 | def _set_member_permissions(member: ArchiveMember, permissions: int) -> ArchiveMember: | ||
| 284 | assert isinstance(member, tarfile.TarInfo) # please mypy | ||
| 285 | member.mode = permissions | ||
| 286 | return member | ||
| 287 | |||
| 270 | 288 | ||
| 271 | class TarGzParser(TarParser): | 289 | class TarGzParser(TarParser): |
| 272 | compression = ':gz' | 290 | compression = ':gz' |
diff --git a/tests/test_corrupted_files.py b/tests/test_corrupted_files.py index 1331f1c..b7240fe 100644 --- a/tests/test_corrupted_files.py +++ b/tests/test_corrupted_files.py | |||
| @@ -293,7 +293,7 @@ class TestCorruptedFiles(unittest.TestCase): | |||
| 293 | os.remove('./tests/data/clean.epub') | 293 | os.remove('./tests/data/clean.epub') |
| 294 | 294 | ||
| 295 | def test_tar(self): | 295 | def test_tar(self): |
| 296 | with tarfile.TarFile('./tests/data/clean.tar', 'w') as zout: | 296 | with tarfile.TarFile.open('./tests/data/clean.tar', 'w') as zout: |
| 297 | zout.add('./tests/data/dirty.flac') | 297 | zout.add('./tests/data/dirty.flac') |
| 298 | zout.add('./tests/data/dirty.docx') | 298 | zout.add('./tests/data/dirty.docx') |
| 299 | zout.add('./tests/data/dirty.jpg') | 299 | zout.add('./tests/data/dirty.jpg') |
| @@ -302,6 +302,7 @@ class TestCorruptedFiles(unittest.TestCase): | |||
| 302 | tarinfo.mtime = time.time() | 302 | tarinfo.mtime = time.time() |
| 303 | tarinfo.uid = 1337 | 303 | tarinfo.uid = 1337 |
| 304 | tarinfo.gid = 1338 | 304 | tarinfo.gid = 1338 |
| 305 | tarinfo.size = os.stat('./tests/data/dirty.png').st_size | ||
| 305 | with open('./tests/data/dirty.png', 'rb') as f: | 306 | with open('./tests/data/dirty.png', 'rb') as f: |
| 306 | zout.addfile(tarinfo, f) | 307 | zout.addfile(tarinfo, f) |
| 307 | p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') | 308 | p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') |
| @@ -316,3 +317,26 @@ class TestCorruptedFiles(unittest.TestCase): | |||
| 316 | with self.assertRaises(ValueError): | 317 | with self.assertRaises(ValueError): |
| 317 | archive.TarParser('./tests/data/clean.tar') | 318 | archive.TarParser('./tests/data/clean.tar') |
| 318 | os.remove('./tests/data/clean.tar') | 319 | os.remove('./tests/data/clean.tar') |
| 320 | |||
| 321 | class TestReadOnlyArchiveMembers(unittest.TestCase): | ||
| 322 | def test_onlymember_tar(self): | ||
| 323 | with tarfile.open('./tests/data/clean.tar', 'w') as zout: | ||
| 324 | zout.add('./tests/data/dirty.png') | ||
| 325 | tarinfo = tarfile.TarInfo('./tests/data/dirty.jpg') | ||
| 326 | tarinfo.mtime = time.time() | ||
| 327 | tarinfo.uid = 1337 | ||
| 328 | tarinfo.mode = 0o000 | ||
| 329 | tarinfo.size = os.stat('./tests/data/dirty.jpg').st_size | ||
| 330 | with open('./tests/data/dirty.jpg', 'rb') as f: | ||
| 331 | zout.addfile(tarinfo=tarinfo, fileobj=f) | ||
| 332 | p, mimetype = parser_factory.get_parser('./tests/data/clean.tar') | ||
| 333 | self.assertEqual(mimetype, 'application/x-tar') | ||
| 334 | meta = p.get_meta() | ||
| 335 | self.assertEqual(meta['./tests/data/dirty.jpg']['uid'], '1337') | ||
| 336 | self.assertTrue(p.remove_all()) | ||
| 337 | |||
| 338 | p = archive.TarParser('./tests/data/clean.cleaned.tar') | ||
| 339 | self.assertEqual(p.get_meta(), {}) | ||
| 340 | os.remove('./tests/data/clean.tar') | ||
| 341 | os.remove('./tests/data/clean.cleaned.tar') | ||
| 342 | |||
