From 5f0b3beb46d09af26107fe5f80e63ddccb127a59 Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sat, 12 Oct 2019 16:13:49 -0700 Subject: Add a way to disable the sandbox Due to bubblewrap's pickiness, mat2 can now be run without a sandbox, even if bubblewrap is installed. --- libmat2/abstract.py | 1 + libmat2/bubblewrap.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++ libmat2/exiftool.py | 22 ++++++---- libmat2/subprocess.py | 113 -------------------------------------------------- libmat2/video.py | 12 ++++-- mat2 | 14 ++++--- tests/test_climat2.py | 31 ++++++++++++-- tests/test_libmat2.py | 40 ++++++++++++++++++ 8 files changed, 213 insertions(+), 133 deletions(-) create mode 100644 libmat2/bubblewrap.py delete mode 100644 libmat2/subprocess.py diff --git a/libmat2/abstract.py b/libmat2/abstract.py index 8861966..5cfd0f2 100644 --- a/libmat2/abstract.py +++ b/libmat2/abstract.py @@ -32,6 +32,7 @@ class AbstractParser(abc.ABC): self.output_filename = fname + '.cleaned' + extension self.lightweight_cleaning = False + self.sandbox = True @abc.abstractmethod def get_meta(self) -> Dict[str, Union[str, dict]]: diff --git a/libmat2/bubblewrap.py b/libmat2/bubblewrap.py new file mode 100644 index 0000000..fb6fc9d --- /dev/null +++ b/libmat2/bubblewrap.py @@ -0,0 +1,113 @@ +""" +Wrapper around a subset of the subprocess module, +that uses bwrap (bubblewrap) when it is available. + +Instead of importing subprocess, other modules should use this as follows: + + from . import subprocess +""" + +import os +import shutil +import subprocess +import tempfile +from typing import List, Optional + + +__all__ = ['PIPE', 'run', 'CalledProcessError'] +PIPE = subprocess.PIPE +CalledProcessError = subprocess.CalledProcessError + + +def _get_bwrap_path() -> str: + bwrap_path = '/usr/bin/bwrap' + if os.path.isfile(bwrap_path): + if os.access(bwrap_path, os.X_OK): + return bwrap_path + + raise RuntimeError("Unable to find bwrap") # pragma: no cover + + +# pylint: disable=bad-whitespace +def _get_bwrap_args(tempdir: str, + input_filename: str, + output_filename: Optional[str] = None) -> List[str]: + ro_bind_args = [] + cwd = os.getcwd() + + # XXX: use --ro-bind-try once all supported platforms + # have a bubblewrap recent enough to support it. + ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd] + for bind_dir in ro_bind_dirs: + if os.path.isdir(bind_dir): # pragma: no cover + ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir]) + + ro_bind_files = ['/etc/ld.so.cache'] + for bind_file in ro_bind_files: + if os.path.isfile(bind_file): # pragma: no cover + ro_bind_args.extend(['--ro-bind', bind_file, bind_file]) + + args = ro_bind_args + \ + ['--dev', '/dev', + '--proc', '/proc', + '--chdir', cwd, + '--tmpfs', '/tmp', + '--unshare-user-try', + '--unshare-ipc', + '--unshare-pid', + '--unshare-net', + '--unshare-uts', + '--unshare-cgroup-try', + '--new-session', + '--cap-drop', 'all', + # XXX: enable --die-with-parent once all supported platforms have + # a bubblewrap recent enough to support it. + # '--die-with-parent', + ] + + if output_filename: + # Mount an empty temporary directory where the sandboxed + # process will create its output file + output_dirname = os.path.dirname(os.path.abspath(output_filename)) + args.extend(['--bind', tempdir, output_dirname]) + + absolute_input_filename = os.path.abspath(input_filename) + args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename]) + + return args + + +# pylint: disable=bad-whitespace +def run(args: List[str], + input_filename: str, + output_filename: Optional[str] = None, + **kwargs) -> subprocess.CompletedProcess: + """Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it + is available. + + Extra supported keyword arguments: + + - `input_filename`, made available read-only in the sandbox + - `output_filename`, where the file created by the sandboxed process + is copied upon successful completion; an empty temporary directory + is made visible as the parent directory of this file in the sandbox. + Optional: one valid use case is to invoke an external process + to inspect metadata present in a file. + """ + try: + bwrap_path = _get_bwrap_path() + except RuntimeError: # pragma: no cover + # bubblewrap is not installed ⇒ short-circuit + return subprocess.run(args, **kwargs) + + with tempfile.TemporaryDirectory() as tempdir: + prefix_args = [bwrap_path] + \ + _get_bwrap_args(input_filename=input_filename, + output_filename=output_filename, + tempdir=tempdir) + completed_process = subprocess.run(prefix_args + args, **kwargs) + if output_filename and completed_process.returncode == 0: + shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)), + output_filename) + + return completed_process diff --git a/libmat2/exiftool.py b/libmat2/exiftool.py index 024f490..89081e2 100644 --- a/libmat2/exiftool.py +++ b/libmat2/exiftool.py @@ -2,10 +2,11 @@ import functools import json import logging import os +import subprocess from typing import Dict, Union, Set from . import abstract -from . import subprocess +from . import bubblewrap # Make pyflakes happy assert Set @@ -19,9 +20,13 @@ class ExiftoolParser(abstract.AbstractParser): meta_allowlist = set() # type: Set[str] def get_meta(self) -> Dict[str, Union[str, dict]]: - out = subprocess.run([_get_exiftool_path(), '-json', self.filename], - input_filename=self.filename, - check=True, stdout=subprocess.PIPE).stdout + if self.sandbox: + out = bubblewrap.run([_get_exiftool_path(), '-json', self.filename], + input_filename=self.filename, + check=True, stdout=subprocess.PIPE).stdout + else: + out = subprocess.run([_get_exiftool_path(), '-json', self.filename], + check=True, stdout=subprocess.PIPE).stdout meta = json.loads(out.decode('utf-8'))[0] for key in self.meta_allowlist: meta.pop(key, None) @@ -48,9 +53,12 @@ class ExiftoolParser(abstract.AbstractParser): '-o', self.output_filename, self.filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: # pragma: no cover logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/libmat2/subprocess.py b/libmat2/subprocess.py deleted file mode 100644 index fb6fc9d..0000000 --- a/libmat2/subprocess.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -Wrapper around a subset of the subprocess module, -that uses bwrap (bubblewrap) when it is available. - -Instead of importing subprocess, other modules should use this as follows: - - from . import subprocess -""" - -import os -import shutil -import subprocess -import tempfile -from typing import List, Optional - - -__all__ = ['PIPE', 'run', 'CalledProcessError'] -PIPE = subprocess.PIPE -CalledProcessError = subprocess.CalledProcessError - - -def _get_bwrap_path() -> str: - bwrap_path = '/usr/bin/bwrap' - if os.path.isfile(bwrap_path): - if os.access(bwrap_path, os.X_OK): - return bwrap_path - - raise RuntimeError("Unable to find bwrap") # pragma: no cover - - -# pylint: disable=bad-whitespace -def _get_bwrap_args(tempdir: str, - input_filename: str, - output_filename: Optional[str] = None) -> List[str]: - ro_bind_args = [] - cwd = os.getcwd() - - # XXX: use --ro-bind-try once all supported platforms - # have a bubblewrap recent enough to support it. - ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd] - for bind_dir in ro_bind_dirs: - if os.path.isdir(bind_dir): # pragma: no cover - ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir]) - - ro_bind_files = ['/etc/ld.so.cache'] - for bind_file in ro_bind_files: - if os.path.isfile(bind_file): # pragma: no cover - ro_bind_args.extend(['--ro-bind', bind_file, bind_file]) - - args = ro_bind_args + \ - ['--dev', '/dev', - '--proc', '/proc', - '--chdir', cwd, - '--tmpfs', '/tmp', - '--unshare-user-try', - '--unshare-ipc', - '--unshare-pid', - '--unshare-net', - '--unshare-uts', - '--unshare-cgroup-try', - '--new-session', - '--cap-drop', 'all', - # XXX: enable --die-with-parent once all supported platforms have - # a bubblewrap recent enough to support it. - # '--die-with-parent', - ] - - if output_filename: - # Mount an empty temporary directory where the sandboxed - # process will create its output file - output_dirname = os.path.dirname(os.path.abspath(output_filename)) - args.extend(['--bind', tempdir, output_dirname]) - - absolute_input_filename = os.path.abspath(input_filename) - args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename]) - - return args - - -# pylint: disable=bad-whitespace -def run(args: List[str], - input_filename: str, - output_filename: Optional[str] = None, - **kwargs) -> subprocess.CompletedProcess: - """Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it - is available. - - Extra supported keyword arguments: - - - `input_filename`, made available read-only in the sandbox - - `output_filename`, where the file created by the sandboxed process - is copied upon successful completion; an empty temporary directory - is made visible as the parent directory of this file in the sandbox. - Optional: one valid use case is to invoke an external process - to inspect metadata present in a file. - """ - try: - bwrap_path = _get_bwrap_path() - except RuntimeError: # pragma: no cover - # bubblewrap is not installed ⇒ short-circuit - return subprocess.run(args, **kwargs) - - with tempfile.TemporaryDirectory() as tempdir: - prefix_args = [bwrap_path] + \ - _get_bwrap_args(input_filename=input_filename, - output_filename=output_filename, - tempdir=tempdir) - completed_process = subprocess.run(prefix_args + args, **kwargs) - if output_filename and completed_process.returncode == 0: - shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)), - output_filename) - - return completed_process diff --git a/libmat2/video.py b/libmat2/video.py index 1492ba1..2b33bc0 100644 --- a/libmat2/video.py +++ b/libmat2/video.py @@ -1,3 +1,4 @@ +import subprocess import functools import os import logging @@ -5,7 +6,7 @@ import logging from typing import Dict, Union from . import exiftool -from . import subprocess +from . import bubblewrap class AbstractFFmpegParser(exiftool.ExiftoolParser): @@ -33,9 +34,12 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser): '-flags:a', '+bitexact', # don't add any metadata self.output_filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/mat2 b/mat2 index b9f02f2..e67fea0 100755 --- a/mat2 +++ b/mat2 @@ -55,6 +55,8 @@ def create_arg_parser() -> argparse.ArgumentParser: ', '.join(p.value for p in UnknownMemberPolicy)) parser.add_argument('--inplace', action='store_true', help='clean in place, without backup') + parser.add_argument('--no-sandbox', dest='sandbox', action='store_true', + default=False, help='Disable bubblewrap\'s sandboxing.') excl_group = parser.add_mutually_exclusive_group() excl_group.add_argument('files', nargs='*', help='the files to process', @@ -78,7 +80,7 @@ def create_arg_parser() -> argparse.ArgumentParser: return parser -def show_meta(filename: str): +def show_meta(filename: str, sandbox: bool): if not __check_file(filename): return @@ -86,6 +88,7 @@ def show_meta(filename: str): if p is None: print("[-] %s's format (%s) is not supported" % (filename, mtype)) return + p.sandbox = sandbox __print_meta(filename, p.get_meta()) @@ -116,7 +119,7 @@ def __print_meta(filename: str, metadata: dict, depth: int = 1): print(padding + " %s: harmful content" % k) -def clean_meta(filename: str, is_lightweight: bool, inplace: bool, +def clean_meta(filename: str, is_lightweight: bool, inplace: bool, sandbox: bool, policy: UnknownMemberPolicy) -> bool: mode = (os.R_OK | os.W_OK) if inplace else os.R_OK if not __check_file(filename, mode): @@ -128,6 +131,7 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, return False p.unknown_member_policy = policy p.lightweight_cleaning = is_lightweight + p.sandbox = sandbox try: logging.debug('Cleaning %s…', filename) @@ -140,7 +144,6 @@ def clean_meta(filename: str, is_lightweight: bool, inplace: bool, return False - def show_parsers(): print('[+] Supported formats:') formats = set() # Set[str] @@ -171,6 +174,7 @@ def __get_files_recursively(files: List[str]) -> List[str]: ret.add(f) return list(ret) + def main() -> int: arg_parser = create_arg_parser() args = arg_parser.parse_args() @@ -193,7 +197,7 @@ def main() -> int: elif args.show: for f in __get_files_recursively(args.files): - show_meta(f) + show_meta(f, args.sandbox) return 0 else: @@ -210,7 +214,7 @@ def main() -> int: futures = list() for f in files: future = executor.submit(clean_meta, f, args.lightweight, - inplace, policy) + inplace, args.sandbox, policy) futures.append(future) for future in concurrent.futures.as_completed(futures): no_failure &= future.result() diff --git a/tests/test_climat2.py b/tests/test_climat2.py index 6cf8a39..9d816b1 100644 --- a/tests/test_climat2.py +++ b/tests/test_climat2.py @@ -20,17 +20,17 @@ class TestHelp(unittest.TestCase): def test_help(self): proc = subprocess.Popen(mat2_binary + ['--help'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]', stdout) - self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) + self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) def test_no_arg(self): proc = subprocess.Popen(mat2_binary, stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [-v] [-l]', + self.assertIn(b'mat2 [-h] [-V] [--unknown-members policy] [--inplace] [--no-sandbox]', stdout) - self.assertIn(b'[--check-dependencies] [-L | -s]', stdout) + self.assertIn(b' [-v] [-l] [--check-dependencies] [-L | -s]', stdout) self.assertIn(b'[files [files ...]]', stdout) @@ -40,12 +40,14 @@ class TestVersion(unittest.TestCase): stdout, _ = proc.communicate() self.assertTrue(stdout.startswith(b'MAT2 ')) + class TestDependencies(unittest.TestCase): def test_dependencies(self): proc = subprocess.Popen(mat2_binary + ['--check-dependencies'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertTrue(b'MAT2' in stdout) + class TestReturnValue(unittest.TestCase): def test_nonzero(self): ret = subprocess.call(mat2_binary + ['mat2'], stdout=subprocess.DEVNULL) @@ -112,6 +114,25 @@ class TestCleanMeta(unittest.TestCase): os.remove('./tests/data/clean.jpg') + def test_jpg_nosandbox(self): + shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') + + proc = subprocess.Popen(mat2_binary + ['--show', '--no-sandbox', './tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + self.assertIn(b'Comment: Created with GIMP', stdout) + + proc = subprocess.Popen(mat2_binary + ['./tests/data/clean.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + + proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/clean.cleaned.jpg'], + stdout=subprocess.PIPE) + stdout, _ = proc.communicate() + self.assertNotIn(b'Comment: Created with GIMP', stdout) + + os.remove('./tests/data/clean.jpg') + class TestIsSupported(unittest.TestCase): def test_pdf(self): @@ -181,6 +202,7 @@ class TestGetMeta(unittest.TestCase): self.assertIn(b'i am a : various comment', stdout) self.assertIn(b'artist: jvoisin', stdout) + class TestControlCharInjection(unittest.TestCase): def test_jpg(self): proc = subprocess.Popen(mat2_binary + ['--show', './tests/data/control_chars.jpg'], @@ -242,6 +264,7 @@ class TestCommandLineParallel(unittest.TestCase): os.remove(path) os.remove('./tests/data/dirty_%d.docx' % i) + class TestInplaceCleaning(unittest.TestCase): def test_cleaning(self): shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index 13d861d..20e6a01 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -721,3 +721,43 @@ class TestCleaningArchives(unittest.TestCase): os.remove('./tests/data/dirty.tar.xz') os.remove('./tests/data/dirty.cleaned.tar.xz') os.remove('./tests/data/dirty.cleaned.cleaned.tar.xz') + +class TestNoSandbox(unittest.TestCase): + def test_avi_nosandbox(self): + shutil.copy('./tests/data/dirty.avi', './tests/data/clean.avi') + p = video.AVIParser('./tests/data/clean.avi') + p.sandbox = False + + meta = p.get_meta() + self.assertEqual(meta['Software'], 'MEncoder SVN-r33148-4.0.1') + + ret = p.remove_all() + self.assertTrue(ret) + + p = video.AVIParser('./tests/data/clean.cleaned.avi') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.avi') + os.remove('./tests/data/clean.cleaned.avi') + os.remove('./tests/data/clean.cleaned.cleaned.avi') + + def test_png_nosandbox(self): + shutil.copy('./tests/data/dirty.png', './tests/data/clean.png') + p = images.PNGParser('./tests/data/clean.png') + p.sandbox = False + p.lightweight_cleaning = True + + meta = p.get_meta() + self.assertEqual(meta['Comment'], 'This is a comment, be careful!') + + ret = p.remove_all() + self.assertTrue(ret) + + p = images.PNGParser('./tests/data/clean.cleaned.png') + self.assertEqual(p.get_meta(), {}) + self.assertTrue(p.remove_all()) + + os.remove('./tests/data/clean.png') + os.remove('./tests/data/clean.cleaned.png') + os.remove('./tests/data/clean.cleaned.cleaned.png') -- cgit v1.3