Coverage for enderchest/sync/utils.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-28 20:32 +0000

1"""Non-implementation-specific syncing utilities""" 

2 

3import fnmatch 

4import getpass 

5import os 

6import socket 

7import stat 

8from collections import defaultdict 

9from collections.abc import Collection, Generator, Iterable 

10from enum import Enum, auto 

11from pathlib import Path 

12from typing import Any, Protocol, TypeVar 

13from urllib.parse import ParseResult, unquote 

14from urllib.request import url2pathname 

15 

16from ..loggers import SYNC_LOGGER 

17 

18 

19def get_default_netloc() -> str: 

20 """Compile a netloc from environment variables, etc. 

21 

22 Returns 

23 ------- 

24 str 

25 The default netloc, which is {user}@{hostname} 

26 """ 

27 return f"{getpass.getuser()}@{socket.gethostname()}".lower() 

28 

29 

30def abspath_from_uri(uri: ParseResult) -> Path: 

31 """Extract and unquote the path component of a URI to turn it into an 

32 unambiguous absolute `pathlib.Path` 

33 

34 h/t https://stackoverflow.com/a/61922504 

35 

36 Parameters 

37 ---------- 

38 uri : ParseResult 

39 The parsed URI to extract the path from 

40 

41 Returns 

42 ------- 

43 Path 

44 The path part of the URI as a Path 

45 """ 

46 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc) 

47 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path))))) 

48 

49 

50def uri_to_ssh(uri: ParseResult) -> str: 

51 """Convert a URI to an SSH address 

52 

53 Parameters 

54 ---------- 

55 uri: ParseResult 

56 The URI to convert 

57 

58 Returns 

59 ------- 

60 str 

61 The SSH-format address 

62 """ 

63 return "{user}{host}:{path}".format( 

64 user=f"{uri.username}@" if uri.username else "", 

65 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""), 

66 path=uri.path, 

67 ) 

68 

69 

70def render_remote(alias: str, uri: ParseResult) -> str: 

71 """Render a remote to a descriptive string 

72 

73 Parameters 

74 ---------- 

75 alias : str 

76 The name of the remote 

77 uri : ParseResult 

78 The parsed URI for the remote 

79 

80 Returns 

81 ------- 

82 str 

83 `{uri_string} [({alias})]}` 

84 (alias is omitted if it's the same as the URI's hostname) 

85 """ 

86 uri_string = uri.geturl() 

87 

88 if uri.hostname != alias: 

89 uri_string += f" ({alias})" 

90 return uri_string 

91 

92 

93class _StatLike(Protocol): # pragma: no cover 

94 @property 

95 def st_mode(self) -> int | None: ... 

96 

97 @property 

98 def st_size(self) -> float | None: ... 

99 

100 @property 

101 def st_mtime(self) -> float | None: ... 

102 

103 

104def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool: 

105 """Determine if two objects are identical (meaning: skip when syncing) 

106 

107 Parameters 

108 ---------- 

109 object_one : os.stat_result or similar 

110 The first object to compare 

111 object_two : os.stat_result or similar 

112 The second object to compare 

113 

114 Returns 

115 ------- 

116 bool 

117 False if the objects are conclusively different, True otherwise. 

118 

119 Notes 

120 ----- 

121 As most implementations of the SFTP protocol do not include the check-file 

122 extension, this method is limited in what it can compare. Use with caution. 

123 """ 

124 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0): 

125 return False 

126 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0): 

127 return False 

128 

129 if stat.S_ISLNK(object_one.st_mode or 0): 

130 # there's no way from the stat to tell if two links have the same target 

131 # so err on the side of "nope" 

132 return False 

133 

134 if stat.S_ISREG(object_one.st_mode or 0): 

135 # these comparisons should only be run on files 

136 if int(object_one.st_size or 0) != int(object_two.st_size or 0): 

137 return False 

138 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0): 

139 return False 

140 return True 

141 

142 

143class Operation(Enum): 

144 """The recognized sync operations 

145 

146 Notes 

147 ----- 

148 There's no `UPDATE` operation because so far this class isn't used by 

149 anything that _can_ perform a delta update on a file 

150 """ 

151 

152 CREATE = auto() 

153 REPLACE = auto() 

154 DELETE = auto() 

155 

156 

157PathInfo = TypeVar( 

158 "PathInfo", 

159 tuple[Path, Any], 

160 tuple[str, Any], 

161 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]] 

162 # but that's not supported until Python 3.11 

163) 

164 

165 

166def filter_contents( 

167 contents: Iterable[PathInfo], 

168 exclude: Collection[str], 

169 prefix: Path | str | None = None, 

170) -> Generator[PathInfo, None, None]: 

171 """Apply an exclusion filter to a list of files 

172 

173 Parameters 

174 ---------- 

175 contents : list of (Path, ...) tuples 

176 The contents to filter 

177 exclude : list of str 

178 The patterns to exclude 

179 prefix : Path, optional 

180 If the contents are iterating over a subdirectory, providing the directory 

181 as the `prefix` will allow filtering to be performed on the full path. 

182 

183 Yields 

184 ------ 

185 (Path, ...) tuples 

186 The elements of the provided list, omitting the ones 

187 to be excluded 

188 """ 

189 for path_info in contents: 

190 if not any( 

191 ( 

192 fnmatch.fnmatch( 

193 os.path.normpath( 

194 os.path.join(prefix or "", path_info[0], "placeholder") 

195 ), 

196 os.path.join("*", pattern, "*"), 

197 ) 

198 for pattern in exclude 

199 ) 

200 ): 

201 yield path_info 

202 

203 

204def diff( 

205 source_files: Iterable[tuple[Path, _StatLike]], 

206 destination_files: Iterable[tuple[Path, _StatLike]], 

207) -> Generator[tuple[Path, _StatLike, Operation], None, None]: 

208 """Compute the "diff" between the source and destination, enumerating 

209 all the operations that should be performed so that the destination 

210 matches the source 

211 

212 Parameters 

213 ---------- 

214 source_files : list of (Path, stat_result) tuples 

215 The files and file attributes at the source 

216 destination_files : list of (Path, stat_result) tuples 

217 The files and file attributes at the destination 

218 

219 Returns 

220 ------- 

221 Generator of (Path, stat_result, Operation) tuples 

222 The files, their attributes and the operations that should be performed on each file 

223 

224 Notes 

225 ----- 

226 - The order of paths returned will match the order provided by the `source_files` 

227 except for the deletions, which will all come at the end and will be sorted 

228 from longest to shortest path (so that individual files are marked for deletion 

229 before their parent folders). 

230 - The attributes of each path will correspond to the *source* attributes for 

231 creations and replacements and to the *destination* attributes for the deletions 

232 """ 

233 destination_lookup: dict[Path, _StatLike] = dict(destination_files) 

234 for file, source_stat in source_files: 

235 if file not in destination_lookup: 

236 yield file, source_stat, Operation.CREATE 

237 else: 

238 destination_stat = destination_lookup.pop(file) 

239 if not is_identical(source_stat, destination_stat): 

240 yield file, source_stat, Operation.REPLACE 

241 # else: continue 

242 

243 for file, destination_stat in sorted( 

244 destination_lookup.items(), key=lambda x: -len(str(x[0])) 

245 ): 

246 yield file, destination_stat, Operation.DELETE 

247 

248 

249def generate_sync_report( 

250 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2 

251) -> None: 

252 """Compile a high-level summary of the outcome of the `diff` method 

253 and report it to the logging.INFO level 

254 

255 Parameters 

256 ---------- 

257 content_diff : list of (Path, Operation) tuples 

258 The files and the operations that are to be performed on each file, as 

259 generated by the `diff` method 

260 depth : int, optional 

261 How many directories to go down from the root to generate the summary. 

262 Default is 2 (just report on top-level files and folders within the 

263 source folder). 

264 

265 Returns 

266 ------- 

267 None 

268 """ 

269 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict( 

270 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0} 

271 ) 

272 

273 for full_path, path_stat, operation in content_diff: 

274 try: 

275 path_key = full_path.parents[-depth] 

276 except IndexError: # then this doesn't go in a subdirectory 

277 summary[full_path] = operation 

278 continue 

279 

280 entry = summary[path_key] 

281 if isinstance(entry, Operation): 

282 # then this is described by the top-level op 

283 continue 

284 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0): 

285 # don't count folder creations 

286 continue 

287 

288 entry[operation] += 1 

289 

290 for path_key, report in sorted(summary.items()): 

291 if isinstance(report, Operation): 

292 # nice that these verbs follow the same pattern 

293 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}") 

294 else: 

295 SYNC_LOGGER.info( 

296 f"Within {path_key}...\n%s", 

297 "\n".join( 

298 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}" 

299 for op, count in report.items() 

300 ), 

301 )