Source code for duplicity.backend_pool

from dataclasses import dataclass, field
import multiprocessing
import os
import sys
import time
from collections import deque
from datetime import datetime, timedelta
import concurrent.futures
import traceback
from typing import List, Optional, Dict, Tuple

from duplicity import (
    backend,
    log,
    path,
)

multiprocessing.set_start_method("fork")
cmd_tracker = multiprocessing.Queue()


[docs]@dataclass class TrackRecord: # result tracking while executed in the pool track_id: int pid: int trace_back: Optional[List[str]] = field( default_factory=list ) # trace backs can't be pickled, store as string, to get it over into main process result: object = None # must be picklable log_prefix: str = "" start_time = datetime.now() stop_time = datetime.min
[docs] def get_runtime(self) -> timedelta: if self.stop_time == datetime.min: return datetime.now() - self.start_time else: return self.stop_time - self.start_time
[docs]def track_cmd(track_id, cmd_name: str, *args, **kwargs): """ wraps the pooled function for time tracking and exception handling. Recording the trace back of an exception when still in process pool context to point to the right place. (This function can't be part of the BackendPool, as then the whole class get pickled) """ global pool_backend, cmd_tracker p = multiprocessing.current_process() trk_rcd = TrackRecord(track_id, p.pid, log_prefix=log.PREFIX) # type: ignore # send cmd/process assignment back to pool for tracking. cmd_tracker.put(trk_rcd, timeout=5) try: cmd = getattr(pool_backend, cmd_name) trk_rcd.result = cmd(*args, **kwargs) except Exception as e: trk_rcd.result = e trk_rcd.trace_back = traceback.format_tb(e.__traceback__) trk_rcd.stop_time = datetime.now() return trk_rcd
[docs]class BackendPool: """ uses concurrent.futures.ProcessPoolExecutor to run backend commands in background """
[docs] @dataclass class CmdStatus: function_name: str args: Dict kwargs: List trk_rcd: Optional[TrackRecord] = None done: bool = False
[docs] def __init__(self, url_string, processes=None) -> None: self.ppe = concurrent.futures.ProcessPoolExecutor( max_workers=processes, initializer=self._process_init, initargs=(url_string,) ) self._command_queue = deque() self._track_id = 0 self.all_results: List[TrackRecord] = [] self.cmd_status: Dict[int, BackendPool.CmdStatus] = {}
def __enter__(self): return self.ppe def __exit__(self, type, value, traceback): self.shutdown()
[docs] def _process_init(self, url_string): pid = os.getpid() pool_nr = multiprocessing.current_process()._identity[0] log.PREFIX = f"Pool{pool_nr}: " log.Info(f"Staring pool process with pid: {pid}") global pool_backend pool_backend = backend.get_backend(url_string)
[docs] def command(self, func_name, args=(), kwds={}): """ run function in a pool of independent processes. Call function by name. func_name: name of the backend method to execute args: positional arguments for the method kwds: key/value arguments for the method Returns a unique ID for each command, increasing int """ self._track_id += 1 self._command_queue.append( ( self._track_id, self.ppe.submit( track_cmd, self._track_id, func_name, *args, ), ) ) self.cmd_status[self._track_id] = self.CmdStatus(function_name=func_name, args=args, kwargs=kwds) return self._track_id
[docs] def collect_results(self) -> Tuple[int, Dict]: """ collect results from commands finished since last run of this method return: number of commands in the queue, dictionary of [track_id, result] """ results: Dict[int, TrackRecord] = {} for _ in range(len(self._command_queue)): # iterate though deque try: # exceptions should be catch in track_cmd(), otherwise it get raised here self._command_queue[0][1].exception(timeout=0) if self._command_queue[0][1].done(): id, async_result = self._command_queue.popleft() track_rcrd = async_result.result(timeout=0) if isinstance(track_rcrd.result, (Exception)): exception_str = f"{''.join(track_rcrd.trace_back)}\n{track_rcrd.result}" log.Debug(f"Exception thrown in pool: \n{exception_str}") if hasattr(track_rcrd.result, "code"): log.FatalError( f"Exception {track_rcrd.result.__class__.__name__} in background " f"pool {track_rcrd.log_prefix}. " "For trace back set loglevel to DEBUG and check output for given pool.", code=track_rcrd.result.code, # type: ignore ) else: raise track_rcrd.result results[track_rcrd.track_id] = track_rcrd else: # shift to next command result self._command_queue.rotate(-1) except (concurrent.futures.TimeoutError, concurrent.futures.CancelledError): self._command_queue.rotate(-1) self.all_results.extend(results.values()) for id, tr in results.items(): self.cmd_status[id].trk_rcd = tr self.cmd_status[id].done = True return (len(self._command_queue), results)
[docs] def get_stats(self, last_index=None): vals = [x.get_runtime().total_seconds() for x in self.all_results[:last_index]] count = len(vals) if count > 0: avg_time = sum(vals) / count max_time = max(vals) min_time = min(vals) else: avg_time = max_time = min_time = -1 log.Debug(f"count: {count}, avg: {avg_time}, max: {max_time}, min: {min_time}") return {"count": count, "avg": avg_time, "max": max_time, "min": min_time}
[docs] def shutdown(self, *args): self.ppe.shutdown(*args)
# code to run/test the pool independent, not relevant for duplicity if __name__ == "__main__": from duplicity import config, log log.setup() log.add_file("/tmp/tmp.log") log.setverbosity(log.DEBUG) backend.import_backends() config.async_concurrency = multiprocessing.cpu_count() config.num_retries = 2 url = "file:///tmp/back4" bw = backend.get_backend(url) # ^^^^^^^^^^ above commands are only there for moking a duplicity config start_time = time.time() bpw = BackendPool(url, processes=config.async_concurrency) results = {} with bpw as pool: # issue tasks into the process pool import pathlib if len(sys.argv) > 1: src = sys.argv[1] else: src = "./" for file in [file for file in pathlib.Path(src).iterdir() if file.is_file()]: source_path = path.Path(file.as_posix()) bpw.command(bw.put_validated, args=(source_path, source_path.get_filename())) # type: ignore commands_left, cmd_results = bpw.collect_results() log.Info(f"got: {len(cmd_results)}, cmd left: {commands_left}, track_id: {bpw._track_id}") results.update(cmd_results) # wait for tasks to complete while True: commands_left, cmd_results = bpw.collect_results() log.Info( f"got: {len(cmd_results)}, " "precessed {len(results)} cmd left: {commands_left}, track_id: {bpw._track_id}" ) results.update(cmd_results) if commands_left == 0: break bpw.get_stats(last_index=-1) # process pool is closed automatically log.Notice(f"Bytes written: {sum([x.result for x in results.values()])}") log.Notice(f"Time elapsed: {time.time() - start_time}")