-
Notifications
You must be signed in to change notification settings - Fork 211
Added withFmi method for cca app #876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
4gust
wants to merge
1
commit into
dev
Choose a base branch
from
4gust/with-fmi
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+278
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2491,6 +2491,32 @@ def remove_tokens_for_client(self): | |
| self.token_cache.remove_at(at) | ||
| # acquire_token_for_client() obtains no RTs, so we have no RT to remove | ||
|
|
||
| def acquire_token_for_client_with_fmi_path(self, scopes, fmi_path, claims_challenge=None, **kwargs): | ||
| """Acquires token for the current confidential client with a Federated Managed Identity (FMI) path. | ||
|
|
||
| This is a convenience wrapper around :func:`~acquire_token_for_client` | ||
| that attaches the ``fmi_path`` parameter to the token request body. | ||
|
|
||
| :param list[str] scopes: (Required) | ||
| Scopes requested to access a protected API (a resource). | ||
| :param str fmi_path: (Required) | ||
| The Federated Managed Identity path to attach to the request. | ||
| :param claims_challenge: | ||
| The claims_challenge parameter requests specific claims requested by the resource provider | ||
| in the form of a claims_challenge directive in the www-authenticate header to be | ||
| returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. | ||
| It is a string of a JSON object which contains lists of claims being requested from these locations. | ||
|
|
||
| :return: A dict representing the json response from Microsoft Entra: | ||
|
|
||
| - A successful response would contain "access_token" key, | ||
| - an error response would contain "error" and usually "error_description". | ||
| """ | ||
| data = kwargs.pop("data", {}) | ||
| data["fmi_path"] = fmi_path | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the tokens cached like in MSAL .NET and MSAL GO? I don't think they are. I see a PR here that look at proper caching #759 but I am not sure it is compliant with the rest. |
||
| return self.acquire_token_for_client( | ||
| scopes, claims_challenge=claims_challenge, data=data, **kwargs) | ||
|
|
||
| def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs): | ||
| """Acquires token using on-behalf-of (OBO) flow. | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| """End-to-end tests for Federated Managed Identity (FMI) functionality. | ||
|
|
||
| These tests verify: | ||
| 1. Tokens can be acquired using certificate authentication with FMI path | ||
| 2. Tokens are properly cached and returned from cache on subsequent calls | ||
| 3. Tokens can be acquired using an assertion callback (RMA pattern) with FMI path | ||
|
|
||
| """ | ||
|
|
||
| import logging | ||
| import os | ||
| import sys | ||
| import unittest | ||
|
|
||
| import msal | ||
| from tests.http_client import MinimalHttpClient | ||
| from tests.lab_config import get_client_certificate | ||
| from tests.test_e2e import LabBasedTestCase | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO) | ||
|
|
||
| # Test configuration | ||
| _FMI_TENANT_ID = "f645ad92-e38d-4d1a-b510-d1b09a74a8ca" | ||
| _FMI_CLIENT_ID = "4df2cbbb-8612-49c1-87c8-f334d6d065ad" | ||
| _FMI_SCOPE = "3091264c-7afb-45d4-b527-39737ee86187/.default" | ||
| _FMI_PATH = "SomeFmiPath/FmiCredentialPath" | ||
| _FMI_CLIENT_ID_URN = "urn:microsoft:identity:fmi" | ||
| _FMI_SCOPE_FOR_RMA = "api://AzureFMITokenExchange/.default" | ||
| _AUTHORITY_URL = "https://login.microsoftonline.com/" + _FMI_TENANT_ID | ||
|
|
||
|
|
||
| def _get_fmi_credential_from_rma(): | ||
| """Acquire an FMI token from RMA service using certificate credentials. | ||
|
|
||
| This mirrors the Go function GetFmiCredentialFromRma: | ||
| 1. Create a confidential client with certificate credential | ||
| 2. Acquire a token for the FMI scope with the FMI path | ||
| 3. Return the access token as an assertion string | ||
| """ | ||
|
|
||
| app = msal.ConfidentialClientApplication( | ||
| _FMI_CLIENT_ID, | ||
| client_credential=get_client_certificate(), | ||
| authority=_AUTHORITY_URL, | ||
| http_client=MinimalHttpClient(), | ||
| ) | ||
| result = app.acquire_token_for_client_with_fmi_path( | ||
| [_FMI_SCOPE_FOR_RMA], _FMI_PATH) | ||
| if "access_token" not in result: | ||
| raise RuntimeError( | ||
| "Failed to acquire FMI token from RMA: {}: {}".format( | ||
| result.get("error"), result.get("error_description"))) | ||
| return result["access_token"] | ||
|
|
||
|
|
||
| class TestFMIBasicFunctionality(LabBasedTestCase): | ||
| """Test basic FMI token acquisition with certificate credential. | ||
|
|
||
| Mirrors TestFMIBasicFunctionality from Go: | ||
| 1. Acquire token by credential with FMI path | ||
| 2. Verify silent (cached) token acquisition works | ||
| 3. Validate tokens match (proving cache was used) | ||
| """ | ||
|
|
||
| def test_acquire_and_cache_with_fmi_path(self): | ||
| app = msal.ConfidentialClientApplication( | ||
| _FMI_CLIENT_ID, | ||
| client_credential=get_client_certificate(), | ||
| authority=_AUTHORITY_URL, | ||
| http_client=MinimalHttpClient(), | ||
| ) | ||
| scopes = [_FMI_SCOPE] | ||
|
|
||
| # 1. Acquire token by credential with FMI path | ||
| result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH) | ||
| self.assertIn("access_token", result, | ||
| "acquire_token_for_client_with_fmi_path() failed: {}: {}".format( | ||
| result.get("error"), result.get("error_description"))) | ||
| self.assertNotEqual("", result["access_token"], | ||
| "acquire_token_for_client_with_fmi_path() returned empty access token") | ||
|
|
||
| first_token = result["access_token"] | ||
|
|
||
| # 2. Verify silent token acquisition works (should retrieve from cache) | ||
| cache_result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH) | ||
| self.assertIn("access_token", cache_result, | ||
| "Second call failed: {}: {}".format( | ||
| cache_result.get("error"), cache_result.get("error_description"))) | ||
| self.assertNotEqual("", cache_result["access_token"], | ||
| "Second call returned empty access token") | ||
| self.assertEqual( | ||
| cache_result.get("token_source"), "cache", | ||
| "Second call should return token from cache") | ||
|
|
||
| # 3. Validate tokens match (proving cache was used) | ||
| self.assertEqual(first_token, cache_result["access_token"], | ||
| "Token comparison failed - tokens don't match, " | ||
| "cache might not be working correctly") | ||
|
|
||
| class TestFMIIntegration(LabBasedTestCase): | ||
| """Test FMI with assertion callback (RMA pattern). | ||
|
|
||
| Mirrors TestFMIIntegration from Go: | ||
| 1. Get credentials from RMA via assertion callback | ||
| 2. Acquire token by credential with FMI path | ||
| 3. Verify cached token acquisition works | ||
| 4. Compare tokens to verify cache was used | ||
| """ | ||
|
|
||
| def test_acquire_with_assertion_callback_and_fmi_path(self): | ||
| # Create credential from assertion callback (mirrors Go's NewCredFromAssertionCallback) | ||
| client_credential = { | ||
| "client_assertion": lambda: _get_fmi_credential_from_rma(), | ||
| } | ||
|
|
||
| app = msal.ConfidentialClientApplication( | ||
| _FMI_CLIENT_ID_URN, | ||
| client_credential=client_credential, | ||
| authority=_AUTHORITY_URL, | ||
| http_client=MinimalHttpClient(), | ||
| ) | ||
| scopes = [_FMI_SCOPE] | ||
| fmi_path = "SomeFmiPath/Path" | ||
|
|
||
| # 1. Acquire token by credential with FMI path | ||
| result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path) | ||
| self.assertIn("access_token", result, | ||
| "acquire_token_for_client_with_fmi_path() failed: {}: {}".format( | ||
| result.get("error"), result.get("error_description"))) | ||
| self.assertNotEqual("", result["access_token"], | ||
| "acquire_token_for_client_with_fmi_path() returned empty access token") | ||
| first_token = result["access_token"] | ||
|
|
||
| # 2. Verify cached token acquisition works | ||
| cache_result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path) | ||
| self.assertIn("access_token", cache_result, | ||
| "Second call failed: {}: {}".format( | ||
| cache_result.get("error"), cache_result.get("error_description"))) | ||
| self.assertNotEqual("", cache_result["access_token"], | ||
| "Second call returned empty access token") | ||
| self.assertEqual( | ||
| cache_result.get("token_source"), "cache", | ||
| "Second call should return token from cache") | ||
|
|
||
| # 3. Compare tokens to verify cache was used | ||
| self.assertEqual(first_token, cache_result["access_token"], | ||
| "Token comparison failed - tokens don't match, " | ||
| "cache might not be working correctly") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have FMI path as one of the kwargs instead? Creating an entire new API seems pretty heavy.