Source code for duplicity.backends.megabackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2017 Tomas Vondra (Launchpad id: tomas-v)
# Copyright 2017 Kenneth Loafman <kenneth@loafman.com>
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from __future__ import print_function
from future import standard_library
standard_library.install_aliases()

from duplicity import util
from duplicity.errors import BackendException
import duplicity.backend

import os
import subprocess


[docs]class MegaBackend(duplicity.backend.Backend): u"""Connect to remote store using Mega.co.nz API"""
[docs] def __init__(self, parsed_url): duplicity.backend.Backend.__init__(self, parsed_url) # ensure all the necessary megatools binaries exist self._check_binary_exists(u'megals') self._check_binary_exists(u'megamkdir') self._check_binary_exists(u'megaget') self._check_binary_exists(u'megaput') self._check_binary_exists(u'megarm') # store some basic info self._hostname = parsed_url.hostname if parsed_url.password is None: self._megarc = os.getenv(u'HOME') + u'/.megarc' else: self._megarc = False self._username = parsed_url.username self._password = self.get_password() # remote folder (Can we assume /Root prefix?) self._root = u'/Root' self._folder = self._root + u'/' + parsed_url.path[1:] # make sure the remote folder exists (the whole path) self._makedir_recursive(parsed_url.path[1:].split(u'/'))
[docs] def _check_binary_exists(self, cmd): u'checks that a specified command exists in the current path' try: # ignore the output, we only need the return code subprocess.check_output([u'which', cmd]) except Exception as e: raise BackendException(u"command '%s' not found, make sure megatools are installed" % (cmd,))
[docs] def _makedir(self, path): u'creates a remote directory' if self._megarc: cmd = [u'megamkdir', u'--config', self._megarc, path] else: cmd = [u'megamkdir', u'-u', self._username, u'-p', self._password, path] self.subprocess_popen(cmd)
[docs] def _makedir_recursive(self, path): u'creates a remote directory (recursively the whole path), ingores errors' print(u"mkdir: %s" % (u'/'.join(path),)) p = self._root for folder in path: p = p + u'/' + folder try: self._makedir(p) except: pass
[docs] def _put(self, source_path, remote_filename): u'uploads file to Mega (deletes it first, to ensure it does not exist)' try: self.delete(util.fsdecode(remote_filename)) except Exception: pass self.upload(local_file=util.fsdecode(source_path.get_canonical()), remote_file=util.fsdecode(remote_filename))
[docs] def _get(self, remote_filename, local_path): u'downloads file from Mega' self.download(remote_file=util.fsdecode(remote_filename), local_file=util.fsdecode(local_path.name))
[docs] def _list(self): u'list files in the backup folder' return self.folder_contents(files_only=True)
[docs] def _delete(self, filename): u'deletes remote ' self.delete(remote_file=util.fsdecode(filename))
[docs] def folder_contents(self, files_only=False): u'lists contents of a folder, optionally ignoring subdirectories' print(u"megals: %s" % (self._folder,)) if self._megarc: cmd = [u'megals', u'--config', self._megarc, self._folder] else: cmd = [u'megals', u'-u', self._username, u'-p', self._password, self._folder] files = subprocess.check_output(cmd) files = util.fsdecode(files.strip()).split(u'\n') # remove the folder name, including the path separator files = [f[len(self._folder) + 1:] for f in files] # optionally ignore entries containing path separator (i.e. not files) if files_only: files = [f for f in files if u'/' not in f] return [util.fsencode(f) for f in files]
[docs] def download(self, remote_file, local_file): print(u"megaget: %s" % (remote_file,)) if self._megarc: cmd = [u'megaget', u'--config', self._megarc, u'--no-progress', u'--path', local_file, self._folder + u'/' + remote_file] else: cmd = [u'megaget', u'-u', self._username, u'-p', self._password, u'--no-progress', u'--path', local_file, self._folder + u'/' + remote_file] self.subprocess_popen(cmd)
[docs] def upload(self, local_file, remote_file): print(u"megaput: %s" % (remote_file,)) if self._megarc: cmd = [u'megaput', u'--config', self._megarc, u'--no-progress', u'--path', self._folder + u'/' + remote_file, local_file] else: cmd = [u'megaput', u'-u', self._username, u'-p', self._password, u'--no-progress', u'--path', self._folder + u'/' + remote_file, local_file] try: self.subprocess_popen(cmd) except Exception as e: error_str = str(e) if u"EOVERQUOTA" in error_str: raise BackendException(u"MEGA account over quota, could not write file : '%s' . " u"Upgrade your storage at https://mega.nz/pro or remove some data." % (remote_file,)) else: raise BackendException(u"Failed writing file '%s' to MEGA , reason : '%s'" % (remote_file, e))
[docs] def delete(self, remote_file): print(u"megarm: %s" % (remote_file,)) if self._megarc: cmd = [u'megarm', u'--config', self._megarc, self._folder + u'/' + remote_file] else: cmd = [u'megarm', u'-u', self._username, u'-p', self._password, self._folder + u'/' + remote_file] self.subprocess_popen(cmd)
duplicity.backend.register_backend(u'mega', MegaBackend) duplicity.backend.uses_netloc.extend([u'mega'])