Coverage for enderchest/sync/file.py: 95%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-06 16:00 +0000

1"""shutil-based sync implementation""" 

2import fnmatch 

3import logging 

4import os 

5import shutil 

6import stat 

7from pathlib import Path 

8from typing import Callable, Collection 

9from urllib.parse import ParseResult 

10 

11from . import ( 

12 SYNC_LOGGER, 

13 Op, 

14 abspath_from_uri, 

15 diff, 

16 filter_contents, 

17 generate_sync_report, 

18 is_identical, 

19) 

20 

21 

22def get_contents(path: Path) -> list[tuple[Path, os.stat_result]]: 

23 """Recursively list the contents of a local directory 

24 

25 Parameters 

26 ---------- 

27 path : Path 

28 The path to scan 

29 

30 Returns 

31 ------- 

32 list of filenames and attributes 

33 The attributes of all files, folders and symlinks found under the 

34 specified path 

35 

36 Notes 

37 ----- 

38 - This list will be sorted from shortest path to longest (so that parent 

39 directories come before their children) 

40 - The paths returned are all relative to the provided path 

41 """ 

42 SYNC_LOGGER.debug(f"Getting contents of {path}") 

43 return sorted( 

44 ((p.relative_to(path), p.lstat()) for p in path.rglob("**/*")), 

45 key=lambda x: len(str(x[0])), 

46 ) 

47 

48 

49def copy( 

50 source_path: Path, 

51 destination_folder: Path, 

52 exclude: Collection[str], 

53 delete: bool, 

54 dry_run: bool, 

55) -> None: 

56 """Copy the specified source file or folder to the provided destination, 

57 overwriting any existing files and deleting any that weren't in the source 

58 

59 Parameters 

60 ---------- 

61 source_path : ParseResult 

62 The file or folder to copy 

63 destination_folder : Path 

64 The destination to put the source file(s) 

65 exclude : list of str 

66 Any patterns that should be excluded from the sync (and sync) 

67 delete : bool 

68 Whether part of the syncing should include deleting files at the destination 

69 that aren't at the source. 

70 dry_run : bool 

71 Whether to only simulate this sync (report the operations to be performed 

72 but not actually perform them) 

73 

74 Notes 

75 ----- 

76 If the source file does not exist, the destination file will simply be deleted 

77 (if it exists) 

78 """ 

79 

80 ignore = ignore_patterns(*exclude) 

81 SYNC_LOGGER.debug(f"Ignoring patterns: {exclude}") 

82 

83 destination_path = destination_folder / source_path.name 

84 if destination_path.is_symlink() and not destination_path.is_dir(): 

85 SYNC_LOGGER.warning("Removing symlink %s", destination_path) 

86 if not dry_run: 

87 destination_path.unlink() 

88 else: 

89 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path) 

90 return 

91 elif destination_path.exists() and not destination_path.is_dir(): 

92 SYNC_LOGGER.warning("Deleting file %s", destination_path) 

93 if not dry_run: 

94 destination_path.unlink() 

95 else: 

96 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path) 

97 return 

98 else: 

99 if not dry_run: 

100 destination_folder.mkdir(parents=True, exist_ok=True) 

101 

102 SYNC_LOGGER.debug(f"Copying {source_path} into {destination_folder}") 

103 

104 if source_path.exists() and not source_path.is_dir(): 

105 if destination_path.exists() and is_identical( 

106 source_path.stat(), destination_path.stat() 

107 ): 

108 SYNC_LOGGER.warning( 

109 "%s and %s are identical. No copy needed.", 

110 source_path, 

111 destination_path, 

112 ) 

113 return 

114 SYNC_LOGGER.debug( 

115 "Copying file %s to %s", 

116 source_path, 

117 destination_path, 

118 ) 

119 if not dry_run: 

120 shutil.copy2(source_path, destination_path, follow_symlinks=False) 

121 return 

122 

123 source_contents = filter_contents( 

124 get_contents(source_path), exclude, prefix=source_path 

125 ) 

126 destination_contents = filter_contents( 

127 get_contents(destination_path), exclude, prefix=destination_path 

128 ) 

129 

130 sync_diff = diff(source_contents, destination_contents) 

131 

132 if dry_run: 

133 generate_sync_report(sync_diff) 

134 return 

135 

136 for path, path_stat, operation in sync_diff: 

137 match (operation, stat.S_ISDIR(path_stat.st_mode or 0)): 

138 case (Op.CREATE, True): 

139 SYNC_LOGGER.debug("Creating directory %s", destination_path / path) 

140 (destination_path / path).mkdir(parents=True, exist_ok=True) 

141 case (Op.CREATE, False) | (Op.REPLACE, False): 

142 SYNC_LOGGER.debug( 

143 "Copying file %s to %s", 

144 source_path / path, 

145 destination_path / path, 

146 ) 

147 (destination_path / path).unlink(missing_ok=True) 

148 if (source_path / path).is_symlink(): 

149 (destination_path / path).symlink_to( 

150 (source_path / path).readlink() 

151 ) 

152 else: 

153 shutil.copy2( 

154 source_path / path, 

155 destination_path / path, 

156 follow_symlinks=False, 

157 ) 

158 case (Op.REPLACE, True): 

159 # this would be replacing a file with a directory 

160 SYNC_LOGGER.debug("Deleting file %s", destination_path / path) 

161 (destination_path / path).unlink() 

162 SYNC_LOGGER.debug( 

163 "Copying directory %s to %s", 

164 source_path / path, 

165 destination_path / path, 

166 ) 

167 shutil.copytree( 

168 source_path / path, 

169 destination_path / path, 

170 symlinks=True, 

171 ignore=ignore, 

172 dirs_exist_ok=True, 

173 ) 

174 case (Op.DELETE, True): 

175 # recall that for deletions, it's the *destination's* stats 

176 if delete: 

177 clean(destination_path / path, ignore, dry_run) 

178 case (Op.DELETE, False): 

179 if delete: 

180 SYNC_LOGGER.debug("Deleting file %s", destination_path / path) 

181 (destination_path / path).unlink() 

182 case op, is_dir: # pragma: no cover 

183 raise NotImplementedError( 

184 f"Don't know how to handle {op} of {'directory' if is_dir else 'file'}" 

185 ) 

186 

187 

188def clean( 

189 root: Path, 

190 ignore: Callable[[str, Collection[str]], set[str]], 

191 dry_run: bool, 

192) -> None: 

193 """Recursively remove all files and symlinks from the root path while 

194 respecting the provided ignore pattern 

195 

196 Parameters 

197 ---------- 

198 root : Path 

199 The root directory. And this should absolutely be a directory. 

200 ignore : Callable 

201 The ignore pattern created by `ignore_pattern` that specifies 

202 which files to ignore. 

203 dry_run : bool 

204 Whether to only simulate this sync (report the operations to be performed 

205 but not actually perform them) 

206 """ 

207 log_level = logging.INFO if dry_run else logging.DEBUG 

208 contents = list(root.iterdir()) 

209 ignore_me = ignore( 

210 os.fspath(root), 

211 [path.name for path in contents], 

212 ) 

213 

214 for path in contents: 

215 if path.name in ignore_me: 

216 SYNC_LOGGER.debug(f"Skipping {path}") 

217 continue 

218 if path.is_symlink(): 

219 SYNC_LOGGER.log(log_level, f"Removing symlink {path}") 

220 if not dry_run: 

221 path.unlink() 

222 elif path.is_dir(): 

223 clean(path, ignore, dry_run) 

224 else: 

225 SYNC_LOGGER.log(log_level, f"Deleting {path}") 

226 if not dry_run: 

227 path.unlink() 

228 

229 # check if folder is now empty 

230 if not list(root.iterdir()): 

231 SYNC_LOGGER.log(log_level, f"Removing empty {root}") 

232 if not dry_run: 

233 root.rmdir() 

234 

235 

236def ignore_patterns(*patterns: str) -> Callable[[str, Collection[str]], set[str]]: 

237 """shutil.ignore_patterns doesn't support checking absolute paths, 

238 so we gotta roll our own. 

239 

240 This implementation is adapted from 

241 https://github.com/python/cpython/blob/3.11/Lib/shutil.py#L440-L450 and 

242 https://stackoverflow.com/a/7842224 

243 

244 Parameters 

245 ---------- 

246 *patterns : str 

247 The patterns to match 

248 

249 Returns 

250 ------- 

251 Callable 

252 An "ignore" filter suitable for use in `shutil.copytree` 

253 """ 

254 

255 def _ignore_patterns(path: str, names: Collection[str]) -> set[str]: 

256 ignored_names: set[str] = set() 

257 for pattern in patterns: 

258 path_parts: list[str] = os.path.normpath(path).split(os.sep) 

259 pattern_depth = len(os.path.normpath(pattern).split(os.sep)) - 1 

260 if pattern_depth == 0: 

261 match_paths: Collection[str] = names 

262 else: 

263 match_paths = [ 

264 os.path.join(*path_parts[-pattern_depth:], name) for name in names 

265 ] 

266 ignored_names.update( 

267 os.path.split(match)[-1] 

268 for match in fnmatch.filter(match_paths, pattern) 

269 ) 

270 return ignored_names 

271 

272 return _ignore_patterns 

273 

274 

275def pull( 

276 remote_uri: ParseResult, 

277 local_path: Path, 

278 exclude: Collection[str], 

279 dry_run: bool, 

280 delete: bool = True, 

281 **unsupported_kwargs, 

282) -> None: 

283 """Copy an upstream file or folder into the specified location, where the remote 

284 is another folder on this machine. This will overwrite any files and folders 

285 already at the destination. 

286 

287 Parameters 

288 ---------- 

289 remote_uri : ParseResult 

290 The URI for the remote resource to copy from. See notes. 

291 local_path : Path 

292 The destination folder 

293 exclude : list of str 

294 Any patterns that should be excluded from the sync 

295 dry_run : bool 

296 Whether to only simulate this sync (report the operations to be performed 

297 but not actually perform them) 

298 delete : bool, optional 

299 Whether part of the syncing should include deleting files at the destination 

300 that aren't at the source. Default is True. 

301 **unsupported_kwargs 

302 Any other provided options will be ignored 

303 

304 Raises 

305 ------ 

306 FileNotFoundError 

307 If the destination folder does not exist 

308 

309 Notes 

310 ----- 

311 - This method is only meant to be used for local files specified using 

312 the file:// protocol, but it does not perform any validation on the URI to 

313 ensure that the schema is correct or that the hostname corresponds to this 

314 machine. This method does not support user authentication 

315 (running the copy as a different user). 

316 - If the destination folder does not already exist, this method will not 

317 create it or its parent directories. 

318 """ 

319 source_path = abspath_from_uri(remote_uri).expanduser() 

320 destination_folder = local_path 

321 

322 if not destination_folder.exists(): 

323 raise FileNotFoundError(f"{local_path} does not exist") 

324 if not source_path.exists(): 

325 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist") 

326 if unsupported_kwargs: 

327 SYNC_LOGGER.debug( 

328 "The following command-line options are ignored for this protocol:\n%s", 

329 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()), 

330 ) 

331 

332 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run) 

333 

334 

335def push( 

336 local_path: Path, 

337 remote_uri: ParseResult, 

338 exclude: Collection[str], 

339 dry_run: bool, 

340 delete: bool = True, 

341 **unsupported_kwargs, 

342) -> None: 

343 """Copy a local file or folder into the specified location, where the remote 

344 is another folder on this machine. This will overwrite any files and folders 

345 already at the destination. 

346 

347 Parameters 

348 ---------- 

349 local_path : Path 

350 The file or folder to copy 

351 remote_uri : ParseResult 

352 The URI for the remote location to copy into. See notes. 

353 exclude : list of str 

354 Any patterns that should be excluded from the sync 

355 dry_run : bool 

356 Whether to only simulate this sync (report the operations to be performed 

357 but not actually perform them) 

358 delete : bool, optional 

359 Whether part of the syncing should include deleting files at the destination 

360 that aren't at the source. Default is True. 

361 **unsupported_kwargs 

362 Any other provided options will be ignored 

363 

364 Raises 

365 ------ 

366 FileNotFoundError 

367 If the destination folder does not exist 

368 

369 Notes 

370 ----- 

371 - This method is only meant to be used for local files specified using 

372 the file:// protocol, but it does not perform any validation on the URI to 

373 ensure that the schema is correct or that the hostname corresponds to this 

374 machine. This method does not support user authentication 

375 (running the copy as a different user). 

376 - If the destination folder does not already exist, this method will not 

377 create it or its parent directories. 

378 """ 

379 source_path = local_path 

380 destination_folder = abspath_from_uri(remote_uri).expanduser() 

381 

382 if not destination_folder.exists(): 

383 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist") 

384 if not source_path.exists(): 

385 raise FileNotFoundError(f"{source_path} does not exist") 

386 if unsupported_kwargs: 

387 SYNC_LOGGER.debug( 

388 "The following command-line options are ignored for this protocol:\n%s", 

389 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()), 

390 ) 

391 

392 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)