Source code for duplicity.patchdir

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
# Copyright 2002 Ben Escoto <>
# Copyright 2007 Kenneth Loafman <>
# This file is part of duplicity.
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import tempfile

from duplicity import diffdir
from duplicity import dup_tarfile
from duplicity import errors
from duplicity import selection
from duplicity import tempdir
from duplicity.path import *  # pylint: disable=unused-wildcard-import,redefined-builtin

"""Functions for patching of directories"""

[docs]class PatchDirException(Exception): pass
[docs]def Patch(base_path, difftar_fileobj): """Patch given base_path and file object containing delta""" diff_tarfile = dup_tarfile.TarFile("arbitrary", "r", difftar_fileobj) patch_diff_tarfile(base_path, diff_tarfile) assert not difftar_fileobj.close()
[docs]def Patch_from_iter(base_path, fileobj_iter, restrict_index=()): """Patch given base_path and iterator of delta file objects""" diff_tarfile = TarFile_FromFileobjs(fileobj_iter) patch_diff_tarfile(base_path, diff_tarfile, restrict_index)
[docs]def patch_diff_tarfile(base_path, diff_tarfile, restrict_index=()): """Patch given Path object using delta dup_tarfile (as in dup_tarfile.TarFile) If restrict_index is set, ignore any deltas in diff_tarfile that don't start with restrict_index. """ if base_path.exists(): path_iter = selection.Select(base_path).set_iter() else: path_iter = empty_iter() # probably untarring full backup diff_path_iter = difftar2path_iter(diff_tarfile) if restrict_index: diff_path_iter = filter_path_iter(diff_path_iter, restrict_index) collated = diffdir.collate2iters(path_iter, diff_path_iter) ITR = IterTreeReducer(PathPatcher, [base_path]) for basis_path, diff_ropath in collated: if basis_path: log.Info( _("Patching %s") % (os.fsdecode(basis_path.get_relative_path())), log.InfoCode.patch_file_patching, util.escape(basis_path.get_relative_path()), ) ITR(basis_path.index, basis_path, diff_ropath) else: log.Info( _("Patching %s") % (os.fsdecode(diff_ropath.get_relative_path())), log.InfoCode.patch_file_patching, util.escape(diff_ropath.get_relative_path()), ) ITR(diff_ropath.index, basis_path, diff_ropath) ITR.Finish() base_path.setdata()
[docs]def empty_iter(): if 0: yield 1 # this never happens, but fools into generator treatment
[docs]def filter_path_iter(path_iter, index): """Rewrite path elements of path_iter so they start with index Discard any that doesn't start with index, and remove the index prefix from the rest. """ assert isinstance(index, tuple) and index, index l = len(index) for path in path_iter: if path.index[:l] == index: path.index = path.index[l:] yield path
[docs]def difftar2path_iter(diff_tarfile): """Turn file-like difftarobj into iterator of ROPaths""" tar_iter = iter(diff_tarfile) multivol_fileobj = None # The next tar_info is stored in this one element list so # Multivol_Filelike below can update it. Any StopIterations will # be passed upwards. try: tarinfo_list = [next(tar_iter)] except StopIteration: return while True: # This section relevant when a multivol diff is last in tar if not tarinfo_list[0]: return if multivol_fileobj and not multivol_fileobj.at_end: multivol_fileobj.close() # aborting in middle of multivol continue index, difftype, multivol = get_index_from_tarinfo(tarinfo_list[0]) ropath = ROPath(index) ropath.init_from_tarinfo(tarinfo_list[0]) ropath.difftype = difftype if difftype == "deleted": ropath.type = None elif ropath.isreg(): if multivol: multivol_fileobj = Multivol_Filelike(diff_tarfile, tar_iter, tarinfo_list, index) ropath.setfileobj(multivol_fileobj) yield ropath continue # Multivol_Filelike will reset tarinfo_list else: ropath.setfileobj(diff_tarfile.extractfile(tarinfo_list[0])) yield ropath try: tarinfo_list[0] = next(tar_iter) except StopIteration: return
[docs]def get_index_from_tarinfo(tarinfo): """Return (index, difftype, multivol) pair from tarinfo object""" for prefix in [ "snapshot/", "diff/", "deleted/", "multivol_diff/", "multivol_snapshot/", ]: tiname = util.get_tarinfo_name(tarinfo) if tiname.startswith(prefix): name = tiname[len(prefix) :] # strip prefix if prefix.startswith("multivol"): if prefix == "multivol_diff/": difftype = "diff" else: difftype = "snapshot" multivol = 1 name, num_subs = re.subn("(?s)^multivol_(diff|snapshot)/?(.*)/[0-9]+$", "\\2", tiname) if num_subs != 1: raise PatchDirException(f"Unrecognized diff entry {tiname}") else: difftype = prefix[:-1] # strip trailing / name = tiname[len(prefix) :] if name.endswith(r"/"): name = name[:-1] # strip trailing /'s multivol = 0 break else: raise PatchDirException(f"Unrecognized diff entry {tiname}") if name == r"." or name == r"": index = () else: index = tuple(os.fsencode(name).split(b"/")) if b".." in index: raise PatchDirException(f"Tar entry {os.fsdecode(tiname)} contains '..'. Security violation") return index, difftype, multivol
[docs]class Multivol_Filelike(object): """Emulate a file like object from multivols Maintains a buffer about the size of a volume. When it is read() to the end, pull in more volumes as desired. """
[docs] def __init__(self, tf, tar_iter, tarinfo_list, index): """Initializer. tf is TarFile obj, tarinfo is first tarinfo""", self.tar_iter = tf, tar_iter self.tarinfo_list = tarinfo_list # must store as list for write access self.index = index self.buffer = b"" self.at_end = False
[docs] def read(self, length=-1): """Read length bytes from file""" if length < 0: while self.addtobuffer(): pass real_len = len(self.buffer) else: while len(self.buffer) < length: if not self.addtobuffer(): break real_len = min(len(self.buffer), length) result = self.buffer[:real_len] self.buffer = self.buffer[real_len:] return result
[docs] def addtobuffer(self): """Add next chunk to buffer""" if self.at_end: return False index, difftype, multivol = get_index_from_tarinfo(self.tarinfo_list[0]) if not multivol or index != self.index: # we've moved on # the following communicates next tarinfo to difftar2path_iter self.at_end = True return False fp =[0]) self.buffer += fp.close() try: self.tarinfo_list[0] = next(self.tar_iter) except StopIteration: self.tarinfo_list[0] = None self.at_end = True return False return True
[docs] def close(self): """If not at end, read remaining data""" if not self.at_end: while True: self.buffer = b"" if not self.addtobuffer(): break self.at_end = True
[docs]class PathPatcher(ITRBranch): """Used by DirPatch, process the given basis and diff"""
[docs] def __init__(self, base_path): """Set base_path, Path of root of tree""" self.dir_basis_path = None self.base_path = base_path self.dir_diff_ropath = None
[docs] def start_process(self, index, basis_path, diff_ropath): """Start processing when diff_ropath is a directory""" if not (diff_ropath and diff_ropath.isdir()): assert index == (), util.uindex(index) # should only happen for first elem self.fast_process(index, basis_path, diff_ropath) return if not basis_path: basis_path = self.base_path.new_index(index) assert not basis_path.exists() basis_path.mkdir() # Need place for later files to go into elif not basis_path.isdir(): basis_path.delete() basis_path.mkdir() self.dir_basis_path = basis_path self.dir_diff_ropath = diff_ropath
[docs] def end_process(self): """Copy directory permissions when leaving tree""" if self.dir_diff_ropath: self.dir_diff_ropath.copy_attribs(self.dir_basis_path)
[docs] def can_fast_process(self, index, basis_path, diff_ropath): # pylint: disable=unused-argument """No need to recurse if diff_ropath isn't a directory""" return not (diff_ropath and diff_ropath.isdir())
[docs] def fast_process(self, index, basis_path, diff_ropath): """For use when neither is a directory""" if not diff_ropath: return # no change elif not basis_path: if diff_ropath.difftype == "deleted": pass # already deleted else: # just copy snapshot over diff_ropath.copy(self.base_path.new_index(index)) elif diff_ropath.difftype == "deleted": if basis_path.isdir(): basis_path.deltree() else: basis_path.delete() elif not basis_path.isreg() or (basis_path.isreg() and diff_ropath.difftype == "snapshot"): if basis_path.isdir(): basis_path.deltree() else: basis_path.delete() diff_ropath.copy(basis_path) else: assert diff_ropath.difftype == "diff", diff_ropath.difftype basis_path.patch_with_attribs(diff_ropath)
[docs]class TarFile_FromFileobjs(object): """Like a dup_tarfile.TarFile iterator, but read from multiple fileobjs"""
[docs] def __init__(self, fileobj_iter): """Make new tarinfo iterator fileobj_iter should be an iterator of file objects opened for reading. They will be closed at end of reading. """ self.fileobj_iter = fileobj_iter self.dup_tarfile, self.tar_iter = None, None self.current_fp = None
def __iter__(self): # pylint: disable=non-iterator-returned return self
[docs] def set_tarfile(self): """Set dup_tarfile from next file object, or raise StopIteration""" if self.current_fp: assert not self.current_fp.close() while True: x = next(self.fileobj_iter) if isinstance(x, errors.BadVolumeException): # continue with the next volume continue else: self.current_fp = x break self.dup_tarfile = util.make_tarfile("r", self.current_fp) self.tar_iter = iter(self.dup_tarfile)
[docs] def __next__(self): if not self.dup_tarfile: try: self.set_tarfile() except StopIteration: return try: return next(self.tar_iter) except StopIteration: assert not self.dup_tarfile.close() self.set_tarfile() return next(self.tar_iter)
[docs] def extractfile(self, tarinfo): """Return data associated with given tarinfo""" return self.dup_tarfile.extractfile(tarinfo)
[docs]def collate_iters(iter_list): """Collate iterators by index Input is a list of n iterators each of which must iterate elements with an index attribute. The elements must come out in increasing order, and the index should be a tuple itself. The output is an iterator which yields tuples where all elements in the tuple have the same index, and the tuple has n elements in it. If any iterator lacks an element with that index, the tuple will have None in that spot. """ # overflow[i] means that iter_list[i] has been exhausted # elems[i] is None means that it is time to replenish it. iter_num = len(iter_list) if iter_num == 2: return diffdir.collate2iters(iter_list[0], iter_list[1]) overflow = [None] * iter_num elems = overflow[:] def setrorps(overflow, elems): """Set the overflow and rorps list""" for i in range(iter_num): if not overflow[i] and elems[i] is None: try: elems[i] = next(iter_list[i]) except StopIteration: overflow[i] = 1 elems[i] = None def getleastindex(elems): """Return the first index in elems, assuming elems isn't empty""" return min([elem.index for elem in [x for x in elems if x]]) def yield_tuples(iter_num, overflow, elems): while True: setrorps(overflow, elems) if None not in overflow: break index = getleastindex(elems) yieldval = [] for i in range(iter_num): if elems[i] and elems[i].index == index: yieldval.append(elems[i]) elems[i] = None else: yieldval.append(None) yield tuple(yieldval) return yield_tuples(iter_num, overflow, elems)
[docs]class IndexedTuple(object): """Like a tuple, but has .index (used previously by collate_iters)"""
[docs] def __init__(self, index, sequence): self.index = index = tuple(sequence)
def __len__(self): return len( def __getitem__(self, key): """This only works for numerical keys (easier this way)""" return[key] def __lt__(self, other): return self.__cmp__(other) == -1 def __le__(self, other): return self.__cmp__(other) != 1 def __ne__(self, other): return not self.__eq__(other) def __gt__(self, other): return self.__cmp__(other) == 1 def __ge__(self, other): return self.__cmp__(other) != -1 def __cmp__(self, other): assert isinstance(other, IndexedTuple) if self.index < other.index: return -1 elif self.index == other.index: return 0 else: return 1 def __eq__(self, other): if isinstance(other, IndexedTuple): return self.index == other.index and == elif isinstance(other, tuple): return == other else: return False def __str__(self): return f"({', '.join(map(str,}).{self.index}"
[docs]def normalize_ps(patch_sequence): """Given an sequence of ROPath deltas, remove blank and unnecessary The sequence is assumed to be in patch order (later patches apply to earlier ones). A patch is unnecessary if a later one doesn't require it (for instance, any patches before a "delete" are unnecessary). """ result_list = [] i = len(patch_sequence) - 1 while i >= 0: delta = patch_sequence[i] if delta is not None: # skip blank entries result_list.insert(0, delta) if delta.difftype != "diff": break i -= 1 return result_list
[docs]def patch_seq2ropath(patch_seq): """Apply the patches in patch_seq, return single ropath""" first = patch_seq[0] assert first.difftype != "diff", f"First patch in sequence {patch_seq} was a diff" if not first.isreg(): # No need to bother with data if not regular file assert len(patch_seq) == 1, f"Patch sequence isn't regular, but has {len(patch_seq)} entries" return first.get_ropath() current_file ="rb") for delta_ropath in patch_seq[1:]: assert delta_ropath.difftype == "diff", delta_ropath.difftype try: cur_file.fileno() except Exception as e: """ librsync insists on a real file object, which we create manually by using the duplicity.tempdir to tell us where. See for discussion of os.tmpfile() vs tempfile.TemporaryFile() w.r.t. Windows / Posix, which is worked around in librsync.PatchedFile() now. """ tempfp = tempfile.TemporaryFile(dir=tempdir.default().dir()) util.copyfileobj(current_file, tempfp) assert not current_file.close() current_file = tempfp current_file = librsync.PatchedFile(current_file,"rb")) result = patch_seq[-1].get_ropath() result.setfileobj(current_file) return result
[docs]def integrate_patch_iters(iter_list): """Combine a list of iterators of ropath patches The iter_list should be sorted in patch order, and the elements in each iter_list need to be orderd by index. The output will be an iterator of the final ROPaths in index order. """ collated = collate_iters(iter_list) for patch_seq in collated: normalized = normalize_ps(patch_seq) try: final_ropath = patch_seq2ropath(normalized) if final_ropath.exists(): # otherwise final patch was delete yield final_ropath except Exception as e: filename = normalized[-1].get_ropath().get_relative_path() log.Warn( _("Error '%s' patching %s") % (util.uexc(e), os.fsdecode(filename)), log.WarningCode.cannot_process, util.escape(filename), )
[docs]def tarfiles2rop_iter(tarfile_list, restrict_index=()): """Integrate tarfiles of diffs into single ROPath iter Then filter out all the diffs in that index which don't start with the restrict_index. """ diff_iters = [difftar2path_iter(x) for x in tarfile_list] if restrict_index: # Apply filter before integration diff_iters = [filter_path_iter(x, restrict_index) for x in diff_iters] return integrate_patch_iters(diff_iters)
[docs]def Write_ROPaths(base_path, rop_iter): """Write out ropaths in rop_iter starting at base_path Returns 1 if something was actually written, 0 otherwise. """ ITR = IterTreeReducer(ROPath_IterWriter, [base_path]) return_val = 0 for ropath in rop_iter: return_val = 1 ITR(ropath.index, ropath) ITR.Finish() base_path.setdata() return return_val
[docs]class ROPath_IterWriter(ITRBranch): """Used in Write_ROPaths above We need to use an ITR because we have to update the permissions/times of directories after we write the files in them. """
[docs] def __init__(self, base_path): """Set base_path, Path of root of tree""" self.base_path = base_path self.dir_diff_ropath = None self.dir_new_path = None
[docs] def start_process(self, index, ropath): """Write ropath. Only handles the directory case""" if not ropath.isdir(): # Base may not be a directory, but rest should assert ropath.index == (), ropath.index new_path = self.base_path.new_index(index) if ropath.exists(): if new_path.exists(): new_path.deltree() ropath.copy(new_path) self.dir_new_path = self.base_path.new_index(index) if self.dir_new_path.exists() and not config.force: # base may exist, but nothing else assert index == (), index else: self.dir_new_path.mkdir() self.dir_diff_ropath = ropath
[docs] def end_process(self): """Update information of a directory when leaving it""" if self.dir_diff_ropath: self.dir_diff_ropath.copy_attribs(self.dir_new_path)
[docs] def can_fast_process(self, index, ropath): # pylint: disable=unused-argument """Can fast process (no recursion) if ropath isn't a directory""" log.Info( _("Writing %s of type %s") % (os.fsdecode(ropath.get_relative_path()), ropath.type), log.InfoCode.patch_file_writing, f"{util.escape(ropath.get_relative_path())} {ropath.type}", ) return not ropath.isdir()
[docs] def fast_process(self, index, ropath): """Write non-directory ropath to destination""" if ropath.exists(): ropath.copy(self.base_path.new_index(index))