diff --git a/metaflow/cli.py b/metaflow/cli.py index a4a1558304d..d022257e4c8 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -455,8 +455,8 @@ def start( ): param_ds = d - # We can now set the the CONFIGS value in the flow properly. This will overwrite - # anything that may have been passed in by default and we will use exactly what + # We can now set the CONFIGS value in the flow properly. This will overwrite + # anything that may have been passed in by default, and we will use exactly what # the original flow had. Note that these are accessed through the parameter name ctx.obj.flow._flow_state[_FlowState.CONFIGS].clear() d = ctx.obj.flow._flow_state[_FlowState.CONFIGS] @@ -471,8 +471,16 @@ def start( raise ctx.obj.delayed_config_exception # Init all values in the flow mutators and then process them - for decorator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): - decorator.external_init() + for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator.external_init() + + # Initialize mutators with top-level options + for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator_options = { + option: deco_options.get(option.replace("-", "_"), option_info["default"]) + for option, option_info in mutator.options.items() + } + mutator.flow_init_options(mutator_options) new_cls = ctx.obj.flow._process_config_decorators(config_options) if new_cls: diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 760508497f0..dfbe39dc10e 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -270,6 +270,7 @@ def add_decorator_options(cmd): seen = {} existing_params = set(p.name.lower() for p in cmd.params) + # Add decorator options for deco in flow_decorators(flow_cls): for option, kwargs in deco.options.items(): @@ -290,6 +291,30 @@ def add_decorator_options(cmd): kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper() seen[option] = deco.name cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) + + # Add flow mutator options + for mutator in flow_mutators(flow_cls): + for option, kwargs in mutator.options.items(): + mutator_name = mutator.__class__.__name__ + if option in seen: + msg = ( + "Flow mutator '%s' uses an option '%s' which is also " + "used by '%s'. This is a bug in Metaflow. " + "Please file a ticket on GitHub." + % (mutator_name, option, seen[option]) + ) + raise MetaflowInternalError(msg) + elif mutator_name.lower() in existing_params: + raise MetaflowInternalError( + "Flow mutator '%s' uses an option '%s' which is a reserved " + "keyword. Please use a different option name." + % (mutator_name, option) + ) + else: + kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper() + seen[option] = mutator_name + cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) + return cmd @@ -297,6 +322,12 @@ def flow_decorators(flow_cls): return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list] +def flow_mutators(flow_cls): + from metaflow.flowspec import _FlowState + + return flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []) + + class StepDecorator(Decorator): """ Base class for all step decorators. @@ -797,6 +828,7 @@ def _init_step_decorators( pre_mutate=False, statically_defined=deco.statically_defined, inserted_by=inserted_by_value, + mutator=deco, ) # Sanity check to make sure we are applying the decorator to the right # class diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index a8df867e644..b5cfa880b39 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -297,6 +297,7 @@ def _process_config_decorators(cls, config_options, process_configs=True): pre_mutate=True, statically_defined=deco.statically_defined, inserted_by=inserted_by_value, + mutator=deco, ) # Sanity check to make sure we are applying the decorator to the right # class diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 7cac32eb598..90c35954963 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -1,7 +1,10 @@ import os import sys import types +import uuid +import datetime +from typing import Dict, List, Union, Tuple as TTuple from metaflow.exception import MetaflowException from metaflow.metaflow_config_funcs import from_conf, get_validate_choice_fn @@ -615,6 +618,56 @@ def get_pinned_conda_libs(python_version, datastore_type): return pins +### +# Runner API type mappings +# Extensions can add custom Click parameter types via get_click_to_python_types +### +def get_click_to_python_types(): + """ + Returns the mapping from Click parameter types to Python types for Runner API. + Extensions can override this function to add custom type mappings. + """ + from metaflow._vendor.click.types import ( + BoolParamType, + Choice, + DateTime, + File, + FloatParamType, + IntParamType, + Path, + StringParamType, + Tuple, + UUIDParameterType, + ) + from metaflow.parameters import JSONTypeClass + from metaflow.includefile import FilePathClass + from metaflow.user_configs.config_options import ( + LocalFileInput, + MultipleTuple, + ConfigValue, + ) + + # Define JSON type for type hints + JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] + + return { + StringParamType: str, + IntParamType: int, + FloatParamType: float, + BoolParamType: bool, + UUIDParameterType: uuid.UUID, + Path: str, + DateTime: datetime.datetime, + Tuple: tuple, + Choice: str, + File: str, + JSONTypeClass: JSON, + FilePathClass: str, + LocalFileInput: str, + MultipleTuple: TTuple[str, Union[JSON, ConfigValue]], + } + + # Check if there are extensions to Metaflow to load and override everything try: from metaflow.extension_support import get_modules @@ -650,6 +703,16 @@ def _new_get_pinned_conda_libs( if any(" " in x for x in o): raise ValueError("Decospecs cannot contain spaces") _TOGGLE_DECOSPECS.extend(o) + elif n == "get_click_to_python_types": + # Extension provides additional Click type mappings for Runner API + # Merge extension's types with base types + def _new_get_click_to_python_types(f1=globals()[n], f2=o): + d1 = f1() + d2 = f2() + d1.update(d2) + return d1 + + globals()[n] = _new_get_click_to_python_types elif not n.startswith("__") and not isinstance(o, types.ModuleType): globals()[n] = o # If DEFAULT_DECOSPECS is set, use that, else extrapolate from extensions diff --git a/metaflow/runner/click_api.py b/metaflow/runner/click_api.py index bda7ddb0157..97daf2db53a 100644 --- a/metaflow/runner/click_api.py +++ b/metaflow/runner/click_api.py @@ -57,26 +57,14 @@ ) from metaflow.user_decorators.user_flow_decorator import FlowMutator +# Import Click type mappings from config (allows extensions to add custom types) +from metaflow.metaflow_config import get_click_to_python_types + +click_to_python_types = get_click_to_python_types() + # Define a recursive type alias for JSON JSON = Union[Dict[str, "JSON"], List["JSON"], str, int, float, bool, None] -click_to_python_types = { - StringParamType: str, - IntParamType: int, - FloatParamType: float, - BoolParamType: bool, - UUIDParameterType: uuid.UUID, - Path: str, - DateTime: datetime.datetime, - Tuple: tuple, - Choice: str, - File: str, - JSONTypeClass: JSON, - FilePathClass: str, - LocalFileInput: str, - MultipleTuple: TTuple[str, Union[JSON, ConfigValue]], -} - def _method_sanity_check( possible_arg_params: TOrderedDict[str, click.Argument], @@ -532,8 +520,16 @@ def _compute_flow_parameters(self): # We ignore any errors if we don't check the configs in the click API. # Init all values in the flow mutators and then process them - for decorator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []): - decorator.external_init() + for mutator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator.external_init() + + # Initialize mutators with top-level options (using defaults for Deployer/Runner) + for mutator in self._flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, []): + mutator_options = { + option: option_info["default"] + for option, option_info in mutator.options.items() + } + mutator.flow_init_options(mutator_options) new_cls = self._flow_cls._process_config_decorators( config_options, process_configs=CLICK_API_PROCESS_CONFIG diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 3dfb01f529d..e6dd14e0271 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -2074,6 +2074,12 @@ def __init__( for deco in flow_decorators(self.task.flow): self.top_level_options.update(deco.get_top_level_options()) + # FlowMutators can also define their own top-level options similar to decorators + from metaflow.flowspec import _FlowState + + for mutator in self.task.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []): + self.top_level_options.update(mutator.get_top_level_options()) + # We also pass configuration options using the kv. syntax which will cause # the configuration options to be loaded from the CONFIG file (or local-config-file # in the case of the local runtime) diff --git a/metaflow/user_decorators/mutable_flow.py b/metaflow/user_decorators/mutable_flow.py index 2cd092fcb78..6a47478d0a3 100644 --- a/metaflow/user_decorators/mutable_flow.py +++ b/metaflow/user_decorators/mutable_flow.py @@ -22,11 +22,15 @@ def __init__( pre_mutate: bool = False, statically_defined: bool = False, inserted_by: Optional[str] = None, + mutator: Optional[ + "metaflow.user_decorators.user_flow_decorator.FlowMutator" + ] = None, ): self._flow_cls = flow_spec self._pre_mutate = pre_mutate self._statically_defined = statically_defined self._inserted_by = inserted_by + self._mutator = mutator if self._inserted_by is None: # This is an error because MutableSteps should only be created by # StepMutators or FlowMutators. We need to catch it now because otherwise @@ -138,6 +142,35 @@ def parameters( ) yield var, param + @property + def tl_options(self) -> Dict[str, Any]: + """ + Get the top-level CLI options for this mutator. + + Returns a dictionary of option names to values that were passed via the CLI. + This allows mutators to access their own top-level options similar to how + they can access configs and parameters. + + Example: + ``` + class MyMutator(FlowMutator): + options = { + 'my-option': {'default': 'value', 'help': 'My option'} + } + + def pre_mutate(self, mutable_flow): + # Access the option value + val = mutable_flow.tl_options.get('my-option') + print(f'Option value: {val}') + ``` + + Returns + ------- + Dict[str, Any] + Dictionary of option names to values + """ + return self._mutator._option_values if self._mutator else {} + @property def steps( self, @@ -189,6 +222,7 @@ def add_parameter( "method and not the `mutate` method" % (name, self._inserted_by) ) from metaflow.parameters import Parameter + from metaflow.flowspec import _FlowState if hasattr(self._flow_cls, name) and not overwrite: raise MetaflowException( @@ -203,6 +237,7 @@ def add_parameter( ) debug.userconf_exec("Mutable flow adding parameter %s to flow" % name) setattr(self._flow_cls, name, value) + self._flow_cls._flow_state.pop(_FlowState.CACHED_PARAMETERS, None) def remove_parameter(self, parameter_name: str) -> bool: """ diff --git a/metaflow/user_decorators/mutable_step.py b/metaflow/user_decorators/mutable_step.py index 7841ea92270..e9443f6262a 100644 --- a/metaflow/user_decorators/mutable_step.py +++ b/metaflow/user_decorators/mutable_step.py @@ -44,6 +44,7 @@ def __init__( pre_mutate=pre_mutate, statically_defined=statically_defined, inserted_by=inserted_by, + mutator=None, # Step mutators don't have top-level options yet ) self._flow_cls = flow_spec.__class__ self._my_step = step diff --git a/metaflow/user_decorators/user_flow_decorator.py b/metaflow/user_decorators/user_flow_decorator.py index 36f3ed53c6b..4369544ac9a 100644 --- a/metaflow/user_decorators/user_flow_decorator.py +++ b/metaflow/user_decorators/user_flow_decorator.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Union, TYPE_CHECKING +from typing import Dict, Optional, Union, TYPE_CHECKING, Any from metaflow.exception import MetaflowException from metaflow.user_configs.config_parameters import ( @@ -124,6 +124,11 @@ class FlowMutator(metaclass=FlowMutatorMeta): modify the steps. """ + # Top-level options that can be specified on the command line + # Format: {'option-name': {'default': value, 'help': 'help text', ...}} + # These options will be registered as CLI arguments and passed to the mutator + options = {} + def __init__(self, *args, **kwargs): from ..flowspec import FlowSpecMeta @@ -228,6 +233,24 @@ def external_init(self): if "init" in self.__class__.__dict__: self.init(*self._args, **self._kwargs) + def flow_init_options(self, options: Dict[str, Any]): + """ + Called to initialize the mutator with top-level CLI options. + + Parameters + ---------- + options : Dict[str, Any] + Dictionary of option names to values from the CLI + """ + self._option_values = options + + def get_top_level_options(self): + """ + Return a list of option-value pairs that correspond to top-level + options that should be passed to subprocesses (tasks). + """ + return list(self._option_values.items()) + def pre_mutate( self, mutable_flow: "metaflow.user_decorators.mutable_flow.MutableFlow" ) -> None: