From 5f0b3beb46d09af26107fe5f80e63ddccb127a59 Mon Sep 17 00:00:00 2001 From: jvoisin Date: Sat, 12 Oct 2019 16:13:49 -0700 Subject: Add a way to disable the sandbox Due to bubblewrap's pickiness, mat2 can now be run without a sandbox, even if bubblewrap is installed. --- libmat2/abstract.py | 1 + libmat2/bubblewrap.py | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++ libmat2/exiftool.py | 22 ++++++---- libmat2/subprocess.py | 113 -------------------------------------------------- libmat2/video.py | 12 ++++-- 5 files changed, 137 insertions(+), 124 deletions(-) create mode 100644 libmat2/bubblewrap.py delete mode 100644 libmat2/subprocess.py (limited to 'libmat2') diff --git a/libmat2/abstract.py b/libmat2/abstract.py index 8861966..5cfd0f2 100644 --- a/libmat2/abstract.py +++ b/libmat2/abstract.py @@ -32,6 +32,7 @@ class AbstractParser(abc.ABC): self.output_filename = fname + '.cleaned' + extension self.lightweight_cleaning = False + self.sandbox = True @abc.abstractmethod def get_meta(self) -> Dict[str, Union[str, dict]]: diff --git a/libmat2/bubblewrap.py b/libmat2/bubblewrap.py new file mode 100644 index 0000000..fb6fc9d --- /dev/null +++ b/libmat2/bubblewrap.py @@ -0,0 +1,113 @@ +""" +Wrapper around a subset of the subprocess module, +that uses bwrap (bubblewrap) when it is available. + +Instead of importing subprocess, other modules should use this as follows: + + from . import subprocess +""" + +import os +import shutil +import subprocess +import tempfile +from typing import List, Optional + + +__all__ = ['PIPE', 'run', 'CalledProcessError'] +PIPE = subprocess.PIPE +CalledProcessError = subprocess.CalledProcessError + + +def _get_bwrap_path() -> str: + bwrap_path = '/usr/bin/bwrap' + if os.path.isfile(bwrap_path): + if os.access(bwrap_path, os.X_OK): + return bwrap_path + + raise RuntimeError("Unable to find bwrap") # pragma: no cover + + +# pylint: disable=bad-whitespace +def _get_bwrap_args(tempdir: str, + input_filename: str, + output_filename: Optional[str] = None) -> List[str]: + ro_bind_args = [] + cwd = os.getcwd() + + # XXX: use --ro-bind-try once all supported platforms + # have a bubblewrap recent enough to support it. + ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd] + for bind_dir in ro_bind_dirs: + if os.path.isdir(bind_dir): # pragma: no cover + ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir]) + + ro_bind_files = ['/etc/ld.so.cache'] + for bind_file in ro_bind_files: + if os.path.isfile(bind_file): # pragma: no cover + ro_bind_args.extend(['--ro-bind', bind_file, bind_file]) + + args = ro_bind_args + \ + ['--dev', '/dev', + '--proc', '/proc', + '--chdir', cwd, + '--tmpfs', '/tmp', + '--unshare-user-try', + '--unshare-ipc', + '--unshare-pid', + '--unshare-net', + '--unshare-uts', + '--unshare-cgroup-try', + '--new-session', + '--cap-drop', 'all', + # XXX: enable --die-with-parent once all supported platforms have + # a bubblewrap recent enough to support it. + # '--die-with-parent', + ] + + if output_filename: + # Mount an empty temporary directory where the sandboxed + # process will create its output file + output_dirname = os.path.dirname(os.path.abspath(output_filename)) + args.extend(['--bind', tempdir, output_dirname]) + + absolute_input_filename = os.path.abspath(input_filename) + args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename]) + + return args + + +# pylint: disable=bad-whitespace +def run(args: List[str], + input_filename: str, + output_filename: Optional[str] = None, + **kwargs) -> subprocess.CompletedProcess: + """Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it + is available. + + Extra supported keyword arguments: + + - `input_filename`, made available read-only in the sandbox + - `output_filename`, where the file created by the sandboxed process + is copied upon successful completion; an empty temporary directory + is made visible as the parent directory of this file in the sandbox. + Optional: one valid use case is to invoke an external process + to inspect metadata present in a file. + """ + try: + bwrap_path = _get_bwrap_path() + except RuntimeError: # pragma: no cover + # bubblewrap is not installed ⇒ short-circuit + return subprocess.run(args, **kwargs) + + with tempfile.TemporaryDirectory() as tempdir: + prefix_args = [bwrap_path] + \ + _get_bwrap_args(input_filename=input_filename, + output_filename=output_filename, + tempdir=tempdir) + completed_process = subprocess.run(prefix_args + args, **kwargs) + if output_filename and completed_process.returncode == 0: + shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)), + output_filename) + + return completed_process diff --git a/libmat2/exiftool.py b/libmat2/exiftool.py index 024f490..89081e2 100644 --- a/libmat2/exiftool.py +++ b/libmat2/exiftool.py @@ -2,10 +2,11 @@ import functools import json import logging import os +import subprocess from typing import Dict, Union, Set from . import abstract -from . import subprocess +from . import bubblewrap # Make pyflakes happy assert Set @@ -19,9 +20,13 @@ class ExiftoolParser(abstract.AbstractParser): meta_allowlist = set() # type: Set[str] def get_meta(self) -> Dict[str, Union[str, dict]]: - out = subprocess.run([_get_exiftool_path(), '-json', self.filename], - input_filename=self.filename, - check=True, stdout=subprocess.PIPE).stdout + if self.sandbox: + out = bubblewrap.run([_get_exiftool_path(), '-json', self.filename], + input_filename=self.filename, + check=True, stdout=subprocess.PIPE).stdout + else: + out = subprocess.run([_get_exiftool_path(), '-json', self.filename], + check=True, stdout=subprocess.PIPE).stdout meta = json.loads(out.decode('utf-8'))[0] for key in self.meta_allowlist: meta.pop(key, None) @@ -48,9 +53,12 @@ class ExiftoolParser(abstract.AbstractParser): '-o', self.output_filename, self.filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: # pragma: no cover logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False diff --git a/libmat2/subprocess.py b/libmat2/subprocess.py deleted file mode 100644 index fb6fc9d..0000000 --- a/libmat2/subprocess.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -Wrapper around a subset of the subprocess module, -that uses bwrap (bubblewrap) when it is available. - -Instead of importing subprocess, other modules should use this as follows: - - from . import subprocess -""" - -import os -import shutil -import subprocess -import tempfile -from typing import List, Optional - - -__all__ = ['PIPE', 'run', 'CalledProcessError'] -PIPE = subprocess.PIPE -CalledProcessError = subprocess.CalledProcessError - - -def _get_bwrap_path() -> str: - bwrap_path = '/usr/bin/bwrap' - if os.path.isfile(bwrap_path): - if os.access(bwrap_path, os.X_OK): - return bwrap_path - - raise RuntimeError("Unable to find bwrap") # pragma: no cover - - -# pylint: disable=bad-whitespace -def _get_bwrap_args(tempdir: str, - input_filename: str, - output_filename: Optional[str] = None) -> List[str]: - ro_bind_args = [] - cwd = os.getcwd() - - # XXX: use --ro-bind-try once all supported platforms - # have a bubblewrap recent enough to support it. - ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd] - for bind_dir in ro_bind_dirs: - if os.path.isdir(bind_dir): # pragma: no cover - ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir]) - - ro_bind_files = ['/etc/ld.so.cache'] - for bind_file in ro_bind_files: - if os.path.isfile(bind_file): # pragma: no cover - ro_bind_args.extend(['--ro-bind', bind_file, bind_file]) - - args = ro_bind_args + \ - ['--dev', '/dev', - '--proc', '/proc', - '--chdir', cwd, - '--tmpfs', '/tmp', - '--unshare-user-try', - '--unshare-ipc', - '--unshare-pid', - '--unshare-net', - '--unshare-uts', - '--unshare-cgroup-try', - '--new-session', - '--cap-drop', 'all', - # XXX: enable --die-with-parent once all supported platforms have - # a bubblewrap recent enough to support it. - # '--die-with-parent', - ] - - if output_filename: - # Mount an empty temporary directory where the sandboxed - # process will create its output file - output_dirname = os.path.dirname(os.path.abspath(output_filename)) - args.extend(['--bind', tempdir, output_dirname]) - - absolute_input_filename = os.path.abspath(input_filename) - args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename]) - - return args - - -# pylint: disable=bad-whitespace -def run(args: List[str], - input_filename: str, - output_filename: Optional[str] = None, - **kwargs) -> subprocess.CompletedProcess: - """Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it - is available. - - Extra supported keyword arguments: - - - `input_filename`, made available read-only in the sandbox - - `output_filename`, where the file created by the sandboxed process - is copied upon successful completion; an empty temporary directory - is made visible as the parent directory of this file in the sandbox. - Optional: one valid use case is to invoke an external process - to inspect metadata present in a file. - """ - try: - bwrap_path = _get_bwrap_path() - except RuntimeError: # pragma: no cover - # bubblewrap is not installed ⇒ short-circuit - return subprocess.run(args, **kwargs) - - with tempfile.TemporaryDirectory() as tempdir: - prefix_args = [bwrap_path] + \ - _get_bwrap_args(input_filename=input_filename, - output_filename=output_filename, - tempdir=tempdir) - completed_process = subprocess.run(prefix_args + args, **kwargs) - if output_filename and completed_process.returncode == 0: - shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)), - output_filename) - - return completed_process diff --git a/libmat2/video.py b/libmat2/video.py index 1492ba1..2b33bc0 100644 --- a/libmat2/video.py +++ b/libmat2/video.py @@ -1,3 +1,4 @@ +import subprocess import functools import os import logging @@ -5,7 +6,7 @@ import logging from typing import Dict, Union from . import exiftool -from . import subprocess +from . import bubblewrap class AbstractFFmpegParser(exiftool.ExiftoolParser): @@ -33,9 +34,12 @@ class AbstractFFmpegParser(exiftool.ExiftoolParser): '-flags:a', '+bitexact', # don't add any metadata self.output_filename] try: - subprocess.run(cmd, check=True, - input_filename=self.filename, - output_filename=self.output_filename) + if self.sandbox: + bubblewrap.run(cmd, check=True, + input_filename=self.filename, + output_filename=self.output_filename) + else: + subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: logging.error("Something went wrong during the processing of %s: %s", self.filename, e) return False -- cgit v1.3