Coverage for enderchest/sync/utils.py: 100%
74 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-06 16:00 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-06 16:00 +0000
1"""Non-implementation-specific syncing utilities"""
2import fnmatch
3import getpass
4import os
5import socket
6import stat
7from collections import defaultdict
8from enum import Enum, auto
9from pathlib import Path
10from typing import Any, Collection, Generator, Iterable, Protocol, TypeVar
11from urllib.parse import ParseResult, unquote
12from urllib.request import url2pathname
14from ..loggers import SYNC_LOGGER
17def get_default_netloc() -> str:
18 """Compile a netloc from environment variables, etc.
20 Returns
21 -------
22 str
23 The default netloc, which is {user}@{hostname}
24 """
25 return f"{getpass.getuser()}@{socket.gethostname()}".lower()
28def abspath_from_uri(uri: ParseResult) -> Path:
29 """Extract and unquote the path component of a URI to turn it into an
30 unambiguous absolute `pathlib.Path`
32 h/t https://stackoverflow.com/a/61922504
34 Parameters
35 ----------
36 uri : ParseResult
37 The parsed URI to extract the path from
39 Returns
40 -------
41 Path
42 The path part of the URI as a Path
43 """
44 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc)
45 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path)))))
48def uri_to_ssh(uri: ParseResult) -> str:
49 """Convert a URI to an SSH address
51 Parameters
52 ----------
53 uri: ParseResult
54 The URI to convert
56 Returns
57 -------
58 str
59 The SSH-format address
60 """
61 return "{user}{host}:{path}".format(
62 user=f"{uri.username}@" if uri.username else "",
63 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""),
64 path=uri.path,
65 )
68def render_remote(alias: str, uri: ParseResult) -> str:
69 """Render a remote to a descriptive string
71 Parameters
72 ----------
73 alias : str
74 The name of the remote
75 uri : ParseResult
76 The parsed URI for the remote
78 Returns
79 -------
80 str
81 `{uri_string} [({alias})]}`
82 (alias is omitted if it's the same as the URI's hostname)
83 """
84 uri_string = uri.geturl()
86 if uri.hostname != alias:
87 uri_string += f" ({alias})"
88 return uri_string
91class _StatLike(Protocol): # pragma: no cover
92 @property
93 def st_mode(self) -> int | None:
94 ...
96 @property
97 def st_size(self) -> float | None:
98 ...
100 @property
101 def st_mtime(self) -> float | None:
102 ...
105def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool:
106 """Determine if two objects are identical (meaning: skip when syncing)
108 Parameters
109 ----------
110 object_one : os.stat_result or similar
111 The first object to compare
112 object_two : os.stat_result or similar
113 The second object to compare
115 Returns
116 -------
117 bool
118 False if the objects are conclusively different, True otherwise.
120 Notes
121 -----
122 As most implementations of the SFTP protocol do not include the check-file
123 extension, this method is limited in what it can compare. Use with caution.
124 """
125 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0):
126 return False
127 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0):
128 return False
130 if stat.S_ISLNK(object_one.st_mode or 0):
131 # there's no way from the stat to tell if two links have the same target
132 # so err on the side of "nope"
133 return False
135 if stat.S_ISREG(object_one.st_mode or 0):
136 # these comparisons should only be run on files
137 if int(object_one.st_size or 0) != int(object_two.st_size or 0):
138 return False
139 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0):
140 return False
141 return True
144class Operation(Enum):
145 """The recognized sync operations
147 Notes
148 -----
149 There's no `UPDATE` operation because so far this class isn't used by
150 anything that _can_ perform a delta update on a file
151 """
153 CREATE = auto()
154 REPLACE = auto()
155 DELETE = auto()
158PathInfo = TypeVar(
159 "PathInfo",
160 tuple[Path, Any],
161 tuple[str, Any],
162 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]]
163 # but that's not supported until Python 3.11
164)
167def filter_contents(
168 contents: Iterable[PathInfo],
169 exclude: Collection[str],
170 prefix: Path | str | None = None,
171) -> Generator[PathInfo, None, None]:
172 """Apply an exclusion filter to a list of files
174 Parameters
175 ----------
176 contents : list of (Path, ...) tuples
177 The contents to filter
178 exclude : list of str
179 The patterns to exclude
180 prefix : Path, optional
181 If the contents are iterating over a subdirectory, providing the directory
182 as the `prefix` will allow filtering to be performed on the full path.
184 Yields
185 ------
186 (Path, ...) tuples
187 The elements of the provided list, omitting the ones
188 to be excluded
189 """
190 for path_info in contents:
191 if not any(
192 (
193 fnmatch.fnmatch(
194 os.path.normpath(os.path.join(prefix or "", path_info[0])),
195 os.path.join("*", pattern),
196 )
197 for pattern in exclude
198 )
199 ):
200 yield path_info
203def diff(
204 source_files: Iterable[tuple[Path, _StatLike]],
205 destination_files: Iterable[tuple[Path, _StatLike]],
206) -> Generator[tuple[Path, _StatLike, Operation], None, None]:
207 """Compute the "diff" between the source and destination, enumerating
208 all the operations that should be performed so that the destination
209 matches the source
211 Parameters
212 ----------
213 source_files : list of (Path, stat_result) tuples
214 The files and file attributes at the source
215 destination_files : list of (Path, stat_result) tuples
216 The files and file attributes at the destination
218 Returns
219 -------
220 Generator of (Path, stat_result, Operation) tuples
221 The files, their attributes and the operations that should be performed on each file
223 Notes
224 -----
225 - The order of paths returned will match the order provided by the `source_files`
226 except for the deletions, which will all come at the end and will be sorted
227 from longest to shortest path (so that individual files are marked for deletion
228 before their parent folders).
229 - The attributes of each path will correspond to the *source* attributes for
230 creations and replacements and to the *destination* attributes for the deletions
231 """
232 destination_lookup: dict[Path, _StatLike] = dict(destination_files)
233 for file, source_stat in source_files:
234 if file not in destination_lookup:
235 yield file, source_stat, Operation.CREATE
236 else:
237 destination_stat = destination_lookup.pop(file)
238 if not is_identical(source_stat, destination_stat):
239 yield file, source_stat, Operation.REPLACE
240 # else: continue
242 for file, destination_stat in sorted(
243 destination_lookup.items(), key=lambda x: -len(str(x[0]))
244 ):
245 yield file, destination_stat, Operation.DELETE
248def generate_sync_report(
249 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2
250) -> None:
251 """Compile a high-level summary of the outcome of the `diff` method
252 and report it to the logging.INFO level
254 Parameters
255 ----------
256 content_diff : list of (Path, Operation) tuples
257 The files and the operations that are to be performed on each file, as
258 generated by the `diff` method
259 depth : int, optional
260 How many directories to go down from the root to generate the summary.
261 Default is 2 (just report on top-level files and folders within the
262 source folder).
264 Returns
265 -------
266 None
267 """
268 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict(
269 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0}
270 )
272 for full_path, path_stat, operation in content_diff:
273 try:
274 path_key = full_path.parents[-depth]
275 except IndexError: # then this doesn't go in a subdirectory
276 summary[full_path] = operation
277 continue
279 entry = summary[path_key]
280 if isinstance(entry, Operation):
281 # then this is described by the top-level op
282 continue
283 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0):
284 # don't count folder creations
285 continue
287 entry[operation] += 1
289 for path_key, report in sorted(summary.items()):
290 if isinstance(report, Operation):
291 # nice that these verbs follow the same pattern
292 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}")
293 else:
294 SYNC_LOGGER.info(
295 f"Within {path_key}...\n%s",
296 "\n".join(
297 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}"
298 for op, count in report.items()
299 ),
300 )