Coverage for gsb/cli.py: 98%

188 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-08 20:16 +0000

1"""Command-line interface""" 

2 

3import datetime as dt 

4import functools 

5import logging 

6import sys 

7from pathlib import Path 

8from typing import Any, Callable 

9 

10import click 

11 

12from . import _git, _version 

13from . import backup as backup_ 

14from . import export as export_ 

15from . import fastforward 

16from . import history as history_ 

17from . import onboard 

18from . import rewind as rewind_ 

19from .logging import IMPORTANT, CLIFormatter, verbosity_to_log_level 

20from .manifest import Manifest 

21 

22LOGGER = logging.getLogger(__package__) 

23 

24 

25@click.group() 

26@click.help_option("--help", "-h") 

27@click.version_option(_version.get_versions()["version"], "--version", "-v", "-V") 

28def gsb(): 

29 """CLI for managing incremental backups of your save states using Git!""" 

30 

31 

32def _subcommand_init(command: Callable) -> Callable: 

33 """Register a subcommand and add some standard CLI handling""" 

34 

35 @functools.wraps(command) 

36 def wrapped(path: Path | None, verbose: int, quiet: int, *args, **kwargs) -> None: 

37 cli_handler = logging.StreamHandler() 

38 cli_handler.setFormatter(CLIFormatter()) 

39 LOGGER.addHandler(cli_handler) 

40 

41 log_level = verbosity_to_log_level(verbose - quiet) 

42 

43 cli_handler.setLevel(log_level) 

44 

45 # TODO: when we add log files, set this to minimum log level across all handlers 

46 LOGGER.setLevel(log_level) 

47 try: 

48 command((path or Path()).absolute(), *args, **kwargs) 

49 except (OSError, ValueError) as oh_no: 

50 LOGGER.error(oh_no) 

51 sys.exit(1) 

52 

53 wrapped = click.option( 

54 "--path", 

55 type=Path, 

56 metavar="SAVE_PATH", 

57 help=( 

58 "Optionally specify the root directory containing your save data." 

59 " If no path is given, the current working directory will be used." 

60 ), 

61 )(wrapped) 

62 

63 wrapped = click.option( 

64 "--verbose", 

65 "-v", 

66 count=True, 

67 help="Increase the amount of information that's printed.", 

68 )(wrapped) 

69 

70 wrapped = click.option( 

71 "--quiet", 

72 "-q", 

73 count=True, 

74 help="Decrease the amount of information that's printed.", 

75 )(wrapped) 

76 return gsb.command()(wrapped) 

77 

78 

79@click.option( 

80 "--tag", 

81 type=str, 

82 help='Specify a description for this backup and "tag" it for future reference.', 

83 metavar='"MESSAGE"', 

84) 

85@click.option( 

86 "--combine", 

87 "-c", 

88 count=True, 

89 help=( 

90 "Combine this backup and the last backup," 

91 " or use -cc to combine ALL backups since the last tagged backup." 

92 ), 

93) 

94@click.argument( 

95 "path_as_arg", 

96 type=Path, 

97 required=False, 

98 metavar="[SAVE_PATH]", 

99) 

100@_subcommand_init 

101def backup(repo_root: Path, path_as_arg: Path | None, tag: str | None, combine: int): 

102 """Create a new backup.""" 

103 parent_hash = None 

104 if combine == 1: 

105 try: 

106 combine_me, parent = history_.get_history( 

107 repo_root, tagged_only=False, include_non_gsb=True, limit=2 

108 ) 

109 except ValueError as probably_not_enough_values: 

110 if "not enough values to unpack" in str(probably_not_enough_values): 

111 LOGGER.error("Cannot combine with the very first backup.") 

112 sys.exit(1) 

113 raise probably_not_enough_values # pragma: no-cover 

114 

115 LOGGER.log(IMPORTANT, "Combining with %s", combine_me["identifier"]) 

116 if combine_me["tagged"]: 

117 LOGGER.warning("Are you sure you want to overwrite a tagged backup?") 

118 history_.log_revision(combine_me, None) 

119 confirmed: bool = click.confirm( 

120 "", 

121 default=False, 

122 show_default=True, 

123 ) 

124 if not confirmed: 

125 LOGGER.error("Aborting.") 

126 sys.exit(1) 

127 _git.delete_tag(repo_root, combine_me["identifier"]) 

128 parent_hash = parent["identifier"] 

129 if combine > 1: 

130 try: 

131 last_tag = history_.get_history(repo_root, tagged_only=True, limit=1)[0] 

132 except IndexError: 

133 LOGGER.error("There are no previous tagged backups.") 

134 sys.exit(1) 

135 LOGGER.log(IMPORTANT, "Combining with the following backups:") 

136 combining = history_.show_history( 

137 repo_root, 

138 tagged_only=False, 

139 include_non_gsb=True, 

140 since_last_tagged_backup=True, 

141 ) 

142 if not combining: 

143 LOGGER.log(IMPORTANT, "(no backups to combine)") 

144 parent_hash = last_tag["identifier"] 

145 

146 backup_.create_backup(path_as_arg or repo_root, tag, parent=parent_hash) 

147 

148 

149@click.option( 

150 "--ignore", 

151 type=str, 

152 required=False, 

153 multiple=True, 

154 help=( 

155 "Provide a glob pattern to ignore. Each ignore pattern" 

156 ' must be prefaced with the "--ignore" flag.' 

157 ), 

158) 

159@click.option( 

160 "--track", 

161 type=str, 

162 required=False, 

163 multiple=True, 

164 help=( 

165 "Provide a glob pattern to track (note: arguments without any flag will" 

166 " also be treated as track patterns)." 

167 ), 

168) 

169@click.argument( 

170 "track_args", type=str, required=False, nargs=-1, metavar="[TRACK_PATTERN]..." 

171) 

172@_subcommand_init 

173def init( 

174 repo_root: Path, 

175 track_args: tuple[str, ...], 

176 track: tuple[str, ...], 

177 ignore: tuple[str, ...], 

178): 

179 """Start tracking a save.""" 

180 onboard.create_repo(repo_root, *track_args, *track, ignore=ignore) 

181 

182 

183@click.option( 

184 "--include_non_gsb", 

185 "-g", 

186 is_flag=True, 

187 help="Include backups created directly with Git / outside of gsb.", 

188) 

189@click.option("--all", "-a", "all_", is_flag=True, help="Include non-tagged backups.") 

190@click.option( 

191 "--since", 

192 type=click.DateTime(), 

193 required=False, 

194 help="Only show backups created after the specified date.", 

195) 

196@click.option( 

197 "--limit", 

198 "-n", 

199 type=int, 

200 required=False, 

201 help="The maximum number of backups to return.", 

202) 

203@click.argument( 

204 "path_as_arg", 

205 type=Path, 

206 required=False, 

207 metavar="[SAVE_PATH]", 

208) 

209@_subcommand_init 

210def history( 

211 repo_root: Path, 

212 path_as_arg: Path | None, 

213 limit: int | None, 

214 since: dt.datetime | None, 

215 all_: bool, 

216 include_non_gsb: bool, 

217): 

218 """List the available backups, starting with the most recent.""" 

219 

220 kwargs: dict[str, Any] = { 

221 "tagged_only": not all_, 

222 "include_non_gsb": include_non_gsb, 

223 } 

224 if limit is not None: 

225 if limit <= 0: 

226 LOGGER.error("Limit must be a positive integer") 

227 sys.exit(1) 

228 kwargs["limit"] = limit 

229 if since is not None: 

230 kwargs["since"] = since 

231 

232 history_.show_history( 

233 path_as_arg or repo_root, **kwargs, always_include_latest=True 

234 ) 

235 

236 

237@click.option( 

238 "--include_gsb_settings", 

239 is_flag=True, 

240 help="Also revert the GSB configuration files (including .gitignore)", 

241) 

242@click.argument( 

243 "revision", 

244 type=str, 

245 required=False, 

246) 

247@_subcommand_init 

248def rewind(repo_root: Path, revision: str | None, include_gsb_settings: bool): 

249 """Restore a backup to the specified REVISION.""" 

250 if revision is None: 

251 revision = _prompt_for_a_recent_revision(repo_root) 

252 try: 

253 rewind_.restore_backup(repo_root, revision, not include_gsb_settings) 

254 except ValueError as whats_that: 

255 LOGGER.error(whats_that) 

256 sys.exit(1) 

257 

258 

259def _prompt_for_a_recent_revision(repo_root) -> str: 

260 """Select a recent revision from a prompt""" 

261 LOGGER.log(IMPORTANT, "Here is a list of recent backups:") 

262 revisions = history_.show_history(repo_root, limit=10) 

263 if len(revisions) == 0: 

264 LOGGER.info("No tagged revisions found. Trying untagged.") 

265 revisions = history_.show_history( 

266 repo_root, 

267 limit=10, 

268 tagged_only=False, 

269 ) 

270 if len(revisions) == 0: 

271 LOGGER.warning("No GSB revisions found. Trying Git.") 

272 revisions = history_.show_history( 

273 repo_root, 

274 limit=10, 

275 tagged_only=False, 

276 include_non_gsb=True, 

277 ) 

278 if len(revisions) == 0: 

279 LOGGER.error("No revisions found!") 

280 sys.exit(1) 

281 LOGGER.log(IMPORTANT, "\nMost recent backup:") 

282 most_recent_backup = history_.show_history( 

283 repo_root, limit=1, always_include_latest=True, numbering=0 

284 )[0] 

285 

286 LOGGER.log( 

287 IMPORTANT, 

288 "\nSelect one by number or identifier (or [q]uit and" 

289 " call gsb history yourself to get more revisions).", 

290 ) 

291 

292 choice: str = click.prompt( 

293 "Select a revision", default="q", show_default=True, type=str 

294 ).lower() 

295 

296 if choice.lower().strip() == "q": 

297 LOGGER.error("Aborting.") 

298 sys.exit(1) 

299 if choice.strip() == "0": 

300 return most_recent_backup["identifier"] 

301 if choice.strip() in [str(i + 1) for i in range(len(revisions))]: 

302 return revisions[int(choice.strip()) - 1]["identifier"] 

303 return choice 

304 

305 

306@click.argument( 

307 "revisions", type=str, required=False, nargs=-1, metavar="[REVISION]..." 

308) 

309@_subcommand_init 

310def delete(repo_root: Path, revisions: tuple[str, ...]): 

311 """Delete one or more backups by their specified REVISION.""" 

312 if not revisions: 

313 revisions = _prompt_for_revisions_to_delete(repo_root) 

314 try: 

315 fastforward.delete_backups(repo_root, *revisions) 

316 LOGGER.log( 

317 IMPORTANT, 

318 'Deleted backups are now marked as "loose."' 

319 " To delete them immediately, run the command:" 

320 "\n git gc --aggressive --prune=now", 

321 ) 

322 except ValueError as whats_that: 

323 LOGGER.error(whats_that) 

324 sys.exit(1) 

325 

326 

327def _prompt_for_revisions_to_delete(repo_root: Path) -> tuple[str, ...]: 

328 """Offer a history of revisions to delete. Unlike similar prompts, this 

329 is a multiselect and requires the user to type in each entry (in order to guard 

330 against accidental deletions).""" 

331 LOGGER.log(IMPORTANT, "Here is a list of recent GSB-created backups:") 

332 revisions = history_.show_history( 

333 repo_root, tagged_only=False, since_last_tagged_backup=True, numbering=None 

334 ) 

335 revisions.extend(history_.show_history(repo_root, limit=3, numbering=None)) 

336 if len(revisions) == 0: 

337 LOGGER.warning("No GSB revisions found. Trying Git.") 

338 revisions = history_.show_history( 

339 repo_root, limit=10, tagged_only=False, include_non_gsb=True, numbering=None 

340 ) 

341 LOGGER.log( 

342 IMPORTANT, 

343 "\nSelect a backup to delete by identifier, or multiple separated by commas." 

344 "\nAlternatively, [q]uit and call gsb history yourself to get more revisions.", 

345 ) 

346 if len(revisions) == 0: 

347 LOGGER.error("No revisions found!") 

348 sys.exit(1) 

349 

350 choices: str = click.prompt( 

351 "Select a revision or revisions", default="q", show_default=True, type=str 

352 ).lower() 

353 

354 if choices.lower().strip() == "q": 

355 LOGGER.error("Aborting.") 

356 sys.exit(1) 

357 return tuple(choice.strip() for choice in choices.strip().split(",")) 

358 

359 

360@click.option( 

361 "-J", 

362 "xz_flag", 

363 is_flag=True, 

364 flag_value="tar.xz", 

365 help="Export as a .tar.xz archive.", 

366) 

367@click.option( 

368 "-j", 

369 "bz2_flag", 

370 is_flag=True, 

371 flag_value="tar.bz2", 

372 help="Export as a .tar.bz2 archive.", 

373) 

374@click.option( 

375 "-z", 

376 "gz_flag", 

377 is_flag=True, 

378 flag_value="tar.gz", 

379 help="Export as a .tar.gz archive.", 

380) 

381@click.option( 

382 "-t", 

383 "tar_flag", 

384 is_flag=True, 

385 flag_value="tar", 

386 help="Export as an uncompressed .tar archive.", 

387) 

388@click.option( 

389 "-p", 

390 "zip_flag", 

391 is_flag=True, 

392 flag_value="zip", 

393 help="Export as a .zip archive.", 

394) 

395@click.option( 

396 "--format", 

397 "archive_format", 

398 type=str, 

399 required=False, 

400 help=( 

401 "Format for the archived backup. If not specified," 

402 " an appropriate one will be chosen based on your OS." 

403 ), 

404 metavar="FORMAT", 

405) 

406@click.option( 

407 "--output", 

408 "-o", 

409 type=Path, 

410 required=False, 

411 help=( 

412 "Explicitly specify a filename for the archived backup." 

413 " The format of the archive will be inferred from the extension" 

414 " unless a format flag is provided." 

415 ), 

416 metavar="FILENAME", 

417) 

418@click.argument( 

419 "revision", 

420 type=str, 

421 required=False, 

422) 

423@_subcommand_init 

424def export( 

425 repo_root: Path, 

426 revision: str | None, 

427 output: Path | None, 

428 **format_flags, 

429): 

430 """Create a stand-alone archive of the specified REVISION.""" 

431 print(format_flags) 

432 specified_formats: list[str] = [value for value in format_flags.values() if value] 

433 

434 if len(specified_formats) > 1: 

435 LOGGER.error("Conflicting values given for archive format") 

436 sys.exit(1) 

437 

438 if revision is None: 

439 revision = _prompt_for_a_recent_revision(repo_root) 

440 

441 if len(specified_formats) == 1: 

442 archive_format = specified_formats[0] 

443 if archive_format.startswith("."): 

444 archive_format = archive_format[1:] 

445 if output is None: 

446 output = Path( 

447 export_.generate_archive_name( 

448 Manifest.of(repo_root).name, revision, extension=archive_format 

449 ) 

450 ) 

451 else: 

452 output = output.parent / (output.name + f".{archive_format}") 

453 

454 try: 

455 export_.export_backup(repo_root, revision, output) 

456 except ValueError as whats_that: 

457 LOGGER.error(whats_that) 

458 sys.exit(1) 

459 

460 

461@click.argument( 

462 "pytest_args", 

463 nargs=-1, 

464) 

465@gsb.command(context_settings={"ignore_unknown_options": True}) 

466def test(pytest_args: tuple[str, ...]): # pragma: no cover 

467 """Run the GSB test suite to ensure that it is running correctly on your system. 

468 Requires you to have installed GSB with the test extra 

469 (_i.e._ `pipx install gsb[test]`).""" 

470 import pytest 

471 

472 pytest.main(["--pyargs", "gsb.test", *pytest_args])