diff options
Diffstat (limited to 'libmat2/subprocess.py')
| -rw-r--r-- | libmat2/subprocess.py | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/libmat2/subprocess.py b/libmat2/subprocess.py new file mode 100644 index 0000000..25646f8 --- /dev/null +++ b/libmat2/subprocess.py | |||
| @@ -0,0 +1,100 @@ | |||
| 1 | """ | ||
| 2 | Wrapper around a subset of the subprocess module, | ||
| 3 | that uses bwrap (bubblewrap) when it is available. | ||
| 4 | |||
| 5 | Instead of importing subprocess, other modules should use this as follows: | ||
| 6 | |||
| 7 | from . import subprocess | ||
| 8 | """ | ||
| 9 | |||
| 10 | import os | ||
| 11 | import shutil | ||
| 12 | import subprocess | ||
| 13 | import tempfile | ||
| 14 | from typing import List, Optional | ||
| 15 | |||
| 16 | |||
| 17 | __all__ = ['PIPE', 'run', 'CalledProcessError'] | ||
| 18 | PIPE = subprocess.PIPE | ||
| 19 | CalledProcessError = subprocess.CalledProcessError | ||
| 20 | |||
| 21 | |||
| 22 | def _get_bwrap_path() -> str: | ||
| 23 | bwrap_path = '/usr/bin/bwrap' | ||
| 24 | if os.path.isfile(bwrap_path): | ||
| 25 | if os.access(bwrap_path, os.X_OK): | ||
| 26 | return bwrap_path | ||
| 27 | |||
| 28 | raise RuntimeError("Unable to find bwrap") # pragma: no cover | ||
| 29 | |||
| 30 | |||
| 31 | # pylint: disable=bad-whitespace | ||
| 32 | def _get_bwrap_args(tempdir: str, | ||
| 33 | input_filename: str, | ||
| 34 | output_filename: Optional[str] = None) -> List[str]: | ||
| 35 | cwd = os.getcwd() | ||
| 36 | |||
| 37 | # XXX: use --ro-bind-try once all supported platforms | ||
| 38 | # have a bubblewrap recent enough to support it. | ||
| 39 | ro_bind_dirs = ['/usr', '/lib', '/lib64', '/bin', '/sbin', cwd] | ||
| 40 | ro_bind_args = [] | ||
| 41 | for bind_dir in ro_bind_dirs: | ||
| 42 | if os.path.isdir(bind_dir): # pragma: no cover | ||
| 43 | ro_bind_args.extend(['--ro-bind', bind_dir, bind_dir]) | ||
| 44 | |||
| 45 | args = ro_bind_args + \ | ||
| 46 | ['--dev', '/dev', | ||
| 47 | '--chdir', cwd, | ||
| 48 | '--unshare-all', | ||
| 49 | '--new-session', | ||
| 50 | # XXX: enable --die-with-parent once all supported platforms have | ||
| 51 | # a bubblewrap recent enough to support it. | ||
| 52 | # '--die-with-parent', | ||
| 53 | ] | ||
| 54 | |||
| 55 | if output_filename: | ||
| 56 | # Mount an empty temporary directory where the sandboxed | ||
| 57 | # process will create its output file | ||
| 58 | output_dirname = os.path.dirname(os.path.abspath(output_filename)) | ||
| 59 | args.extend(['--bind', tempdir, output_dirname]) | ||
| 60 | |||
| 61 | absolute_input_filename = os.path.abspath(input_filename) | ||
| 62 | args.extend(['--ro-bind', absolute_input_filename, absolute_input_filename]) | ||
| 63 | |||
| 64 | return args | ||
| 65 | |||
| 66 | |||
| 67 | # pylint: disable=bad-whitespace | ||
| 68 | def run(args: List[str], | ||
| 69 | input_filename: str, | ||
| 70 | output_filename: Optional[str] = None, | ||
| 71 | **kwargs) -> subprocess.CompletedProcess: | ||
| 72 | """Wrapper around `subprocess.run`, that uses bwrap (bubblewrap) if it | ||
| 73 | is available. | ||
| 74 | |||
| 75 | Extra supported keyword arguments: | ||
| 76 | |||
| 77 | - `input_filename`, made available read-only in the sandbox | ||
| 78 | - `output_filename`, where the file created by the sandboxed process | ||
| 79 | is copied upon successful completion; an empty temporary directory | ||
| 80 | is made visible as the parent directory of this file in the sandbox. | ||
| 81 | Optional: one valid use case is to invoke an external process | ||
| 82 | to inspect metadata present in a file. | ||
| 83 | """ | ||
| 84 | try: | ||
| 85 | bwrap_path = _get_bwrap_path() | ||
| 86 | except RuntimeError: # pragma: no cover | ||
| 87 | # bubblewrap is not installed ⇒ short-circuit | ||
| 88 | return subprocess.run(args, **kwargs) | ||
| 89 | |||
| 90 | with tempfile.TemporaryDirectory() as tempdir: | ||
| 91 | prefix_args = [bwrap_path] + \ | ||
| 92 | _get_bwrap_args(input_filename=input_filename, | ||
| 93 | output_filename=output_filename, | ||
| 94 | tempdir=tempdir) | ||
| 95 | completed_process = subprocess.run(prefix_args + args, **kwargs) | ||
| 96 | if output_filename and completed_process.returncode == 0: | ||
| 97 | shutil.copy(os.path.join(tempdir, os.path.basename(output_filename)), | ||
| 98 | output_filename) | ||
| 99 | |||
| 100 | return completed_process | ||
