diff --git a/packages/google-auth/google/auth/_agent_identity_utils.py b/packages/google-auth/google/auth/_agent_identity_utils.py index 8a1eddbe1cd3..b57c7bc82b52 100644 --- a/packages/google-auth/google/auth/_agent_identity_utils.py +++ b/packages/google-auth/google/auth/_agent_identity_utils.py @@ -89,6 +89,14 @@ def get_agent_identity_certificate_path(): if not cert_config_path and not has_well_known_dir: return None + # If ECP config path is specified but does not exist, and we are on a workstation, fail-fast immediately. + if ( + cert_config_path + and not has_well_known_dir + and not os.path.exists(cert_config_path) + ): + return None + has_logged_config_warning = False has_logged_cert_warning = False diff --git a/packages/google-auth/tests/test_agent_identity_utils.py b/packages/google-auth/tests/test_agent_identity_utils.py index f74bdad9e475..50a47367b9d7 100644 --- a/packages/google-auth/tests/test_agent_identity_utils.py +++ b/packages/google-auth/tests/test_agent_identity_utils.py @@ -165,14 +165,23 @@ def test_get_agent_identity_certificate_path_success(self, tmpdir, monkeypatch): assert result == str(cert_path) @mock.patch("time.sleep") + @mock.patch("google.auth._agent_identity_utils.os.path.exists") def test_get_agent_identity_certificate_path_retry( - self, mock_sleep, tmpdir, monkeypatch + self, mock_exists, mock_sleep, tmpdir, monkeypatch ): config_path = tmpdir.join("config.json") monkeypatch.setenv( environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) ) + # Simulate workload env (well_known_dir exists) to avoid fail-fast + def exists_side_effect(path): + if path == "/var/run/secrets/workload-spiffe-credentials": + return True + return False + + mock_exists.side_effect = exists_side_effect + # File doesn't exist initially with pytest.raises(exceptions.RefreshError): _agent_identity_utils.get_agent_identity_certificate_path() @@ -180,14 +189,23 @@ def test_get_agent_identity_certificate_path_retry( assert mock_sleep.call_count == len(_agent_identity_utils._POLLING_INTERVALS) @mock.patch("time.sleep") + @mock.patch("google.auth._agent_identity_utils.os.path.exists") def test_get_agent_identity_certificate_path_failure( - self, mock_sleep, tmpdir, monkeypatch + self, mock_exists, mock_sleep, tmpdir, monkeypatch ): config_path = tmpdir.join("non_existent_config.json") monkeypatch.setenv( environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) ) + # Simulate workload env (well_known_dir exists) to avoid fail-fast + def exists_side_effect(path): + if path == "/var/run/secrets/workload-spiffe-credentials": + return True + return False + + mock_exists.side_effect = exists_side_effect + with pytest.raises(exceptions.RefreshError) as excinfo: _agent_identity_utils.get_agent_identity_certificate_path() @@ -198,6 +216,19 @@ def test_get_agent_identity_certificate_path_failure( ) assert mock_sleep.call_count == len(_agent_identity_utils._POLLING_INTERVALS) + def test_get_agent_identity_certificate_path_workstation_fail_fast( + self, tmpdir, monkeypatch + ): + config_path = tmpdir.join("non_existent_config.json") + monkeypatch.setenv( + environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path) + ) + + # On a workstation, well_known_dir does not exist, and config file is missing. + # It should fail-fast and return None immediately. + result = _agent_identity_utils.get_agent_identity_certificate_path() + assert result is None + @mock.patch("time.sleep") @mock.patch("os.path.exists") def test_get_agent_identity_certificate_path_cert_not_found( diff --git a/packages/google-auth/tests/transport/test_mtls.py b/packages/google-auth/tests/transport/test_mtls.py index fc0e69bd377c..405cb496cad2 100644 --- a/packages/google-auth/tests/transport/test_mtls.py +++ b/packages/google-auth/tests/transport/test_mtls.py @@ -154,8 +154,10 @@ def test_default_client_encrypted_cert_source( # Test good callback. get_client_ssl_credentials.return_value = (True, b"cert", b"key", b"passphrase") callback = mtls.default_client_encrypted_cert_source("cert_path", "key_path") - with mock.patch("{}.open".format(__name__), return_value=mock.MagicMock()): + with mock.patch("google.auth.transport.mtls.open", mock.mock_open()) as mock_file: assert callback() == ("cert_path", "key_path", b"passphrase") + mock_file.assert_any_call("cert_path", "wb") + mock_file.assert_any_call("key_path", "wb") # Test bad callback which throws exception. get_client_ssl_credentials.side_effect = exceptions.ClientCertError()