Coverage for enderchest/sync/file.py: 95%
121 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-06 16:00 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-06 16:00 +0000
1"""shutil-based sync implementation"""
2import fnmatch
3import logging
4import os
5import shutil
6import stat
7from pathlib import Path
8from typing import Callable, Collection
9from urllib.parse import ParseResult
11from . import (
12 SYNC_LOGGER,
13 Op,
14 abspath_from_uri,
15 diff,
16 filter_contents,
17 generate_sync_report,
18 is_identical,
19)
22def get_contents(path: Path) -> list[tuple[Path, os.stat_result]]:
23 """Recursively list the contents of a local directory
25 Parameters
26 ----------
27 path : Path
28 The path to scan
30 Returns
31 -------
32 list of filenames and attributes
33 The attributes of all files, folders and symlinks found under the
34 specified path
36 Notes
37 -----
38 - This list will be sorted from shortest path to longest (so that parent
39 directories come before their children)
40 - The paths returned are all relative to the provided path
41 """
42 SYNC_LOGGER.debug(f"Getting contents of {path}")
43 return sorted(
44 ((p.relative_to(path), p.lstat()) for p in path.rglob("**/*")),
45 key=lambda x: len(str(x[0])),
46 )
49def copy(
50 source_path: Path,
51 destination_folder: Path,
52 exclude: Collection[str],
53 delete: bool,
54 dry_run: bool,
55) -> None:
56 """Copy the specified source file or folder to the provided destination,
57 overwriting any existing files and deleting any that weren't in the source
59 Parameters
60 ----------
61 source_path : ParseResult
62 The file or folder to copy
63 destination_folder : Path
64 The destination to put the source file(s)
65 exclude : list of str
66 Any patterns that should be excluded from the sync (and sync)
67 delete : bool
68 Whether part of the syncing should include deleting files at the destination
69 that aren't at the source.
70 dry_run : bool
71 Whether to only simulate this sync (report the operations to be performed
72 but not actually perform them)
74 Notes
75 -----
76 If the source file does not exist, the destination file will simply be deleted
77 (if it exists)
78 """
80 ignore = ignore_patterns(*exclude)
81 SYNC_LOGGER.debug(f"Ignoring patterns: {exclude}")
83 destination_path = destination_folder / source_path.name
84 if destination_path.is_symlink() and not destination_path.is_dir():
85 SYNC_LOGGER.warning("Removing symlink %s", destination_path)
86 if not dry_run:
87 destination_path.unlink()
88 else:
89 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path)
90 return
91 elif destination_path.exists() and not destination_path.is_dir():
92 SYNC_LOGGER.warning("Deleting file %s", destination_path)
93 if not dry_run:
94 destination_path.unlink()
95 else:
96 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path)
97 return
98 else:
99 if not dry_run:
100 destination_folder.mkdir(parents=True, exist_ok=True)
102 SYNC_LOGGER.debug(f"Copying {source_path} into {destination_folder}")
104 if source_path.exists() and not source_path.is_dir():
105 if destination_path.exists() and is_identical(
106 source_path.stat(), destination_path.stat()
107 ):
108 SYNC_LOGGER.warning(
109 "%s and %s are identical. No copy needed.",
110 source_path,
111 destination_path,
112 )
113 return
114 SYNC_LOGGER.debug(
115 "Copying file %s to %s",
116 source_path,
117 destination_path,
118 )
119 if not dry_run:
120 shutil.copy2(source_path, destination_path, follow_symlinks=False)
121 return
123 source_contents = filter_contents(
124 get_contents(source_path), exclude, prefix=source_path
125 )
126 destination_contents = filter_contents(
127 get_contents(destination_path), exclude, prefix=destination_path
128 )
130 sync_diff = diff(source_contents, destination_contents)
132 if dry_run:
133 generate_sync_report(sync_diff)
134 return
136 for path, path_stat, operation in sync_diff:
137 match (operation, stat.S_ISDIR(path_stat.st_mode or 0)):
138 case (Op.CREATE, True):
139 SYNC_LOGGER.debug("Creating directory %s", destination_path / path)
140 (destination_path / path).mkdir(parents=True, exist_ok=True)
141 case (Op.CREATE, False) | (Op.REPLACE, False):
142 SYNC_LOGGER.debug(
143 "Copying file %s to %s",
144 source_path / path,
145 destination_path / path,
146 )
147 (destination_path / path).unlink(missing_ok=True)
148 if (source_path / path).is_symlink():
149 (destination_path / path).symlink_to(
150 (source_path / path).readlink()
151 )
152 else:
153 shutil.copy2(
154 source_path / path,
155 destination_path / path,
156 follow_symlinks=False,
157 )
158 case (Op.REPLACE, True):
159 # this would be replacing a file with a directory
160 SYNC_LOGGER.debug("Deleting file %s", destination_path / path)
161 (destination_path / path).unlink()
162 SYNC_LOGGER.debug(
163 "Copying directory %s to %s",
164 source_path / path,
165 destination_path / path,
166 )
167 shutil.copytree(
168 source_path / path,
169 destination_path / path,
170 symlinks=True,
171 ignore=ignore,
172 dirs_exist_ok=True,
173 )
174 case (Op.DELETE, True):
175 # recall that for deletions, it's the *destination's* stats
176 if delete:
177 clean(destination_path / path, ignore, dry_run)
178 case (Op.DELETE, False):
179 if delete:
180 SYNC_LOGGER.debug("Deleting file %s", destination_path / path)
181 (destination_path / path).unlink()
182 case op, is_dir: # pragma: no cover
183 raise NotImplementedError(
184 f"Don't know how to handle {op} of {'directory' if is_dir else 'file'}"
185 )
188def clean(
189 root: Path,
190 ignore: Callable[[str, Collection[str]], set[str]],
191 dry_run: bool,
192) -> None:
193 """Recursively remove all files and symlinks from the root path while
194 respecting the provided ignore pattern
196 Parameters
197 ----------
198 root : Path
199 The root directory. And this should absolutely be a directory.
200 ignore : Callable
201 The ignore pattern created by `ignore_pattern` that specifies
202 which files to ignore.
203 dry_run : bool
204 Whether to only simulate this sync (report the operations to be performed
205 but not actually perform them)
206 """
207 log_level = logging.INFO if dry_run else logging.DEBUG
208 contents = list(root.iterdir())
209 ignore_me = ignore(
210 os.fspath(root),
211 [path.name for path in contents],
212 )
214 for path in contents:
215 if path.name in ignore_me:
216 SYNC_LOGGER.debug(f"Skipping {path}")
217 continue
218 if path.is_symlink():
219 SYNC_LOGGER.log(log_level, f"Removing symlink {path}")
220 if not dry_run:
221 path.unlink()
222 elif path.is_dir():
223 clean(path, ignore, dry_run)
224 else:
225 SYNC_LOGGER.log(log_level, f"Deleting {path}")
226 if not dry_run:
227 path.unlink()
229 # check if folder is now empty
230 if not list(root.iterdir()):
231 SYNC_LOGGER.log(log_level, f"Removing empty {root}")
232 if not dry_run:
233 root.rmdir()
236def ignore_patterns(*patterns: str) -> Callable[[str, Collection[str]], set[str]]:
237 """shutil.ignore_patterns doesn't support checking absolute paths,
238 so we gotta roll our own.
240 This implementation is adapted from
241 https://github.com/python/cpython/blob/3.11/Lib/shutil.py#L440-L450 and
242 https://stackoverflow.com/a/7842224
244 Parameters
245 ----------
246 *patterns : str
247 The patterns to match
249 Returns
250 -------
251 Callable
252 An "ignore" filter suitable for use in `shutil.copytree`
253 """
255 def _ignore_patterns(path: str, names: Collection[str]) -> set[str]:
256 ignored_names: set[str] = set()
257 for pattern in patterns:
258 path_parts: list[str] = os.path.normpath(path).split(os.sep)
259 pattern_depth = len(os.path.normpath(pattern).split(os.sep)) - 1
260 if pattern_depth == 0:
261 match_paths: Collection[str] = names
262 else:
263 match_paths = [
264 os.path.join(*path_parts[-pattern_depth:], name) for name in names
265 ]
266 ignored_names.update(
267 os.path.split(match)[-1]
268 for match in fnmatch.filter(match_paths, pattern)
269 )
270 return ignored_names
272 return _ignore_patterns
275def pull(
276 remote_uri: ParseResult,
277 local_path: Path,
278 exclude: Collection[str],
279 dry_run: bool,
280 delete: bool = True,
281 **unsupported_kwargs,
282) -> None:
283 """Copy an upstream file or folder into the specified location, where the remote
284 is another folder on this machine. This will overwrite any files and folders
285 already at the destination.
287 Parameters
288 ----------
289 remote_uri : ParseResult
290 The URI for the remote resource to copy from. See notes.
291 local_path : Path
292 The destination folder
293 exclude : list of str
294 Any patterns that should be excluded from the sync
295 dry_run : bool
296 Whether to only simulate this sync (report the operations to be performed
297 but not actually perform them)
298 delete : bool, optional
299 Whether part of the syncing should include deleting files at the destination
300 that aren't at the source. Default is True.
301 **unsupported_kwargs
302 Any other provided options will be ignored
304 Raises
305 ------
306 FileNotFoundError
307 If the destination folder does not exist
309 Notes
310 -----
311 - This method is only meant to be used for local files specified using
312 the file:// protocol, but it does not perform any validation on the URI to
313 ensure that the schema is correct or that the hostname corresponds to this
314 machine. This method does not support user authentication
315 (running the copy as a different user).
316 - If the destination folder does not already exist, this method will not
317 create it or its parent directories.
318 """
319 source_path = abspath_from_uri(remote_uri).expanduser()
320 destination_folder = local_path
322 if not destination_folder.exists():
323 raise FileNotFoundError(f"{local_path} does not exist")
324 if not source_path.exists():
325 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist")
326 if unsupported_kwargs:
327 SYNC_LOGGER.debug(
328 "The following command-line options are ignored for this protocol:\n%s",
329 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()),
330 )
332 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)
335def push(
336 local_path: Path,
337 remote_uri: ParseResult,
338 exclude: Collection[str],
339 dry_run: bool,
340 delete: bool = True,
341 **unsupported_kwargs,
342) -> None:
343 """Copy a local file or folder into the specified location, where the remote
344 is another folder on this machine. This will overwrite any files and folders
345 already at the destination.
347 Parameters
348 ----------
349 local_path : Path
350 The file or folder to copy
351 remote_uri : ParseResult
352 The URI for the remote location to copy into. See notes.
353 exclude : list of str
354 Any patterns that should be excluded from the sync
355 dry_run : bool
356 Whether to only simulate this sync (report the operations to be performed
357 but not actually perform them)
358 delete : bool, optional
359 Whether part of the syncing should include deleting files at the destination
360 that aren't at the source. Default is True.
361 **unsupported_kwargs
362 Any other provided options will be ignored
364 Raises
365 ------
366 FileNotFoundError
367 If the destination folder does not exist
369 Notes
370 -----
371 - This method is only meant to be used for local files specified using
372 the file:// protocol, but it does not perform any validation on the URI to
373 ensure that the schema is correct or that the hostname corresponds to this
374 machine. This method does not support user authentication
375 (running the copy as a different user).
376 - If the destination folder does not already exist, this method will not
377 create it or its parent directories.
378 """
379 source_path = local_path
380 destination_folder = abspath_from_uri(remote_uri).expanduser()
382 if not destination_folder.exists():
383 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist")
384 if not source_path.exists():
385 raise FileNotFoundError(f"{source_path} does not exist")
386 if unsupported_kwargs:
387 SYNC_LOGGER.debug(
388 "The following command-line options are ignored for this protocol:\n%s",
389 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()),
390 )
392 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)