From 38fae60b8beaf9c7b37c65325d2d285e62b6cb85 Mon Sep 17 00:00:00 2001 From: jvoisin Date: Fri, 18 May 2018 23:52:40 +0200 Subject: Rename some files to simplify packaging - the `src` folder is now `libmat2` - the `main.py` script is now `mat2.py` --- .gitlab-ci.yml | 6 +- README.md | 2 +- libmat2/__init__.py | 6 ++ libmat2/abstract.py | 24 ++++++++ libmat2/audio.py | 39 ++++++++++++ libmat2/harmless.py | 17 ++++++ libmat2/images.py | 101 +++++++++++++++++++++++++++++++ libmat2/office.py | 150 ++++++++++++++++++++++++++++++++++++++++++++++ libmat2/parser_factory.py | 42 +++++++++++++ libmat2/pdf.py | 135 +++++++++++++++++++++++++++++++++++++++++ libmat2/torrent.py | 126 ++++++++++++++++++++++++++++++++++++++ main.py | 121 ------------------------------------- mat2.py | 121 +++++++++++++++++++++++++++++++++++++ src/__init__.py | 6 -- src/abstract.py | 24 -------- src/audio.py | 39 ------------ src/harmless.py | 17 ------ src/images.py | 101 ------------------------------- src/office.py | 150 ---------------------------------------------- src/parser_factory.py | 42 ------------- src/pdf.py | 135 ----------------------------------------- src/torrent.py | 126 -------------------------------------- tests/test_climat2.py | 44 +++++++------- tests/test_libmat2.py | 2 +- 24 files changed, 788 insertions(+), 788 deletions(-) create mode 100644 libmat2/__init__.py create mode 100644 libmat2/abstract.py create mode 100644 libmat2/audio.py create mode 100644 libmat2/harmless.py create mode 100644 libmat2/images.py create mode 100644 libmat2/office.py create mode 100644 libmat2/parser_factory.py create mode 100644 libmat2/pdf.py create mode 100644 libmat2/torrent.py delete mode 100755 main.py create mode 100755 mat2.py delete mode 100644 src/__init__.py delete mode 100644 src/abstract.py delete mode 100644 src/audio.py delete mode 100644 src/harmless.py delete mode 100644 src/images.py delete mode 100644 src/office.py delete mode 100644 src/parser_factory.py delete mode 100644 src/pdf.py delete mode 100644 src/torrent.py diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 90596a5..37f3b01 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -9,14 +9,14 @@ bandit: script: - apt-get -qqy update - apt-get -qqy install --no-install-recommends python3-bandit - - bandit -r ./src --format txt --skip B404,B603 + - bandit -r ./libmat2 --format txt --skip B404,B603 pyflakes: stage: linting script: - apt-get -qqy update - apt-get -qqy install --no-install-recommends pyflakes3 - - pyflakes3 ./src + - pyflakes3 ./libmat2 tests: stage: test @@ -24,4 +24,4 @@ tests: - apt-get -qqy update - apt-get -qqy install --no-install-recommends python3-mutagen python3-gi-cairo gir1.2-poppler-0.18 gir1.2-gdkpixbuf-2.0 libimage-exiftool-perl python3-coverage - python3-coverage run -m unittest discover -s tests/ - - python3-coverage report -m --include 'src/*' + - python3-coverage report -m --include 'libmat2/*' diff --git a/README.md b/README.md index 51f3c63..0499372 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ $ python3 -m unittest discover -v # Supported formats ```bash -$ python3 ./main.py -l +$ python3 ./mat2.py -l ``` # Related softwares diff --git a/libmat2/__init__.py b/libmat2/__init__.py new file mode 100644 index 0000000..07d3036 --- /dev/null +++ b/libmat2/__init__.py @@ -0,0 +1,6 @@ +#!/bin/env python3 + +# A set of extension that aren't supported, despite matching a supported mimetype +unsupported_extensions = set(['bat', 'c', 'h', 'ksh', 'pl', 'txt', 'asc', + 'text', 'pot', 'brf', 'srt', 'rdf', 'wsdl', + 'xpdl', 'xsl', 'xsd']) diff --git a/libmat2/abstract.py b/libmat2/abstract.py new file mode 100644 index 0000000..e4838a9 --- /dev/null +++ b/libmat2/abstract.py @@ -0,0 +1,24 @@ +import abc +import os + + +class AbstractParser(abc.ABC): + meta_list = set() + mimetypes = set() + + def __init__(self, filename: str): + self.filename = filename + fname, extension = os.path.splitext(filename) + self.output_filename = fname + '.cleaned' + extension + + @abc.abstractmethod + def get_meta(self) -> dict: + pass + + @abc.abstractmethod + def remove_all(self) -> bool: + pass + + def remove_all_lightweight(self) -> bool: + """ Remove _SOME_ metadata. """ + return self.remove_all() diff --git a/libmat2/audio.py b/libmat2/audio.py new file mode 100644 index 0000000..3a6aa79 --- /dev/null +++ b/libmat2/audio.py @@ -0,0 +1,39 @@ +import shutil + +import mutagen + +from . import abstract + + +class MutagenParser(abstract.AbstractParser): + def get_meta(self): + f = mutagen.File(self.filename) + if f.tags: + return {k:', '.join(v) for k, v in f.tags.items()} + return {} + + def remove_all(self): + shutil.copy(self.filename, self.output_filename) + f = mutagen.File(self.output_filename) + f.delete() + f.save() + return True + + +class MP3Parser(MutagenParser): + mimetypes = {'audio/mpeg', } + + def get_meta(self): + metadata = {} + meta = mutagen.File(self.filename).tags + for key in meta: + metadata[key.rstrip(' \t\r\n\0')] = ', '.join(map(str, meta[key].text)) + return metadata + + +class OGGParser(MutagenParser): + mimetypes = {'audio/ogg', } + + +class FLACParser(MutagenParser): + mimetypes = {'audio/flac', } diff --git a/libmat2/harmless.py b/libmat2/harmless.py new file mode 100644 index 0000000..aa00582 --- /dev/null +++ b/libmat2/harmless.py @@ -0,0 +1,17 @@ +from . import abstract + + +class HarmlessParser(abstract.AbstractParser): + """ This is the parser for filetypes that do not contain metadata. """ + mimetypes = {'application/xml', 'text/plain'} + + def __init__(self, filename: str): + super().__init__(filename) + self.filename = filename + self.output_filename = filename + + def get_meta(self): + return dict() + + def remove_all(self): + return True diff --git a/libmat2/images.py b/libmat2/images.py new file mode 100644 index 0000000..c84952a --- /dev/null +++ b/libmat2/images.py @@ -0,0 +1,101 @@ +import subprocess +import json +import os + +import cairo + +import gi +gi.require_version('GdkPixbuf', '2.0') +from gi.repository import GdkPixbuf + +from . import abstract + + +class PNGParser(abstract.AbstractParser): + mimetypes = {'image/png', } + meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', + 'Directory', 'FileSize', 'FileModifyDate', + 'FileAccessDate', 'FileInodeChangeDate', + 'FilePermissions', 'FileType', 'FileTypeExtension', + 'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType', + 'Compression', 'Filter', 'Interlace', 'BackgroundColor', + 'ImageSize', 'Megapixels', 'ImageHeight'} + + def __init__(self, filename): + super().__init__(filename) + try: # better fail here than later + cairo.ImageSurface.create_from_png(self.filename) + except MemoryError: + raise ValueError + + def get_meta(self): + out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename]) + meta = json.loads(out.decode('utf-8'))[0] + for key in self.meta_whitelist: + meta.pop(key, None) + return meta + + def remove_all(self): + surface = cairo.ImageSurface.create_from_png(self.filename) + surface.write_to_png(self.output_filename) + return True + + +class GdkPixbufAbstractParser(abstract.AbstractParser): + """ GdkPixbuf can handle a lot of surfaces, so we're rending images on it, + this has the side-effect of removing metadata completely. + """ + def get_meta(self): + out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename]) + meta = json.loads(out.decode('utf-8'))[0] + for key in self.meta_whitelist: + meta.pop(key, None) + return meta + + def remove_all(self): + _, extension = os.path.splitext(self.filename) + pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename) + if extension == '.jpg': + extension = '.jpeg' + pixbuf.savev(self.output_filename, extension[1:], [], []) + return True + + +class JPGParser(GdkPixbufAbstractParser): + mimetypes = {'image/jpeg'} + meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', + 'Directory', 'FileSize', 'FileModifyDate', + 'FileAccessDate', "FileInodeChangeDate", + 'FilePermissions', 'FileType', 'FileTypeExtension', + 'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample', + 'ColorComponents', 'EncodingProcess', 'JFIFVersion', + 'ResolutionUnit', 'XResolution', 'YCbCrSubSampling', + 'YResolution', 'Megapixels', 'ImageHeight'} + + +class TiffParser(GdkPixbufAbstractParser): + mimetypes = {'image/tiff'} + meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples', + 'FillOrder', 'PhotometricInterpretation', + 'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel', + 'StripByteCounts', 'StripOffsets', 'BitsPerSample', + 'Directory', 'ExifToolVersion', 'FileAccessDate', + 'FileInodeChangeDate', 'FileModifyDate', 'FileName', + 'FilePermissions', 'FileSize', 'FileType', + 'FileTypeExtension', 'ImageHeight', 'ImageSize', + 'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'} + + +class BMPParser(GdkPixbufAbstractParser): + mimetypes = {'image/x-ms-bmp'} + meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory', + 'FileSize', 'FileModifyDate', 'FileAccessDate', + 'FileInodeChangeDate', 'FilePermissions', 'FileType', + 'FileTypeExtension', 'MIMEType', 'BMPVersion', + 'ImageWidth', 'ImageHeight', 'Planes', 'BitDepth', + 'Compression', 'ImageLength', 'PixelsPerMeterX', + 'PixelsPerMeterY', 'NumColors', 'NumImportantColors', + 'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask', + 'ColorSpace', 'RedEndpoint', 'GreenEndpoint', + 'BlueEndpoint', 'GammaRed', 'GammaGreen', 'GammaBlue', + 'ImageSize', 'Megapixels'} diff --git a/libmat2/office.py b/libmat2/office.py new file mode 100644 index 0000000..749fc7d --- /dev/null +++ b/libmat2/office.py @@ -0,0 +1,150 @@ +import os +import re +import shutil +import tempfile +import datetime +import zipfile + +from . import abstract, parser_factory + + +class ArchiveBasedAbstractParser(abstract.AbstractParser): + def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: + zipinfo.compress_type = zipfile.ZIP_DEFLATED + zipinfo.create_system = 3 # Linux + zipinfo.comment = b'' + zipinfo.date_time = (1980, 1, 1, 0, 0, 0) + return zipinfo + + def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> dict: + metadata = {} + if zipinfo.create_system == 3: + #metadata['create_system'] = 'Linux' + pass + elif zipinfo.create_system == 2: + metadata['create_system'] = 'Windows' + else: + metadata['create_system'] = 'Weird' + + if zipinfo.comment: + metadata['comment'] = zipinfo.comment + + if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): + metadata['date_time'] = datetime.datetime(*zipinfo.date_time) + + return metadata + + + def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str, + zin: zipfile.ZipFile, zout: zipfile.ZipFile): + zin.extract(member=item, path=temp_folder) + tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename)) + if not tmp_parser: + print("%s's format (%s) isn't supported" % (item.filename, mtype)) + return + tmp_parser.remove_all() + zinfo = zipfile.ZipInfo(item.filename) + clean_zinfo = self._clean_zipinfo(zinfo) + with open(tmp_parser.output_filename, 'rb') as f: + zout.writestr(clean_zinfo, f.read()) + + +class MSOfficeParser(ArchiveBasedAbstractParser): + mimetypes = { + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation' + } + files_to_keep = {'_rels/.rels', 'word/_rels/document.xml.rels'} + + def get_meta(self): + """ + Yes, I know that parsing xml with regexp ain't pretty, + be my guest and fix it if you want. + """ + metadata = {} + zipin = zipfile.ZipFile(self.filename) + for item in zipin.infolist(): + if item.filename.startswith('docProps/') and item.filename.endswith('.xml'): + content = zipin.read(item).decode('utf-8') + for (key, value) in re.findall(r"<(.+)>(.+)", content, re.I): + metadata[key] = value + if not metadata: # better safe than sorry + metadata[item] = 'harmful content' + + metadata = {**metadata, **self._get_zipinfo_meta(item)} + zipin.close() + return metadata + + + def remove_all(self): + zin = zipfile.ZipFile(self.filename, 'r') + zout = zipfile.ZipFile(self.output_filename, 'w') + temp_folder = tempfile.mkdtemp() + + for item in zin.infolist(): + if item.filename[-1] == '/': + continue # `is_dir` is added in Python3.6 + elif item.filename.startswith('docProps/'): + if not item.filename.endswith('.rels'): + continue # don't keep metadata files + if item.filename in self.files_to_keep: + item = self._clean_zipinfo(item) + zout.writestr(item, zin.read(item)) + continue + + self._clean_internal_file(item, temp_folder, zin, zout) + + shutil.rmtree(temp_folder) + zout.close() + zin.close() + return True + + + +class LibreOfficeParser(ArchiveBasedAbstractParser): + mimetypes = { + 'application/vnd.oasis.opendocument.text', + 'application/vnd.oasis.opendocument.spreadsheet', + 'application/vnd.oasis.opendocument.presentation', + 'application/vnd.oasis.opendocument.graphics', + 'application/vnd.oasis.opendocument.chart', + 'application/vnd.oasis.opendocument.formula', + 'application/vnd.oasis.opendocument.image', + } + + def get_meta(self): + """ + Yes, I know that parsing xml with regexp ain't pretty, + be my guest and fix it if you want. + """ + metadata = {} + zipin = zipfile.ZipFile(self.filename) + for item in zipin.infolist(): + if item.filename == 'meta.xml': + content = zipin.read(item).decode('utf-8') + for (key, value) in re.findall(r"<((?:meta|dc|cp).+?)>(.+)", content, re.I): + metadata[key] = value + if not metadata: # better safe than sorry + metadata[item] = 'harmful content' + metadata = {**metadata, **self._get_zipinfo_meta(item)} + zipin.close() + return metadata + + def remove_all(self): + zin = zipfile.ZipFile(self.filename, 'r') + zout = zipfile.ZipFile(self.output_filename, 'w') + temp_folder = tempfile.mkdtemp() + + for item in zin.infolist(): + if item.filename[-1] == '/': + continue # `is_dir` is added in Python3.6 + elif item.filename == 'meta.xml': + continue # don't keep metadata files + + self._clean_internal_file(item, temp_folder, zin, zout) + + shutil.rmtree(temp_folder) + zout.close() + zin.close() + return True diff --git a/libmat2/parser_factory.py b/libmat2/parser_factory.py new file mode 100644 index 0000000..dbe68b9 --- /dev/null +++ b/libmat2/parser_factory.py @@ -0,0 +1,42 @@ +import os +import mimetypes +import importlib +import pkgutil +from typing import TypeVar + +from . import abstract, unsupported_extensions + + +T = TypeVar('T', bound='abstract.AbstractParser') + +# This loads every parser in a dynamic way +for module_loader, name, ispkg in pkgutil.walk_packages('.libmat2'): + if not name.startswith('libmat2.'): + continue + elif name == 'libmat2.abstract': + continue + importlib.import_module(name) + + +def _get_parsers() -> list: + """ Get all our parsers!""" + def __get_parsers(cls): + return cls.__subclasses__() + \ + [g for s in cls.__subclasses__() for g in __get_parsers(s)] + return __get_parsers(abstract.AbstractParser) + + +def get_parser(filename: str) -> (T, str): + mtype, _ = mimetypes.guess_type(filename) + + _, extension = os.path.splitext(filename) + if extension in unsupported_extensions: + return None, mtype + + for c in _get_parsers(): + if mtype in c.mimetypes: + try: + return c(filename), mtype + except ValueError: + return None, mtype + return None, mtype diff --git a/libmat2/pdf.py b/libmat2/pdf.py new file mode 100644 index 0000000..5b99192 --- /dev/null +++ b/libmat2/pdf.py @@ -0,0 +1,135 @@ +""" Handle PDF + +""" + +import os +import re +import logging +import tempfile +import io + +import cairo +import gi +gi.require_version('Poppler', '0.18') +from gi.repository import Poppler, GLib + +from . import abstract + +logging.basicConfig(level=logging.DEBUG) + + +class PDFParser(abstract.AbstractParser): + mimetypes = {'application/pdf', } + meta_list = {'author', 'creation-date', 'creator', 'format', 'keywords', + 'metadata', 'mod-date', 'producer', 'subject', 'title', + 'viewer-preferences'} + + def __init__(self, filename): + super().__init__(filename) + self.uri = 'file://' + os.path.abspath(self.filename) + self.__scale = 2 # how much precision do we want for the render + try: # Check now that the file is valid, to avoid surprises later + Poppler.Document.new_from_file(self.uri, None) + except GLib.GError: # Invalid PDF + raise ValueError + + def remove_all_lightweight(self): + """ + Load the document into Poppler, render pages on a new PDFSurface. + """ + document = Poppler.Document.new_from_file(self.uri, None) + pages_count = document.get_n_pages() + + tmp_path = tempfile.mkstemp()[1] + pdf_surface = cairo.PDFSurface(tmp_path, 10, 10) + pdf_context = cairo.Context(pdf_surface) # context draws on the surface + + for pagenum in range(pages_count): + logging.info("Rendering page %d/%d", pagenum + 1, pages_count) + page = document.get_page(pagenum) + page_width, page_height = page.get_size() + pdf_surface.set_size(page_width, page_height) + pdf_context.save() + page.render_for_printing(pdf_context) + pdf_context.restore() + pdf_context.show_page() # draw pdf_context on pdf_surface + pdf_surface.finish() + + self.__remove_superficial_meta(tmp_path, self.output_filename) + os.remove(tmp_path) + + return True + + def remove_all(self): + """ + Load the document into Poppler, render pages on PNG, + and shove those PNG into a new PDF. + """ + document = Poppler.Document.new_from_file(self.uri, None) + pages_count = document.get_n_pages() + + _, tmp_path = tempfile.mkstemp() + pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway + pdf_context = cairo.Context(pdf_surface) + + for pagenum in range(pages_count): + page = document.get_page(pagenum) + page_width, page_height = page.get_size() + logging.info("Rendering page %d/%d", pagenum + 1, pages_count) + + img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, int(page_width) * self.__scale, int(page_height) * self.__scale) + img_context = cairo.Context(img_surface) + + img_context.scale(self.__scale, self.__scale) + page.render_for_printing(img_context) + img_context.show_page() + + buf = io.BytesIO() + img_surface.write_to_png(buf) + img_surface.finish() + buf.seek(0) + + img = cairo.ImageSurface.create_from_png(buf) + pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale) + pdf_context.set_source_surface(img, 0, 0) + pdf_context.paint() + pdf_context.show_page() + + pdf_surface.finish() + + # Removes metadata added by Poppler + self.__remove_superficial_meta(tmp_path, self.output_filename) + os.remove(tmp_path) + + return True + + @staticmethod + def __remove_superficial_meta(in_file: str, out_file: str) -> bool: + document = Poppler.Document.new_from_file('file://' + in_file) + document.set_producer('') + document.set_creator('') + document.set_creation_date(-1) + document.save('file://' + os.path.abspath(out_file)) + return True + + + @staticmethod + def __parse_metadata_field(data: str) -> dict: + metadata = {} + for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)", data, re.I): + metadata[key] = value + return metadata + + def get_meta(self): + """ Return a dict with all the meta of the file + """ + metadata = {} + document = Poppler.Document.new_from_file(self.uri, None) + + for key in self.meta_list: + if document.get_property(key): + metadata[key] = document.get_property(key) + if 'metadata' in metadata: + parsed_meta = self.__parse_metadata_field(metadata['metadata']) + return {**metadata, **parsed_meta} + return metadata diff --git a/libmat2/torrent.py b/libmat2/torrent.py new file mode 100644 index 0000000..cb4b5e3 --- /dev/null +++ b/libmat2/torrent.py @@ -0,0 +1,126 @@ +from . import abstract + + +class TorrentParser(abstract.AbstractParser): + mimetypes = {'application/x-bittorrent', } + whitelist = {b'announce', b'announce-list', b'info'} + + def get_meta(self) -> dict: + metadata = {} + with open(self.filename, 'rb') as f: + d = _BencodeHandler().bdecode(f.read()) + if d is None: + return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename} + for k, v in d.items(): + if k not in self.whitelist: + metadata[k.decode('utf-8')] = v + return metadata + + + def remove_all(self) -> bool: + cleaned = dict() + with open(self.filename, 'rb') as f: + d = _BencodeHandler().bdecode(f.read()) + if d is None: + return False + for k, v in d.items(): + if k in self.whitelist: + cleaned[k] = v + with open(self.output_filename, 'wb') as f: + f.write(_BencodeHandler().bencode(cleaned)) + return True + + +class _BencodeHandler(object): + """ + Since bencode isn't that hard to parse, + MAT2 comes with its own parser, based on the spec + https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding + """ + def __init__(self): + self.__decode_func = { + ord('d'): self.__decode_dict, + ord('i'): self.__decode_int, + ord('l'): self.__decode_list, + } + for i in range(0, 10): + self.__decode_func[ord(str(i))] = self.__decode_string + + self.__encode_func = { + bytes: self.__encode_string, + dict: self.__encode_dict, + int: self.__encode_int, + list: self.__encode_list, + } + + @staticmethod + def __decode_int(s: str) -> (int, str): + s = s[1:] + next_idx = s.index(b'e') + if s.startswith(b'-0'): + raise ValueError # negative zero doesn't exist + elif s.startswith(b'0') and next_idx != 1: + raise ValueError # no leading zero except for zero itself + return int(s[:next_idx]), s[next_idx+1:] + + @staticmethod + def __decode_string(s: str) -> (str, str): + sep = s.index(b':') + str_len = int(s[:sep]) + if str_len < 0: + raise ValueError + elif s[0] == b'0' and sep != 1: + raise ValueError + s = s[1:] + return s[sep:sep+str_len], s[sep+str_len:] + + def __decode_list(self, s: str) -> (list, str): + r = list() + s = s[1:] # skip leading `l` + while s[0] != ord('e'): + v, s = self.__decode_func[s[0]](s) + r.append(v) + return r, s[1:] + + def __decode_dict(self, s: str) -> (dict, str): + r = dict() + s = s[1:] # skip leading `d` + while s[0] != ord(b'e'): + k, s = self.__decode_string(s) + r[k], s = self.__decode_func[s[0]](s) + return r, s[1:] + + @staticmethod + def __encode_int(x: str) -> bytes: + return b'i' + bytes(str(x), 'utf-8') + b'e' + + @staticmethod + def __encode_string(x: str) -> bytes: + return bytes((str(len(x))), 'utf-8') + b':' + x + + def __encode_list(self, x: str) -> bytes: + ret = b'' + for i in x: + ret += self.__encode_func[type(i)](i) + return b'l' + ret + b'e' + + def __encode_dict(self, x: str) -> bytes: + ret = b'' + for k, v in sorted(x.items()): + ret += self.__encode_func[type(k)](k) + ret += self.__encode_func[type(v)](v) + return b'd' + ret + b'e' + + def bencode(self, s: str) -> bytes: + return self.__encode_func[type(s)](s) + + def bdecode(self, s: str): + try: + r, l = self.__decode_func[s[0]](s) + except (IndexError, KeyError, ValueError) as e: + print("not a valid bencoded string: %s" % e) + return None + if l != b'': + print("invalid bencoded value (data after valid prefix)") + return None + return r diff --git a/main.py b/main.py deleted file mode 100755 index 55489be..0000000 --- a/main.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/python3 - -import os -from typing import Tuple -import sys -import itertools -import mimetypes -import argparse -import multiprocessing - -from src import parser_factory, unsupported_extensions - -__version__ = '0.1.1' - -def __check_file(filename: str, mode: int = os.R_OK) -> bool: - if not os.path.isfile(filename): - print("[-] %s is not a regular file." % filename) - return False - elif not os.access(filename, mode): - print("[-] %s is not readable and writeable." % filename) - return False - return True - - -def create_arg_parser(): - parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2') - parser.add_argument('files', nargs='*') - parser.add_argument('-v', '--version', action='version', - version='MAT2 %s' % __version__) - parser.add_argument('-l', '--list', action='store_true', - help='list all supported fileformats') - - info = parser.add_mutually_exclusive_group() - info.add_argument('-c', '--check', action='store_true', - help='check if a file is free of harmful metadatas') - info.add_argument('-s', '--show', action='store_true', - help='list all the harmful metadata of a file without removing them') - info.add_argument('-L', '--lightweight', action='store_true', - help='remove SOME metadata') - return parser - - -def show_meta(filename: str): - if not __check_file(filename): - return - - p, mtype = parser_factory.get_parser(filename) - if p is None: - print("[-] %s's format (%s) is not supported" % (filename, mtype)) - return - - print("[+] Metadata for %s:" % filename) - for k, v in p.get_meta().items(): - try: # FIXME this is ugly. - print(" %s: %s" % (k, v)) - except UnicodeEncodeError: - print(" %s: harmful content" % k) - -def clean_meta(params: Tuple[str, bool]) -> bool: - filename, is_lightweigth = params - if not __check_file(filename, os.R_OK|os.W_OK): - return False - - p, mtype = parser_factory.get_parser(filename) - if p is None: - print("[-] %s's format (%s) is not supported" % (filename, mtype)) - return False - if is_lightweigth: - return p.remove_all_lightweight() - return p.remove_all() - - -def show_parsers(): - print('[+] Supported formats:') - for parser in parser_factory._get_parsers(): - for mtype in parser.mimetypes: - extensions = set() - for extension in mimetypes.guess_all_extensions(mtype): - if extension[1:] not in unsupported_extensions: # skip the dot - extensions.add(extension) - if not extensions: - # we're not supporting a single extension in the current - # mimetype, so there is not point in showing the mimetype at all - continue - print(' - %s (%s)' % (mtype, ', '.join(extensions))) - - -def __get_files_recursively(files): - for f in files: - if os.path.isfile(f): - yield f - else: - for path, _, _files in os.walk(f): - for _f in _files: - yield os.path.join(path, _f) - -def main(): - arg_parser = create_arg_parser() - args = arg_parser.parse_args() - - if not args.files: - if not args.list: - return arg_parser.print_help() - show_parsers() - return 0 - - elif args.show: - for f in __get_files_recursively(args.files): - show_meta(f) - return 0 - - else: - p = multiprocessing.Pool() - mode = (args.lightweight is True) - l = zip(__get_files_recursively(args.files), itertools.repeat(mode)) - - ret = list(p.imap_unordered(clean_meta, list(l))) - return 0 if all(ret) else -1 - -if __name__ == '__main__': - sys.exit(main()) diff --git a/mat2.py b/mat2.py new file mode 100755 index 0000000..aa213ab --- /dev/null +++ b/mat2.py @@ -0,0 +1,121 @@ +#!/usr/bin/python3 + +import os +from typing import Tuple +import sys +import itertools +import mimetypes +import argparse +import multiprocessing + +from libmat2 import parser_factory, unsupported_extensions + +__version__ = '0.1.1' + +def __check_file(filename: str, mode: int = os.R_OK) -> bool: + if not os.path.isfile(filename): + print("[-] %s is not a regular file." % filename) + return False + elif not os.access(filename, mode): + print("[-] %s is not readable and writeable." % filename) + return False + return True + + +def create_arg_parser(): + parser = argparse.ArgumentParser(description='Metadata anonymisation toolkit 2') + parser.add_argument('files', nargs='*') + parser.add_argument('-v', '--version', action='version', + version='MAT2 %s' % __version__) + parser.add_argument('-l', '--list', action='store_true', + help='list all supported fileformats') + + info = parser.add_mutually_exclusive_group() + info.add_argument('-c', '--check', action='store_true', + help='check if a file is free of harmful metadatas') + info.add_argument('-s', '--show', action='store_true', + help='list all the harmful metadata of a file without removing them') + info.add_argument('-L', '--lightweight', action='store_true', + help='remove SOME metadata') + return parser + + +def show_meta(filename: str): + if not __check_file(filename): + return + + p, mtype = parser_factory.get_parser(filename) + if p is None: + print("[-] %s's format (%s) is not supported" % (filename, mtype)) + return + + print("[+] Metadata for %s:" % filename) + for k, v in p.get_meta().items(): + try: # FIXME this is ugly. + print(" %s: %s" % (k, v)) + except UnicodeEncodeError: + print(" %s: harmful content" % k) + +def clean_meta(params: Tuple[str, bool]) -> bool: + filename, is_lightweigth = params + if not __check_file(filename, os.R_OK|os.W_OK): + return False + + p, mtype = parser_factory.get_parser(filename) + if p is None: + print("[-] %s's format (%s) is not supported" % (filename, mtype)) + return False + if is_lightweigth: + return p.remove_all_lightweight() + return p.remove_all() + + +def show_parsers(): + print('[+] Supported formats:') + for parser in parser_factory._get_parsers(): + for mtype in parser.mimetypes: + extensions = set() + for extension in mimetypes.guess_all_extensions(mtype): + if extension[1:] not in unsupported_extensions: # skip the dot + extensions.add(extension) + if not extensions: + # we're not supporting a single extension in the current + # mimetype, so there is not point in showing the mimetype at all + continue + print(' - %s (%s)' % (mtype, ', '.join(extensions))) + + +def __get_files_recursively(files): + for f in files: + if os.path.isfile(f): + yield f + else: + for path, _, _files in os.walk(f): + for _f in _files: + yield os.path.join(path, _f) + +def main(): + arg_parser = create_arg_parser() + args = arg_parser.parse_args() + + if not args.files: + if not args.list: + return arg_parser.print_help() + show_parsers() + return 0 + + elif args.show: + for f in __get_files_recursively(args.files): + show_meta(f) + return 0 + + else: + p = multiprocessing.Pool() + mode = (args.lightweight is True) + l = zip(__get_files_recursively(args.files), itertools.repeat(mode)) + + ret = list(p.imap_unordered(clean_meta, list(l))) + return 0 if all(ret) else -1 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index 07d3036..0000000 --- a/src/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/env python3 - -# A set of extension that aren't supported, despite matching a supported mimetype -unsupported_extensions = set(['bat', 'c', 'h', 'ksh', 'pl', 'txt', 'asc', - 'text', 'pot', 'brf', 'srt', 'rdf', 'wsdl', - 'xpdl', 'xsl', 'xsd']) diff --git a/src/abstract.py b/src/abstract.py deleted file mode 100644 index e4838a9..0000000 --- a/src/abstract.py +++ /dev/null @@ -1,24 +0,0 @@ -import abc -import os - - -class AbstractParser(abc.ABC): - meta_list = set() - mimetypes = set() - - def __init__(self, filename: str): - self.filename = filename - fname, extension = os.path.splitext(filename) - self.output_filename = fname + '.cleaned' + extension - - @abc.abstractmethod - def get_meta(self) -> dict: - pass - - @abc.abstractmethod - def remove_all(self) -> bool: - pass - - def remove_all_lightweight(self) -> bool: - """ Remove _SOME_ metadata. """ - return self.remove_all() diff --git a/src/audio.py b/src/audio.py deleted file mode 100644 index 3a6aa79..0000000 --- a/src/audio.py +++ /dev/null @@ -1,39 +0,0 @@ -import shutil - -import mutagen - -from . import abstract - - -class MutagenParser(abstract.AbstractParser): - def get_meta(self): - f = mutagen.File(self.filename) - if f.tags: - return {k:', '.join(v) for k, v in f.tags.items()} - return {} - - def remove_all(self): - shutil.copy(self.filename, self.output_filename) - f = mutagen.File(self.output_filename) - f.delete() - f.save() - return True - - -class MP3Parser(MutagenParser): - mimetypes = {'audio/mpeg', } - - def get_meta(self): - metadata = {} - meta = mutagen.File(self.filename).tags - for key in meta: - metadata[key.rstrip(' \t\r\n\0')] = ', '.join(map(str, meta[key].text)) - return metadata - - -class OGGParser(MutagenParser): - mimetypes = {'audio/ogg', } - - -class FLACParser(MutagenParser): - mimetypes = {'audio/flac', } diff --git a/src/harmless.py b/src/harmless.py deleted file mode 100644 index aa00582..0000000 --- a/src/harmless.py +++ /dev/null @@ -1,17 +0,0 @@ -from . import abstract - - -class HarmlessParser(abstract.AbstractParser): - """ This is the parser for filetypes that do not contain metadata. """ - mimetypes = {'application/xml', 'text/plain'} - - def __init__(self, filename: str): - super().__init__(filename) - self.filename = filename - self.output_filename = filename - - def get_meta(self): - return dict() - - def remove_all(self): - return True diff --git a/src/images.py b/src/images.py deleted file mode 100644 index c84952a..0000000 --- a/src/images.py +++ /dev/null @@ -1,101 +0,0 @@ -import subprocess -import json -import os - -import cairo - -import gi -gi.require_version('GdkPixbuf', '2.0') -from gi.repository import GdkPixbuf - -from . import abstract - - -class PNGParser(abstract.AbstractParser): - mimetypes = {'image/png', } - meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', - 'Directory', 'FileSize', 'FileModifyDate', - 'FileAccessDate', 'FileInodeChangeDate', - 'FilePermissions', 'FileType', 'FileTypeExtension', - 'MIMEType', 'ImageWidth', 'BitDepth', 'ColorType', - 'Compression', 'Filter', 'Interlace', 'BackgroundColor', - 'ImageSize', 'Megapixels', 'ImageHeight'} - - def __init__(self, filename): - super().__init__(filename) - try: # better fail here than later - cairo.ImageSurface.create_from_png(self.filename) - except MemoryError: - raise ValueError - - def get_meta(self): - out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename]) - meta = json.loads(out.decode('utf-8'))[0] - for key in self.meta_whitelist: - meta.pop(key, None) - return meta - - def remove_all(self): - surface = cairo.ImageSurface.create_from_png(self.filename) - surface.write_to_png(self.output_filename) - return True - - -class GdkPixbufAbstractParser(abstract.AbstractParser): - """ GdkPixbuf can handle a lot of surfaces, so we're rending images on it, - this has the side-effect of removing metadata completely. - """ - def get_meta(self): - out = subprocess.check_output(['/usr/bin/exiftool', '-json', self.filename]) - meta = json.loads(out.decode('utf-8'))[0] - for key in self.meta_whitelist: - meta.pop(key, None) - return meta - - def remove_all(self): - _, extension = os.path.splitext(self.filename) - pixbuf = GdkPixbuf.Pixbuf.new_from_file(self.filename) - if extension == '.jpg': - extension = '.jpeg' - pixbuf.savev(self.output_filename, extension[1:], [], []) - return True - - -class JPGParser(GdkPixbufAbstractParser): - mimetypes = {'image/jpeg'} - meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', - 'Directory', 'FileSize', 'FileModifyDate', - 'FileAccessDate', "FileInodeChangeDate", - 'FilePermissions', 'FileType', 'FileTypeExtension', - 'MIMEType', 'ImageWidth', 'ImageSize', 'BitsPerSample', - 'ColorComponents', 'EncodingProcess', 'JFIFVersion', - 'ResolutionUnit', 'XResolution', 'YCbCrSubSampling', - 'YResolution', 'Megapixels', 'ImageHeight'} - - -class TiffParser(GdkPixbufAbstractParser): - mimetypes = {'image/tiff'} - meta_whitelist = {'Compression', 'ExifByteOrder', 'ExtraSamples', - 'FillOrder', 'PhotometricInterpretation', - 'PlanarConfiguration', 'RowsPerStrip', 'SamplesPerPixel', - 'StripByteCounts', 'StripOffsets', 'BitsPerSample', - 'Directory', 'ExifToolVersion', 'FileAccessDate', - 'FileInodeChangeDate', 'FileModifyDate', 'FileName', - 'FilePermissions', 'FileSize', 'FileType', - 'FileTypeExtension', 'ImageHeight', 'ImageSize', - 'ImageWidth', 'MIMEType', 'Megapixels', 'SourceFile'} - - -class BMPParser(GdkPixbufAbstractParser): - mimetypes = {'image/x-ms-bmp'} - meta_whitelist = {'SourceFile', 'ExifToolVersion', 'FileName', 'Directory', - 'FileSize', 'FileModifyDate', 'FileAccessDate', - 'FileInodeChangeDate', 'FilePermissions', 'FileType', - 'FileTypeExtension', 'MIMEType', 'BMPVersion', - 'ImageWidth', 'ImageHeight', 'Planes', 'BitDepth', - 'Compression', 'ImageLength', 'PixelsPerMeterX', - 'PixelsPerMeterY', 'NumColors', 'NumImportantColors', - 'RedMask', 'GreenMask', 'BlueMask', 'AlphaMask', - 'ColorSpace', 'RedEndpoint', 'GreenEndpoint', - 'BlueEndpoint', 'GammaRed', 'GammaGreen', 'GammaBlue', - 'ImageSize', 'Megapixels'} diff --git a/src/office.py b/src/office.py deleted file mode 100644 index 749fc7d..0000000 --- a/src/office.py +++ /dev/null @@ -1,150 +0,0 @@ -import os -import re -import shutil -import tempfile -import datetime -import zipfile - -from . import abstract, parser_factory - - -class ArchiveBasedAbstractParser(abstract.AbstractParser): - def _clean_zipinfo(self, zipinfo: zipfile.ZipInfo) -> zipfile.ZipInfo: - zipinfo.compress_type = zipfile.ZIP_DEFLATED - zipinfo.create_system = 3 # Linux - zipinfo.comment = b'' - zipinfo.date_time = (1980, 1, 1, 0, 0, 0) - return zipinfo - - def _get_zipinfo_meta(self, zipinfo: zipfile.ZipInfo) -> dict: - metadata = {} - if zipinfo.create_system == 3: - #metadata['create_system'] = 'Linux' - pass - elif zipinfo.create_system == 2: - metadata['create_system'] = 'Windows' - else: - metadata['create_system'] = 'Weird' - - if zipinfo.comment: - metadata['comment'] = zipinfo.comment - - if zipinfo.date_time != (1980, 1, 1, 0, 0, 0): - metadata['date_time'] = datetime.datetime(*zipinfo.date_time) - - return metadata - - - def _clean_internal_file(self, item: zipfile.ZipInfo, temp_folder: str, - zin: zipfile.ZipFile, zout: zipfile.ZipFile): - zin.extract(member=item, path=temp_folder) - tmp_parser, mtype = parser_factory.get_parser(os.path.join(temp_folder, item.filename)) - if not tmp_parser: - print("%s's format (%s) isn't supported" % (item.filename, mtype)) - return - tmp_parser.remove_all() - zinfo = zipfile.ZipInfo(item.filename) - clean_zinfo = self._clean_zipinfo(zinfo) - with open(tmp_parser.output_filename, 'rb') as f: - zout.writestr(clean_zinfo, f.read()) - - -class MSOfficeParser(ArchiveBasedAbstractParser): - mimetypes = { - 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', - 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', - 'application/vnd.openxmlformats-officedocument.presentationml.presentation' - } - files_to_keep = {'_rels/.rels', 'word/_rels/document.xml.rels'} - - def get_meta(self): - """ - Yes, I know that parsing xml with regexp ain't pretty, - be my guest and fix it if you want. - """ - metadata = {} - zipin = zipfile.ZipFile(self.filename) - for item in zipin.infolist(): - if item.filename.startswith('docProps/') and item.filename.endswith('.xml'): - content = zipin.read(item).decode('utf-8') - for (key, value) in re.findall(r"<(.+)>(.+)", content, re.I): - metadata[key] = value - if not metadata: # better safe than sorry - metadata[item] = 'harmful content' - - metadata = {**metadata, **self._get_zipinfo_meta(item)} - zipin.close() - return metadata - - - def remove_all(self): - zin = zipfile.ZipFile(self.filename, 'r') - zout = zipfile.ZipFile(self.output_filename, 'w') - temp_folder = tempfile.mkdtemp() - - for item in zin.infolist(): - if item.filename[-1] == '/': - continue # `is_dir` is added in Python3.6 - elif item.filename.startswith('docProps/'): - if not item.filename.endswith('.rels'): - continue # don't keep metadata files - if item.filename in self.files_to_keep: - item = self._clean_zipinfo(item) - zout.writestr(item, zin.read(item)) - continue - - self._clean_internal_file(item, temp_folder, zin, zout) - - shutil.rmtree(temp_folder) - zout.close() - zin.close() - return True - - - -class LibreOfficeParser(ArchiveBasedAbstractParser): - mimetypes = { - 'application/vnd.oasis.opendocument.text', - 'application/vnd.oasis.opendocument.spreadsheet', - 'application/vnd.oasis.opendocument.presentation', - 'application/vnd.oasis.opendocument.graphics', - 'application/vnd.oasis.opendocument.chart', - 'application/vnd.oasis.opendocument.formula', - 'application/vnd.oasis.opendocument.image', - } - - def get_meta(self): - """ - Yes, I know that parsing xml with regexp ain't pretty, - be my guest and fix it if you want. - """ - metadata = {} - zipin = zipfile.ZipFile(self.filename) - for item in zipin.infolist(): - if item.filename == 'meta.xml': - content = zipin.read(item).decode('utf-8') - for (key, value) in re.findall(r"<((?:meta|dc|cp).+?)>(.+)", content, re.I): - metadata[key] = value - if not metadata: # better safe than sorry - metadata[item] = 'harmful content' - metadata = {**metadata, **self._get_zipinfo_meta(item)} - zipin.close() - return metadata - - def remove_all(self): - zin = zipfile.ZipFile(self.filename, 'r') - zout = zipfile.ZipFile(self.output_filename, 'w') - temp_folder = tempfile.mkdtemp() - - for item in zin.infolist(): - if item.filename[-1] == '/': - continue # `is_dir` is added in Python3.6 - elif item.filename == 'meta.xml': - continue # don't keep metadata files - - self._clean_internal_file(item, temp_folder, zin, zout) - - shutil.rmtree(temp_folder) - zout.close() - zin.close() - return True diff --git a/src/parser_factory.py b/src/parser_factory.py deleted file mode 100644 index 48616b0..0000000 --- a/src/parser_factory.py +++ /dev/null @@ -1,42 +0,0 @@ -import os -import mimetypes -import importlib -import pkgutil -from typing import TypeVar - -from . import abstract, unsupported_extensions - - -T = TypeVar('T', bound='abstract.AbstractParser') - -# This loads every parser in a dynamic way -for module_loader, name, ispkg in pkgutil.walk_packages('.src'): - if not name.startswith('src.'): - continue - elif name == 'src.abstract': - continue - importlib.import_module(name) - - -def _get_parsers() -> list: - """ Get all our parsers!""" - def __get_parsers(cls): - return cls.__subclasses__() + \ - [g for s in cls.__subclasses__() for g in __get_parsers(s)] - return __get_parsers(abstract.AbstractParser) - - -def get_parser(filename: str) -> (T, str): - mtype, _ = mimetypes.guess_type(filename) - - _, extension = os.path.splitext(filename) - if extension in unsupported_extensions: - return None, mtype - - for c in _get_parsers(): - if mtype in c.mimetypes: - try: - return c(filename), mtype - except ValueError: - return None, mtype - return None, mtype diff --git a/src/pdf.py b/src/pdf.py deleted file mode 100644 index 5b99192..0000000 --- a/src/pdf.py +++ /dev/null @@ -1,135 +0,0 @@ -""" Handle PDF - -""" - -import os -import re -import logging -import tempfile -import io - -import cairo -import gi -gi.require_version('Poppler', '0.18') -from gi.repository import Poppler, GLib - -from . import abstract - -logging.basicConfig(level=logging.DEBUG) - - -class PDFParser(abstract.AbstractParser): - mimetypes = {'application/pdf', } - meta_list = {'author', 'creation-date', 'creator', 'format', 'keywords', - 'metadata', 'mod-date', 'producer', 'subject', 'title', - 'viewer-preferences'} - - def __init__(self, filename): - super().__init__(filename) - self.uri = 'file://' + os.path.abspath(self.filename) - self.__scale = 2 # how much precision do we want for the render - try: # Check now that the file is valid, to avoid surprises later - Poppler.Document.new_from_file(self.uri, None) - except GLib.GError: # Invalid PDF - raise ValueError - - def remove_all_lightweight(self): - """ - Load the document into Poppler, render pages on a new PDFSurface. - """ - document = Poppler.Document.new_from_file(self.uri, None) - pages_count = document.get_n_pages() - - tmp_path = tempfile.mkstemp()[1] - pdf_surface = cairo.PDFSurface(tmp_path, 10, 10) - pdf_context = cairo.Context(pdf_surface) # context draws on the surface - - for pagenum in range(pages_count): - logging.info("Rendering page %d/%d", pagenum + 1, pages_count) - page = document.get_page(pagenum) - page_width, page_height = page.get_size() - pdf_surface.set_size(page_width, page_height) - pdf_context.save() - page.render_for_printing(pdf_context) - pdf_context.restore() - pdf_context.show_page() # draw pdf_context on pdf_surface - pdf_surface.finish() - - self.__remove_superficial_meta(tmp_path, self.output_filename) - os.remove(tmp_path) - - return True - - def remove_all(self): - """ - Load the document into Poppler, render pages on PNG, - and shove those PNG into a new PDF. - """ - document = Poppler.Document.new_from_file(self.uri, None) - pages_count = document.get_n_pages() - - _, tmp_path = tempfile.mkstemp() - pdf_surface = cairo.PDFSurface(tmp_path, 32, 32) # resized later anyway - pdf_context = cairo.Context(pdf_surface) - - for pagenum in range(pages_count): - page = document.get_page(pagenum) - page_width, page_height = page.get_size() - logging.info("Rendering page %d/%d", pagenum + 1, pages_count) - - img_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, int(page_width) * self.__scale, int(page_height) * self.__scale) - img_context = cairo.Context(img_surface) - - img_context.scale(self.__scale, self.__scale) - page.render_for_printing(img_context) - img_context.show_page() - - buf = io.BytesIO() - img_surface.write_to_png(buf) - img_surface.finish() - buf.seek(0) - - img = cairo.ImageSurface.create_from_png(buf) - pdf_surface.set_size(page_width*self.__scale, page_height*self.__scale) - pdf_context.set_source_surface(img, 0, 0) - pdf_context.paint() - pdf_context.show_page() - - pdf_surface.finish() - - # Removes metadata added by Poppler - self.__remove_superficial_meta(tmp_path, self.output_filename) - os.remove(tmp_path) - - return True - - @staticmethod - def __remove_superficial_meta(in_file: str, out_file: str) -> bool: - document = Poppler.Document.new_from_file('file://' + in_file) - document.set_producer('') - document.set_creator('') - document.set_creation_date(-1) - document.save('file://' + os.path.abspath(out_file)) - return True - - - @staticmethod - def __parse_metadata_field(data: str) -> dict: - metadata = {} - for (_, key, value) in re.findall(r"<(xmp|pdfx|pdf|xmpMM):(.+)>(.+)", data, re.I): - metadata[key] = value - return metadata - - def get_meta(self): - """ Return a dict with all the meta of the file - """ - metadata = {} - document = Poppler.Document.new_from_file(self.uri, None) - - for key in self.meta_list: - if document.get_property(key): - metadata[key] = document.get_property(key) - if 'metadata' in metadata: - parsed_meta = self.__parse_metadata_field(metadata['metadata']) - return {**metadata, **parsed_meta} - return metadata diff --git a/src/torrent.py b/src/torrent.py deleted file mode 100644 index cb4b5e3..0000000 --- a/src/torrent.py +++ /dev/null @@ -1,126 +0,0 @@ -from . import abstract - - -class TorrentParser(abstract.AbstractParser): - mimetypes = {'application/x-bittorrent', } - whitelist = {b'announce', b'announce-list', b'info'} - - def get_meta(self) -> dict: - metadata = {} - with open(self.filename, 'rb') as f: - d = _BencodeHandler().bdecode(f.read()) - if d is None: - return {'Unknown meta': 'Unable to parse torrent file "%s".' % self.filename} - for k, v in d.items(): - if k not in self.whitelist: - metadata[k.decode('utf-8')] = v - return metadata - - - def remove_all(self) -> bool: - cleaned = dict() - with open(self.filename, 'rb') as f: - d = _BencodeHandler().bdecode(f.read()) - if d is None: - return False - for k, v in d.items(): - if k in self.whitelist: - cleaned[k] = v - with open(self.output_filename, 'wb') as f: - f.write(_BencodeHandler().bencode(cleaned)) - return True - - -class _BencodeHandler(object): - """ - Since bencode isn't that hard to parse, - MAT2 comes with its own parser, based on the spec - https://wiki.theory.org/index.php/BitTorrentSpecification#Bencoding - """ - def __init__(self): - self.__decode_func = { - ord('d'): self.__decode_dict, - ord('i'): self.__decode_int, - ord('l'): self.__decode_list, - } - for i in range(0, 10): - self.__decode_func[ord(str(i))] = self.__decode_string - - self.__encode_func = { - bytes: self.__encode_string, - dict: self.__encode_dict, - int: self.__encode_int, - list: self.__encode_list, - } - - @staticmethod - def __decode_int(s: str) -> (int, str): - s = s[1:] - next_idx = s.index(b'e') - if s.startswith(b'-0'): - raise ValueError # negative zero doesn't exist - elif s.startswith(b'0') and next_idx != 1: - raise ValueError # no leading zero except for zero itself - return int(s[:next_idx]), s[next_idx+1:] - - @staticmethod - def __decode_string(s: str) -> (str, str): - sep = s.index(b':') - str_len = int(s[:sep]) - if str_len < 0: - raise ValueError - elif s[0] == b'0' and sep != 1: - raise ValueError - s = s[1:] - return s[sep:sep+str_len], s[sep+str_len:] - - def __decode_list(self, s: str) -> (list, str): - r = list() - s = s[1:] # skip leading `l` - while s[0] != ord('e'): - v, s = self.__decode_func[s[0]](s) - r.append(v) - return r, s[1:] - - def __decode_dict(self, s: str) -> (dict, str): - r = dict() - s = s[1:] # skip leading `d` - while s[0] != ord(b'e'): - k, s = self.__decode_string(s) - r[k], s = self.__decode_func[s[0]](s) - return r, s[1:] - - @staticmethod - def __encode_int(x: str) -> bytes: - return b'i' + bytes(str(x), 'utf-8') + b'e' - - @staticmethod - def __encode_string(x: str) -> bytes: - return bytes((str(len(x))), 'utf-8') + b':' + x - - def __encode_list(self, x: str) -> bytes: - ret = b'' - for i in x: - ret += self.__encode_func[type(i)](i) - return b'l' + ret + b'e' - - def __encode_dict(self, x: str) -> bytes: - ret = b'' - for k, v in sorted(x.items()): - ret += self.__encode_func[type(k)](k) - ret += self.__encode_func[type(v)](v) - return b'd' + ret + b'e' - - def bencode(self, s: str) -> bytes: - return self.__encode_func[type(s)](s) - - def bdecode(self, s: str): - try: - r, l = self.__decode_func[s[0]](s) - except (IndexError, KeyError, ValueError) as e: - print("not a valid bencoded string: %s" % e) - return None - if l != b'': - print("invalid bencoded value (data after valid prefix)") - return None - return r diff --git a/tests/test_climat2.py b/tests/test_climat2.py index 864ee0d..36973bf 100644 --- a/tests/test_climat2.py +++ b/tests/test_climat2.py @@ -6,43 +6,43 @@ import unittest class TestHelp(unittest.TestCase): def test_help(self): - proc = subprocess.Popen(['./main.py', '--help'], stdout=subprocess.PIPE) + proc = subprocess.Popen(['./mat2.py', '--help'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'usage: main.py [-h] [-v] [-l] [-c | -s | -L] [files [files ...]]', stdout) + self.assertIn(b'usage: mat2.py [-h] [-v] [-l] [-c | -s | -L] [files [files ...]]', stdout) def test_no_arg(self): - proc = subprocess.Popen(['./main.py'], stdout=subprocess.PIPE) + proc = subprocess.Popen(['./mat2.py'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - self.assertIn(b'usage: main.py [-h] [-v] [-l] [-c | -s | -L] [files [files ...]]', stdout) + self.assertIn(b'usage: mat2.py [-h] [-v] [-l] [-c | -s | -L] [files [files ...]]', stdout) class TestVersion(unittest.TestCase): def test_version(self): - proc = subprocess.Popen(['./main.py', '--version'], stdout=subprocess.PIPE) + proc = subprocess.Popen(['./mat2.py', '--version'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertTrue(stdout.startswith(b'MAT2 ')) class TestExclusiveArgs(unittest.TestCase): def test_version(self): - proc = subprocess.Popen(['./main.py', '-s', '-c'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen(['./mat2.py', '-s', '-c'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = proc.communicate() - self.assertIn(b'main.py: error: argument -c/--check: not allowed with argument -s/--show', stderr) + self.assertIn(b'mat2.py: error: argument -c/--check: not allowed with argument -s/--show', stderr) class TestReturnValue(unittest.TestCase): def test_nonzero(self): - ret = subprocess.call(['./main.py', './main.py'], stdout=subprocess.DEVNULL) + ret = subprocess.call(['./mat2.py', './mat2.py'], stdout=subprocess.DEVNULL) self.assertEqual(255, ret) - ret = subprocess.call(['./main.py', '--whololo'], stderr=subprocess.DEVNULL) + ret = subprocess.call(['./mat2.py', '--whololo'], stderr=subprocess.DEVNULL) self.assertEqual(2, ret) def test_zero(self): - ret = subprocess.call(['./main.py'], stdout=subprocess.DEVNULL) + ret = subprocess.call(['./mat2.py'], stdout=subprocess.DEVNULL) self.assertEqual(0, ret) - ret = subprocess.call(['./main.py', '--show', './main.py'], stdout=subprocess.DEVNULL) + ret = subprocess.call(['./mat2.py', '--show', './mat2.py'], stdout=subprocess.DEVNULL) self.assertEqual(0, ret) @@ -50,16 +50,16 @@ class TestCleanMeta(unittest.TestCase): def test_jpg(self): shutil.copy('./tests/data/dirty.jpg', './tests/data/clean.jpg') - proc = subprocess.Popen(['./main.py', '--show', './tests/data/clean.jpg'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/clean.jpg'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'Comment: Created with GIMP', stdout) - proc = subprocess.Popen(['./main.py', './tests/data/clean.jpg'], + proc = subprocess.Popen(['./mat2.py', './tests/data/clean.jpg'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() - proc = subprocess.Popen(['./main.py', '--show', './tests/data/clean.cleaned.jpg'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/clean.cleaned.jpg'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertNotIn(b'Comment: Created with GIMP', stdout) @@ -69,25 +69,25 @@ class TestCleanMeta(unittest.TestCase): class TestGetMeta(unittest.TestCase): def test_pdf(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.pdf'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.pdf'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'producer: pdfTeX-1.40.14', stdout) def test_png(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.png'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.png'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'Comment: This is a comment, be careful!', stdout) def test_jpg(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.jpg'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.jpg'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'Comment: Created with GIMP', stdout) def test_docx(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.docx'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.docx'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'Application: LibreOffice/5.4.5.1$Linux_X86_64', stdout) @@ -95,7 +95,7 @@ class TestGetMeta(unittest.TestCase): self.assertIn(b'revision: 1', stdout) def test_odt(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.odt'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.odt'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'generator: LibreOffice/3.3$Unix', stdout) @@ -103,14 +103,14 @@ class TestGetMeta(unittest.TestCase): self.assertIn(b'date_time: 2011-07-26 02:40:16', stdout) def test_mp3(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.mp3'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.mp3'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'TALB: harmfull', stdout) self.assertIn(b'COMM::: Thank you for using MAT !', stdout) def test_flac(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.flac'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.flac'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'comments: Thank you for using MAT !', stdout) @@ -118,7 +118,7 @@ class TestGetMeta(unittest.TestCase): self.assertIn(b'title: I am so', stdout) def test_ogg(self): - proc = subprocess.Popen(['./main.py', '--show', './tests/data/dirty.ogg'], + proc = subprocess.Popen(['./mat2.py', '--show', './tests/data/dirty.ogg'], stdout=subprocess.PIPE) stdout, _ = proc.communicate() self.assertIn(b'comments: Thank you for using MAT !', stdout) diff --git a/tests/test_libmat2.py b/tests/test_libmat2.py index f3e11d9..89a5811 100644 --- a/tests/test_libmat2.py +++ b/tests/test_libmat2.py @@ -6,7 +6,7 @@ import os import zipfile import tempfile -from src import pdf, images, audio, office, parser_factory, torrent +from libmat2 import pdf, images, audio, office, parser_factory, torrent class TestParserFactory(unittest.TestCase): -- cgit v1.3