88from dataclasses import dataclass , field
99from typing import Any , Dict , List , Optional , Tuple , Union
1010
11+ import urllib3 .util
12+
1113import openeo
14+ from openeo .utils .http import HTTP_429_TOO_MANY_REQUESTS , retry_configuration
1215
1316_log = logging .getLogger (__name__ )
1417
@@ -90,8 +93,8 @@ class ConnectedTask(Task):
9093 root_url : str
9194 bearer_token : Optional [str ] = field (default = None , repr = False )
9295
93- def get_connection (self ) -> openeo .Connection :
94- connection = openeo .connect (self .root_url )
96+ def get_connection (self , retry : Union [ urllib3 . util . Retry , dict , bool , None ] = None ) -> openeo .Connection :
97+ connection = openeo .connect (self .root_url , retry = retry )
9598 if self .bearer_token :
9699 connection .authenticate_bearer_token (self .bearer_token )
97100 return connection
@@ -112,7 +115,12 @@ def execute(self) -> _TaskResult:
112115 """
113116 # TODO: move main try-except block to base class?
114117 try :
115- job = self .get_connection ().job (self .job_id )
118+ # Make sure to retry job start attempt (POST request) with "429 Too Many Requests" response
119+ retry = retry_configuration (
120+ allowed_methods = urllib3 .util .Retry .DEFAULT_ALLOWED_METHODS .union ({"POST" }),
121+ status_forcelist = [HTTP_429_TOO_MANY_REQUESTS ],
122+ )
123+ job = self .get_connection (retry = retry ).job (self .job_id )
116124 # TODO: only start when status is "queued"?
117125 job .start ()
118126 _log .info (f"Job { self .job_id !r} started successfully" )
0 commit comments