Source code for duplicity.backends.idrivedbackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2021 Menno Smits <menno@smi-ling.nl>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import os
import urllib
import tempfile
import re
import xml.etree.ElementTree as ET
import shutil
import errno


import duplicity.backend
from duplicity import config
from duplicity import log
from duplicity import tempdir
from duplicity import progress
from duplicity.errors import BackendException

#
#   This backend works with the IDrive  "dedup implementation". V0.1
#               (for all new and recent accounts)
#
#   Credits: This code is loosely inspired by the work of <aappddeevv>
#
#
#   This backend uses an intermediate driver for IDrive: "idevsutil_dedup" that will be
#   installed automagically  when you perform the account setup on your system.
#   It can, however, also be downloaded directly from the following URL's
#
#   https://www.idrivedownloads.com/downloads/linux/download-options/IDrive_linux_64bit.zip
#   and
#   https://www.idrivedownloads.com/downloads/linux/download-options/IDrive_linux_32bit.zip
#
#   for 32 and 64 bit linux, respectively. Copy the file anywhere with exe permissions.
#   (no further setup of your IDrive account is needed for idrived to work)
#
#
#   For this backend to work, you need to create a number of environment variables:
#
#   - Put the absolute path to the driver-file (idevsutil_dedup) in IDEVSPATH
#   - Put the account-name (login name) in IDRIVEID
#
#   - Put the name of the desired bucket for this backup-session in IDBUCKET
#     If this bucket does not exist it will be created at runtime
#
#   - Create a file with the account password - put absolute path in IDPWDFILE
#
#   When using a custom encryption key:
#   - Create a file with the encryption key - put absolute path in IDKEYFILE
#
#   Note: setup proper security for these files!
#
#
#   The IDrive "root" issue ...
#
#   IDrive stores files according to 1) the selected bucket, 2) the supplied path
#   and 3)the absolute path of the directory used for uploads. So ... if we use
#       - bucket <MYBUCKET>
#       - duplicity commandline idrived://DUPLICITY
#   and
#       - system tempfile OR path set from --tempfile "\tmp"
#
#   the files end-up in the following path:
#       <MYBUCKET>/DUPLICITY/tmp/duplicity-??????-tempdir
#
#   Not only is this SO UGLY .... but - as tempdirs have unique names - this effectively
#   disables the idea of incremental backups.
#
#   To remedy this, idrived uses the concept of a "fakeroot" directory, defined via the
#   --idr-fakeroot=... switch. This can be an existing directory, or the directory is
#   created at runtime on the root of the (host) files system. (cave: you have to have
#   write access to the root!). Directories created at runtime are auto-removed on exit!
#
#   So, in the above scheme, we could do:
#       duplicity --idr-fakeroot=nicepath idrived://DUPLICITY
#
#   our files end-up at
#       <MYBUCKET>/DUPLICITY/nicepath
#
#
#   Have fun!
#


[docs]class IDriveBackend(duplicity.backend.Backend):
[docs] def __init__(self, parsed_url): duplicity.backend.Backend.__init__(self, parsed_url) # parsed_url will have leading slashes in it, 4 slashes typically. self.parsed_url = parsed_url self.url_string = duplicity.backend.strip_auth_from_url(self.parsed_url) log.Debug(u"parsed_url: {0}".format(parsed_url)) self.connected = False
[docs] def user_connected(self): return self.connected
[docs] def request(self, commandline): # request for commands returning data in XML format log.Debug(u"Request command: {0}".format(commandline)) try: _, reply, error = self.subprocess_popen(commandline) except KeyError: raise BackendException(u"Unknown protocol failure on request {0}".format(commandline)) response = reply + error try: xml = u"<root>" + u''.join(re.findall(u"<[^>]+>", response)) + u"</root>" el = ET.fromstring(xml) except: el = None log.Debug(u"Request response: {0}".format(response)) return el
[docs] def connect(self): # get the path to the command executable path = os.environ.get(u"IDEVSPATH") if path is None: log.Warn(u"-" * 72) log.Warn(u"WARNING: No path to 'idevsutil_dedup' has been set. Download module from") log.Warn(u" https://www.idrivedownloads.com/downloads/linux/download-options/IDrive_linux_64bit.zip") log.Warn(u"or") log.Warn(u" https://www.idrivedownloads.com/downloads/linux/download-options/IDrive_linux_32bit.zip") log.Warn(u"and place anywhere with exe rights. Then creat env var 'IDEVSPATH' with path to file") log.Warn(u"-" * 72) raise BackendException(u"No IDEVSPATH env var set. Should contain folder to idevsutil_dedup") self.cmd = os.path.join(path, u"idevsutil_dedup") log.Debug(u"IDrive command base: %s" % (self.cmd)) # get the account-id self.idriveid = os.environ.get(u"IDRIVEID") if self.idriveid is None: log.Warn(u"-" * 72) log.Warn(u"WARNING: IDrive logon ID missing") log.Warn(u"Create an environment variable IDriveID with your IDrive logon ID") log.Warn(u"-" * 72) raise BackendException(u"No IDRIVEID env var set. Should contain IDrive id") log.Debug(u"IDrive id: %s" % (self.idriveid)) # Get the full-path to the account password file filepath = os.environ.get(u"IDPWDFILE") if filepath is None: log.Warn(u"-" * 72) log.Warn(u"WARNING: IDrive password file missging") log.Warn(u"Please create a file with your IDrive logon password,") log.Warn(u"Then create an environment variable IDPWDFILE with path/filename of said file") log.Warn(u"-" * 72) raise BackendException(u"No IDPWDFILE env var set. Should contain file with password") log.Debug(u"IDrive pwdpath: %s" % (filepath)) self.auth_switch = u" --password-file={0}".format(filepath) # fakeroot set? Create directory and mark for cleanup if config.fakeroot is None: self.cleanup = False self.fakeroot = u'' else: # Make sure fake root is created at root level! self.fakeroot = os.path.join(u'/', config.fakeroot) try: os.mkdir(self.fakeroot) except OSError as e: self.cleanup = False if e.errno == errno.EEXIST: log.Debug(u"Using existing directory {0} as fake-root".format(self.fakeroot)) else: log.Warn(u"-" * 72) log.Warn(u"WARNING: Creation of FAKEROOT {0} failed; backup will use system temp directory" .format(self.fakeroot)) log.Warn(u"This might interfere with incremental backups") log.Warn(u"-" * 72) raise BackendException(u"Creation of the directory {0} failed".format(self.fakeroot)) else: log.Debug(u"Directory {0} created as fake-root (Will clean-up afterwards!)".format(self.fakeroot)) self.cleanup = True # get the bucket self.bucket = os.environ.get(u"IDBUCKET") if self.bucket is None: log.Warn(u"-" * 72) log.Warn(u"WARNING: IDrive backup bucket missing") log.Warn(u"Create an environment variable IDBUCKET specifying the target bucket") log.Warn(u"-" * 72) raise BackendException(u"No IDBUCKET env var set. Should contain IDrive backup bucket") log.Debug(u"IDrive bucket: %s" % (self.bucket)) # check account / get config status and config type el = self.request(self.cmd + self.auth_switch + u" --validate --user={0}".format(self.idriveid)).find(u'tree') if el.attrib[u"message"] != u"SUCCESS": raise BackendException(u"Protocol failure - " + el.attrib[u"desc"]) if el.attrib[u"desc"] != u"VALID ACCOUNT": raise BackendException(u"IDrive account invalid") if el.attrib[u"configstatus"] != u"SET": raise BackendException(u"IDrive account not set") # When private encryption enabled: get the full-path to a encription key file if el.attrib[u"configtype"] == u"PRIVATE": filepath = os.environ.get(u"IDKEYFILE") if filepath is None: log.Warn(u"-" * 72) log.Warn(u"WARNING: IDrive encryption key file missging") log.Warn(u"Please create a file with your IDrive encryption key,") log.Warn(u"Then create an environment variable IDKEYFILE with path/filename of said file") log.Warn(u"-" * 72) raise BackendException(u"No IDKEYFILE env var set. Should contain file with encription key") log.Debug(u"IDrive keypath: %s" % (filepath)) self.auth_switch += u" --pvt-key={0}".format(filepath) # get the server address el = self.request(self.cmd + self.auth_switch + u" --getServerAddress {0}".format(self.idriveid)).find(u'tree') self.idriveserver = el.attrib[u"cmdUtilityServer"] # get the device list - primarely used to get device-id string el = self.request(self.cmd + self.auth_switch + u" --list-device {0}@{1}::home". format(self.idriveid, self.idriveserver)) # scan all returned devices for requested device (== bucket) self.idrivedevid = None for item in el.findall(u'item'): if item.attrib[u'nick_name'] == self.bucket: # prefix and suffix reverse-engineered from Common.pl! self.idrivedevid = u"5c0b" + item.attrib[u"device_id"] + u"4b5z" if self.idrivedevid is None: el = self.request( self.cmd + self.auth_switch + u" --create-bucket --bucket-type=D --nick-name={0} --os=Linux --uid=987654321 {1}@{2}::home/" .format(self.bucket, self.idriveid, self.idriveserver)).find(u'item') # prefix and suffix reverse-engineered from Common.pl! self.idrivedevid = u"5c0b" + el.attrib[u"device_id"] + u"4b5z" # We're fully connected! self.connected = True log.Debug(u"User fully connected")
[docs] def list_raw(self): # get raw list; used by _list, _query and _query_list remote_path = os.path.join(urllib.parse.unquote(self.parsed_url.path.lstrip(u'/')), self.fakeroot.lstrip(u'/')).rstrip() commandline = ((self.cmd + self.auth_switch + u" --auth-list --device-id={0} {1}@{2}::home/{3}" .format(self.idrivedevid, self.idriveid, self.idriveserver, remote_path))) try: _, l, _ = self.subprocess_popen(commandline) except: # error: treat as empty response log.Debug(u"list EMPTY response ") return [] log.Debug(u"list response: {0}".format(l)) # get a list of lists from data lines returned by idevsutil_dedup --auth-list filtered = map((lambda line: re.split(r"\[|\]", line)), [x for x in l.splitlines() if x.startswith(u"[")]) # remove whitespace from elements filtered = map((lambda line: map((lambda c: c.strip()), line)), filtered) # remove empty elements filtered = list(map((lambda cols: list(filter((lambda c: c != u''), cols))), filtered)) return filtered
[docs] def _put(self, source_path, remote_filename): # Put a file. log.Debug(u"_PUT") if not self.user_connected(): self.connect() # decode from byte-stream to utf-8 string filename = remote_filename.decode(u'utf-8') intrim_file = os.path.join(self.fakeroot, filename) remote_dirpath = urllib.parse.unquote(self.parsed_url.path.lstrip(u'/')) os.rename(source_path.name, intrim_file) log.Debug(u"put_file: source_path={0}, remote_file={1}".format(source_path.name, filename)) flist = tempfile.NamedTemporaryFile(u'w') flist.write(intrim_file) flist.seek(0) putrequest = ((self.cmd + self.auth_switch + u" --device-id={0} --files-from={1} / {2}@{3}::home/{4}") .format(self.idrivedevid, flist.name, self.idriveid, self.idriveserver, remote_dirpath)) log.Debug(u"put_file put command: {0}".format(putrequest)) _, putresponse, _ = self.subprocess_popen(putrequest) log.Debug(u"put_file put response: {0}".format(putresponse)) flist.close() os.remove(intrim_file)
[docs] def _get(self, remote_filename, local_path): # Get a file. log.Debug(u"_GET") if not self.user_connected(): self.connect() # decode from byte-stream to utf-8 string filename = remote_filename.decode(u'utf-8') remote_path = os.path.join(urllib.parse.unquote(self.parsed_url.path.lstrip(u'/')), self.fakeroot.lstrip(u'/'), filename).rstrip() log.Debug(u"_get: remote_filename={0}, local_path={1}, remote_path={2}, parsed_url.path={3}" .format(filename, local_path, remote_path, self.parsed_url.path)) # Create tempdir to downlaod file into tmpdir = tempfile.mkdtemp() log.Debug(u"_get created temporary download folder: {}".format(tmpdir)) # The filelist file flist = tempfile.NamedTemporaryFile(u'w') flist.write(remote_path) flist.seek(0) commandline = ((self.cmd + self.auth_switch + u" --device-id={0} --files-from={1} {2}@{3}::home/ {4}") .format(self.idrivedevid, flist.name, self.idriveid, self.idriveserver, tmpdir)) log.Debug(u"get command: {0}".format(commandline)) _, getresponse, _ = self.subprocess_popen(commandline) log.Debug(u"_get response: {0}".format(getresponse)) flist.close() # move to the final location downloadedSrcPath = os.path.join(tmpdir, remote_path.lstrip(u'/').rstrip(u'/')) log.Debug(u"_get moving file {0} to final location: {1}".format(downloadedSrcPath, local_path.name)) os.rename(downloadedSrcPath, local_path.name) shutil.rmtree(tmpdir)
[docs] def _list(self): # List files on remote folder log.Debug(u"_LIST") if not self.user_connected(): self.connect() filtered = self.list_raw() filtered = [x[-1] for x in filtered] return filtered
[docs] def _delete(self, remote_filename): # Delete single file log.Debug(u"_DELETE") if not self.user_connected(): self.connect() # decode from byte-stream to utf-8 string filename = remote_filename.decode(u'utf-8') # create a file-list file flist = tempfile.NamedTemporaryFile(u'w') flist.write(filename.lstrip(u'/')) flist.seek(0) # target path (remote) on IDrive remote_path = os.path.join(urllib.parse.unquote(self.parsed_url.path.lstrip(u'/')), self.fakeroot.lstrip(u'/')).rstrip() log.Debug(u"delete: {0} from remote file path {1}".format(filename, remote_path)) # delete files from file-list delrequest = ((self.cmd + self.auth_switch + u" --delete-items --device-id={0} --files-from={1} {2}@{3}::home/{4}") .format(self.idrivedevid, flist.name, self.idriveid, self.idriveserver, remote_path)) log.Debug(u"delete: {0}".format(delrequest)) _, delresponse, _ = self.subprocess_popen(delrequest) log.Debug(u"delete response: {0}".format(delresponse)) # close tempfile flist.close()
[docs] def _delete_list(self, filename_list): # Delete multiple files log.Debug(u"_DELETE LIST") if not self.user_connected(): self.connect() # create a file-list file flist = tempfile.NamedTemporaryFile(u'w') # create file-list for filename in filename_list: flist.write(filename.decode(u'utf-8').lstrip(u'/') + u'\n') flist.seek(0) # target path (remote) on IDrive remote_path = os.path.join(urllib.parse.unquote(self.parsed_url.path.lstrip(u'/')), self.fakeroot.lstrip(u'/')).rstrip() log.Debug(u"delete multiple files from remote file path {0}".format(remote_path)) # delete files from file-list delrequest = ((self.cmd + self.auth_switch + u" --delete-items --device-id={0} --files-from={1} {2}@{3}::home/{4}") .format(self.idrivedevid, flist.name, self.idriveid, self.idriveserver, remote_path)) log.Debug(u"delete: {0}".format(delrequest)) _, delresponse, _ = self.subprocess_popen(delrequest) log.Debug(u"delete response: {0}".format(delresponse)) # close tempfile flist.close()
[docs] def _close(self): # Remove EVS_temp directory + contents log.Debug(u"Removing IDrive temp folder evs_temp") try: shutil.rmtree(u"evs_temp") except: pass
[docs] def _query(self, filename): log.Debug(u"_QUERY") if not self.user_connected(): self.connect() # Get raw directory list; take-out size (index 1) for requested filename (index -1) filtered = self.list_raw() if filtered: filtered = [x[1] for x in filtered if x[-1] == filename.decode(u'utf-8')] if filtered: return {u'size': int(filtered[0])} return {u'size': -1}
[docs] def _query_list(self, filename_list): log.Debug(u"_QUERY_LIST") if not self.user_connected(): self.connect() # Get raw directory list filtered = self.list_raw() # For each filename in list: take-out size (index 1) for requested filename (index -1) info = {} for filename in filename_list: if filtered: result = [x[1] for x in filtered if x[-1] == filename.decode(u'utf-8')] if result: info[filename] = {u'size': int(result[0])} else: info[filename] = {u'size': -1} return info
def __del__(self): # remove the self-created temp dir. # We do it here, AFTER the clean-up of Duplicity, so it will be empty! if self.cleanup: os.rmdir(self.fakeroot)
duplicity.backend.register_backend(u"idrived", IDriveBackend)