Coverage for enderchest/sync/utils.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-06 16:00 +0000

1"""Non-implementation-specific syncing utilities""" 

2import fnmatch 

3import getpass 

4import os 

5import socket 

6import stat 

7from collections import defaultdict 

8from enum import Enum, auto 

9from pathlib import Path 

10from typing import Any, Collection, Generator, Iterable, Protocol, TypeVar 

11from urllib.parse import ParseResult, unquote 

12from urllib.request import url2pathname 

13 

14from ..loggers import SYNC_LOGGER 

15 

16 

17def get_default_netloc() -> str: 

18 """Compile a netloc from environment variables, etc. 

19 

20 Returns 

21 ------- 

22 str 

23 The default netloc, which is {user}@{hostname} 

24 """ 

25 return f"{getpass.getuser()}@{socket.gethostname()}".lower() 

26 

27 

28def abspath_from_uri(uri: ParseResult) -> Path: 

29 """Extract and unquote the path component of a URI to turn it into an 

30 unambiguous absolute `pathlib.Path` 

31 

32 h/t https://stackoverflow.com/a/61922504 

33 

34 Parameters 

35 ---------- 

36 uri : ParseResult 

37 The parsed URI to extract the path from 

38 

39 Returns 

40 ------- 

41 Path 

42 The path part of the URI as a Path 

43 """ 

44 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc) 

45 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path))))) 

46 

47 

48def uri_to_ssh(uri: ParseResult) -> str: 

49 """Convert a URI to an SSH address 

50 

51 Parameters 

52 ---------- 

53 uri: ParseResult 

54 The URI to convert 

55 

56 Returns 

57 ------- 

58 str 

59 The SSH-format address 

60 """ 

61 return "{user}{host}:{path}".format( 

62 user=f"{uri.username}@" if uri.username else "", 

63 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""), 

64 path=uri.path, 

65 ) 

66 

67 

68def render_remote(alias: str, uri: ParseResult) -> str: 

69 """Render a remote to a descriptive string 

70 

71 Parameters 

72 ---------- 

73 alias : str 

74 The name of the remote 

75 uri : ParseResult 

76 The parsed URI for the remote 

77 

78 Returns 

79 ------- 

80 str 

81 `{uri_string} [({alias})]}` 

82 (alias is omitted if it's the same as the URI's hostname) 

83 """ 

84 uri_string = uri.geturl() 

85 

86 if uri.hostname != alias: 

87 uri_string += f" ({alias})" 

88 return uri_string 

89 

90 

91class _StatLike(Protocol): # pragma: no cover 

92 @property 

93 def st_mode(self) -> int | None: 

94 ... 

95 

96 @property 

97 def st_size(self) -> float | None: 

98 ... 

99 

100 @property 

101 def st_mtime(self) -> float | None: 

102 ... 

103 

104 

105def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool: 

106 """Determine if two objects are identical (meaning: skip when syncing) 

107 

108 Parameters 

109 ---------- 

110 object_one : os.stat_result or similar 

111 The first object to compare 

112 object_two : os.stat_result or similar 

113 The second object to compare 

114 

115 Returns 

116 ------- 

117 bool 

118 False if the objects are conclusively different, True otherwise. 

119 

120 Notes 

121 ----- 

122 As most implementations of the SFTP protocol do not include the check-file 

123 extension, this method is limited in what it can compare. Use with caution. 

124 """ 

125 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0): 

126 return False 

127 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0): 

128 return False 

129 

130 if stat.S_ISLNK(object_one.st_mode or 0): 

131 # there's no way from the stat to tell if two links have the same target 

132 # so err on the side of "nope" 

133 return False 

134 

135 if stat.S_ISREG(object_one.st_mode or 0): 

136 # these comparisons should only be run on files 

137 if int(object_one.st_size or 0) != int(object_two.st_size or 0): 

138 return False 

139 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0): 

140 return False 

141 return True 

142 

143 

144class Operation(Enum): 

145 """The recognized sync operations 

146 

147 Notes 

148 ----- 

149 There's no `UPDATE` operation because so far this class isn't used by 

150 anything that _can_ perform a delta update on a file 

151 """ 

152 

153 CREATE = auto() 

154 REPLACE = auto() 

155 DELETE = auto() 

156 

157 

158PathInfo = TypeVar( 

159 "PathInfo", 

160 tuple[Path, Any], 

161 tuple[str, Any], 

162 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]] 

163 # but that's not supported until Python 3.11 

164) 

165 

166 

167def filter_contents( 

168 contents: Iterable[PathInfo], 

169 exclude: Collection[str], 

170 prefix: Path | str | None = None, 

171) -> Generator[PathInfo, None, None]: 

172 """Apply an exclusion filter to a list of files 

173 

174 Parameters 

175 ---------- 

176 contents : list of (Path, ...) tuples 

177 The contents to filter 

178 exclude : list of str 

179 The patterns to exclude 

180 prefix : Path, optional 

181 If the contents are iterating over a subdirectory, providing the directory 

182 as the `prefix` will allow filtering to be performed on the full path. 

183 

184 Yields 

185 ------ 

186 (Path, ...) tuples 

187 The elements of the provided list, omitting the ones 

188 to be excluded 

189 """ 

190 for path_info in contents: 

191 if not any( 

192 ( 

193 fnmatch.fnmatch( 

194 os.path.normpath(os.path.join(prefix or "", path_info[0])), 

195 os.path.join("*", pattern), 

196 ) 

197 for pattern in exclude 

198 ) 

199 ): 

200 yield path_info 

201 

202 

203def diff( 

204 source_files: Iterable[tuple[Path, _StatLike]], 

205 destination_files: Iterable[tuple[Path, _StatLike]], 

206) -> Generator[tuple[Path, _StatLike, Operation], None, None]: 

207 """Compute the "diff" between the source and destination, enumerating 

208 all the operations that should be performed so that the destination 

209 matches the source 

210 

211 Parameters 

212 ---------- 

213 source_files : list of (Path, stat_result) tuples 

214 The files and file attributes at the source 

215 destination_files : list of (Path, stat_result) tuples 

216 The files and file attributes at the destination 

217 

218 Returns 

219 ------- 

220 Generator of (Path, stat_result, Operation) tuples 

221 The files, their attributes and the operations that should be performed on each file 

222 

223 Notes 

224 ----- 

225 - The order of paths returned will match the order provided by the `source_files` 

226 except for the deletions, which will all come at the end and will be sorted 

227 from longest to shortest path (so that individual files are marked for deletion 

228 before their parent folders). 

229 - The attributes of each path will correspond to the *source* attributes for 

230 creations and replacements and to the *destination* attributes for the deletions 

231 """ 

232 destination_lookup: dict[Path, _StatLike] = dict(destination_files) 

233 for file, source_stat in source_files: 

234 if file not in destination_lookup: 

235 yield file, source_stat, Operation.CREATE 

236 else: 

237 destination_stat = destination_lookup.pop(file) 

238 if not is_identical(source_stat, destination_stat): 

239 yield file, source_stat, Operation.REPLACE 

240 # else: continue 

241 

242 for file, destination_stat in sorted( 

243 destination_lookup.items(), key=lambda x: -len(str(x[0])) 

244 ): 

245 yield file, destination_stat, Operation.DELETE 

246 

247 

248def generate_sync_report( 

249 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2 

250) -> None: 

251 """Compile a high-level summary of the outcome of the `diff` method 

252 and report it to the logging.INFO level 

253 

254 Parameters 

255 ---------- 

256 content_diff : list of (Path, Operation) tuples 

257 The files and the operations that are to be performed on each file, as 

258 generated by the `diff` method 

259 depth : int, optional 

260 How many directories to go down from the root to generate the summary. 

261 Default is 2 (just report on top-level files and folders within the 

262 source folder). 

263 

264 Returns 

265 ------- 

266 None 

267 """ 

268 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict( 

269 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0} 

270 ) 

271 

272 for full_path, path_stat, operation in content_diff: 

273 try: 

274 path_key = full_path.parents[-depth] 

275 except IndexError: # then this doesn't go in a subdirectory 

276 summary[full_path] = operation 

277 continue 

278 

279 entry = summary[path_key] 

280 if isinstance(entry, Operation): 

281 # then this is described by the top-level op 

282 continue 

283 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0): 

284 # don't count folder creations 

285 continue 

286 

287 entry[operation] += 1 

288 

289 for path_key, report in sorted(summary.items()): 

290 if isinstance(report, Operation): 

291 # nice that these verbs follow the same pattern 

292 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}") 

293 else: 

294 SYNC_LOGGER.info( 

295 f"Within {path_key}...\n%s", 

296 "\n".join( 

297 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}" 

298 for op, count in report.items() 

299 ), 

300 )