Skip to content
91 changes: 89 additions & 2 deletions msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@
'login-us.microsoftonline.com',
AZURE_US_GOVERNMENT,
])

# Trusted issuer hosts for OIDC issuer validation
# Includes all well-known Microsoft identity provider hosts and national clouds
TRUSTED_ISSUER_HOSTS = frozenset([
# Global/Public cloud
"login.microsoftonline.com",
"login.microsoft.com",
"login.windows.net",
"sts.windows.net",
# China cloud
"login.chinacloudapi.cn",
"login.partner.microsoftonline.cn",
# Germany cloud (legacy)
"login.microsoftonline.de",
# US Government clouds
"login.microsoftonline.us",
"login.usgovcloudapi.net",
"login-us.microsoftonline.com",
"https://login.sovcloud-identity.fr", # AzureBleu
"https://login.sovcloud-identity.de", # AzureDelos
"https://login.sovcloud-identity.sg", # AzureGovSG
])

WELL_KNOWN_B2C_HOSTS = [
"b2clogin.com",
"b2clogin.cn",
Expand Down Expand Up @@ -67,6 +90,7 @@ def __init__(
performed.
"""
self._http_client = http_client
self._oidc_authority_url = oidc_authority_url
if oidc_authority_url:
logger.debug("Initializing with OIDC authority: %s", oidc_authority_url)
tenant_discovery_endpoint = self._initialize_oidc_authority(
Expand All @@ -93,14 +117,24 @@ def __init__(
.format(authority_url)
) + " Also please double check your tenant name or GUID is correct."
raise ValueError(error_message)
openid_config.pop("issuer", None) # Not used in MSAL.py, so remove it therefore no need to validate it
logger.debug(
'openid_config("%s") = %s', tenant_discovery_endpoint, openid_config)
self._issuer = openid_config.get('issuer')
self.authorization_endpoint = openid_config['authorization_endpoint']
self.token_endpoint = openid_config['token_endpoint']
self.device_authorization_endpoint = openid_config.get('device_authorization_endpoint')
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID

# Validate the issuer if using OIDC authority
if self._oidc_authority_url and not self.has_valid_issuer():
raise ValueError((
"The issuer '{iss}' does not match the authority '{auth}' or a known pattern. "
"When using the 'oidc_authority' parameter in ClientApplication, the authority "
"will be validated against the issuer from {auth}/.well-known/openid-configuration ."
"If using a known Entra authority (e.g. login.microsoftonline.com) the "
"'authority' parameter should be used instead of 'oidc_authority'. "
""
).format(iss=self._issuer, auth=oidc_authority_url))
def _initialize_oidc_authority(self, oidc_authority_url):
authority, self.instance, tenant = canonicalize(oidc_authority_url)
self.is_adfs = tenant.lower() == 'adfs' # As a convention
Expand Down Expand Up @@ -175,6 +209,60 @@ def user_realm_discovery(self, username, correlation_id=None, response=None):
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def has_valid_issuer(self):
"""
Returns True if the issuer from OIDC discovery is valid for this authority.

An issuer is valid if one of the following is true:
- It exactly matches the authority URL (with/without trailing slash)
- It has the same scheme and host as the authority (path can be different)
- The issuer host is a well-known Microsoft authority host
- The issuer host is a regional variant of a well-known host (e.g., westus2.login.microsoft.com)
- For CIAM, hosts that end with well-known B2C hosts (e.g., tenant.b2clogin.com) are accepted as valid issuers
"""
if not self._issuer or not self._oidc_authority_url:
return False

# Case 1: Exact match (most common case, normalized for trailing slashes)
if self._issuer.rstrip("/") == self._oidc_authority_url.rstrip("/"):
return True

issuer_parsed = urlparse(self._issuer)
authority_parsed = urlparse(self._oidc_authority_url)
issuer_host = issuer_parsed.hostname.lower() if issuer_parsed.hostname else None

if not issuer_host:
return False

# Case 2: Issuer is from a trusted Microsoft host - O(1) lookup
if issuer_host in TRUSTED_ISSUER_HOSTS:
return True

# Case 3: Regional variant check - O(1) lookup
# e.g., westus2.login.microsoft.com -> extract "login.microsoft.com"
dot_index = issuer_host.find(".")
if dot_index > 0:
potential_base = issuer_host[dot_index + 1:]
if "." not in issuer_host[:dot_index]:
# 3a: Base host is a trusted Microsoft host
if potential_base in TRUSTED_ISSUER_HOSTS:
return True
# 3b: Issuer has a region prefix on the authority host
# e.g. issuer=us.someweb.com, authority=someweb.com
authority_host = authority_parsed.hostname.lower() if authority_parsed.hostname else ""
if potential_base == authority_host:
return True

# Case 4: Same scheme and host (path can differ)
if (authority_parsed.scheme == issuer_parsed.scheme and
authority_parsed.netloc == issuer_parsed.netloc):
return True

# Case 5: Check if issuer host ends with any well-known B2C host (e.g., tenant.b2clogin.com)
if any(issuer_host.endswith(h) for h in WELL_KNOWN_B2C_HOSTS):
return True

return False

def canonicalize(authority_or_auth_endpoint):
# Returns (url_parsed_result, hostname_in_lowercase, tenant)
Expand Down Expand Up @@ -223,4 +311,3 @@ def tenant_discovery(tenant_discovery_endpoint, http_client, **kwargs):
resp.raise_for_status()
raise RuntimeError( # A fallback here, in case resp.raise_for_status() is no-op
"Unable to complete OIDC Discovery: %d, %s" % (resp.status_code, resp.text))

1 change: 1 addition & 0 deletions tests/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, number_of_tenants=1, tokens_per_tenant=1, cache_hit=False):
with patch.object(msal.authority, "tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}) as _: # Otherwise it would fail on OIDC discovery
self.apps = [ # In MSAL Python, each CCA binds to one tenant only
msal.ConfidentialClientApplication(
Expand Down
1 change: 1 addition & 0 deletions tests/test_account_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def _mock_post(url, headers=None, *args, **kwargs):
@patch.object(msal.authority, "tenant_discovery", return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}) # Otherwise it would fail on OIDC discovery
class TestAccountSourceBehavior(unittest.TestCase):

Expand Down
5 changes: 5 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_OIDC_DISCOVERY_MOCK = Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/tenant",
})


Expand Down Expand Up @@ -690,6 +691,7 @@ def mock_post(url, headers=None, *args, **kwargs):
@patch(_OIDC_DISCOVERY, new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/common",
"token_endpoint": "https://contoso.com/common",
"issuer": "https://contoso.com/common",
}))
def test_common_authority_should_emit_warning(self):
self._test_certain_authority_should_emit_warning(
Expand All @@ -698,6 +700,7 @@ def test_common_authority_should_emit_warning(self):
@patch(_OIDC_DISCOVERY, new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/organizations",
"token_endpoint": "https://contoso.com/organizations",
"issuer": "https://contoso.com/organizations",
}))
def test_organizations_authority_should_emit_warning(self):
self._test_certain_authority_should_emit_warning(
Expand Down Expand Up @@ -755,6 +758,7 @@ def test_client_id_should_be_a_valid_scope(self):
@patch("msal.authority.tenant_discovery", new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}))
class TestMsalBehaviorWithoutPyMsalRuntimeOrBroker(unittest.TestCase):

Expand Down Expand Up @@ -796,6 +800,7 @@ def test_should_fallback_when_pymsalruntime_failed_to_initialize_broker(self):
@patch("msal.authority.tenant_discovery", new=Mock(return_value={
"authorization_endpoint": "https://contoso.com/placeholder",
"token_endpoint": "https://contoso.com/placeholder",
"issuer": "https://contoso.com/placeholder",
}))
@patch("msal.application._init_broker", new=Mock()) # Pretend pymsalruntime installed and working
class TestBrokerFallbackWithDifferentAuthorities(unittest.TestCase):
Expand Down
Loading
Loading