Source code for duplicity.backends._testbackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto <ben@emerose.org>
# Copyright 2007 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import glob
import hashlib
import inspect
import json
import logging
import os
from random import random
import re
import sys
import time

import duplicity.backend
from duplicity import (
    log,
    path,
    progress,
)
from duplicity.errors import BackendException


[docs]class BackendErrors: # FAIL_WITH_EXCEPTION: set to substring matched by the filename. FAIL_WITH_EXCEPTION = "DUP_FAIL_WITH_EXCEPTION" # SYSTEM_EXIT: set to substring matched by the filename. FAIL_SYSTEM_EXIT = "DUP_FAIL_BY_SYSTEM_EXIT" # WAIT_FOR_OTHER_VOLUME: set to json string: ["file_to_delay", "file_wait_for"] (substing match) WAIT_FOR_OTHER_VOLUME = "DUP_FAIL_WAIT_FOR_VOLUME" # LAST_BYTE_MISSING: set to substring matched by the filename. LAST_BYTE_MISSING = "DUP_FAIL_LAST_BYTE_MISSING" # SKIP_PUT_SILENT: Don't put file if name contains string SKIP_PUT_SILENT = "DUP_FAIL_SKIP_PUT_SILENT" # RANDOM_DELAY: max random delay in ms added to command DELAY_RANDOM_MS = "DUP_FAIL_DELAY_RANDOM_MS"
[docs]class _TestBackend(duplicity.backend.Backend): """Use this backend to test/create certain error situations errors get triggered via ENV Vars, see class BackendErrors """
[docs] def __init__(self, parsed_url): super().__init__(parsed_url) log._logger.addHandler(logging.FileHandler("/tmp/testbackend.log")) log.Warn("TestBackend is not made for production use!") # The URL form "file:MyFile" is not a valid duplicity target. if not parsed_url.path.startswith("//"): raise BackendException("Bad file:// path syntax.") self.remote_pathdir = path.Path(parsed_url.path[2:]) try: os.makedirs(self.remote_pathdir.base) except Exception: pass
[docs] @staticmethod def _fail_with_exception(filename): filename = os.fsdecode(filename) log.Debug(f"DUB_FAIL: Check if fail on exception for {filename}. Called by {inspect.stack()[2][3]}.") if os.getenv(BackendErrors.FAIL_WITH_EXCEPTION) and os.getenv(BackendErrors.FAIL_WITH_EXCEPTION) in filename: log.Error(f"DUB_FAIL: Force exception on file {filename}.") raise FileNotFoundError(f"TEST: raised exception on {filename} by intention")
[docs] @staticmethod def _fail_by_sys_exit(filename): filename = os.fsdecode(filename) log.Debug(f"DUB_FAIL: Check if do sys.exit(30) for {filename}. Called by {inspect.stack()[2][3]}.") if os.getenv(BackendErrors.FAIL_SYSTEM_EXIT) and os.getenv(BackendErrors.FAIL_SYSTEM_EXIT) in filename: log.Error(f"DUB_FAIL: Force sys.exit(30) on file {filename}.") time.sleep(1) # otherwise log message doesn't get printed. sys.exit(30)
[docs] def _wait_for_other_volume(self, filename): filename = os.fsdecode(filename) log.Debug(f"DUB_FAIL: Check if action on {filename} shoud be delayed. Called by {inspect.stack()[2][3]}.") env = os.getenv(BackendErrors.WAIT_FOR_OTHER_VOLUME) if not env: return # skip if env not set file_stop, file_waitfor = json.loads(env) timestamp = re.match(r".*\.(\d{8}T\d{6}[-+0-9:Z]*)\..*", filename).group(1) file_waitfor_glob = f"{os.fsdecode(self.remote_pathdir.get_canonical())}/*{timestamp}*{file_waitfor}*" if file_stop in filename: while not glob.glob(file_waitfor_glob): log.Error(f"DUB_FAIL: Waiting for file matiching {file_waitfor_glob}.") time.sleep(1) log.Warn(f"DUB_FAIL: {filename} written after {glob.glob(file_waitfor_glob)}")
[docs] def _remove_last_byte(self, filename): filename = os.fsdecode(filename) log.Debug(f"DUB_FAIL: Check if {filename} shoud be truncated. Called by {inspect.stack()[2][3]}.") if os.getenv(BackendErrors.LAST_BYTE_MISSING) and os.getenv(BackendErrors.LAST_BYTE_MISSING) in filename: log.Error(f"DUB_FAIL: removing last byte from {filename}") with open(self.remote_pathdir.append(filename).get_canonical(), "ab") as remote_file: remote_file.seek(-1, os.SEEK_END) remote_file.truncate()
[docs] @staticmethod def _skip_put_silent(filename): """ retrun true if file should be skipped silently """ filename = os.fsdecode(filename) log.Debug(f"DUB_FAIL: Check if {filename} should be skipped. Called by {inspect.stack()[2][3]}.") if os.getenv(BackendErrors.SKIP_PUT_SILENT) and os.getenv(BackendErrors.SKIP_PUT_SILENT) in filename: log.Error(f"DUB_FAIL: {filename} skipped silent.") return True return False
[docs] @staticmethod def _delay_random(): """ sleep a random amount of milliseconds if ENV set """ log.Debug("DUB_FAIL: Check if action should be delayed.") if os.getenv(BackendErrors.DELAY_RANDOM_MS): wait = random() * float(os.getenv(BackendErrors.DELAY_RANDOM_MS)) / 1000 # type: ignore log.Error(f"DUB_FAIL: wait for {wait} millisec.") time.sleep(wait)
[docs] def _move(self, source_path, remote_filename): self._fail_with_exception(remote_filename) self._fail_by_sys_exit(remote_filename) self._wait_for_other_volume(remote_filename) self._delay_random() target_path = self.remote_pathdir.append(remote_filename) try: source_path.rename(target_path) return True except OSError: return False
[docs] def _put(self, source_path, remote_filename): self._fail_with_exception(remote_filename) self._wait_for_other_volume(remote_filename) self._delay_random() if self._skip_put_silent(remote_filename): return target_path = self.remote_pathdir.append(remote_filename) source_path.setdata() source_size = source_path.getsize() progress.report_transfer(0, source_size) target_path.writefileobj(source_path.open("rb")) self._fail_by_sys_exit(remote_filename) # fail after file is transferred self._remove_last_byte(remote_filename) progress.report_transfer(source_size, source_size)
[docs] def _get(self, filename, local_path): self._fail_with_exception(filename) self._fail_by_sys_exit(filename) self._delay_random() source_path = self.remote_pathdir.append(filename) local_path.writefileobj(source_path.open("rb"))
[docs] def _list(self): return self.remote_pathdir.listdir()
[docs] def _validate(self, remote_filename, size, source_path=None, **kwargs): # poc to show that validate can do additional things. self._delay_random() self._remove_last_byte(remote_filename) results_str = [] results_bool = [] try: if source_path: target_path = self.remote_pathdir.append(remote_filename) target_hash = self.__hash_fileobj(target_path.open()) source_hash = self.__hash_fileobj(source_path.open()) if target_hash == source_hash: results_str.append(f"file hash {target_hash} matches") results_bool.append(True) else: results_str.append(f"expected hash {source_hash} doesn't match file hash {target_hash}") results_bool.append(False) if size == self._query(remote_filename)["size"]: results_str.append(f"file size {size}") results_bool.append(True) else: results_str.append( f'expected size {size} and file size {self._query(remote_filename)["size"]} don\'t match' ) results_bool.append(False) except FileNotFoundError as e: results_bool.append(False) results_str.append(f"FileNotFoundError: {e}") except Exception as e: log.FatalError( _("Unexpected exception while validate %s.") % os.fsdecode(remote_filename), log.ErrorCode.backend_validation_failed, extra=f"Exception: {e}", ) return (all(results_bool), ", ".join(results_str))
[docs] def _delete(self, filename): self._fail_with_exception(filename) self._fail_by_sys_exit(filename) self._delay_random() self.remote_pathdir.append(filename).delete()
[docs] def _delete_list(self, filenames): for filename in filenames: self._fail_with_exception(filename) self._fail_by_sys_exit(filename) self._delay_random() self.remote_pathdir.append(filename).delete()
[docs] def _query(self, filename): self._fail_with_exception(filename) self._delay_random() self._fail_by_sys_exit(filename) target_file = self.remote_pathdir.append(filename) target_file.setdata() size = target_file.getsize() if target_file.exists() else -1 return {"size": size}
[docs] def _query_list(self, filename_list): return {x: self._query(x) for x in filename_list}
[docs] def _retry_cleanup(self): pass
[docs] def _close(self): pass
[docs] def _error_code(self, operation, e): # pylint: disable=unused-argument return log.ErrorCode.backend_error
def __hash_file(self, filename): self.__hash_fileobj(open(filename, "rb")) def __hash_fileobj(self, fileobj): # TODO: Remove when py38 goes EOL if sys.version_info[:2] == (3, 8): h = hashlib.sha1() else: h = hashlib.sha1(usedforsecurity=False) # loop till the end of the file chunk = 0 while chunk != b"": # read only 1024 bytes at a time chunk = fileobj.read(1024) h.update(chunk) fileobj.close() # return the hex representation of digest return h.hexdigest()
duplicity.backend.register_backend("fortestsonly", _TestBackend)