Coverage for enderchest/sync/file.py: 95%
121 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-05-04 01:41 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-05-04 01:41 +0000
1"""shutil-based sync implementation"""
3import fnmatch
4import logging
5import os
6import shutil
7import stat
8from pathlib import Path
9from typing import Callable, Collection
10from urllib.parse import ParseResult
12from . import (
13 SYNC_LOGGER,
14 Op,
15 abspath_from_uri,
16 diff,
17 filter_contents,
18 generate_sync_report,
19 is_identical,
20)
23def get_contents(path: Path) -> list[tuple[Path, os.stat_result]]:
24 """Recursively list the contents of a local directory
26 Parameters
27 ----------
28 path : Path
29 The path to scan
31 Returns
32 -------
33 list of filenames and attributes
34 The attributes of all files, folders and symlinks found under the
35 specified path
37 Notes
38 -----
39 - This list will be sorted from shortest path to longest (so that parent
40 directories come before their children)
41 - The paths returned are all relative to the provided path
42 """
43 SYNC_LOGGER.debug(f"Getting contents of {path}")
44 return sorted(
45 ((p.relative_to(path), p.lstat()) for p in path.rglob("**/*")),
46 key=lambda x: len(str(x[0])),
47 )
50def copy(
51 source_path: Path,
52 destination_folder: Path,
53 exclude: Collection[str],
54 delete: bool,
55 dry_run: bool,
56) -> None:
57 """Copy the specified source file or folder to the provided destination,
58 overwriting any existing files and deleting any that weren't in the source
60 Parameters
61 ----------
62 source_path : ParseResult
63 The file or folder to copy
64 destination_folder : Path
65 The destination to put the source file(s)
66 exclude : list of str
67 Any patterns that should be excluded from the sync (and sync)
68 delete : bool
69 Whether part of the syncing should include deleting files at the destination
70 that aren't at the source.
71 dry_run : bool
72 Whether to only simulate this sync (report the operations to be performed
73 but not actually perform them)
75 Notes
76 -----
77 If the source file does not exist, the destination file will simply be deleted
78 (if it exists)
79 """
81 ignore = ignore_patterns(*exclude)
82 SYNC_LOGGER.debug(f"Ignoring patterns: {exclude}")
84 destination_path = destination_folder / source_path.name
85 if destination_path.is_symlink() and not destination_path.is_dir():
86 SYNC_LOGGER.warning("Removing symlink %s", destination_path)
87 if not dry_run:
88 destination_path.unlink()
89 else:
90 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path)
91 return
92 elif destination_path.exists() and not destination_path.is_dir():
93 SYNC_LOGGER.warning("Deleting file %s", destination_path)
94 if not dry_run:
95 destination_path.unlink()
96 else:
97 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path)
98 return
99 else:
100 if not dry_run:
101 destination_folder.mkdir(parents=True, exist_ok=True)
103 SYNC_LOGGER.debug(f"Copying {source_path} into {destination_folder}")
105 if source_path.exists() and not source_path.is_dir():
106 if destination_path.exists() and is_identical(
107 source_path.stat(), destination_path.stat()
108 ):
109 SYNC_LOGGER.warning(
110 "%s and %s are identical. No copy needed.",
111 source_path,
112 destination_path,
113 )
114 return
115 SYNC_LOGGER.debug(
116 "Copying file %s to %s",
117 source_path,
118 destination_path,
119 )
120 if not dry_run:
121 shutil.copy2(source_path, destination_path, follow_symlinks=False)
122 return
124 source_contents = filter_contents(
125 get_contents(source_path), exclude, prefix=source_path
126 )
127 destination_contents = filter_contents(
128 get_contents(destination_path), exclude, prefix=destination_path
129 )
131 sync_diff = diff(source_contents, destination_contents)
133 if dry_run:
134 generate_sync_report(sync_diff)
135 return
137 for path, path_stat, operation in sync_diff:
138 match (operation, stat.S_ISDIR(path_stat.st_mode or 0)):
139 case (Op.CREATE, True):
140 SYNC_LOGGER.debug("Creating directory %s", destination_path / path)
141 (destination_path / path).mkdir(parents=True, exist_ok=True)
142 case (Op.CREATE, False) | (Op.REPLACE, False):
143 SYNC_LOGGER.debug(
144 "Copying file %s to %s",
145 source_path / path,
146 destination_path / path,
147 )
148 (destination_path / path).unlink(missing_ok=True)
149 if (source_path / path).is_symlink():
150 (destination_path / path).symlink_to(
151 (source_path / path).readlink()
152 )
153 else:
154 shutil.copy2(
155 source_path / path,
156 destination_path / path,
157 follow_symlinks=False,
158 )
159 case (Op.REPLACE, True):
160 # this would be replacing a file with a directory
161 SYNC_LOGGER.debug("Deleting file %s", destination_path / path)
162 (destination_path / path).unlink()
163 SYNC_LOGGER.debug(
164 "Copying directory %s to %s",
165 source_path / path,
166 destination_path / path,
167 )
168 shutil.copytree(
169 source_path / path,
170 destination_path / path,
171 symlinks=True,
172 ignore=ignore,
173 dirs_exist_ok=True,
174 )
175 case (Op.DELETE, True):
176 # recall that for deletions, it's the *destination's* stats
177 if delete:
178 clean(destination_path / path, ignore, dry_run)
179 case (Op.DELETE, False):
180 if delete:
181 SYNC_LOGGER.debug("Deleting file %s", destination_path / path)
182 (destination_path / path).unlink()
183 case op, is_dir: # pragma: no cover
184 raise NotImplementedError(
185 f"Don't know how to handle {op} of {'directory' if is_dir else 'file'}"
186 )
189def clean(
190 root: Path,
191 ignore: Callable[[str, Collection[str]], set[str]],
192 dry_run: bool,
193) -> None:
194 """Recursively remove all files and symlinks from the root path while
195 respecting the provided ignore pattern
197 Parameters
198 ----------
199 root : Path
200 The root directory. And this should absolutely be a directory.
201 ignore : Callable
202 The ignore pattern created by `ignore_pattern` that specifies
203 which files to ignore.
204 dry_run : bool
205 Whether to only simulate this sync (report the operations to be performed
206 but not actually perform them)
207 """
208 log_level = logging.INFO if dry_run else logging.DEBUG
209 contents = list(root.iterdir())
210 ignore_me = ignore(
211 os.fspath(root),
212 [path.name for path in contents],
213 )
215 for path in contents:
216 if path.name in ignore_me:
217 SYNC_LOGGER.debug(f"Skipping {path}")
218 continue
219 if path.is_symlink():
220 SYNC_LOGGER.log(log_level, f"Removing symlink {path}")
221 if not dry_run:
222 path.unlink()
223 elif path.is_dir():
224 clean(path, ignore, dry_run)
225 else:
226 SYNC_LOGGER.log(log_level, f"Deleting {path}")
227 if not dry_run:
228 path.unlink()
230 # check if folder is now empty
231 if not list(root.iterdir()):
232 SYNC_LOGGER.log(log_level, f"Removing empty {root}")
233 if not dry_run:
234 root.rmdir()
237def ignore_patterns(*patterns: str) -> Callable[[str, Collection[str]], set[str]]:
238 """shutil.ignore_patterns doesn't support checking absolute paths,
239 so we gotta roll our own.
241 This implementation is adapted from
242 https://github.com/python/cpython/blob/3.11/Lib/shutil.py#L440-L450 and
243 https://stackoverflow.com/a/7842224
245 Parameters
246 ----------
247 *patterns : str
248 The patterns to match
250 Returns
251 -------
252 Callable
253 An "ignore" filter suitable for use in `shutil.copytree`
254 """
256 def _ignore_patterns(path: str, names: Collection[str]) -> set[str]:
257 ignored_names: set[str] = set()
258 for pattern in patterns:
259 path_parts: list[str] = os.path.normpath(path).split(os.sep)
260 pattern_depth = len(os.path.normpath(pattern).split(os.sep)) - 1
261 if pattern_depth == 0:
262 match_paths: Collection[str] = names
263 else:
264 match_paths = [
265 os.path.join(*path_parts[-pattern_depth:], name) for name in names
266 ]
267 ignored_names.update(
268 os.path.split(match)[-1]
269 for match in fnmatch.filter(match_paths, pattern)
270 )
271 return ignored_names
273 return _ignore_patterns
276def pull(
277 remote_uri: ParseResult,
278 local_path: Path,
279 exclude: Collection[str],
280 dry_run: bool,
281 delete: bool = True,
282 **unsupported_kwargs,
283) -> None:
284 """Copy an upstream file or folder into the specified location, where the remote
285 is another folder on this machine. This will overwrite any files and folders
286 already at the destination.
288 Parameters
289 ----------
290 remote_uri : ParseResult
291 The URI for the remote resource to copy from. See notes.
292 local_path : Path
293 The destination folder
294 exclude : list of str
295 Any patterns that should be excluded from the sync
296 dry_run : bool
297 Whether to only simulate this sync (report the operations to be performed
298 but not actually perform them)
299 delete : bool, optional
300 Whether part of the syncing should include deleting files at the destination
301 that aren't at the source. Default is True.
302 **unsupported_kwargs
303 Any other provided options will be ignored
305 Raises
306 ------
307 FileNotFoundError
308 If the destination folder does not exist
310 Notes
311 -----
312 - This method is only meant to be used for local files specified using
313 the file:// protocol, but it does not perform any validation on the URI to
314 ensure that the schema is correct or that the hostname corresponds to this
315 machine. This method does not support user authentication
316 (running the copy as a different user).
317 - If the destination folder does not already exist, this method will not
318 create it or its parent directories.
319 """
320 source_path = abspath_from_uri(remote_uri).expanduser()
321 destination_folder = local_path
323 if not destination_folder.exists():
324 raise FileNotFoundError(f"{local_path} does not exist")
325 if not source_path.exists():
326 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist")
327 if unsupported_kwargs:
328 SYNC_LOGGER.debug(
329 "The following command-line options are ignored for this protocol:\n%s",
330 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()),
331 )
333 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)
336def push(
337 local_path: Path,
338 remote_uri: ParseResult,
339 exclude: Collection[str],
340 dry_run: bool,
341 delete: bool = True,
342 **unsupported_kwargs,
343) -> None:
344 """Copy a local file or folder into the specified location, where the remote
345 is another folder on this machine. This will overwrite any files and folders
346 already at the destination.
348 Parameters
349 ----------
350 local_path : Path
351 The file or folder to copy
352 remote_uri : ParseResult
353 The URI for the remote location to copy into. See notes.
354 exclude : list of str
355 Any patterns that should be excluded from the sync
356 dry_run : bool
357 Whether to only simulate this sync (report the operations to be performed
358 but not actually perform them)
359 delete : bool, optional
360 Whether part of the syncing should include deleting files at the destination
361 that aren't at the source. Default is True.
362 **unsupported_kwargs
363 Any other provided options will be ignored
365 Raises
366 ------
367 FileNotFoundError
368 If the destination folder does not exist
370 Notes
371 -----
372 - This method is only meant to be used for local files specified using
373 the file:// protocol, but it does not perform any validation on the URI to
374 ensure that the schema is correct or that the hostname corresponds to this
375 machine. This method does not support user authentication
376 (running the copy as a different user).
377 - If the destination folder does not already exist, this method will not
378 create it or its parent directories.
379 """
380 source_path = local_path
381 destination_folder = abspath_from_uri(remote_uri).expanduser()
383 if not destination_folder.exists():
384 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist")
385 if not source_path.exists():
386 raise FileNotFoundError(f"{source_path} does not exist")
387 if unsupported_kwargs:
388 SYNC_LOGGER.debug(
389 "The following command-line options are ignored for this protocol:\n%s",
390 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()),
391 )
393 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)