Source code for duplicity.backends.multibackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2015 Steve Tynor <steve.tynor@gmail.com>
# Copyright 2016 Thomas Harning Jr <harningt@gmail.com>
#                  - mirror/stripe modes
#                  - write error modes
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

#

import json
import os
import os.path
import urllib.error
import urllib.parse
import urllib.request

import duplicity.backend
from duplicity import config
from duplicity import log
from duplicity import util
from duplicity.errors import BackendException


[docs]class MultiBackend(duplicity.backend.Backend): """Store files across multiple remote stores. URL is a path to a local file containing URLs/other config defining the remote store""" # the stores we are managing __stores = [] __affinities = {} # Set of known query paramaters __knownQueryParameters = frozenset( [ "mode", "onfail", "subpath", ] ) # the mode of operation to follow # can be one of 'stripe' or 'mirror' currently __mode = "stripe" __mode_allowedSet = frozenset( [ "mirror", "stripe", ] ) # the write error handling logic # can be one of the following: # * continue - default, on failure continues to next source # * abort - stop all further operations __onfail_mode = "continue" __onfail_mode_allowedSet = frozenset( [ "abort", "continue", ] ) # sub path to dynamically add sub directories to backends # will be appended to the url value __subpath = "" # when we write in stripe mode, we "stripe" via a simple round-robin across # remote stores. It's hard to get too much more sophisticated # since we can't rely on the backend to give us any useful meta # data (e.g. sizes of files, capacity of the store (quotas)) to do # a better job of balancing load across stores. __write_cursor = 0
[docs] @staticmethod def get_query_params(parsed_url): # Reparse so the query string is available reparsed_url = urllib.parse.urlparse(parsed_url.geturl()) if len(reparsed_url.query) == 0: return dict() try: queryMultiDict = urllib.parse.parse_qs(reparsed_url.query, strict_parsing=True) except ValueError as e: log.Log( _("MultiBackend: Could not parse query string %s: %s ") % (reparsed_url.query, e), log.ERROR, ) raise BackendException("Could not parse query string") queryDict = dict() # Convert the multi-dict to a single dictionary # while checking to make sure that no unrecognized values are found for name, valueList in list(queryMultiDict.items()): if len(valueList) != 1: log.Log( _("MultiBackend: Invalid query string %s: more than one value for %s") % (reparsed_url.query, name), log.ERROR, ) raise BackendException("Invalid query string") if name not in MultiBackend.__knownQueryParameters: log.Log( _("MultiBackend: Invalid query string %s: unknown parameter %s") % (reparsed_url.query, name), log.ERROR, ) raise BackendException("Invalid query string") queryDict[name] = valueList[0] return queryDict
[docs] def __init__(self, parsed_url): duplicity.backend.Backend.__init__(self, parsed_url) # Init each of the wrapped stores # # config file is a json formatted collection of values, one for # each backend. We will 'stripe' data across all the given stores: # # 'url' - the URL used for the backend store # 'env' - an optional list of enviroment variable values to set # during the intialization of the backend # # Example: # # [ # { # "url": "abackend://myuser@domain.com/backup", # "env": [ # { # "name" : "MYENV", # "value" : "xyz" # }, # { # "name" : "FOO", # "value" : "bar" # } # ] # }, # { # "url": "file:///path/to/dir" # } # ] queryParams = MultiBackend.get_query_params(parsed_url) if "mode" in queryParams: self.__mode = queryParams["mode"] if "onfail" in queryParams: self.__onfail_mode = queryParams["onfail"] if self.__mode not in MultiBackend.__mode_allowedSet: log.Log( _("MultiBackend: illegal value for %s: %s") % ("mode", self.__mode), log.ERROR, ) raise BackendException("MultiBackend: invalid mode value") if self.__onfail_mode not in MultiBackend.__onfail_mode_allowedSet: log.Log( _("MultiBackend: illegal value for %s: %s") % ("onfail", self.__onfail_mode), log.ERROR, ) raise BackendException("MultiBackend: invalid onfail value") if "subpath" in queryParams: self.__subpath = queryParams["subpath"] try: with open(parsed_url.path) as f: configs = json.load(f) except IOError as e: log.Log(_("MultiBackend: Url %s") % (parsed_url.strip_auth()), log.ERROR) log.Log( _("MultiBackend: Could not load config file %s: %s ") % (parsed_url.path, e), log.ERROR, ) raise BackendException("Could not load config file") for config in configs: url = config["url"] + self.__subpath log.Log(_("MultiBackend: use store %s") % url, log.INFO) if "env" in config: for env in config["env"]: log.Log( _("MultiBackend: set env %s = %s") % (env["name"], env["value"]), log.INFO, ) os.environ[env["name"]] = env["value"] store = duplicity.backend.get_backend(url) self.__stores.append(store) # Prefix affinity if "prefixes" in config: if self.__mode == "stripe": raise BackendException("Multibackend: stripe mode not supported with prefix affinity.") for prefix in config["prefixes"]: log.Log( _("Multibackend: register affinity for prefix %s") % prefix, log.INFO, ) if prefix in self.__affinities: self.__affinities[prefix].append(store) else: self.__affinities[prefix] = [store]
# store_list = store.list() # log.Log(_("MultiBackend: at init, store %s has %s files") # % (url, len(store_list)), # log.INFO)
[docs] def _eligible_stores(self, filename): if self.__affinities: matching_prefixes = [k for k in list(self.__affinities.keys()) if os.fsdecode(filename).startswith(k)] matching_stores = {store for prefix in matching_prefixes for store in self.__affinities[prefix]} if matching_stores: # Distinct stores with matching prefix return list(matching_stores) # No affinity rule or no matching store for that prefix return self.__stores
[docs] def _put(self, source_path, remote_filename): # Store an indication of whether any of these passed passed = False # Eligibile stores for this action stores = self._eligible_stores(remote_filename) # Mirror mode always starts at zero if self.__mode == "mirror": self.__write_cursor = 0 first = self.__write_cursor while True: store = stores[self.__write_cursor] try: next = self.__write_cursor + 1 # pylint: disable=redefined-builtin if next > len(stores) - 1: next = 0 log.Log( _("MultiBackend: _put: write to store #%s (%s)") % (self.__write_cursor, store.backend.parsed_url.strip_auth()), log.DEBUG, ) store.put(source_path, remote_filename) passed = True self.__write_cursor = next # No matter what, if we loop around, break this loop if next == 0: break # If in stripe mode, don't continue to the next if self.__mode == "stripe": break except Exception as e: log.Log( _("MultiBackend: failed to write to store #%s (%s), try #%s, Exception: %s") % ( self.__write_cursor, store.backend.parsed_url.strip_auth(), next, e, ), log.INFO, ) self.__write_cursor = next # If we consider write failure as abort, abort if self.__onfail_mode == "abort": log.Log( _("MultiBackend: failed to write %s. Aborting process.") % source_path, log.ERROR, ) raise BackendException("failed to write") # If we've looped around, and none of them passed, fail if (self.__write_cursor == first) and not passed: log.Log( _("MultiBackend: failed to write %s. Tried all backing stores and none succeeded") % source_path, log.ERROR, ) raise BackendException("failed to write")
[docs] def _get(self, remote_filename, local_path): # since the backend operations will be retried, we can't # simply try to get from the store, if not found, move to the # next store (since each failure will be retried n times # before finally giving up). So we need to get the list first # before we try to fetch # ENHANCEME: maintain a cached list for each store stores = self._eligible_stores(remote_filename) for s in stores: flist = s.list() if remote_filename in flist: s.get(remote_filename, local_path) return log.Log( _("MultiBackend: failed to get %s to %s from %s") % (remote_filename, local_path, s.backend.parsed_url.strip_auth()), log.INFO, ) log.Log( _("MultiBackend: failed to get %s. Tried all backing stores and none succeeded") % remote_filename, log.ERROR, ) raise BackendException("failed to get")
[docs] def _list(self): lists = [] for s in self.__stores: config.are_errors_fatal["list"] = (False, []) l = s.list() log.Notice(_("MultiBackend: %s: %d files") % (s.backend.parsed_url.strip_auth(), len(l))) if len(l) == 0 and duplicity.backend._last_exception: log.Warn( _( f"Exception during list of {s.backend.parsed_url.strip_auth()}: " f"{util.uexc(duplicity.backend._last_exception)}" ) ) duplicity.backend._last_exception = None lists.append(l) # combine the lists into a single flat list w/o duplicates via set: result = list({item for sublist in lists for item in sublist}) log.Log(_("MultiBackend: combined list: %s") % result, log.DEBUG) return result
[docs] def _delete(self, filename): # Store an indication on whether any passed passed = False stores = self._eligible_stores(filename) # since the backend operations will be retried, we can't # simply try to get from the store, if not found, move to the # next store (since each failure will be retried n times # before finally giving up). So we need to get the list first # before we try to delete # ENHANCEME: maintain a cached list for each store for s in stores: flist = s.list() if filename in flist: if hasattr(s.backend, "_delete_list"): s._do_delete_list( [ filename, ] ) elif hasattr(s.backend, "_delete"): s._do_delete(filename) passed = True if not passed: log.Log( _("MultiBackend: failed to delete %s. Tried all backing stores and none succeeded") % filename, log.ERROR, )
[docs] def _delete_list(self, filenames): # Store an indication on whether any passed passed = False stores = self.__stores # since the backend operations will be retried, we can't # simply try to get from the store, if not found, move to the # next store (since each failure will be retried n times # before finally giving up). So we need to get the list first # before we try to delete # ENHANCEME: maintain a cached list for each store for s in stores: flist = s.list() cleaned = [f for f in filenames if f in flist] if hasattr(s.backend, "_delete_list"): s._do_delete_list(cleaned) elif hasattr(s.backend, "_delete"): for filename in cleaned: s._do_delete(filename) passed = True if not passed: log.Log( _("MultiBackend: failed to delete %s. Tried all backing stores and none succeeded") % filenames, log.ERROR, )
[docs] def pre_process_download(self, filename): for store in self.__stores: if hasattr(store.backend, "pre_process_download"): store.backend.pre_process_download(filename)
[docs] def pre_process_download_batch(self, filenames): set_files = set(filenames) for store in self.__stores: if hasattr(store.backend, "pre_process_download_batch"): store_files_to_download = set_files.intersection(store.list()) if len(store_files_to_download) > 0: store.backend.pre_process_download_batch(store_files_to_download)
duplicity.backend.register_backend("multi", MultiBackend)