Coverage for enderchest/sync/utils.py: 100%

74 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-05-04 01:41 +0000

1"""Non-implementation-specific syncing utilities""" 

2 

3import fnmatch 

4import getpass 

5import os 

6import socket 

7import stat 

8from collections import defaultdict 

9from enum import Enum, auto 

10from pathlib import Path 

11from typing import Any, Collection, Generator, Iterable, Protocol, TypeVar 

12from urllib.parse import ParseResult, unquote 

13from urllib.request import url2pathname 

14 

15from ..loggers import SYNC_LOGGER 

16 

17 

18def get_default_netloc() -> str: 

19 """Compile a netloc from environment variables, etc. 

20 

21 Returns 

22 ------- 

23 str 

24 The default netloc, which is {user}@{hostname} 

25 """ 

26 return f"{getpass.getuser()}@{socket.gethostname()}".lower() 

27 

28 

29def abspath_from_uri(uri: ParseResult) -> Path: 

30 """Extract and unquote the path component of a URI to turn it into an 

31 unambiguous absolute `pathlib.Path` 

32 

33 h/t https://stackoverflow.com/a/61922504 

34 

35 Parameters 

36 ---------- 

37 uri : ParseResult 

38 The parsed URI to extract the path from 

39 

40 Returns 

41 ------- 

42 Path 

43 The path part of the URI as a Path 

44 """ 

45 host = "{0}{0}{mnt}{0}".format(os.path.sep, mnt=uri.netloc) 

46 return Path(os.path.abspath(os.path.join(host, url2pathname(unquote(uri.path))))) 

47 

48 

49def uri_to_ssh(uri: ParseResult) -> str: 

50 """Convert a URI to an SSH address 

51 

52 Parameters 

53 ---------- 

54 uri: ParseResult 

55 The URI to convert 

56 

57 Returns 

58 ------- 

59 str 

60 The SSH-format address 

61 """ 

62 return "{user}{host}:{path}".format( 

63 user=f"{uri.username}@" if uri.username else "", 

64 host=(uri.hostname or "localhost") + (f":{uri.port}" if uri.port else ""), 

65 path=uri.path, 

66 ) 

67 

68 

69def render_remote(alias: str, uri: ParseResult) -> str: 

70 """Render a remote to a descriptive string 

71 

72 Parameters 

73 ---------- 

74 alias : str 

75 The name of the remote 

76 uri : ParseResult 

77 The parsed URI for the remote 

78 

79 Returns 

80 ------- 

81 str 

82 `{uri_string} [({alias})]}` 

83 (alias is omitted if it's the same as the URI's hostname) 

84 """ 

85 uri_string = uri.geturl() 

86 

87 if uri.hostname != alias: 

88 uri_string += f" ({alias})" 

89 return uri_string 

90 

91 

92class _StatLike(Protocol): # pragma: no cover 

93 @property 

94 def st_mode(self) -> int | None: ... 

95 

96 @property 

97 def st_size(self) -> float | None: ... 

98 

99 @property 

100 def st_mtime(self) -> float | None: ... 

101 

102 

103def is_identical(object_one: _StatLike, object_two: _StatLike) -> bool: 

104 """Determine if two objects are identical (meaning: skip when syncing) 

105 

106 Parameters 

107 ---------- 

108 object_one : os.stat_result or similar 

109 The first object to compare 

110 object_two : os.stat_result or similar 

111 The second object to compare 

112 

113 Returns 

114 ------- 

115 bool 

116 False if the objects are conclusively different, True otherwise. 

117 

118 Notes 

119 ----- 

120 As most implementations of the SFTP protocol do not include the check-file 

121 extension, this method is limited in what it can compare. Use with caution. 

122 """ 

123 if stat.S_ISDIR(object_one.st_mode or 0) != stat.S_ISDIR(object_two.st_mode or 0): 

124 return False 

125 if stat.S_ISLNK(object_one.st_mode or 0) != stat.S_ISLNK(object_two.st_mode or 0): 

126 return False 

127 

128 if stat.S_ISLNK(object_one.st_mode or 0): 

129 # there's no way from the stat to tell if two links have the same target 

130 # so err on the side of "nope" 

131 return False 

132 

133 if stat.S_ISREG(object_one.st_mode or 0): 

134 # these comparisons should only be run on files 

135 if int(object_one.st_size or 0) != int(object_two.st_size or 0): 

136 return False 

137 if int(object_one.st_mtime or 0) != int(object_two.st_mtime or 0): 

138 return False 

139 return True 

140 

141 

142class Operation(Enum): 

143 """The recognized sync operations 

144 

145 Notes 

146 ----- 

147 There's no `UPDATE` operation because so far this class isn't used by 

148 anything that _can_ perform a delta update on a file 

149 """ 

150 

151 CREATE = auto() 

152 REPLACE = auto() 

153 DELETE = auto() 

154 

155 

156PathInfo = TypeVar( 

157 "PathInfo", 

158 tuple[Path, Any], 

159 tuple[str, Any], 

160 # TODO: the proper type hint is tuple[Path, *tuple[Any, ...]] 

161 # but that's not supported until Python 3.11 

162) 

163 

164 

165def filter_contents( 

166 contents: Iterable[PathInfo], 

167 exclude: Collection[str], 

168 prefix: Path | str | None = None, 

169) -> Generator[PathInfo, None, None]: 

170 """Apply an exclusion filter to a list of files 

171 

172 Parameters 

173 ---------- 

174 contents : list of (Path, ...) tuples 

175 The contents to filter 

176 exclude : list of str 

177 The patterns to exclude 

178 prefix : Path, optional 

179 If the contents are iterating over a subdirectory, providing the directory 

180 as the `prefix` will allow filtering to be performed on the full path. 

181 

182 Yields 

183 ------ 

184 (Path, ...) tuples 

185 The elements of the provided list, omitting the ones 

186 to be excluded 

187 """ 

188 for path_info in contents: 

189 if not any( 

190 ( 

191 fnmatch.fnmatch( 

192 os.path.normpath( 

193 os.path.join(prefix or "", path_info[0], "placeholder") 

194 ), 

195 os.path.join("*", pattern, "*"), 

196 ) 

197 for pattern in exclude 

198 ) 

199 ): 

200 yield path_info 

201 

202 

203def diff( 

204 source_files: Iterable[tuple[Path, _StatLike]], 

205 destination_files: Iterable[tuple[Path, _StatLike]], 

206) -> Generator[tuple[Path, _StatLike, Operation], None, None]: 

207 """Compute the "diff" between the source and destination, enumerating 

208 all the operations that should be performed so that the destination 

209 matches the source 

210 

211 Parameters 

212 ---------- 

213 source_files : list of (Path, stat_result) tuples 

214 The files and file attributes at the source 

215 destination_files : list of (Path, stat_result) tuples 

216 The files and file attributes at the destination 

217 

218 Returns 

219 ------- 

220 Generator of (Path, stat_result, Operation) tuples 

221 The files, their attributes and the operations that should be performed on each file 

222 

223 Notes 

224 ----- 

225 - The order of paths returned will match the order provided by the `source_files` 

226 except for the deletions, which will all come at the end and will be sorted 

227 from longest to shortest path (so that individual files are marked for deletion 

228 before their parent folders). 

229 - The attributes of each path will correspond to the *source* attributes for 

230 creations and replacements and to the *destination* attributes for the deletions 

231 """ 

232 destination_lookup: dict[Path, _StatLike] = dict(destination_files) 

233 for file, source_stat in source_files: 

234 if file not in destination_lookup: 

235 yield file, source_stat, Operation.CREATE 

236 else: 

237 destination_stat = destination_lookup.pop(file) 

238 if not is_identical(source_stat, destination_stat): 

239 yield file, source_stat, Operation.REPLACE 

240 # else: continue 

241 

242 for file, destination_stat in sorted( 

243 destination_lookup.items(), key=lambda x: -len(str(x[0])) 

244 ): 

245 yield file, destination_stat, Operation.DELETE 

246 

247 

248def generate_sync_report( 

249 content_diff: Iterable[tuple[Path, _StatLike, Operation]], depth: int = 2 

250) -> None: 

251 """Compile a high-level summary of the outcome of the `diff` method 

252 and report it to the logging.INFO level 

253 

254 Parameters 

255 ---------- 

256 content_diff : list of (Path, Operation) tuples 

257 The files and the operations that are to be performed on each file, as 

258 generated by the `diff` method 

259 depth : int, optional 

260 How many directories to go down from the root to generate the summary. 

261 Default is 2 (just report on top-level files and folders within the 

262 source folder). 

263 

264 Returns 

265 ------- 

266 None 

267 """ 

268 summary: dict[Path, dict[Operation, int] | Operation] = defaultdict( 

269 lambda: {Operation.CREATE: 0, Operation.REPLACE: 0, Operation.DELETE: 0} 

270 ) 

271 

272 for full_path, path_stat, operation in content_diff: 

273 try: 

274 path_key = full_path.parents[-depth] 

275 except IndexError: # then this doesn't go in a subdirectory 

276 summary[full_path] = operation 

277 continue 

278 

279 entry = summary[path_key] 

280 if isinstance(entry, Operation): 

281 # then this is described by the top-level op 

282 continue 

283 if operation == Operation.CREATE and stat.S_ISDIR(path_stat.st_mode or 0): 

284 # don't count folder creations 

285 continue 

286 

287 entry[operation] += 1 

288 

289 for path_key, report in sorted(summary.items()): 

290 if isinstance(report, Operation): 

291 # nice that these verbs follow the same pattern 

292 SYNC_LOGGER.info(f"{report.name[:-1].title()}ing {path_key}") 

293 else: 

294 SYNC_LOGGER.info( 

295 f"Within {path_key}...\n%s", 

296 "\n".join( 

297 f" - {op.name[:-1].title()}ing {count} file{'' if count == 1 else 's'}" 

298 for op, count in report.items() 

299 ), 

300 )