11import json
2- import time
3- from datetime import datetime , timedelta , timezone
4- from time import sleep
52from typing import TYPE_CHECKING , Any , Dict , List , Optional , Tuple , Union
63from urllib .parse import parse_qs , urlparse
74
85import requests
96from loguru import logger
10- from requests .auth import AuthBase
7+ from requests .adapters import HTTPAdapter
118from typing_extensions import TypedDict
9+ from urllib3 .util .retry import Retry
1210
1311from fides .api .common_exceptions import ConnectionException
1412from fides .api .service .connectors .limiter .rate_limiter import (
1816)
1917
2018if TYPE_CHECKING :
21- from requests_oauth2client import BearerToken , DPoPKey , OAuth2Client
19+ from requests_oauth2client import DPoPKey , OAuth2Client
2220
2321
2422class _JwkBase (TypedDict , total = False ):
@@ -58,15 +56,6 @@ class OktaApplication(TypedDict, total=False):
5856DEFAULT_MAX_PAGES = 100
5957DEFAULT_REQUEST_TIMEOUT = 30
6058
61- # Token caching: refresh 10 minutes before expiry (matching OAuth2AuthenticationStrategyBase)
62- TOKEN_EXPIRY_BUFFER_MINUTES = 10
63-
64- # Retry configuration (matching AuthenticatedClient defaults)
65- DEFAULT_RETRY_COUNT = 3
66- DEFAULT_BACKOFF_FACTOR = 1.0
67- RETRY_STATUS_CODES = [429 , 502 , 503 , 504 ]
68- MAX_RETRY_AFTER_SECONDS = 300
69-
7059# Rate limiting: Okta's default is 600 requests/minute for most endpoints
7160# https://developer.okta.com/docs/reference/rl-global-mgmt/
7261DEFAULT_RATE_LIMIT_PER_MINUTE = 500 # Conservative default below Okta's limit
@@ -78,50 +67,16 @@ class OktaApplication(TypedDict, total=False):
7867}
7968
8069
81- def _get_retry_after (response : requests .Response ) -> Optional [float ]:
82- """
83- Parse Retry-After header from response.
84-
85- Adapted from AuthenticatedClient.get_retry_after.
86-
87- Args:
88- response: HTTP response object
89-
90- Returns:
91- Seconds to wait, or None if no Retry-After header
92- """
93- import email
94- import re
95-
96- retry_after = response .headers .get ("Retry-After" )
97- if retry_after is None :
98- return None
99-
100- # If a number value is provided, server is telling us to sleep for X seconds
101- if re .match (r"^\s*[0-9]+\s*$" , retry_after ):
102- seconds = float (int (retry_after ))
103- else :
104- # Attempt to parse a timestamp and diff with current time
105- retry_date_tuple = email .utils .parsedate_tz (retry_after )
106- if retry_date_tuple is None :
107- return None
108- retry_date = email .utils .mktime_tz (retry_date_tuple )
109- seconds = retry_date - time .time ()
110-
111- seconds = max (seconds , 0 )
112- return min (seconds , MAX_RETRY_AFTER_SECONDS )
113-
114-
11570class OktaHttpClient :
11671 """
11772 HTTP client for Okta API with OAuth2 private_key_jwt + DPoP.
11873
11974 Uses custom implementation instead of Okta SDK because the SDK lacks DPoP support.
12075
12176 Features:
122- - Token caching with automatic refresh before expiry (10-minute buffer)
77+ - Automatic token management via OAuth2ClientCredentialsAuth (10-minute refresh buffer)
12378 - Rate limiting via Redis (gracefully degrades if Redis unavailable)
124- - Retry with exponential backoff for transient failures (429, 5xx)
79+ - Retry with exponential backoff for transient failures (429, 5xx) via urllib3
12580 """
12681
12782 def __init__ (
@@ -132,30 +87,26 @@ def __init__(
13287 scopes : Optional [List [str ]] = None ,
13388 * ,
13489 rate_limit_per_minute : Optional [int ] = DEFAULT_RATE_LIMIT_PER_MINUTE ,
135- oauth_client : "Optional[OAuth2Client]" = None , # For test injection
136- dpop_key : "Optional[DPoPKey]" = None , # For test injection
90+ session : Optional [requests .Session ] = None , # For test injection
13791 ):
13892 self .org_url = org_url .rstrip ("/" )
13993 self .scopes = tuple (scopes ) if scopes is not None else DEFAULT_OKTA_SCOPES
14094
141- # Token caching state
142- self ._cached_token : "Optional[BearerToken]" = None
143- self ._token_expires_at : Optional [float ] = None
144-
14595 # Rate limiting configuration
14696 self ._rate_limit_per_minute = rate_limit_per_minute
14797
148- if oauth_client is not None or dpop_key is not None :
149- if oauth_client is None or dpop_key is None :
150- raise ValueError (
151- "Both oauth_client and dpop_key must be provided when injecting test dependencies."
152- )
153- self ._oauth_client = oauth_client
154- self ._dpop_key = dpop_key
98+ # Allow test injection of a pre-configured session
99+ if session is not None :
100+ self ._session = session
155101 return
156102
157103 try :
158- from requests_oauth2client import DPoPKey , OAuth2Client , PrivateKeyJwt
104+ from requests_oauth2client import (
105+ DPoPKey ,
106+ OAuth2Client ,
107+ OAuth2ClientCredentialsAuth ,
108+ PrivateKeyJwt ,
109+ )
159110
160111 private_jwk = self ._parse_jwk (private_key )
161112 alg = self ._determine_alg_from_jwk (private_jwk )
@@ -165,12 +116,35 @@ def __init__(
165116 # 1. It's explicitly recommended by RFC 9449 for DPoP proofs
166117 # 2. It provides strong security with compact signatures
167118 # 3. Okta supports ES256 for DPoP across all configurations
168- self . _dpop_key = DPoPKey .generate (alg = "ES256" )
119+ dpop_key = DPoPKey .generate (alg = "ES256" )
169120
170- self . _oauth_client = OAuth2Client (
121+ oauth_client = OAuth2Client (
171122 token_endpoint = f"{ self .org_url } /oauth2/v1/token" ,
172123 auth = PrivateKeyJwt (client_id , private_jwk , alg = alg ),
124+ dpop_bound_access_tokens = True ,
173125 )
126+
127+ # Create session with automatic token management
128+ self ._session = requests .Session ()
129+ self ._session .auth = OAuth2ClientCredentialsAuth (
130+ client = oauth_client ,
131+ scope = " " .join (self .scopes ),
132+ dpop_key = dpop_key ,
133+ leeway = 600 , # 10 min buffer before expiry (matching TOKEN_EXPIRY_BUFFER_MINUTES)
134+ )
135+
136+ # Configure retry strategy via urllib3
137+ retry_strategy = Retry (
138+ total = 3 , # 3 retries
139+ backoff_factor = 1.0 , # Exponential backoff
140+ status_forcelist = [429 , 502 , 503 , 504 ], # Retryable status codes
141+ respect_retry_after_header = True , # Honor Retry-After header
142+ raise_on_status = False , # Let us handle errors
143+ )
144+ adapter = HTTPAdapter (max_retries = retry_strategy )
145+ self ._session .mount ("https://" , adapter )
146+ self ._session .mount ("http://" , adapter )
147+
174148 except ImportError as e :
175149 raise ConnectionException (
176150 "The 'requests-oauth2client' library is required for Okta connector. "
@@ -223,70 +197,6 @@ def _determine_alg_from_jwk(jwk: PrivateJwk) -> str:
223197
224198 return "RS256"
225199
226- def _is_token_close_to_expiry (self ) -> bool :
227- """
228- Check if cached token will expire within the buffer period.
229-
230- Matches OAuth2AuthenticationStrategyBase._close_to_expiration behavior.
231- """
232- if self ._token_expires_at is None :
233- return True
234- buffer = timedelta (minutes = TOKEN_EXPIRY_BUFFER_MINUTES )
235- return (
236- self ._token_expires_at < (datetime .now (timezone .utc ) + buffer ).timestamp ()
237- )
238-
239- def _get_token (self ) -> AuthBase :
240- """
241- Get a valid OAuth2 token, using cache if available and not expired.
242-
243- Tokens are cached in memory and reused until 10 minutes before expiry,
244- matching the behavior of OAuth2AuthenticationStrategyBase.
245- """
246- # Return cached token if still valid
247- if self ._cached_token is not None and not self ._is_token_close_to_expiry ():
248- logger .debug ("Using cached OAuth2 token for Okta" )
249- return self ._cached_token
250-
251- try :
252- from requests_oauth2client .exceptions import (
253- InvalidClient ,
254- OAuth2Error ,
255- UnauthorizedClient ,
256- )
257-
258- logger .info ("Acquiring new OAuth2 token for Okta" )
259- token = self ._oauth_client .client_credentials (
260- scope = " " .join (self .scopes ), dpop_key = self ._dpop_key
261- )
262-
263- # Cache the token with expiration
264- self ._cached_token = token
265- # BearerToken has expires_at property (datetime) or expires_in
266- if hasattr (token , "expires_at" ) and token .expires_at :
267- self ._token_expires_at = token .expires_at .timestamp ()
268- elif hasattr (token , "expires_in" ) and token .expires_in :
269- self ._token_expires_at = (
270- datetime .now (timezone .utc ).timestamp () + token .expires_in
271- )
272- else :
273- # Default to 1 hour if no expiration info (Okta default)
274- self ._token_expires_at = datetime .now (timezone .utc ).timestamp () + 3600
275-
276- return token
277- except ImportError as e :
278- raise ConnectionException (
279- "The 'requests-oauth2client' library is required for Okta connector. "
280- "Please install it with: pip install requests-oauth2client"
281- ) from e
282- except (InvalidClient , UnauthorizedClient ) as e :
283- # Specific OAuth2 authentication errors - preserve type for caller to detect
284- raise ConnectionException (f"OAuth2 authentication failed: { str (e )} " ) from e
285- except OAuth2Error as e :
286- raise ConnectionException (
287- f"OAuth2 token acquisition failed: { str (e )} "
288- ) from e
289-
290200 def _build_rate_limit_requests (self ) -> List [RateLimiterRequest ]:
291201 """
292202 Build rate limit request objects for Okta API calls.
@@ -310,94 +220,6 @@ def _apply_rate_limit(self) -> None:
310220 if rate_limit_requests :
311221 RateLimiter ().limit (rate_limit_requests )
312222
313- def clear_token_cache (self ) -> None :
314- """
315- Clear the cached OAuth2 token.
316-
317- Useful for testing or forcing a token refresh.
318- """
319- self ._cached_token = None
320- self ._token_expires_at = None
321-
322- def _send_request_with_retry (
323- self ,
324- method : str ,
325- url : str ,
326- params : Optional [Dict [str , Any ]] = None ,
327- retry_count : int = DEFAULT_RETRY_COUNT ,
328- backoff_factor : float = DEFAULT_BACKOFF_FACTOR ,
329- ) -> requests .Response :
330- """
331- Send an HTTP request with retry logic for transient failures.
332-
333- Adapted from AuthenticatedClient.retry_send decorator.
334-
335- Args:
336- method: HTTP method (GET, POST, etc.)
337- url: Full URL to request
338- params: Query parameters
339- retry_count: Number of retries for transient failures
340- backoff_factor: Exponential backoff factor
341-
342- Returns:
343- Response object
344-
345- Raises:
346- ConnectionException: If request fails after all retries
347- """
348- last_exception : Optional [Exception ] = None
349-
350- for attempt in range (retry_count + 1 ):
351- sleep_time = backoff_factor * (2 ** (attempt + 1 ))
352-
353- try :
354- # Apply rate limiting before each attempt
355- self ._apply_rate_limit ()
356-
357- token = self ._get_token ()
358- response = requests .request (
359- method = method ,
360- url = url ,
361- params = params ,
362- auth = token ,
363- timeout = DEFAULT_REQUEST_TIMEOUT ,
364- )
365-
366- # Success
367- if response .ok :
368- return response
369-
370- # Check if we should retry this status code
371- if response .status_code not in RETRY_STATUS_CODES :
372- response .raise_for_status ()
373-
374- # Retryable error - check for Retry-After header
375- retry_after = _get_retry_after (response )
376- if retry_after is not None :
377- sleep_time = retry_after
378-
379- last_exception = ConnectionException (
380- f"Okta API request failed with status { response .status_code } "
381- )
382-
383- except requests .RequestException as e :
384- last_exception = ConnectionException (
385- f"Okta API request failed: { str (e )} "
386- )
387- # Connection errors are not typically retryable
388- break
389-
390- if attempt < retry_count :
391- logger .warning (
392- "Retrying Okta API request in {} seconds (attempt {}/{})" ,
393- sleep_time ,
394- attempt + 1 ,
395- retry_count ,
396- )
397- sleep (sleep_time )
398-
399- raise last_exception # type: ignore[misc]
400-
401223 def list_applications (
402224 self , limit : int = DEFAULT_API_LIMIT , after : Optional [str ] = None
403225 ) -> Tuple [List [OktaApplication ], Optional [str ]]:
@@ -406,25 +228,34 @@ def list_applications(
406228
407229 Includes:
408230 - Rate limiting (via Redis if available)
409- - Retry with exponential backoff for transient failures
410- - Token caching to minimize OAuth2 token requests
231+ - Retry with exponential backoff for transient failures (via urllib3)
232+ - Automatic token management (via OAuth2ClientCredentialsAuth)
411233
412234 Args:
413235 limit: Maximum number of applications to return
414236 after: Cursor for next page (from previous response)
415237
416238 Returns:
417239 Tuple of (apps_list, next_cursor). next_cursor is None if no more pages.
240+
241+ Raises:
242+ ConnectionException: If request fails after all retries
418243 """
244+ self ._apply_rate_limit ()
245+
419246 params : Dict [str , Union [int , str ]] = {"limit" : limit }
420247 if after :
421248 params ["after" ] = after
422249
423- response = self ._send_request_with_retry (
424- method = "GET" ,
425- url = f"{ self .org_url } /api/v1/apps" ,
426- params = params ,
427- )
250+ try :
251+ response = self ._session .get (
252+ f"{ self .org_url } /api/v1/apps" ,
253+ params = params ,
254+ timeout = DEFAULT_REQUEST_TIMEOUT ,
255+ )
256+ response .raise_for_status ()
257+ except requests .RequestException as e :
258+ raise ConnectionException (f"Okta API request failed: { str (e )} " ) from e
428259
429260 next_cursor = self ._extract_next_cursor (response .headers .get ("Link" ))
430261 return response .json (), next_cursor
0 commit comments