Coverage for enderchest/sync/file.py: 95%

121 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-05-04 01:41 +0000

1"""shutil-based sync implementation""" 

2 

3import fnmatch 

4import logging 

5import os 

6import shutil 

7import stat 

8from pathlib import Path 

9from typing import Callable, Collection 

10from urllib.parse import ParseResult 

11 

12from . import ( 

13 SYNC_LOGGER, 

14 Op, 

15 abspath_from_uri, 

16 diff, 

17 filter_contents, 

18 generate_sync_report, 

19 is_identical, 

20) 

21 

22 

23def get_contents(path: Path) -> list[tuple[Path, os.stat_result]]: 

24 """Recursively list the contents of a local directory 

25 

26 Parameters 

27 ---------- 

28 path : Path 

29 The path to scan 

30 

31 Returns 

32 ------- 

33 list of filenames and attributes 

34 The attributes of all files, folders and symlinks found under the 

35 specified path 

36 

37 Notes 

38 ----- 

39 - This list will be sorted from shortest path to longest (so that parent 

40 directories come before their children) 

41 - The paths returned are all relative to the provided path 

42 """ 

43 SYNC_LOGGER.debug(f"Getting contents of {path}") 

44 return sorted( 

45 ((p.relative_to(path), p.lstat()) for p in path.rglob("**/*")), 

46 key=lambda x: len(str(x[0])), 

47 ) 

48 

49 

50def copy( 

51 source_path: Path, 

52 destination_folder: Path, 

53 exclude: Collection[str], 

54 delete: bool, 

55 dry_run: bool, 

56) -> None: 

57 """Copy the specified source file or folder to the provided destination, 

58 overwriting any existing files and deleting any that weren't in the source 

59 

60 Parameters 

61 ---------- 

62 source_path : ParseResult 

63 The file or folder to copy 

64 destination_folder : Path 

65 The destination to put the source file(s) 

66 exclude : list of str 

67 Any patterns that should be excluded from the sync (and sync) 

68 delete : bool 

69 Whether part of the syncing should include deleting files at the destination 

70 that aren't at the source. 

71 dry_run : bool 

72 Whether to only simulate this sync (report the operations to be performed 

73 but not actually perform them) 

74 

75 Notes 

76 ----- 

77 If the source file does not exist, the destination file will simply be deleted 

78 (if it exists) 

79 """ 

80 

81 ignore = ignore_patterns(*exclude) 

82 SYNC_LOGGER.debug(f"Ignoring patterns: {exclude}") 

83 

84 destination_path = destination_folder / source_path.name 

85 if destination_path.is_symlink() and not destination_path.is_dir(): 

86 SYNC_LOGGER.warning("Removing symlink %s", destination_path) 

87 if not dry_run: 

88 destination_path.unlink() 

89 else: 

90 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path) 

91 return 

92 elif destination_path.exists() and not destination_path.is_dir(): 

93 SYNC_LOGGER.warning("Deleting file %s", destination_path) 

94 if not dry_run: 

95 destination_path.unlink() 

96 else: 

97 SYNC_LOGGER.debug("And replacing it entirely with %s", source_path) 

98 return 

99 else: 

100 if not dry_run: 

101 destination_folder.mkdir(parents=True, exist_ok=True) 

102 

103 SYNC_LOGGER.debug(f"Copying {source_path} into {destination_folder}") 

104 

105 if source_path.exists() and not source_path.is_dir(): 

106 if destination_path.exists() and is_identical( 

107 source_path.stat(), destination_path.stat() 

108 ): 

109 SYNC_LOGGER.warning( 

110 "%s and %s are identical. No copy needed.", 

111 source_path, 

112 destination_path, 

113 ) 

114 return 

115 SYNC_LOGGER.debug( 

116 "Copying file %s to %s", 

117 source_path, 

118 destination_path, 

119 ) 

120 if not dry_run: 

121 shutil.copy2(source_path, destination_path, follow_symlinks=False) 

122 return 

123 

124 source_contents = filter_contents( 

125 get_contents(source_path), exclude, prefix=source_path 

126 ) 

127 destination_contents = filter_contents( 

128 get_contents(destination_path), exclude, prefix=destination_path 

129 ) 

130 

131 sync_diff = diff(source_contents, destination_contents) 

132 

133 if dry_run: 

134 generate_sync_report(sync_diff) 

135 return 

136 

137 for path, path_stat, operation in sync_diff: 

138 match (operation, stat.S_ISDIR(path_stat.st_mode or 0)): 

139 case (Op.CREATE, True): 

140 SYNC_LOGGER.debug("Creating directory %s", destination_path / path) 

141 (destination_path / path).mkdir(parents=True, exist_ok=True) 

142 case (Op.CREATE, False) | (Op.REPLACE, False): 

143 SYNC_LOGGER.debug( 

144 "Copying file %s to %s", 

145 source_path / path, 

146 destination_path / path, 

147 ) 

148 (destination_path / path).unlink(missing_ok=True) 

149 if (source_path / path).is_symlink(): 

150 (destination_path / path).symlink_to( 

151 (source_path / path).readlink() 

152 ) 

153 else: 

154 shutil.copy2( 

155 source_path / path, 

156 destination_path / path, 

157 follow_symlinks=False, 

158 ) 

159 case (Op.REPLACE, True): 

160 # this would be replacing a file with a directory 

161 SYNC_LOGGER.debug("Deleting file %s", destination_path / path) 

162 (destination_path / path).unlink() 

163 SYNC_LOGGER.debug( 

164 "Copying directory %s to %s", 

165 source_path / path, 

166 destination_path / path, 

167 ) 

168 shutil.copytree( 

169 source_path / path, 

170 destination_path / path, 

171 symlinks=True, 

172 ignore=ignore, 

173 dirs_exist_ok=True, 

174 ) 

175 case (Op.DELETE, True): 

176 # recall that for deletions, it's the *destination's* stats 

177 if delete: 

178 clean(destination_path / path, ignore, dry_run) 

179 case (Op.DELETE, False): 

180 if delete: 

181 SYNC_LOGGER.debug("Deleting file %s", destination_path / path) 

182 (destination_path / path).unlink() 

183 case op, is_dir: # pragma: no cover 

184 raise NotImplementedError( 

185 f"Don't know how to handle {op} of {'directory' if is_dir else 'file'}" 

186 ) 

187 

188 

189def clean( 

190 root: Path, 

191 ignore: Callable[[str, Collection[str]], set[str]], 

192 dry_run: bool, 

193) -> None: 

194 """Recursively remove all files and symlinks from the root path while 

195 respecting the provided ignore pattern 

196 

197 Parameters 

198 ---------- 

199 root : Path 

200 The root directory. And this should absolutely be a directory. 

201 ignore : Callable 

202 The ignore pattern created by `ignore_pattern` that specifies 

203 which files to ignore. 

204 dry_run : bool 

205 Whether to only simulate this sync (report the operations to be performed 

206 but not actually perform them) 

207 """ 

208 log_level = logging.INFO if dry_run else logging.DEBUG 

209 contents = list(root.iterdir()) 

210 ignore_me = ignore( 

211 os.fspath(root), 

212 [path.name for path in contents], 

213 ) 

214 

215 for path in contents: 

216 if path.name in ignore_me: 

217 SYNC_LOGGER.debug(f"Skipping {path}") 

218 continue 

219 if path.is_symlink(): 

220 SYNC_LOGGER.log(log_level, f"Removing symlink {path}") 

221 if not dry_run: 

222 path.unlink() 

223 elif path.is_dir(): 

224 clean(path, ignore, dry_run) 

225 else: 

226 SYNC_LOGGER.log(log_level, f"Deleting {path}") 

227 if not dry_run: 

228 path.unlink() 

229 

230 # check if folder is now empty 

231 if not list(root.iterdir()): 

232 SYNC_LOGGER.log(log_level, f"Removing empty {root}") 

233 if not dry_run: 

234 root.rmdir() 

235 

236 

237def ignore_patterns(*patterns: str) -> Callable[[str, Collection[str]], set[str]]: 

238 """shutil.ignore_patterns doesn't support checking absolute paths, 

239 so we gotta roll our own. 

240 

241 This implementation is adapted from 

242 https://github.com/python/cpython/blob/3.11/Lib/shutil.py#L440-L450 and 

243 https://stackoverflow.com/a/7842224 

244 

245 Parameters 

246 ---------- 

247 *patterns : str 

248 The patterns to match 

249 

250 Returns 

251 ------- 

252 Callable 

253 An "ignore" filter suitable for use in `shutil.copytree` 

254 """ 

255 

256 def _ignore_patterns(path: str, names: Collection[str]) -> set[str]: 

257 ignored_names: set[str] = set() 

258 for pattern in patterns: 

259 path_parts: list[str] = os.path.normpath(path).split(os.sep) 

260 pattern_depth = len(os.path.normpath(pattern).split(os.sep)) - 1 

261 if pattern_depth == 0: 

262 match_paths: Collection[str] = names 

263 else: 

264 match_paths = [ 

265 os.path.join(*path_parts[-pattern_depth:], name) for name in names 

266 ] 

267 ignored_names.update( 

268 os.path.split(match)[-1] 

269 for match in fnmatch.filter(match_paths, pattern) 

270 ) 

271 return ignored_names 

272 

273 return _ignore_patterns 

274 

275 

276def pull( 

277 remote_uri: ParseResult, 

278 local_path: Path, 

279 exclude: Collection[str], 

280 dry_run: bool, 

281 delete: bool = True, 

282 **unsupported_kwargs, 

283) -> None: 

284 """Copy an upstream file or folder into the specified location, where the remote 

285 is another folder on this machine. This will overwrite any files and folders 

286 already at the destination. 

287 

288 Parameters 

289 ---------- 

290 remote_uri : ParseResult 

291 The URI for the remote resource to copy from. See notes. 

292 local_path : Path 

293 The destination folder 

294 exclude : list of str 

295 Any patterns that should be excluded from the sync 

296 dry_run : bool 

297 Whether to only simulate this sync (report the operations to be performed 

298 but not actually perform them) 

299 delete : bool, optional 

300 Whether part of the syncing should include deleting files at the destination 

301 that aren't at the source. Default is True. 

302 **unsupported_kwargs 

303 Any other provided options will be ignored 

304 

305 Raises 

306 ------ 

307 FileNotFoundError 

308 If the destination folder does not exist 

309 

310 Notes 

311 ----- 

312 - This method is only meant to be used for local files specified using 

313 the file:// protocol, but it does not perform any validation on the URI to 

314 ensure that the schema is correct or that the hostname corresponds to this 

315 machine. This method does not support user authentication 

316 (running the copy as a different user). 

317 - If the destination folder does not already exist, this method will not 

318 create it or its parent directories. 

319 """ 

320 source_path = abspath_from_uri(remote_uri).expanduser() 

321 destination_folder = local_path 

322 

323 if not destination_folder.exists(): 

324 raise FileNotFoundError(f"{local_path} does not exist") 

325 if not source_path.exists(): 

326 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist") 

327 if unsupported_kwargs: 

328 SYNC_LOGGER.debug( 

329 "The following command-line options are ignored for this protocol:\n%s", 

330 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()), 

331 ) 

332 

333 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run) 

334 

335 

336def push( 

337 local_path: Path, 

338 remote_uri: ParseResult, 

339 exclude: Collection[str], 

340 dry_run: bool, 

341 delete: bool = True, 

342 **unsupported_kwargs, 

343) -> None: 

344 """Copy a local file or folder into the specified location, where the remote 

345 is another folder on this machine. This will overwrite any files and folders 

346 already at the destination. 

347 

348 Parameters 

349 ---------- 

350 local_path : Path 

351 The file or folder to copy 

352 remote_uri : ParseResult 

353 The URI for the remote location to copy into. See notes. 

354 exclude : list of str 

355 Any patterns that should be excluded from the sync 

356 dry_run : bool 

357 Whether to only simulate this sync (report the operations to be performed 

358 but not actually perform them) 

359 delete : bool, optional 

360 Whether part of the syncing should include deleting files at the destination 

361 that aren't at the source. Default is True. 

362 **unsupported_kwargs 

363 Any other provided options will be ignored 

364 

365 Raises 

366 ------ 

367 FileNotFoundError 

368 If the destination folder does not exist 

369 

370 Notes 

371 ----- 

372 - This method is only meant to be used for local files specified using 

373 the file:// protocol, but it does not perform any validation on the URI to 

374 ensure that the schema is correct or that the hostname corresponds to this 

375 machine. This method does not support user authentication 

376 (running the copy as a different user). 

377 - If the destination folder does not already exist, this method will not 

378 create it or its parent directories. 

379 """ 

380 source_path = local_path 

381 destination_folder = abspath_from_uri(remote_uri).expanduser() 

382 

383 if not destination_folder.exists(): 

384 raise FileNotFoundError(f"{remote_uri.geturl()} does not exist") 

385 if not source_path.exists(): 

386 raise FileNotFoundError(f"{source_path} does not exist") 

387 if unsupported_kwargs: 

388 SYNC_LOGGER.debug( 

389 "The following command-line options are ignored for this protocol:\n%s", 

390 "\n".join(" {}: {}".format(*item) for item in unsupported_kwargs.items()), 

391 ) 

392 

393 copy(source_path, destination_folder, exclude, delete=delete, dry_run=dry_run)