Coverage for gsb/_git.py: 97%
237 statements
« prev ^ index » next coverage.py v7.2.6, created at 2024-09-08 16:21 -0400
« prev ^ index » next coverage.py v7.2.6, created at 2024-09-08 16:21 -0400
1"""Abstraction around the git library interface (to allow for easier backend swaps"""
3import datetime as dt
4import getpass
5import logging
6import re
7import socket
8import tarfile
9import zipfile
10from functools import partial
11from pathlib import Path
12from typing import Any, Generator, Iterable, NamedTuple, Self
14import pygit2
16from ._make_zip_archive import write_zip_archive
18LOGGER = logging.getLogger(__name__)
21def init(repo_root: Path) -> pygit2.Repository:
22 """Initialize (or re-initialize) a git repo, equivalent to running
23 `git init`
25 Parameters
26 ----------
27 repo_root : Path
28 The root directory of the git repo
30 Returns
31 -------
32 repo
33 The initialized git repository
35 Raises
36 ------
37 OSError
38 If `repo_root` does not exist, is not a directory or cannot be accessed
39 """
40 return _repo(repo_root, new=True)
43def _repo(
44 repo_root: Path, new: bool = False, initial_branch: str = "gsb"
45) -> pygit2.Repository:
46 """Load a git repository from the specified location
48 Parameters
49 ----------
50 repo_root : Path
51 The root directory of the git repo
52 new : bool, optional
53 By default, this method loads existing repositories. To initialize a new
54 repo, pass in `new=True`
55 initial_branch : str, optional
56 By default, if a new repo is to be created, it will be given the initial
57 branch name "gsb." To override this behavior (say, for testing) provide
58 a different value to this argument.
60 Returns
61 -------
62 repo
63 The requested git repository
65 Raises
66 ------
67 NotADirectoryError
68 If `repo_root` is not a directory
69 FileNotFoundError
70 If `repo_root` does not exist or the repo is not a valid repository
71 and `new=False`
72 OSError
73 If `repo_root` cannot otherwise be accessed
74 """
75 repo_root = repo_root.expanduser().resolve()
76 if not repo_root.exists():
77 raise FileNotFoundError(f"{repo_root} does not exist")
78 if not repo_root.is_dir():
79 raise NotADirectoryError(f"{repo_root} is not a directory")
80 if new:
81 LOGGER.debug(
82 "git init %s --initial-branch=%s", repr(str(repo_root)), initial_branch
83 )
84 return pygit2.init_repository(repo_root, initial_head=initial_branch)
85 try:
86 return pygit2.Repository(repo_root)
87 except pygit2.GitError as maybe_no_git:
88 if "repository not found" in str(maybe_no_git).lower():
89 raise FileNotFoundError(maybe_no_git) from maybe_no_git
90 raise # pragma: no cover
93def _config() -> dict[str, str]:
94 """Load the global git config and fill in any missing needed values
96 Returns
97 -------
98 dict
99 The user's global git config settings
101 Notes
102 -----
103 Loading a repo-specific git config is not supported by this method
104 """
105 config = _git_config()
106 LOGGER.debug("git config --global --list")
107 config["user.name"] = config.get("user.name") or getpass.getuser()
108 if "user.email" not in config:
109 config["user.email"] = f"{getpass.getuser()}@{socket.gethostname()}"
111 config["committer.name"] = "gsb"
112 config["committer.email"] = "gsb@openbagtwo.github.io"
113 return config
116def _git_config() -> dict[str, str]: # pragma: no cover
117 """Separate encapsulation for the purposes of monkeypatching"""
118 try:
119 return {
120 entry.name: entry.value for entry in pygit2.Config().get_global_config()
121 }
122 except OSError:
123 return {}
126def add(repo_root: Path, patterns: Iterable[str]) -> pygit2.Index:
127 """Add files matching the given pattern to the repo, equivalent to running
128 `git add <pattern>`
130 Parameters
131 ----------
132 repo_root : Path
133 The root directory of the git repo
134 patterns : list of str
135 The glob patterns to match
137 Returns
138 -------
139 index
140 The updated git index
142 Raises
143 ------
144 OSError
145 If `repo_root` does not exist, is not a directory or cannot be accessed
146 """
147 repo = _repo(repo_root)
148 patterns = list(patterns)
149 LOGGER.debug(
150 "git add %s",
151 " ".join([repr(pattern) for pattern in patterns]),
152 )
153 repo.index.add_all(patterns)
154 repo.index.write()
155 return repo.index
158def force_add(repo_root: Path, files: Iterable[Path]) -> pygit2.Index:
159 """Forcibly add specific files, overriding .gitignore, equivalent to running
160 `git add <file> --force`
162 Parameters
163 ----------
164 repo_root : Path
165 The root directory of the git repo
166 files : list of paths
167 The file paths to add, relative to the repo root
169 Returns
170 -------
171 index
172 The updated git index
174 Raises
175 ------
176 FileNotFoundError
177 If one of the specified paths does not exist
178 IsADirectoryError
179 If one of the specified paths is a directory
180 OSError
181 If `repo_root` does not exist, is not a directory or cannot be accessed
182 """
183 repo = _repo(repo_root)
184 for path in files:
185 try:
186 LOGGER.debug("git add --force %s", repr(str(path)))
187 repo.index.add(path)
188 except OSError as maybe_file_not_found: # pragma: no cover
189 if "No such file or directory" in str(maybe_file_not_found):
190 raise FileNotFoundError(maybe_file_not_found) from maybe_file_not_found
191 raise # pragma: no cover
192 except pygit2.GitError as maybe_directory: # pragma: no cover
193 if "is a directory" in str(maybe_directory):
194 raise IsADirectoryError(maybe_directory) from maybe_directory
195 repo.index.write()
196 return repo.index
199class Commit(NamedTuple):
200 """Commit metadata
202 Attributes
203 ----------
204 hash : str
205 The full commit hash
206 message : str
207 The commit message
208 timestamp : dt.datetime
209 The timestamp of the commit
210 gsb : bool
211 True if and only if the tag was created by `gsb`
212 """
214 hash: str
215 message: str
216 timestamp: dt.datetime
217 gsb: bool
219 @classmethod
220 def from_pygit2(cls, commit_object: pygit2.Object) -> Self:
221 """Resolve from a pygit2 object"""
222 try:
223 gsb = commit_object.committer.name == "gsb"
224 except AttributeError: # pragma: no cover
225 gsb = False
226 return cls(
227 str(commit_object.id),
228 commit_object.message,
229 dt.datetime.fromtimestamp(commit_object.commit_time),
230 gsb,
231 )
234def commit(
235 repo_root: Path,
236 message: str,
237 timestamp: dt.datetime | None = None,
238 _committer: tuple[str, str] | None = None,
239) -> Commit:
240 """Commit staged changes, equivalent to running `git commit -m <message>`
242 Parameters
243 ----------
244 repo_root : Path
245 The root directory of the git repo
246 message : str
247 The commit message
248 timestamp : dt.datetime, optional
249 By default, commits are created using the current timestamp. Use this
250 argument to provide a custom timestamp (as you would when calling
251 `git commit --date <timestamp>`)
252 _committer : (str, str) tuple, optional
253 By default this method uses "gsb" as the committer. This should not
254 be overridden outside of testing, but to do so, pass in both the
255 username and email address.
257 Returns
258 -------
259 commit
260 The generated commit object
262 Raises
263 ------
264 OSError
265 If `repo_root` does not exist, is not a directory or cannot be accessed
266 ValueError
267 If the commit is empty ("nothing to do")
268 """
269 repo = _repo(repo_root)
270 try:
271 ref = repo.head.name
272 parents = [repo.head.target]
273 except pygit2.GitError as headless:
274 if re.search(r"reference 'refs/heads/(.*)' not found", str(headless)):
275 ref = "HEAD"
276 parents = []
277 else:
278 raise # pragma: no cover
279 if not repo.status(untracked_files="no"):
280 raise ValueError("Nothing to commit")
282 if not message.endswith("\n"):
283 message += "\n"
285 config = _config()
286 author = pygit2.Signature(config["user.name"], config["user.email"])
287 signature_kwargs: dict[str, Any] = {}
288 if timestamp is not None:
289 signature_kwargs = {"time": int(timestamp.timestamp())}
290 if _committer is None:
291 committer = pygit2.Signature(
292 config["committer.name"], config["committer.email"], **signature_kwargs
293 )
294 else:
295 committer = pygit2.Signature(*_committer, **signature_kwargs)
297 LOGGER.debug("git commit -m %s", repr(message))
298 commit_id = repo.create_commit(
299 ref, author, committer, message, repo.index.write_tree(), parents
300 )
301 return Commit.from_pygit2(repo[commit_id])
304def log(
305 repo_root: Path, starting_point: str | None = None
306) -> Generator[Commit, None, None]:
307 """Return metadata about commits such as you'd get by running `git log`
309 Parameters
310 ----------
311 repo_root : Path
312 The root directory of the git repo
313 starting_point : str, optional
314 The default behavior is to walk backwards from the current repo HEAD.
315 To specify a different starting point, provide an identifier for this
316 variable.
318 Returns
319 -------
320 iterable of commit
321 The requested commits, returned lazily, in reverse-chronological order
323 Raises
324 ------
325 ValueError
326 If `starting_point` is provided and can't be resolved
327 OSError
328 If `repo_root` does not exist, is not a directory or cannot be accessed
329 """
330 repo = _repo(repo_root)
331 try:
332 if starting_point is not None:
333 LOGGER.debug("git log %s", starting_point)
334 head = _resolve_reference(starting_point, repo).id
335 else:
336 LOGGER.debug("git log")
337 head = repo[repo.head.target].id
338 for commit_object in repo.walk(head, pygit2.GIT_SORT_NONE):
339 yield Commit.from_pygit2(commit_object)
340 except pygit2.GitError as maybe_empty_history:
341 if re.search("reference (.*) not found", str(maybe_empty_history)):
342 # this is what pygit2 throws when there's no commits
343 return
344 raise # pragma: no cover
347def ls_files(repo_root: Path) -> list[Path]:
348 """List the files in the index, similar to the output you'd get from
349 running `git ls-files`
351 Parameters
352 ----------
353 repo_root : Path
354 The root directory of the git repo
356 Returns
357 -------
358 list of Path
359 The files being tracked in this repo
361 Raises
362 ------
363 OSError
364 If `repo_root` does not exist, is not a directory or cannot be accessed
365 """
366 repo = _repo(repo_root)
367 LOGGER.debug("git ls-files")
368 return [repo_root / file.path for file in repo.index]
371class Tag(NamedTuple):
372 """Tag metadata
374 Attributes
375 ----------
376 name : str
377 The name of the tag
378 annotation : str or None
379 The tag's annotation. If None, then this is a lightweight tag
380 target : Commit
381 The commit the tag is targeting
382 gsb : bool or None
383 True if the tagger was `gsb`, False if it was created by
384 someone / something else and None if it's a lightweight tag (which
385 doesn't have a tagger)
386 """
388 name: str
389 annotation: str | None
390 target: Commit
391 gsb: bool | None
393 @classmethod
394 def from_repo_reference(
395 cls, reference: pygit2.Reference | str, repo: pygit2.Repository
396 ) -> Self:
397 """Parse the reference and resolve from the pygit2 object"""
398 if isinstance(reference, str):
399 tag_object = repo.revparse_single(reference)
400 else:
401 tag_object = repo.revparse_single(reference.name)
402 reference = reference.shorthand
404 if tag_object.type == pygit2.GIT_OBJECT_TAG:
405 try:
406 gsb = tag_object.tagger.name == "gsb"
407 except AttributeError: # pragma: no cover
408 gsb = False
409 return cls(
410 tag_object.name,
411 tag_object.message,
412 Commit.from_pygit2(repo[tag_object.target]),
413 gsb,
414 )
415 if tag_object.type == pygit2.GIT_OBJECT_COMMIT:
416 return cls(reference, None, Commit.from_pygit2(tag_object), False)
417 raise TypeError( # pragma: no cover
418 f"Don't know how to parse reference of type: {tag_object.type}"
419 )
422def tag(
423 repo_root: Path,
424 tag_name: str,
425 annotation: str | None,
426 target: str | None = None,
427 _tagger: tuple[str, str] | None = None,
428) -> Tag:
429 """Create a tag at the current HEAD, equivalent to running
430 `git tag [-am <annotation>]`
432 Parameters
433 ----------
434 repo_root : Path
435 The root directory of the git repo
436 tag_name : str
437 The name to give the tag
438 annotation : str or None
439 The annotation to give the tag. If None is provided, a lightweight tag
440 will be created
441 target : str, optional
442 The commit to assign the tag. If None is given, the current HEAD will
443 be used.
444 _tagger : (str, str) tuple, optional
445 By default this method uses "gsb" as the tagger. This should not
446 be overridden outside of testing, but to do so, pass in both the
447 username and email address.
449 Returns
450 -------
451 tag
452 The generated tag object
454 Raises
455 ------
456 ValueError
457 If there is already a tag with the provided name, or if `target` is
458 provided and can't be resolved
459 OSError
460 If `repo_root` does not exist, is not a directory or cannot be accessed
461 """
462 repo = _repo(repo_root)
464 config = _config()
465 if _tagger is None:
466 tagger = pygit2.Signature(
467 config["committer.name"],
468 config["committer.email"],
469 )
470 else:
471 tagger = pygit2.Signature(*_tagger)
473 reference = _resolve_reference(target, repo).id if target else repo.head.target
474 ref_short = str(reference)[:8] if target else "HEAD"
476 if annotation:
477 if not annotation.endswith("\n"):
478 annotation += "\n"
480 LOGGER.debug("git tag %s -am %s %s", tag_name, repr(annotation), ref_short)
481 repo.create_tag(
482 tag_name,
483 reference,
484 pygit2.GIT_OBJECT_COMMIT,
485 tagger,
486 annotation,
487 )
488 else:
489 LOGGER.debug("git tag %s %s", tag_name, ref_short)
490 repo.create_reference(f"refs/tags/{tag_name}", reference)
492 return Tag.from_repo_reference(tag_name, repo)
494 # PSA: pygit2.AlreadyExistsError subclasses ValueError
497def delete_tag(repo_root: Path, tag_name: str) -> None:
498 """Delete a tag, equivalent to running `git tag -d <tag_name>`
500 Parameters
501 ----------
502 repo_root : Path
503 The root directory of the git repo
504 tag_name : str
505 The name to the tag to delete
507 Raises
508 ------
509 ValueError
510 If there is no tag with the provided name
511 OSError
512 If `repo_root` does not exist, is not a directory or cannot be accessed
513 """
514 repo = _repo(repo_root)
515 try:
516 LOGGER.debug("git tag -d %s", tag_name)
517 repo.references[f"refs/tags/{tag_name}"].delete()
518 except KeyError as tag_not_found:
519 raise ValueError(f"No such tag: {tag_name}") from tag_not_found
522def get_tags(repo_root: Path, annotated_only: bool) -> list[Tag]:
523 """List the repo's tags, similar to the output you'd get from
524 running `git tag`, with the additional option of filtering out
525 lightweight tags
527 Parameters
528 ----------
529 repo_root : Path
530 The root directory of the git repo
531 annotated_only : bool
532 Lightweight tags will be included if and only if this is `False`
534 Returns
535 -------
536 list of Tag
537 The requested list of tags, sorted in lexical order
539 Raises
540 ------
541 OSError
542 If `repo_root` does not exist, is not a directory or cannot be accessed
543 """
544 repo = _repo(repo_root)
545 tags: list[Tag] = []
546 LOGGER.debug("git tag")
547 for reference in repo.references.iterator(pygit2.GIT_REFERENCES_TAGS):
548 parsed_tag = Tag.from_repo_reference(reference, repo)
549 if parsed_tag.annotation or not annotated_only:
550 tags.append(parsed_tag)
551 return sorted(tags)
554def _resolve_reference(reference: str, repo: pygit2.Repository) -> pygit2.Object:
555 """Attempt to resolve a reference
557 Parameters
558 ----------
559 reference : str
560 The reference to resolve
561 repo : Repository
562 The git repository
564 Returns
565 -------
566 pygit2.Object
567 The resolved reference
569 Raises
570 ------
571 ValueError
572 If the specified revision does not exist
573 """
574 try:
575 LOGGER.debug("git show %s", reference)
576 return repo.revparse_single(reference)
577 except KeyError as no_rev:
578 raise ValueError(
579 f"Could not find a revision named {repr(reference)}"
580 ) from no_rev
583def show(repo_root: Path, reference: str) -> Commit | Tag:
584 """Get information about a specified revision, similar to the output you'd
585 get from running `git show <commit-hash-or-tag-name>`.
587 Parameters
588 ----------
589 repo_root : Path
590 The root directory of the git repo
591 reference : str
592 A unique descriptor of the tag or commit
594 Returns
595 -------
596 Commit or Tag
597 The requested tag or commit
599 Raises
600 ------
601 OSError
602 If `repo_root` does not exist, is not a directory or cannot be accessed
603 ValueError
604 If the specified revision does not exist
605 """
606 repo = _repo(repo_root)
607 revision = _resolve_reference(reference, repo)
608 if revision.type == pygit2.GIT_OBJECT_TAG:
609 return Tag.from_repo_reference(str(revision.id), repo)
610 if revision.type == pygit2.GIT_OBJECT_COMMIT:
611 return Commit.from_pygit2(revision)
612 raise TypeError( # pragma: no cover
613 f"Object of type {revision.type} is not a valid revision"
614 )
617def reset(repo_root: Path, reference: str, hard: bool) -> None:
618 """Reset the repo to the specified revision, equivalent to running
619 `git reset [--hard/--soft] <revision>`
621 Parameters
622 ----------
623 repo_root : Path
624 The root directory of the git repo
625 reference : str
626 A unique descriptor of the tag or commit
627 hard : bool
628 If True, perform a hard reset. If False, perform a soft reset.
630 Returns
631 -------
632 None
634 Raises
635 ------
636 OSError
637 If `repo_root` does not exist, is not a directory or cannot be accessed
638 ValueError
639 If the specified revision does not exist
640 """
641 repo = _repo(repo_root)
643 # make sure revision exists
644 reference = _resolve_reference(reference, repo).id
646 LOGGER.debug(f"git reset --{'hard' if hard else 'soft'} %s", reference)
647 repo.reset(reference, pygit2.GIT_RESET_HARD if hard else pygit2.GIT_RESET_SOFT)
650def checkout_files(repo_root: Path, reference: str, paths: Iterable[Path]) -> None:
651 """Check out the versions of the specified files that existed at the specified
652 revision, equivalent to running
653 `git reset <revision> -- <paths...> && git checkout <revision> -- <paths...>`
655 Parameters
656 ----------
657 repo_root : Path
658 The root directory of the git repo
659 reference : str
660 A unique descriptor of the tag or commit
661 paths : list of Paths
662 The files to reset
664 Returns
665 -------
666 None
668 Raises
669 ------
670 OSError
671 If `repo_root` does not exist, is not a directory or cannot be accessed
672 ValueError
673 If the specified revision does not exist
674 """
675 repo = _repo(repo_root)
677 revision = _resolve_reference(reference, repo)
678 if isinstance(revision, pygit2.Tag):
679 return checkout_files(repo_root, str(revision.target), paths)
681 paths = list(paths)
683 for path in paths:
684 LOGGER.debug("git reset %s -- %s", reference, repr(str(path)))
685 try:
686 repo.index.remove(path)
687 except OSError:
688 pass # possible that the file no longer exists
689 try:
690 past_file = revision.tree[path]
691 repo.index.add(pygit2.IndexEntry(path, past_file.id, past_file.filemode))
692 except KeyError:
693 pass # possible that the file doesn't exist at the time of the revision
695 repo.index.write()
696 LOGGER.debug(
697 "git checkout %s -- %s",
698 reference,
699 " ".join((repr(str(path)) for path in paths)),
700 )
701 repo.checkout(strategy=pygit2.GIT_CHECKOUT_FORCE, paths=paths)
702 return None
705def checkout_branch(repo_root: Path, branch_name: str, target: str | None) -> None:
706 """Check out a branch, either new or existing, equivalent to calling
707 `git checkout [-b] <branch_name> [<target>]`
709 Parameters
710 ----------
711 repo_root : Path
712 The root directory of the git repo
713 branch_name : str
714 The name for the branch
715 target : str or None
716 When a reference is provided, this method will attempt to create a new
717 branch at that reference point. When None is provided, this method will
718 attempt to check out an existing branch at that branch's head.
720 Raises
721 ------
722 OSError
723 If `repo_root` does not exist, is not a directory or cannot be accessed
724 ValueError
725 If the specified `target` does not exist, if the `branch_name` is taken
726 (when `target` is specified) or if the `branch_name` _does not_ exist
727 (when `target=None`)
728 """
729 repo = _repo(repo_root)
730 if target is not None:
731 LOGGER.debug("git checkout -b %s %s", branch_name, target)
732 reference = _resolve_reference(target, repo)
733 if isinstance(reference, pygit2.Tag):
734 reference = _resolve_reference(str(reference.target), repo)
735 repo.branches.local.create(branch_name, reference)
736 try:
737 LOGGER.debug("git checkout %s", branch_name)
738 repo.checkout(repo.branches.local[branch_name])
739 except KeyError as no_such_branch:
740 raise ValueError(no_such_branch) from no_such_branch
743def delete_branch(repo_root: Path, branch_name: str) -> None:
744 """Delete a branch, equivalent to running `git branch -D <branch_name>`
746 Parameters
747 ----------
748 repo_root : Path
749 The root directory of the git repo
750 branch_name : str
751 The name of the branch
753 Raises
754 ------
755 OSError
756 If `repo_root` does not exist, is not a directory or cannot be accessed
757 ValueError
758 If the specified branch does not exist or if the specified branch is
759 currently checked out
760 """
761 repo = _repo(repo_root)
762 try:
763 LOGGER.debug("git branch -D %s", branch_name)
764 repo.branches.local.delete(branch_name)
765 except KeyError as no_such_branch:
766 raise ValueError(no_such_branch) from no_such_branch
769def archive(repo_root: Path, filename: Path, reference: str = "HEAD") -> None:
770 """Create a standalone archive containing the files in the repo at the
771 current HEAD, equivalent to running `git archive -o <filename>`
773 Parameters
774 ----------
775 repo_root : Path
776 The root directory of the git repo
777 filename : Path
778 The full path to the archive's location, including its extension
779 reference : str, optional
780 A unique descriptor of the tag or commit to archive. If None is given,
781 the default is to use the current HEAD.
783 Raises
784 ------
785 OSError
786 If `repo_root` does not exist, is not a directory or cannot be accessed,
787 or if the specified `filename` already exists or cannot be written to.
788 ValueError
789 If the specified `target` does not exist or if the given filename
790 does not have a valid extension
791 NotImplementedError
792 If the compression schema implied by the filename's extension is not
793 supported
794 """
795 repo = _repo(repo_root)
796 revision = _resolve_reference(reference, repo)
797 if filename.exists():
798 raise FileExistsError(f"Archive {filename} already exists.")
800 LOGGER.debug("git archive -o %s %s", filename, reference)
802 match tuple(suffix.lower() for suffix in filename.suffixes):
803 case ():
804 raise ValueError(f"Filename {filename} does not specify an extension.")
805 case *_, ".tar":
806 opener = partial(tarfile.open, mode="x:")
807 case (*_, ".tgz") | (*_, ".tar", ".gz"):
808 opener = partial(tarfile.open, mode="x:gz")
809 case (*_, ".tbz2" | ".tbz") | (*_, ".tar", (".bz2" | ".bz")):
810 opener = partial(tarfile.open, mode="x:bz2")
811 case (*_, ".txz" | ".tlzma" | ".tlz") | (*_, ".tar", (".xz" | ".lzma" | ".lz")):
812 opener = partial(tarfile.open, mode="x:xz")
813 case (*_, ".zip"):
814 with zipfile.ZipFile(
815 filename, "w", compression=zipfile.ZIP_DEFLATED
816 ) as zip_file:
817 write_zip_archive(repo, revision, zip_file)
818 return
819 case _:
820 raise NotImplementedError(f"{filename}: Archive format is not supported.")
822 with opener(filename) as archive_file:
823 repo.write_archive(revision, archive_file)