Skip to content

Commit 67dcc86

Browse files
committed
Allow flow mutators to add top level options
1 parent a9edc4e commit 67dcc86

File tree

7 files changed

+107
-3
lines changed

7 files changed

+107
-3
lines changed

metaflow/cli.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,16 @@ def start(
471471
raise ctx.obj.delayed_config_exception
472472

473473
# Init all values in the flow mutators and then process them
474-
for decorator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
475-
decorator.external_init()
474+
for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
475+
mutator.external_init()
476+
477+
# Initialize mutators with top-level options
478+
for mutator in ctx.obj.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
479+
mutator_options = {
480+
option: deco_options.get(option.replace("-", "_"), option_info["default"])
481+
for option, option_info in mutator.options.items()
482+
}
483+
mutator.flow_init_options(mutator_options)
476484

477485
new_cls = ctx.obj.flow._process_config_decorators(config_options)
478486
if new_cls:

metaflow/decorators.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ def add_decorator_options(cmd):
270270

271271
seen = {}
272272
existing_params = set(p.name.lower() for p in cmd.params)
273+
273274
# Add decorator options
274275
for deco in flow_decorators(flow_cls):
275276
for option, kwargs in deco.options.items():
@@ -290,13 +291,43 @@ def add_decorator_options(cmd):
290291
kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper()
291292
seen[option] = deco.name
292293
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))
294+
295+
# Add flow mutator options
296+
for mutator in flow_mutators(flow_cls):
297+
for option, kwargs in mutator.options.items():
298+
mutator_name = mutator.__class__.__name__
299+
if option in seen:
300+
msg = (
301+
"Flow mutator '%s' uses an option '%s' which is also "
302+
"used by '%s'. This is a bug in Metaflow. "
303+
"Please file a ticket on GitHub."
304+
% (mutator_name, option, seen[option])
305+
)
306+
raise MetaflowInternalError(msg)
307+
elif mutator_name.lower() in existing_params:
308+
raise MetaflowInternalError(
309+
"Flow mutator '%s' uses an option '%s' which is a reserved "
310+
"keyword. Please use a different option name."
311+
% (mutator_name, option)
312+
)
313+
else:
314+
kwargs["envvar"] = "METAFLOW_FLOW_%s" % option.upper()
315+
seen[option] = mutator_name
316+
cmd.params.insert(0, click.Option(("--" + option,), **kwargs))
317+
293318
return cmd
294319

295320

296321
def flow_decorators(flow_cls):
297322
return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list]
298323

299324

325+
def flow_mutators(flow_cls):
326+
from metaflow.flowspec import _FlowState
327+
328+
return flow_cls._flow_state.get(_FlowState.FLOW_MUTATORS, [])
329+
330+
300331
class StepDecorator(Decorator):
301332
"""
302333
Base class for all step decorators.
@@ -797,6 +828,7 @@ def _init_step_decorators(
797828
pre_mutate=False,
798829
statically_defined=deco.statically_defined,
799830
inserted_by=inserted_by_value,
831+
mutator=deco,
800832
)
801833
# Sanity check to make sure we are applying the decorator to the right
802834
# class

metaflow/flowspec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ def _process_config_decorators(cls, config_options, process_configs=True):
297297
pre_mutate=True,
298298
statically_defined=deco.statically_defined,
299299
inserted_by=inserted_by_value,
300+
mutator=deco,
300301
)
301302
# Sanity check to make sure we are applying the decorator to the right
302303
# class

metaflow/runtime.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2074,6 +2074,12 @@ def __init__(
20742074
for deco in flow_decorators(self.task.flow):
20752075
self.top_level_options.update(deco.get_top_level_options())
20762076

2077+
# FlowMutators can also define their own top-level options similar to decorators
2078+
from metaflow.flowspec import _FlowState
2079+
2080+
for mutator in self.task.flow._flow_state.get(_FlowState.FLOW_MUTATORS, []):
2081+
self.top_level_options.update(mutator.get_top_level_options())
2082+
20772083
# We also pass configuration options using the kv.<name> syntax which will cause
20782084
# the configuration options to be loaded from the CONFIG file (or local-config-file
20792085
# in the case of the local runtime)

metaflow/user_decorators/mutable_flow.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ def __init__(
2222
pre_mutate: bool = False,
2323
statically_defined: bool = False,
2424
inserted_by: Optional[str] = None,
25+
mutator: Optional[
26+
"metaflow.user_decorators.user_flow_decorator.FlowMutator"
27+
] = None,
2528
):
2629
self._flow_cls = flow_spec
2730
self._pre_mutate = pre_mutate
2831
self._statically_defined = statically_defined
2932
self._inserted_by = inserted_by
33+
self._mutator = mutator
3034
if self._inserted_by is None:
3135
# This is an error because MutableSteps should only be created by
3236
# StepMutators or FlowMutators. We need to catch it now because otherwise
@@ -138,6 +142,35 @@ def parameters(
138142
)
139143
yield var, param
140144

145+
@property
146+
def tl_options(self) -> Dict[str, Any]:
147+
"""
148+
Get the top-level CLI options for this mutator.
149+
150+
Returns a dictionary of option names to values that were passed via the CLI.
151+
This allows mutators to access their own top-level options similar to how
152+
they can access configs and parameters.
153+
154+
Example:
155+
```
156+
class MyMutator(FlowMutator):
157+
options = {
158+
'my-option': {'default': 'value', 'help': 'My option'}
159+
}
160+
161+
def pre_mutate(self, mutable_flow):
162+
# Access the option value
163+
val = mutable_flow.tl_options.get('my-option')
164+
print(f'Option value: {val}')
165+
```
166+
167+
Returns
168+
-------
169+
Dict[str, Any]
170+
Dictionary of option names to values
171+
"""
172+
return self._mutator._option_values if self._mutator else {}
173+
141174
@property
142175
def steps(
143176
self,

metaflow/user_decorators/mutable_step.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(
4444
pre_mutate=pre_mutate,
4545
statically_defined=statically_defined,
4646
inserted_by=inserted_by,
47+
mutator=None, # Step mutators don't have top-level options yet
4748
)
4849
self._flow_cls = flow_spec.__class__
4950
self._my_step = step

metaflow/user_decorators/user_flow_decorator.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Dict, Optional, Union, TYPE_CHECKING
1+
from typing import Dict, Optional, Union, TYPE_CHECKING, Any
22

33
from metaflow.exception import MetaflowException
44
from metaflow.user_configs.config_parameters import (
@@ -124,6 +124,11 @@ class FlowMutator(metaclass=FlowMutatorMeta):
124124
modify the steps.
125125
"""
126126

127+
# Top-level options that can be specified on the command line
128+
# Format: {'option-name': {'default': value, 'help': 'help text', ...}}
129+
# These options will be registered as CLI arguments and passed to the mutator
130+
options = {}
131+
127132
def __init__(self, *args, **kwargs):
128133
from ..flowspec import FlowSpecMeta
129134

@@ -228,6 +233,24 @@ def external_init(self):
228233
if "init" in self.__class__.__dict__:
229234
self.init(*self._args, **self._kwargs)
230235

236+
def flow_init_options(self, options: Dict[str, Any]):
237+
"""
238+
Called to initialize the mutator with top-level CLI options.
239+
240+
Parameters
241+
----------
242+
options : Dict[str, Any]
243+
Dictionary of option names to values from the CLI
244+
"""
245+
self._option_values = options
246+
247+
def get_top_level_options(self):
248+
"""
249+
Return a list of option-value pairs that correspond to top-level
250+
options that should be passed to subprocesses (tasks).
251+
"""
252+
return list(self._option_values.items())
253+
231254
def pre_mutate(
232255
self, mutable_flow: "metaflow.user_decorators.mutable_flow.MutableFlow"
233256
) -> None:

0 commit comments

Comments
 (0)