Coverage for enderchest/sync/utils.py: 100%
74 statements
« prev ^ index » next coverage.py v7.5.0, created at 2024-05-04 01:41 +0000
« prev ^ index » next coverage.py v7.5.0, created at 2024-05-04 01:41 +0000
1"""Non-implementation-specific syncing utilities"""
3import fnmatch
4import getpass
5import os
6import socket
7import stat
8from collections import defaultdict
9from enum import Enum, auto
10from pathlib import Path
11from typing import Any, Collection, Generator, Iterable, Protocol, TypeVar
12from urllib.parse import ParseResult, unquote
13from urllib.request import url2pathname
15from ..loggers import SYNC_LOGGER
18def get_default_netloc() -> str:
19 """Compile a netloc from environment variables, etc.
21 Returns
22 -------
23 str
24 The default netloc, which is {user}@{hostname}
25 """
26 return f"{getpass.getuser()}@{socket.gethostname()}".lower()
29def abspath_from_uri(uri: ParseResult) -> Path:
30 """Extract and unquote the path component of a URI to turn it into an
31 unambiguous absolute `pathlib.Path`
33 h/t https://stackoverflow.com/a/61922504
35 Parameters
36 ----------
37 uri : ParseResult
38 The parsed URI to extract the path from
40 Returns
41 -------
42 Path
43 The path part of the URI as a Path
44 """
45 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc)
46 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path)))))
49def uri_to_ssh(uri: ParseResult) -> str:
50 """Convert a URI to an SSH address
52 Parameters
53 ----------
54 uri: ParseResult
55 The URI to convert
57 Returns
58 -------
59 str
60 The SSH-format address
61 """
62 return "{user}{host}:{path}".format(
63 user=f"{uri.username}@" if uri.username else "",
64 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""),
65 path=uri.path,
66 )
69def render_remote(alias: str, uri: ParseResult) -> str:
70 """Render a remote to a descriptive string
72 Parameters
73 ----------
74 alias : str
75 The name of the remote
76 uri : ParseResult
77 The parsed URI for the remote
79 Returns
80 -------
81 str
82 `{uri_string} [({alias})]}`
83 (alias is omitted if it's the same as the URI's hostname)
84 """
85 uri_string = uri.geturl()
87 if uri.hostname != alias:
88 uri_string += f" ({alias})"
89 return uri_string
92class _StatLike(Protocol): # pragma: no cover
93 @property
94 def st_mode(self) -> int | None: ...
96 @property
97 def st_size(self) -> float | None: ...
99 @property
100 def st_mtime(self) -> float | None: ...
103def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool:
104 """Determine if two objects are identical (meaning: skip when syncing)
106 Parameters
107 ----------
108 object_one : os.stat_result or similar
109 The first object to compare
110 object_two : os.stat_result or similar
111 The second object to compare
113 Returns
114 -------
115 bool
116 False if the objects are conclusively different, True otherwise.
118 Notes
119 -----
120 As most implementations of the SFTP protocol do not include the check-file
121 extension, this method is limited in what it can compare. Use with caution.
122 """
123 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0):
124 return False
125 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0):
126 return False
128 if stat.S_ISLNK(object_one.st_mode or 0):
129 # there's no way from the stat to tell if two links have the same target
130 # so err on the side of "nope"
131 return False
133 if stat.S_ISREG(object_one.st_mode or 0):
134 # these comparisons should only be run on files
135 if int(object_one.st_size or 0) != int(object_two.st_size or 0):
136 return False
137 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0):
138 return False
139 return True
142class Operation(Enum):
143 """The recognized sync operations
145 Notes
146 -----
147 There's no `UPDATE` operation because so far this class isn't used by
148 anything that _can_ perform a delta update on a file
149 """
151 CREATE = auto()
152 REPLACE = auto()
153 DELETE = auto()
156PathInfo = TypeVar(
157 "PathInfo",
158 tuple[Path, Any],
159 tuple[str, Any],
160 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]]
161 # but that's not supported until Python 3.11
162)
165def filter_contents(
166 contents: Iterable[PathInfo],
167 exclude: Collection[str],
168 prefix: Path | str | None = None,
169) -> Generator[PathInfo, None, None]:
170 """Apply an exclusion filter to a list of files
172 Parameters
173 ----------
174 contents : list of (Path, ...) tuples
175 The contents to filter
176 exclude : list of str
177 The patterns to exclude
178 prefix : Path, optional
179 If the contents are iterating over a subdirectory, providing the directory
180 as the `prefix` will allow filtering to be performed on the full path.
182 Yields
183 ------
184 (Path, ...) tuples
185 The elements of the provided list, omitting the ones
186 to be excluded
187 """
188 for path_info in contents:
189 if not any(
190 (
191 fnmatch.fnmatch(
192 os.path.normpath(
193 os.path.join(prefix or "", path_info[0], "placeholder")
194 ),
195 os.path.join("*", pattern, "*"),
196 )
197 for pattern in exclude
198 )
199 ):
200 yield path_info
203def diff(
204 source_files: Iterable[tuple[Path, _StatLike]],
205 destination_files: Iterable[tuple[Path, _StatLike]],
206) -> Generator[tuple[Path, _StatLike, Operation], None, None]:
207 """Compute the "diff" between the source and destination, enumerating
208 all the operations that should be performed so that the destination
209 matches the source
211 Parameters
212 ----------
213 source_files : list of (Path, stat_result) tuples
214 The files and file attributes at the source
215 destination_files : list of (Path, stat_result) tuples
216 The files and file attributes at the destination
218 Returns
219 -------
220 Generator of (Path, stat_result, Operation) tuples
221 The files, their attributes and the operations that should be performed on each file
223 Notes
224 -----
225 - The order of paths returned will match the order provided by the `source_files`
226 except for the deletions, which will all come at the end and will be sorted
227 from longest to shortest path (so that individual files are marked for deletion
228 before their parent folders).
229 - The attributes of each path will correspond to the *source* attributes for
230 creations and replacements and to the *destination* attributes for the deletions
231 """
232 destination_lookup: dict[Path, _StatLike] = dict(destination_files)
233 for file, source_stat in source_files:
234 if file not in destination_lookup:
235 yield file, source_stat, Operation.CREATE
236 else:
237 destination_stat = destination_lookup.pop(file)
238 if not is_identical(source_stat, destination_stat):
239 yield file, source_stat, Operation.REPLACE
240 # else: continue
242 for file, destination_stat in sorted(
243 destination_lookup.items(), key=lambda x: -len(str(x[0]))
244 ):
245 yield file, destination_stat, Operation.DELETE
248def generate_sync_report(
249 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2
250) -> None:
251 """Compile a high-level summary of the outcome of the `diff` method
252 and report it to the logging.INFO level
254 Parameters
255 ----------
256 content_diff : list of (Path, Operation) tuples
257 The files and the operations that are to be performed on each file, as
258 generated by the `diff` method
259 depth : int, optional
260 How many directories to go down from the root to generate the summary.
261 Default is 2 (just report on top-level files and folders within the
262 source folder).
264 Returns
265 -------
266 None
267 """
268 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict(
269 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0}
270 )
272 for full_path, path_stat, operation in content_diff:
273 try:
274 path_key = full_path.parents[-depth]
275 except IndexError: # then this doesn't go in a subdirectory
276 summary[full_path] = operation
277 continue
279 entry = summary[path_key]
280 if isinstance(entry, Operation):
281 # then this is described by the top-level op
282 continue
283 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0):
284 # don't count folder creations
285 continue
287 entry[operation] += 1
289 for path_key, report in sorted(summary.items()):
290 if isinstance(report, Operation):
291 # nice that these verbs follow the same pattern
292 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}")
293 else:
294 SYNC_LOGGER.info(
295 f"Within {path_key}...\n%s",
296 "\n".join(
297 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}"
298 for op, count in report.items()
299 ),
300 )