Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,32 @@ def remove_tokens_for_client(self):
self.token_cache.remove_at(at)
# acquire_token_for_client() obtains no RTs, so we have no RT to remove

def acquire_token_for_client_with_fmi_path(self, scopes, fmi_path, claims_challenge=None, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have FMI path as one of the kwargs instead? Creating an entire new API seems pretty heavy.

"""Acquires token for the current confidential client with a Federated Managed Identity (FMI) path.

This is a convenience wrapper around :func:`~acquire_token_for_client`
that attaches the ``fmi_path`` parameter to the token request body.

:param list[str] scopes: (Required)
Scopes requested to access a protected API (a resource).
:param str fmi_path: (Required)
The Federated Managed Identity path to attach to the request.
:param claims_challenge:
The claims_challenge parameter requests specific claims requested by the resource provider
in the form of a claims_challenge directive in the www-authenticate header to be
returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token.
It is a string of a JSON object which contains lists of claims being requested from these locations.

:return: A dict representing the json response from Microsoft Entra:

- A successful response would contain "access_token" key,
- an error response would contain "error" and usually "error_description".
"""
data = kwargs.pop("data", {})
data["fmi_path"] = fmi_path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the tokens cached like in MSAL .NET and MSAL GO? I don't think they are.

I see a PR here that look at proper caching #759 but I am not sure it is compliant with the rest.

return self.acquire_token_for_client(
scopes, claims_challenge=claims_challenge, data=data, **kwargs)

def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=None, **kwargs):
"""Acquires token using on-behalf-of (OBO) flow.

Expand Down
99 changes: 99 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,105 @@ def test_organizations_authority_should_emit_warning(self):
authority="https://login.microsoftonline.com/organizations")


@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK)
class TestAcquireTokenForClientWithFmiPath(unittest.TestCase):
"""Test that acquire_token_for_client_with_fmi_path attaches fmi_path to HTTP body."""

def test_fmi_path_is_included_in_request_body(self):
app = ConfidentialClientApplication(
"client_id", client_credential="secret",
authority="https://login.microsoftonline.com/my_tenant")
fmi_path = "SomeFmiPath/FmiCredentialPath"
captured_data = {}

def mock_post(url, headers=None, data=None, *args, **kwargs):
captured_data.update(data or {})
return MinimalResponse(
status_code=200, text=json.dumps({
"access_token": "an AT",
"expires_in": 3600,
}))

result = app.acquire_token_for_client_with_fmi_path(
["scope"], fmi_path, post=mock_post)
self.assertIn("access_token", result)
self.assertIn("fmi_path", captured_data,
"fmi_path should be present in the HTTP request body")
self.assertEqual(fmi_path, captured_data["fmi_path"],
"fmi_path value should match the input")

def test_fmi_path_coexists_with_other_data(self):
app = ConfidentialClientApplication(
"client_id", client_credential="secret",
authority="https://login.microsoftonline.com/my_tenant")
fmi_path = "another/fmi/path"
captured_data = {}

def mock_post(url, headers=None, data=None, *args, **kwargs):
captured_data.update(data or {})
return MinimalResponse(
status_code=200, text=json.dumps({
"access_token": "an AT",
"expires_in": 3600,
}))

result = app.acquire_token_for_client_with_fmi_path(
["scope"], fmi_path, post=mock_post)
self.assertIn("access_token", result)
self.assertEqual(fmi_path, captured_data["fmi_path"])
self.assertEqual("client_credentials", captured_data.get("grant_type"))

def test_fmi_path_preserves_existing_data_params(self):
app = ConfidentialClientApplication(
"client_id", client_credential="secret",
authority="https://login.microsoftonline.com/my_tenant")
fmi_path = "my/fmi/path"
captured_data = {}

def mock_post(url, headers=None, data=None, *args, **kwargs):
captured_data.update(data or {})
return MinimalResponse(
status_code=200, text=json.dumps({
"access_token": "an AT",
"expires_in": 3600,
}))

result = app.acquire_token_for_client_with_fmi_path(
["scope"], fmi_path,
data={"extra_key": "extra_value"},
post=mock_post)
self.assertIn("access_token", result)
self.assertEqual(fmi_path, captured_data["fmi_path"])
self.assertEqual("extra_value", captured_data.get("extra_key"),
"Pre-existing data params should be preserved")

def test_cached_token_is_returned_on_second_call(self):
app = ConfidentialClientApplication(
"client_id", client_credential="secret",
authority="https://login.microsoftonline.com/my_tenant")
fmi_path = "SomeFmiPath/FmiCredentialPath"
call_count = [0]

def mock_post(url, headers=None, data=None, *args, **kwargs):
call_count[0] += 1
return MinimalResponse(
status_code=200, text=json.dumps({
"access_token": "an AT",
"expires_in": 3600,
}))

result1 = app.acquire_token_for_client_with_fmi_path(
["scope"], fmi_path, post=mock_post)
self.assertIn("access_token", result1)
self.assertEqual(result1[app._TOKEN_SOURCE], app._TOKEN_SOURCE_IDP)

result2 = app.acquire_token_for_client_with_fmi_path(
["scope"], fmi_path, post=mock_post)
self.assertIn("access_token", result2)
self.assertEqual(result2[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE,
"Second call should return token from cache")


@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK)
class TestRemoveTokensForClient(unittest.TestCase):
def test_remove_tokens_for_client_should_remove_client_tokens_only(self):
Expand Down
153 changes: 153 additions & 0 deletions tests/test_fmi_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""End-to-end tests for Federated Managed Identity (FMI) functionality.

These tests verify:
1. Tokens can be acquired using certificate authentication with FMI path
2. Tokens are properly cached and returned from cache on subsequent calls
3. Tokens can be acquired using an assertion callback (RMA pattern) with FMI path

"""

import logging
import os
import sys
import unittest

import msal
from tests.http_client import MinimalHttpClient
from tests.lab_config import get_client_certificate
from tests.test_e2e import LabBasedTestCase

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO)

# Test configuration
_FMI_TENANT_ID = "f645ad92-e38d-4d1a-b510-d1b09a74a8ca"
_FMI_CLIENT_ID = "4df2cbbb-8612-49c1-87c8-f334d6d065ad"
_FMI_SCOPE = "3091264c-7afb-45d4-b527-39737ee86187/.default"
_FMI_PATH = "SomeFmiPath/FmiCredentialPath"
_FMI_CLIENT_ID_URN = "urn:microsoft:identity:fmi"
_FMI_SCOPE_FOR_RMA = "api://AzureFMITokenExchange/.default"
_AUTHORITY_URL = "https://login.microsoftonline.com/" + _FMI_TENANT_ID


def _get_fmi_credential_from_rma():
"""Acquire an FMI token from RMA service using certificate credentials.

This mirrors the Go function GetFmiCredentialFromRma:
1. Create a confidential client with certificate credential
2. Acquire a token for the FMI scope with the FMI path
3. Return the access token as an assertion string
"""

app = msal.ConfidentialClientApplication(
_FMI_CLIENT_ID,
client_credential=get_client_certificate(),
authority=_AUTHORITY_URL,
http_client=MinimalHttpClient(),
)
result = app.acquire_token_for_client_with_fmi_path(
[_FMI_SCOPE_FOR_RMA], _FMI_PATH)
if "access_token" not in result:
raise RuntimeError(
"Failed to acquire FMI token from RMA: {}: {}".format(
result.get("error"), result.get("error_description")))
return result["access_token"]


class TestFMIBasicFunctionality(LabBasedTestCase):
"""Test basic FMI token acquisition with certificate credential.

Mirrors TestFMIBasicFunctionality from Go:
1. Acquire token by credential with FMI path
2. Verify silent (cached) token acquisition works
3. Validate tokens match (proving cache was used)
"""

def test_acquire_and_cache_with_fmi_path(self):
app = msal.ConfidentialClientApplication(
_FMI_CLIENT_ID,
client_credential=get_client_certificate(),
authority=_AUTHORITY_URL,
http_client=MinimalHttpClient(),
)
scopes = [_FMI_SCOPE]

# 1. Acquire token by credential with FMI path
result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
self.assertIn("access_token", result,
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
result.get("error"), result.get("error_description")))
self.assertNotEqual("", result["access_token"],
"acquire_token_for_client_with_fmi_path() returned empty access token")

first_token = result["access_token"]

# 2. Verify silent token acquisition works (should retrieve from cache)
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, _FMI_PATH)
self.assertIn("access_token", cache_result,
"Second call failed: {}: {}".format(
cache_result.get("error"), cache_result.get("error_description")))
self.assertNotEqual("", cache_result["access_token"],
"Second call returned empty access token")
self.assertEqual(
cache_result.get("token_source"), "cache",
"Second call should return token from cache")

# 3. Validate tokens match (proving cache was used)
self.assertEqual(first_token, cache_result["access_token"],
"Token comparison failed - tokens don't match, "
"cache might not be working correctly")

class TestFMIIntegration(LabBasedTestCase):
"""Test FMI with assertion callback (RMA pattern).

Mirrors TestFMIIntegration from Go:
1. Get credentials from RMA via assertion callback
2. Acquire token by credential with FMI path
3. Verify cached token acquisition works
4. Compare tokens to verify cache was used
"""

def test_acquire_with_assertion_callback_and_fmi_path(self):
# Create credential from assertion callback (mirrors Go's NewCredFromAssertionCallback)
client_credential = {
"client_assertion": lambda: _get_fmi_credential_from_rma(),
}

app = msal.ConfidentialClientApplication(
_FMI_CLIENT_ID_URN,
client_credential=client_credential,
authority=_AUTHORITY_URL,
http_client=MinimalHttpClient(),
)
scopes = [_FMI_SCOPE]
fmi_path = "SomeFmiPath/Path"

# 1. Acquire token by credential with FMI path
result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
self.assertIn("access_token", result,
"acquire_token_for_client_with_fmi_path() failed: {}: {}".format(
result.get("error"), result.get("error_description")))
self.assertNotEqual("", result["access_token"],
"acquire_token_for_client_with_fmi_path() returned empty access token")
first_token = result["access_token"]

# 2. Verify cached token acquisition works
cache_result = app.acquire_token_for_client_with_fmi_path(scopes, fmi_path)
self.assertIn("access_token", cache_result,
"Second call failed: {}: {}".format(
cache_result.get("error"), cache_result.get("error_description")))
self.assertNotEqual("", cache_result["access_token"],
"Second call returned empty access token")
self.assertEqual(
cache_result.get("token_source"), "cache",
"Second call should return token from cache")

# 3. Compare tokens to verify cache was used
self.assertEqual(first_token, cache_result["access_token"],
"Token comparison failed - tokens don't match, "
"cache might not be working correctly")


if __name__ == "__main__":
unittest.main()
Loading