Coverage for enderchest/sync/utils.py: 100%
75 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-28 20:32 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-28 20:32 +0000
1"""Non-implementation-specific syncing utilities"""
3import fnmatch
4import getpass
5import os
6import socket
7import stat
8from collections import defaultdict
9from collections.abc import Collection, Generator, Iterable
10from enum import Enum, auto
11from pathlib import Path
12from typing import Any, Protocol, TypeVar
13from urllib.parse import ParseResult, unquote
14from urllib.request import url2pathname
16from ..loggers import SYNC_LOGGER
19def get_default_netloc() -> str:
20 """Compile a netloc from environment variables, etc.
22 Returns
23 -------
24 str
25 The default netloc, which is {user}@{hostname}
26 """
27 return f"{getpass.getuser()}@{socket.gethostname()}".lower()
30def abspath_from_uri(uri: ParseResult) -> Path:
31 """Extract and unquote the path component of a URI to turn it into an
32 unambiguous absolute `pathlib.Path`
34 h/t https://stackoverflow.com/a/61922504
36 Parameters
37 ----------
38 uri : ParseResult
39 The parsed URI to extract the path from
41 Returns
42 -------
43 Path
44 The path part of the URI as a Path
45 """
46 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc)
47 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path)))))
50def uri_to_ssh(uri: ParseResult) -> str:
51 """Convert a URI to an SSH address
53 Parameters
54 ----------
55 uri: ParseResult
56 The URI to convert
58 Returns
59 -------
60 str
61 The SSH-format address
62 """
63 return "{user}{host}:{path}".format(
64 user=f"{uri.username}@" if uri.username else "",
65 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""),
66 path=uri.path,
67 )
70def render_remote(alias: str, uri: ParseResult) -> str:
71 """Render a remote to a descriptive string
73 Parameters
74 ----------
75 alias : str
76 The name of the remote
77 uri : ParseResult
78 The parsed URI for the remote
80 Returns
81 -------
82 str
83 `{uri_string} [({alias})]}`
84 (alias is omitted if it's the same as the URI's hostname)
85 """
86 uri_string = uri.geturl()
88 if uri.hostname != alias:
89 uri_string += f" ({alias})"
90 return uri_string
93class _StatLike(Protocol): # pragma: no cover
94 @property
95 def st_mode(self) -> int | None: ...
97 @property
98 def st_size(self) -> float | None: ...
100 @property
101 def st_mtime(self) -> float | None: ...
104def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool:
105 """Determine if two objects are identical (meaning: skip when syncing)
107 Parameters
108 ----------
109 object_one : os.stat_result or similar
110 The first object to compare
111 object_two : os.stat_result or similar
112 The second object to compare
114 Returns
115 -------
116 bool
117 False if the objects are conclusively different, True otherwise.
119 Notes
120 -----
121 As most implementations of the SFTP protocol do not include the check-file
122 extension, this method is limited in what it can compare. Use with caution.
123 """
124 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0):
125 return False
126 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0):
127 return False
129 if stat.S_ISLNK(object_one.st_mode or 0):
130 # there's no way from the stat to tell if two links have the same target
131 # so err on the side of "nope"
132 return False
134 if stat.S_ISREG(object_one.st_mode or 0):
135 # these comparisons should only be run on files
136 if int(object_one.st_size or 0) != int(object_two.st_size or 0):
137 return False
138 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0):
139 return False
140 return True
143class Operation(Enum):
144 """The recognized sync operations
146 Notes
147 -----
148 There's no `UPDATE` operation because so far this class isn't used by
149 anything that _can_ perform a delta update on a file
150 """
152 CREATE = auto()
153 REPLACE = auto()
154 DELETE = auto()
157PathInfo = TypeVar(
158 "PathInfo",
159 tuple[Path, Any],
160 tuple[str, Any],
161 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]]
162 # but that's not supported until Python 3.11
163)
166def filter_contents(
167 contents: Iterable[PathInfo],
168 exclude: Collection[str],
169 prefix: Path | str | None = None,
170) -> Generator[PathInfo, None, None]:
171 """Apply an exclusion filter to a list of files
173 Parameters
174 ----------
175 contents : list of (Path, ...) tuples
176 The contents to filter
177 exclude : list of str
178 The patterns to exclude
179 prefix : Path, optional
180 If the contents are iterating over a subdirectory, providing the directory
181 as the `prefix` will allow filtering to be performed on the full path.
183 Yields
184 ------
185 (Path, ...) tuples
186 The elements of the provided list, omitting the ones
187 to be excluded
188 """
189 for path_info in contents:
190 if not any(
191 (
192 fnmatch.fnmatch(
193 os.path.normpath(
194 os.path.join(prefix or "", path_info[0], "placeholder")
195 ),
196 os.path.join("*", pattern, "*"),
197 )
198 for pattern in exclude
199 )
200 ):
201 yield path_info
204def diff(
205 source_files: Iterable[tuple[Path, _StatLike]],
206 destination_files: Iterable[tuple[Path, _StatLike]],
207) -> Generator[tuple[Path, _StatLike, Operation], None, None]:
208 """Compute the "diff" between the source and destination, enumerating
209 all the operations that should be performed so that the destination
210 matches the source
212 Parameters
213 ----------
214 source_files : list of (Path, stat_result) tuples
215 The files and file attributes at the source
216 destination_files : list of (Path, stat_result) tuples
217 The files and file attributes at the destination
219 Returns
220 -------
221 Generator of (Path, stat_result, Operation) tuples
222 The files, their attributes and the operations that should be performed on each file
224 Notes
225 -----
226 - The order of paths returned will match the order provided by the `source_files`
227 except for the deletions, which will all come at the end and will be sorted
228 from longest to shortest path (so that individual files are marked for deletion
229 before their parent folders).
230 - The attributes of each path will correspond to the *source* attributes for
231 creations and replacements and to the *destination* attributes for the deletions
232 """
233 destination_lookup: dict[Path, _StatLike] = dict(destination_files)
234 for file, source_stat in source_files:
235 if file not in destination_lookup:
236 yield file, source_stat, Operation.CREATE
237 else:
238 destination_stat = destination_lookup.pop(file)
239 if not is_identical(source_stat, destination_stat):
240 yield file, source_stat, Operation.REPLACE
241 # else: continue
243 for file, destination_stat in sorted(
244 destination_lookup.items(), key=lambda x: -len(str(x[0]))
245 ):
246 yield file, destination_stat, Operation.DELETE
249def generate_sync_report(
250 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2
251) -> None:
252 """Compile a high-level summary of the outcome of the `diff` method
253 and report it to the logging.INFO level
255 Parameters
256 ----------
257 content_diff : list of (Path, Operation) tuples
258 The files and the operations that are to be performed on each file, as
259 generated by the `diff` method
260 depth : int, optional
261 How many directories to go down from the root to generate the summary.
262 Default is 2 (just report on top-level files and folders within the
263 source folder).
265 Returns
266 -------
267 None
268 """
269 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict(
270 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0}
271 )
273 for full_path, path_stat, operation in content_diff:
274 try:
275 path_key = full_path.parents[-depth]
276 except IndexError: # then this doesn't go in a subdirectory
277 summary[full_path] = operation
278 continue
280 entry = summary[path_key]
281 if isinstance(entry, Operation):
282 # then this is described by the top-level op
283 continue
284 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0):
285 # don't count folder creations
286 continue
288 entry[operation] += 1
290 for path_key, report in sorted(summary.items()):
291 if isinstance(report, Operation):
292 # nice that these verbs follow the same pattern
293 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}")
294 else:
295 SYNC_LOGGER.info(
296 f"Within {path_key}...\n%s",
297 "\n".join(
298 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}"
299 for op, count in report.items()
300 ),
301 )