Skip to content

Block insecure options and protocols by default #1521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 29, 2022
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions git/cmd.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
from __future__ import annotations
import re
from contextlib import contextmanager
import io
import logging
@@ -24,7 +25,7 @@
from git.exc import CommandError
from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present

from .exc import GitCommandError, GitCommandNotFound
from .exc import GitCommandError, GitCommandNotFound, UnsafeOptionError, UnsafeProtocolError
from .util import (
LazyMixin,
stream_copy,
@@ -262,6 +263,8 @@ class Git(LazyMixin):

_excluded_ = ("cat_file_all", "cat_file_header", "_version_info")

re_unsafe_protocol = re.compile("(.+)::.+")

def __getstate__(self) -> Dict[str, Any]:
return slots_to_dict(self, exclude=self._excluded_)

@@ -454,6 +457,48 @@ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike:
url = url.replace("\\\\", "\\").replace("\\", "/")
return url

@classmethod
def check_unsafe_protocols(cls, url: str) -> None:
"""
Check for unsafe protocols.
Apart from the usual protocols (http, git, ssh),
Git allows "remote helpers" that have the form `<transport>::<address>`,
one of these helpers (`ext::`) can be used to invoke any arbitrary command.
See:
- https://git-scm.com/docs/gitremote-helpers
- https://git-scm.com/docs/git-remote-ext
"""
match = cls.re_unsafe_protocol.match(url)
if match:
protocol = match.group(1)
raise UnsafeProtocolError(
f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it."
)

@classmethod
def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None:
"""
Check for unsafe options.
Some options that are passed to `git <command>` can be used to execute
arbitrary commands, this are blocked by default.
"""
# Options can be of the form `foo` or `--foo bar` `--foo=bar`,
# so we need to check if they start with "--foo" or if they are equal to "foo".
bare_unsafe_options = [
option.lstrip("-")
for option in unsafe_options
]
for option in options:
for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options):
if option.startswith(unsafe_option) or option == bare_option:
raise UnsafeOptionError(
f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it."
)

class AutoInterrupt(object):
"""Kill/Interrupt the stored process instance once this instance goes out of scope. It is
used to prevent processes piling up in case iterators stop reading.
@@ -1148,12 +1193,12 @@ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any
return args

@classmethod
def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]:
def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]:

outlist = []
if isinstance(arg_list, (list, tuple)):
for arg in arg_list:
outlist.extend(cls.__unpack_args(arg))
outlist.extend(cls._unpack_args(arg))
else:
outlist.append(str(arg_list))

@@ -1238,7 +1283,7 @@ def _call_process(
# Prepare the argument list

opt_args = self.transform_kwargs(**opts_kwargs)
ext_args = self.__unpack_args([a for a in args if a is not None])
ext_args = self._unpack_args([a for a in args if a is not None])

if insert_after_this_arg is None:
args_list = opt_args + ext_args
8 changes: 8 additions & 0 deletions git/exc.py
Original file line number Diff line number Diff line change
@@ -37,6 +37,14 @@ class NoSuchPathError(GitError, OSError):
"""Thrown if a path could not be access by the system."""


class UnsafeProtocolError(GitError):
"""Thrown if unsafe protocols are passed without being explicitly allowed."""


class UnsafeOptionError(GitError):
"""Thrown if unsafe options are passed without being explicitly allowed."""


class CommandError(GitError):
"""Base class for exceptions thrown at every stage of `Popen()` execution.
36 changes: 33 additions & 3 deletions git/objects/submodule/base.py
Original file line number Diff line number Diff line change
@@ -272,7 +272,16 @@ def _module_abspath(cls, parent_repo: "Repo", path: PathLike, name: str) -> Path
# end

@classmethod
def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo":
def _clone_repo(
cls,
repo: "Repo",
url: str,
path: PathLike,
name: str,
allow_unsafe_options: bool = False,
allow_unsafe_protocols: bool = False,
**kwargs: Any,
) -> "Repo":
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
@@ -289,7 +298,13 @@ def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs
module_checkout_path = osp.join(str(repo.working_tree_dir), path)
# end

clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
clone = git.Repo.clone_from(
url,
module_checkout_path,
allow_unsafe_options=allow_unsafe_options,
allow_unsafe_protocols=allow_unsafe_protocols,
**kwargs,
)
if cls._need_gitfile_submodules(repo.git):
cls._write_git_file_and_module_config(module_checkout_path, module_abspath)
# end
@@ -359,6 +374,8 @@ def add(
depth: Union[int, None] = None,
env: Union[Mapping[str, str], None] = None,
clone_multi_options: Union[Sequence[TBD], None] = None,
allow_unsafe_options: bool = False,
allow_unsafe_protocols: bool = False,
) -> "Submodule":
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
@@ -475,7 +492,16 @@ def add(
kwargs["multi_options"] = clone_multi_options

# _clone_repo(cls, repo, url, path, name, **kwargs):
mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs)
mrepo = cls._clone_repo(
repo,
url,
path,
name,
env=env,
allow_unsafe_options=allow_unsafe_options,
allow_unsafe_protocols=allow_unsafe_protocols,
**kwargs,
)
# END verify url

## See #525 for ensuring git urls in config-files valid under Windows.
@@ -520,6 +546,8 @@ def update(
keep_going: bool = False,
env: Union[Mapping[str, str], None] = None,
clone_multi_options: Union[Sequence[TBD], None] = None,
allow_unsafe_options: bool = False,
allow_unsafe_protocols: bool = False,
) -> "Submodule":
"""Update the repository of this submodule to point to the checkout
we point at with the binsha of this instance.
@@ -643,6 +671,8 @@ def update(
n=True,
env=env,
multi_options=clone_multi_options,
allow_unsafe_options=allow_unsafe_options,
allow_unsafe_protocols=allow_unsafe_protocols,
)
# END handle dry-run
progress.update(
70 changes: 63 additions & 7 deletions git/remote.py
Original file line number Diff line number Diff line change
@@ -535,6 +535,23 @@ class Remote(LazyMixin, IterableObj):
__slots__ = ("repo", "name", "_config_reader")
_id_attribute_ = "name"

unsafe_git_fetch_options = [
# This option allows users to execute arbitrary commands.
# https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---upload-packltupload-packgt
"--upload-pack",
]
unsafe_git_pull_options = [
# This option allows users to execute arbitrary commands.
# https://git-scm.com/docs/git-pull#Documentation/git-pull.txt---upload-packltupload-packgt
"--upload-pack"
]
unsafe_git_push_options = [
# This option allows users to execute arbitrary commands.
# https://git-scm.com/docs/git-push#Documentation/git-push.txt---execltgit-receive-packgt
"--receive-pack",
"--exec",
]

def __init__(self, repo: "Repo", name: str) -> None:
"""Initialize a remote instance
@@ -611,7 +628,9 @@ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator["Remote
yield Remote(repo, section[lbound + 1 : rbound])
# END for each configuration section

def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> "Remote":
def set_url(
self, new_url: str, old_url: Optional[str] = None, allow_unsafe_protocols: bool = False, **kwargs: Any
) -> "Remote":
"""Configure URLs on current remote (cf command git remote set_url)
This command manages URLs on the remote.
@@ -620,15 +639,17 @@ def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) ->
:param old_url: when set, replaces this URL with new_url for the remote
:return: self
"""
if not allow_unsafe_protocols:
Git.check_unsafe_protocols(new_url)
scmd = "set-url"
kwargs["insert_kwargs_after"] = scmd
if old_url:
self.repo.git.remote(scmd, self.name, new_url, old_url, **kwargs)
self.repo.git.remote(scmd, "--", self.name, new_url, old_url, **kwargs)
else:
self.repo.git.remote(scmd, self.name, new_url, **kwargs)
self.repo.git.remote(scmd, "--", self.name, new_url, **kwargs)
return self

def add_url(self, url: str, **kwargs: Any) -> "Remote":
def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":
"""Adds a new url on current remote (special case of git remote set_url)
This command adds new URLs to a given remote, making it possible to have
@@ -637,7 +658,7 @@ def add_url(self, url: str, **kwargs: Any) -> "Remote":
:param url: string being the URL to add as an extra remote URL
:return: self
"""
return self.set_url(url, add=True)
return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols)

def delete_url(self, url: str, **kwargs: Any) -> "Remote":
"""Deletes a new url on current remote (special case of git remote set_url)
@@ -729,7 +750,7 @@ def stale_refs(self) -> IterableList[Reference]:
return out_refs

@classmethod
def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote":
def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote":
"""Create a new remote to the given repository
:param repo: Repository instance that is to receive the new remote
:param name: Desired name of the remote
@@ -739,7 +760,10 @@ def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote":
:raise GitCommandError: in case an origin with that name already exists"""
scmd = "add"
kwargs["insert_kwargs_after"] = scmd
repo.git.remote(scmd, name, Git.polish_url(url), **kwargs)
url = Git.polish_url(url)
if not allow_unsafe_protocols:
Git.check_unsafe_protocols(url)
repo.git.remote(scmd, "--", name, url, **kwargs)
return cls(repo, name)

# add is an alias
@@ -921,6 +945,8 @@ def fetch(
progress: Union[RemoteProgress, None, "UpdateProgress"] = None,
verbose: bool = True,
kill_after_timeout: Union[None, float] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> IterableList[FetchInfo]:
"""Fetch the latest changes for this remote
@@ -963,6 +989,14 @@ def fetch(
else:
args = [refspec]

if not allow_unsafe_protocols:
for ref in args:
if ref:
Git.check_unsafe_protocols(ref)

if not allow_unsafe_options:
Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_fetch_options)

proc = self.repo.git.fetch(
"--", self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs
)
@@ -976,6 +1010,8 @@ def pull(
refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, "UpdateProgress", None] = None,
kill_after_timeout: Union[None, float] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> IterableList[FetchInfo]:
"""Pull changes from the given branch, being the same as a fetch followed
@@ -990,6 +1026,15 @@ def pull(
# No argument refspec, then ensure the repo's config has a fetch refspec.
self._assert_refspec()
kwargs = add_progress(kwargs, self.repo.git, progress)

refspec = Git._unpack_args(refspec or [])
if not allow_unsafe_protocols:
for ref in refspec:
Git.check_unsafe_protocols(ref)

if not allow_unsafe_options:
Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options)

proc = self.repo.git.pull(
"--", self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs
)
@@ -1003,6 +1048,8 @@ def push(
refspec: Union[str, List[str], None] = None,
progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None] = None,
kill_after_timeout: Union[None, float] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> IterableList[PushInfo]:
"""Push changes from source branch in refspec to target branch in refspec.
@@ -1033,6 +1080,15 @@ def push(
If the operation fails completely, the length of the returned IterableList will
be 0."""
kwargs = add_progress(kwargs, self.repo.git, progress)

refspec = Git._unpack_args(refspec or [])
if not allow_unsafe_protocols:
for ref in refspec:
Git.check_unsafe_protocols(ref)

if not allow_unsafe_options:
Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options)

proc = self.repo.git.push(
"--",
self,
48 changes: 45 additions & 3 deletions git/repo/base.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,11 @@
)
from git.config import GitConfigParser
from git.db import GitCmdObjectDB
from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError
from git.exc import (
GitCommandError,
InvalidGitRepositoryError,
NoSuchPathError,
)
from git.index import IndexFile
from git.objects import Submodule, RootModule, Commit
from git.refs import HEAD, Head, Reference, TagReference
@@ -129,6 +133,18 @@ class Repo(object):
re_author_committer_start = re.compile(r"^(author|committer)")
re_tab_full_line = re.compile(r"^\t(.*)$")

unsafe_git_clone_options = [
# This option allows users to execute arbitrary commands.
# https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---upload-packltupload-packgt
"--upload-pack",
"-u",
# Users can override configuration variables
# like `protocol.allow` or `core.gitProxy` to execute arbitrary commands.
# https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---configltkeygtltvaluegt
"--config",
"-c",
]

# invariants
# represents the configuration level of a configuration file
config_level: ConfigLevels_Tup = ("system", "user", "global", "repository")
@@ -955,7 +971,7 @@ def blame(
file: str,
incremental: bool = False,
rev_opts: Optional[List[str]] = None,
**kwargs: Any
**kwargs: Any,
) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None:
"""The blame information for the given file at the given revision.
@@ -1146,6 +1162,8 @@ def _clone(
odb_default_type: Type[GitCmdObjectDB],
progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None,
multi_options: Optional[List[str]] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
odbt = kwargs.pop("odbt", odb_default_type)
@@ -1167,6 +1185,12 @@ def _clone(
multi = None
if multi_options:
multi = shlex.split(" ".join(multi_options))

if not allow_unsafe_protocols:
Git.check_unsafe_protocols(str(url))
if not allow_unsafe_options and multi_options:
Git.check_unsafe_options(options=multi_options, unsafe_options=cls.unsafe_git_clone_options)

proc = git.clone(
multi,
"--",
@@ -1220,6 +1244,8 @@ def clone(
path: PathLike,
progress: Optional[Callable] = None,
multi_options: Optional[List[str]] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
"""Create a clone from this repository.
@@ -1230,6 +1256,7 @@ def clone(
option per list item which is passed exactly as specified to clone.
For example ['--config core.filemode=false', '--config core.ignorecase',
'--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path']
:param unsafe_protocols: Allow unsafe protocols to be used, like ext
:param kwargs:
* odbt = ObjectDatabase Type, allowing to determine the object database
implementation used by the returned Repo instance
@@ -1243,6 +1270,8 @@ def clone(
type(self.odb),
progress,
multi_options,
allow_unsafe_protocols=allow_unsafe_protocols,
allow_unsafe_options=allow_unsafe_options,
**kwargs,
)

@@ -1254,6 +1283,8 @@ def clone_from(
progress: Optional[Callable] = None,
env: Optional[Mapping[str, str]] = None,
multi_options: Optional[List[str]] = None,
allow_unsafe_protocols: bool = False,
allow_unsafe_options: bool = False,
**kwargs: Any,
) -> "Repo":
"""Create a clone from the given URL
@@ -1268,12 +1299,23 @@ def clone_from(
If you want to unset some variable, consider providing empty string
as its value.
:param multi_options: See ``clone`` method
:param unsafe_protocols: Allow unsafe protocols to be used, like ext
:param kwargs: see the ``clone`` method
:return: Repo instance pointing to the cloned directory"""
git = cls.GitCommandWrapperType(os.getcwd())
if env is not None:
git.update_environment(**env)
return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs)
return cls._clone(
git,
url,
to_path,
GitCmdObjectDB,
progress,
multi_options,
allow_unsafe_protocols=allow_unsafe_protocols,
allow_unsafe_options=allow_unsafe_options,
**kwargs,
)

def archive(
self,
4 changes: 2 additions & 2 deletions test/test_git.py
Original file line number Diff line number Diff line change
@@ -39,12 +39,12 @@ def test_call_process_calls_execute(self, git):
self.assertEqual(git.call_args, ((["git", "version"],), {}))

def test_call_unpack_args_unicode(self):
args = Git._Git__unpack_args("Unicode€™")
args = Git._unpack_args("Unicode€™")
mangled_value = "Unicode\u20ac\u2122"
self.assertEqual(args, [mangled_value])

def test_call_unpack_args(self):
args = Git._Git__unpack_args(["git", "log", "--", "Unicode€™"])
args = Git._unpack_args(["git", "log", "--", "Unicode€™"])
mangled_value = "Unicode\u20ac\u2122"
self.assertEqual(args, ["git", "log", "--", mangled_value])

211 changes: 211 additions & 0 deletions test/test_remote.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@
GitCommandError,
)
from git.cmd import Git
from pathlib import Path
from git.exc import UnsafeOptionError, UnsafeProtocolError
from test.lib import (
TestBase,
with_rw_repo,
@@ -690,6 +692,215 @@ def test_push_error(self, repo):
with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"):
rem.push("__BAD_REF__")

@with_rw_repo("HEAD")
def test_set_unsafe_url(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
remote.set_url(url)

@with_rw_repo("HEAD")
def test_set_unsafe_url_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
remote.set_url(url, allow_unsafe_protocols=True)
assert list(remote.urls)[-1] == url

@with_rw_repo("HEAD")
def test_add_unsafe_url(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
remote.add_url(url)

@with_rw_repo("HEAD")
def test_add_unsafe_url_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
remote.add_url(url, allow_unsafe_protocols=True)
assert list(remote.urls)[-1] == url

@with_rw_repo("HEAD")
def test_create_remote_unsafe_url(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
Remote.create(rw_repo, "origin", url)

@with_rw_repo("HEAD")
def test_create_remote_unsafe_url_allowed(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for i, url in enumerate(urls):
remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True)
assert remote.url == url

@with_rw_repo("HEAD")
def test_fetch_unsafe_url(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
remote.fetch(url)

@with_rw_repo("HEAD")
def test_fetch_unsafe_url_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
remote.fetch(url, allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_fetch_unsafe_options(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
remote.fetch(**unsafe_option)

@with_rw_repo("HEAD")
def test_fetch_unsafe_options_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
for unsafe_option in unsafe_options:
# The options will be allowed, but the command will fail.
with self.assertRaises(GitCommandError):
remote.fetch(**unsafe_option, allow_unsafe_options=True)

@with_rw_repo("HEAD")
def test_pull_unsafe_url(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
remote.pull(url)

@with_rw_repo("HEAD")
def test_pull_unsafe_url_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
remote.pull(url, allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_pull_unsafe_options(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
remote.pull(**unsafe_option)

@with_rw_repo("HEAD")
def test_pull_unsafe_options_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [{"upload-pack": f"touch {tmp_file}"}]
for unsafe_option in unsafe_options:
# The options will be allowed, but the command will fail.
with self.assertRaises(GitCommandError):
remote.pull(**unsafe_option, allow_unsafe_options=True)

@with_rw_repo("HEAD")
def test_push_unsafe_url(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
remote.push(url)

@with_rw_repo("HEAD")
def test_push_unsafe_url_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
remote.push(url, allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_push_unsafe_options(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
{
"receive-pack": f"touch {tmp_file}",
"exec": f"touch {tmp_file}",
}
]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
remote.push(**unsafe_option)

@with_rw_repo("HEAD")
def test_push_unsafe_options_allowed(self, rw_repo):
remote = rw_repo.remote("origin")
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
{
"receive-pack": f"touch {tmp_file}",
"exec": f"touch {tmp_file}",
}
]
for unsafe_option in unsafe_options:
# The options will be allowed, but the command will fail.
with self.assertRaises(GitCommandError):
remote.push(**unsafe_option, allow_unsafe_options=True)


class TestTimeouts(TestBase):
@with_rw_repo("HEAD", bare=False)
131 changes: 131 additions & 0 deletions test/test_repo.py
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@
)
from git.exc import (
BadObject,
UnsafeOptionError,
UnsafeProtocolError,
)
from git.repo.fun import touch
from test.lib import TestBase, with_rw_repo, fixture
@@ -223,6 +225,7 @@ def test_clone_from_pathlib_withConfig(self, rw_dir):
"--config submodule.repo.update=checkout",
"--config filter.lfs.clean='git-lfs clean -- %f'",
],
allow_unsafe_options=True,
)

self.assertEqual(cloned.config_reader().get_value("submodule", "active"), "repo")
@@ -263,6 +266,134 @@ def test_leaking_password_in_clone_logs(self, rw_dir):
to_path=rw_dir,
)

@with_rw_repo("HEAD")
def test_clone_unsafe_options(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
rw_repo.clone(tmp_dir, multi_options=[unsafe_option])

@with_rw_repo("HEAD")
def test_clone_unsafe_options_allowed(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
]
for i, unsafe_option in enumerate(unsafe_options):
destination = tmp_dir / str(i)
# The options will be allowed, but the command will fail.
with self.assertRaises(GitCommandError):
rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True)

unsafe_options = [
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for i, unsafe_option in enumerate(unsafe_options):
destination = tmp_dir / str(i)
assert not destination.exists()
rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True)
assert destination.exists()

@with_rw_repo("HEAD")
def test_clone_safe_options(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
options = [
"--depth=1",
"--single-branch",
"-q",
]
for option in options:
destination = tmp_dir / option
assert not destination.exists()
rw_repo.clone(destination, multi_options=[option])
assert destination.exists()

@with_rw_repo("HEAD")
def test_clone_from_unsafe_options(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option])

@with_rw_repo("HEAD")
def test_clone_from_unsafe_options_allowed(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
]
for i, unsafe_option in enumerate(unsafe_options):
destination = tmp_dir / str(i)
# The options will be allowed, but the command will fail.
with self.assertRaises(GitCommandError):
Repo.clone_from(
rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True
)

unsafe_options = [
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for i, unsafe_option in enumerate(unsafe_options):
destination = tmp_dir / str(i)
assert not destination.exists()
Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True)
assert destination.exists()

@with_rw_repo("HEAD")
def test_clone_from_safe_options(self, rw_repo):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
options = [
"--depth=1",
"--single-branch",
"-q",
]
for option in options:
destination = tmp_dir / option
assert not destination.exists()
Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option])
assert destination.exists()

def test_clone_from_unsafe_procol(self):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::17/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
Repo.clone_from(url, tmp_dir)

def test_clone_from_unsafe_procol_allowed(self):
tmp_dir = pathlib.Path(tempfile.mkdtemp())
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::/foo",
]
for url in urls:
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_max_chunk_size(self, repo):
class TestOutputStream(TestBase):
119 changes: 117 additions & 2 deletions test/test_submodule.py
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import shutil
import tempfile
from pathlib import Path
import sys
from unittest import skipIf

@@ -12,7 +14,13 @@
from git.cmd import Git
from git.compat import is_win
from git.config import GitConfigParser, cp
from git.exc import InvalidGitRepositoryError, RepositoryDirtyError
from git.exc import (
GitCommandError,
InvalidGitRepositoryError,
RepositoryDirtyError,
UnsafeOptionError,
UnsafeProtocolError,
)
from git.objects.submodule.base import Submodule
from git.objects.submodule.root import RootModule, RootUpdateProgress
from git.repo.fun import find_submodule_git_dir, touch
@@ -1026,7 +1034,7 @@ def test_update_clone_multi_options_argument(self, rwdir):
)

# Act
sm.update(init=True, clone_multi_options=["--config core.eol=true"])
sm.update(init=True, clone_multi_options=["--config core.eol=true"], allow_unsafe_options=True)

# Assert
sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config"))
@@ -1070,6 +1078,7 @@ def test_add_clone_multi_options_argument(self, rwdir):
sm_name,
url=self._small_repo_url(),
clone_multi_options=["--config core.eol=true"],
allow_unsafe_options=True,
)

# Assert
@@ -1089,3 +1098,109 @@ def test_add_no_clone_multi_options_argument(self, rwdir):
sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config"))
with self.assertRaises(cp.NoOptionError):
sm_config.get_value("core", "eol")

@with_rw_repo("HEAD")
def test_submodule_add_unsafe_url(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::/foo",
]
for url in urls:
with self.assertRaises(UnsafeProtocolError):
Submodule.add(rw_repo, "new", "new", url)

@with_rw_repo("HEAD")
def test_submodule_add_unsafe_url_allowed(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::/foo",
]
for url in urls:
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_submodule_add_unsafe_options(self, rw_repo):
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option])

@with_rw_repo("HEAD")
def test_submodule_add_unsafe_options_allowed(self, rw_repo):
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
for unsafe_option in unsafe_options:
with self.assertRaises(GitCommandError):
Submodule.add(
rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True
)

@with_rw_repo("HEAD")
def test_submodule_update_unsafe_url(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::/foo",
]
for url in urls:
submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url)
with self.assertRaises(UnsafeProtocolError):
submodule.update()

@with_rw_repo("HEAD")
def test_submodule_update_unsafe_url_allowed(self, rw_repo):
urls = [
"ext::sh -c touch% /tmp/pwn",
"fd::/foo",
]
for url in urls:
submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url)
# The URL will be allowed into the command, but the command will
# fail since we don't have that protocol enabled in the Git config file.
with self.assertRaises(GitCommandError):
submodule.update(allow_unsafe_protocols=True)

@with_rw_repo("HEAD")
def test_submodule_update_unsafe_options(self, rw_repo):
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir))
for unsafe_option in unsafe_options:
with self.assertRaises(UnsafeOptionError):
submodule.update(clone_multi_options=[unsafe_option])

@with_rw_repo("HEAD")
def test_submodule_update_unsafe_options_allowed(self, rw_repo):
tmp_dir = Path(tempfile.mkdtemp())
tmp_file = tmp_dir / "pwn"
unsafe_options = [
f"--upload-pack='touch {tmp_file}'",
f"-u 'touch {tmp_file}'",
"--config=protocol.ext.allow=always",
"-c protocol.ext.allow=always",
]
submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir))
for unsafe_option in unsafe_options:
with self.assertRaises(GitCommandError):
submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True)