-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[Data] Add ray.dataset.expect() fro data quality checks #58434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Add helper functions for common expectation patterns: - expect_column_values_to_be_between() - expect_column_values_to_not_be_null() - expect_column_values_to_be_unique() - expect_column_values_to_be_in_set() - expect_table_row_count_to_be_between() - Add ExpectationSuite class for organizing multiple expectations - Improve error messages with context and suggestions - Enhance documentation with more examples - Fix linting issues (whitespace, boolean comparisons) Signed-off-by: soffer-anyscale <[email protected]>
Hide implementation detail from users - always use map_batches for validator functions since they expect batch input. This simplifies the API and removes unnecessary complexity. Signed-off-by: soffer-anyscale <[email protected]>
Remove implementation details from public API: - Remove name and description parameters (auto-generated for expressions) - Remove batch_format parameter (use default internally) - Make expectation optional to allow ds.expect(expr=...) - Improve error messages and documentation - Align API signature with filter() and other Dataset methods Signed-off-by: soffer-anyscale <[email protected]>
- Accept expectations in constructor: ExpectationSuite(name, [exp1, exp2]) - Rename add_expectation() to append() for list-like API - Rename add_expectations() to extend() for list-like API - Add __getitem__ and __iter__ for list-like iteration - Support single expectation or list/tuple in constructor - Support extending from another ExpectationSuite - Update documentation examples to show simpler usage patterns Signed-off-by: soffer-anyscale <[email protected]>
- Make all classes (@DeveloperAPI): Expectation, DataQualityExpectation, SLAExpectation, ExpectationResult, ExpectationSuite - Make all helper functions @DeveloperAPI (not public) - Only export expect() from ray.data.expectations - Update Dataset.expect() to accept lists directly instead of ExpectationSuite - Remove ExpectationSuite from public API, use simple lists instead - Update documentation to show only expect() usage patterns Signed-off-by: soffer-anyscale <[email protected]>
- Accept name, description, validator_fn, max_execution_time_seconds, etc. directly in Dataset.expect()
- Support ds.expect(col('age') >= 0, name='check') pattern
- Remove need to create expectation objects separately
- Still supports passing expectation objects for advanced use cases
- Update examples to show simpler usage patterns
Signed-off-by: soffer-anyscale <[email protected]>
- Rename optimization_strategy parameter to optimize_for for clarity - optimize_for='performance' (default) is more intuitive than optimization_strategy - Still supports optimization_strategy for backward compatibility - Update examples to show simpler usage: ds.expect(max_execution_time_seconds=60) - Default to 'performance' when time constraint is set (most common use case) Signed-off-by: soffer-anyscale <[email protected]>
- Add expect() factory function for creating expectation objects - Add Dataset.expect() method for validating data quality and SLA requirements - Support expression-based validation using Ray Data expressions API - Support custom validator functions for complex validation logic - Support SLA expectations with max_execution_time_seconds - Return (passed_ds, failed_ds, result) tuples for quarantine workflows - Extract helper functions for batch row counting and dataset creation - Add comprehensive test coverage in test_expectations.py - Follow Ray Data coding standards and architecture patterns Signed-off-by: soffer-anyscale <[email protected]>
- Rename SLAExpectation to ExecutionTimeExpectation - Rename ExpectationType.SLA to ExpectationType.EXECUTION_TIME - Rename _expect_sla to _expect_execution_time - Rename _sla_expectations to _execution_time_expectations - Update all method names and variable names - Update docstrings to use generic developer terminology - Fix lint errors (lambda expressions, missing imports) - Keep SLA terminology only in PR documentation Signed-off-by: soffer-anyscale <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new ray.data.expectations module that enables data quality validation and execution time constraints (SLA requirements) for Ray Data pipelines. The implementation provides a declarative API for validating datasets and monitoring execution times.
Key changes:
- Expression-based validation using Ray Data's expression API
- Dataset splitting into passed/failed datasets for quarantine workflows
- Execution time monitoring and timeout handling
- Pythonic API with
Dataset.expect()method
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
python/ray/data/expectations.py |
Core expectations module with Expectation, DataQualityExpectation, and ExecutionTimeExpectation classes |
python/ray/data/dataset.py |
Added Dataset.expect() method and helper functions for validation |
python/ray/data/_internal/plan.py |
Integrated execution time expectations tracking into ExecutionPlan |
python/ray/data/tests/test_expectations.py |
Comprehensive test suite for expectations functionality |
python/ray/data/__init__.py |
Exported expect function |
python/ray/data/_internal/execution/resource_manager.py |
Minor refactoring of resource reservation logic |
python/ray/data/_internal/execution/interfaces/execution_options.py |
Added copy() method to ExecutionOptions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
python/ray/data/expectations.py
Outdated
| else: | ||
| # Import for runtime type checking | ||
| try: | ||
| from ray.data.expressions import Expr as _ExprType # noqa: F401 | ||
| except ImportError: | ||
| _ExprType = None # type: ignore |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runtime type checking import pattern is unnecessarily complex. The _ExprType imported in the else block is never used. Consider simplifying this by either removing the runtime import or using it consistently throughout the file.
| else: | |
| # Import for runtime type checking | |
| try: | |
| from ray.data.expressions import Expr as _ExprType # noqa: F401 | |
| except ImportError: | |
| _ExprType = None # type: ignore |
python/ray/data/expectations.py
Outdated
| if self.max_execution_time_seconds is None and self.max_execution_time is None: | ||
| if self.target_completion_time is None: | ||
| raise ValueError( | ||
| "Must specify either max_execution_time_seconds, " |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected error message to use 'at least one time constraint' for clarity and consistency with the method's purpose
| "Must specify either max_execution_time_seconds, " | |
| "Must specify at least one time constraint: max_execution_time_seconds, " |
python/ray/data/dataset.py
Outdated
| total_rows_validated=total_rows, | ||
| failed_rows=failed_rows, |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpectationResult uses field names total_count and failure_count (as defined in expectations.py line 238-239), but this code passes total_rows_validated and failed_rows. This will cause a TypeError at runtime. Use the correct field names: total_count and failure_count.
| total_rows_validated=total_rows, | |
| failed_rows=failed_rows, | |
| total_count=total_rows, | |
| failure_count=failed_rows, |
| result = ExpectationResult( | ||
| expectation=expectation, | ||
| passed=passed, | ||
| message=message, | ||
| failure_count=failure_count, | ||
| total_count=total_count, | ||
| ) |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as Comment 4 - inconsistent field naming between the dataclass definition and usage. This instance correctly uses failure_count and total_count, but other usages in the same file use failed_rows and total_rows_validated. All usages should be consistent with the dataclass definition.
python/ray/data/dataset.py
Outdated
| result = ExpectationResult( | ||
| expectation=expectation, | ||
| passed=passed, | ||
| message=message, | ||
| execution_time_seconds=elapsed_time, | ||
| total_count=processed_rows, | ||
| failure_count=0 if passed else 1, # 1 = timeout failure |
Copilot
AI
Nov 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment # 1 = timeout failure on line 2341 suggests that failure_count=1 represents a timeout failure, but this is semantically incorrect. The failure_count field should represent the number of failed rows/batches, not a boolean flag for timeout. Consider using a separate field or status code to indicate timeout failures rather than overloading failure_count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a powerful and well-designed expect() API for data quality and SLA enforcement in Ray Data. The implementation is comprehensive, covering expression-based validation, custom validator functions, and execution time constraints, and it integrates nicely with the existing expression system. My review focuses on improving scalability, correctness in edge cases, and maintainability. I've identified a critical bug in how validation results are created, a couple of high-severity issues related to potential memory-overload and schema preservation, and some medium-severity issues for code cleanup and maintainability. The tests are extensive, though I found one that needs to be corrected to align with the API. Overall, this is an excellent and valuable addition to the Ray Data library.
| result = ExpectationResult( | ||
| expectation=expectation, | ||
| passed=passed, | ||
| message=message, | ||
| total_rows_validated=total_rows, | ||
| failed_rows=failed_rows, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bug here where ExpectationResult is instantiated with incorrect keyword arguments. The ExpectationResult dataclass expects total_count and failure_count, but total_rows_validated and failed_rows are used instead. This will cause a TypeError at runtime.
| result = ExpectationResult( | |
| expectation=expectation, | |
| passed=passed, | |
| message=message, | |
| total_rows_validated=total_rows, | |
| failed_rows=failed_rows, | |
| ) | |
| result = ExpectationResult( | |
| expectation=expectation, | |
| passed=passed, | |
| message=message, | |
| total_count=total_rows, | |
| failure_count=failed_rows, | |
| ) |
python/ray/data/dataset.py
Outdated
| # Failed dataset = remaining unprocessed data | ||
| # For V1, we return empty dataset as we can't easily track unprocessed data | ||
| # This could be enhanced in future versions | ||
| failed_ds = ray.data.from_items([]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a timeout occurs, the failed_ds is created from an empty list using ray.data.from_items([]). This creates a new dataset without a schema. If the original dataset had a schema, this could lead to errors in downstream operations that expect the schema to be preserved.
To ensure schema preservation, it's better to create an empty dataset from the original dataset's context. A simple way to do this is self.limit(0).
| failed_ds = ray.data.from_items([]) | |
| failed_ds = self.limit(0) |
python/ray/data/dataset.py
Outdated
| if failed_ds.count() > 0: | ||
| failed_rows = failed_ds.take_all() | ||
| all_failed_rows.extend(failed_rows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of _expect_list collects all failed rows from each expectation into a list on the driver node by calling failed_ds.take_all(). For large datasets with even a moderate failure rate, this can lead to an out-of-memory error on the driver, which undermines the benefits of distributed computation.
A more scalable approach would be to collect the failed_ds Dataset objects and then union them at the end. This would keep the data distributed in the object store.
Example of a more scalable approach:
all_failed_datasets = []
# ... in the loop ...
if failed_ds.count() > 0:
all_failed_datasets.append(failed_ds)
# ... after the loop ...
if all_failed_datasets:
failed_ds = all_failed_datasets[0].union(*all_failed_datasets[1:])
else:
# create empty dataset with correct schema
failed_ds = current_ds.limit(0)| def test_expectation_result_with_all_fields(): | ||
| """Test ExpectationResult with all fields.""" | ||
| result = ExpectationResult( | ||
| expectation_name="test_expectation", | ||
| passed=True, | ||
| total_rows_validated=1000, | ||
| failed_rows=0, | ||
| execution_time_seconds=5.2, | ||
| message="All checks passed", | ||
| ) | ||
|
|
||
| assert result.passed is True | ||
| assert result.total_rows_validated == 1000 | ||
| assert result.failed_rows == 0 | ||
| assert result.execution_time_seconds == 5.2 | ||
| assert "test_expectation" in result.message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case for ExpectationResult is incorrect and will fail. It attempts to initialize ExpectationResult with fields that do not exist (expectation_name, total_rows_validated, failed_rows) and misses the required expectation field.
The assertions also reference these non-existent fields. The test should be updated to correctly instantiate ExpectationResult and assert on its actual fields (expectation, total_count, failure_count).
| def test_expectation_result_with_all_fields(): | |
| """Test ExpectationResult with all fields.""" | |
| result = ExpectationResult( | |
| expectation_name="test_expectation", | |
| passed=True, | |
| total_rows_validated=1000, | |
| failed_rows=0, | |
| execution_time_seconds=5.2, | |
| message="All checks passed", | |
| ) | |
| assert result.passed is True | |
| assert result.total_rows_validated == 1000 | |
| assert result.failed_rows == 0 | |
| assert result.execution_time_seconds == 5.2 | |
| assert "test_expectation" in result.message | |
| def test_expectation_result_with_all_fields(): | |
| """Test ExpectationResult with all fields.""" | |
| exp = expect(expr=col("value") > 0, name="test_expectation") | |
| result = ExpectationResult( | |
| expectation=exp, | |
| passed=True, | |
| total_count=1000, | |
| failure_count=0, | |
| execution_time_seconds=5.2, | |
| message="All checks passed for test_expectation", | |
| ) | |
| assert result.passed is True | |
| assert result.total_count == 1000 | |
| assert result.failure_count == 0 | |
| assert result.execution_time_seconds == 5.2 | |
| assert "test_expectation" in result.message |
python/ray/data/dataset.py
Outdated
| try: | ||
| import pandas as pd | ||
|
|
||
| if isinstance(batch, pd.DataFrame): | ||
| batch["_validation_passed"] = True | ||
| return batch | ||
| except Exception: | ||
| pass | ||
| try: | ||
| import pyarrow as pa | ||
|
|
||
| if isinstance(batch, pa.Table): | ||
| import pyarrow.compute as pc | ||
|
|
||
| passed_col = pc.fill_null(pc.scalar(True), True) | ||
| return batch.append_column("_validation_passed", passed_col) | ||
| except Exception: | ||
| pass | ||
| # Fallback: add validation flag to dict batch | ||
| if isinstance(batch, dict): | ||
| import numpy as np | ||
|
|
||
| batch["_validation_passed"] = np.array( | ||
| [True] * len(batch.get(list(batch.keys())[0], [])) | ||
| ) | ||
| return batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication within the validation_fn wrapper for handling different batch types (pandas, pyarrow, dict). This logic for adding the _validation_passed column is repeated three times: for empty batches, for passed batches, and for failed batches.
To improve maintainability and reduce redundancy, consider refactoring this logic into a single helper function, for example _add_validation_flag(batch, flag_value), that can be called from all three locations.
python/ray/data/expectations.py
Outdated
| def _is_expr(obj: Any) -> bool: | ||
| """Check if an object is a Ray Data expression. | ||
| Uses isinstance if Expr is available, otherwise falls back to duck typing. | ||
| This ensures we can detect expressions even if the import fails. | ||
| """ | ||
| # Import here to avoid circular dependencies and handle lazy import | ||
| try: | ||
| from ray.data.expressions import Expr as ExprType | ||
|
|
||
| return isinstance(obj, ExprType) | ||
| except (ImportError, TypeError): | ||
| pass | ||
|
|
||
| # Duck typing fallback for Expr detection | ||
| # Expectation is defined later in the file, but we can check by class name | ||
| obj_type = type(obj).__name__ | ||
| if obj_type == "Expectation" or obj_type.endswith("Expectation"): | ||
| return False | ||
| if callable(obj): | ||
| return False | ||
| return hasattr(obj, "to_pyarrow") and hasattr(obj, "data_type") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Fix ExpectationResult field names (total_count/failure_count) - Fix schema preservation for failed_ds using limit(0) - Fix memory issue in _expect_list by using union instead of take_all - Fix test_expectation_result_with_all_fields with correct fields - Refactor validation flag logic into _add_validation_flag helper - Remove unused _is_expr function and _ExprType import - Improve error message clarity for time constraints - Fix failure_count semantics for timeout (use passed=False instead) Signed-off-by: soffer-anyscale <[email protected]>
- Extract _add_validation_flag helper function - Remove duplicate code for pandas/pyarrow/dict batch handling - Reduces code duplication from 3 locations to 1 helper function Signed-off-by: soffer-anyscale <[email protected]>
Description
This PR introduces a new
ray.data.expectationsmodule that enables users to define and enforce both data quality rules and execution time constraints (enterprise SLA requirements) for Ray Data pipelines. The expectations API integrates seamlessly with Ray Data's expression system to provide declarative validation and execution time monitoring.Key Features:
Data Quality Validation
col("value") > 0)Execution Time Constraints (SLA Requirements)
max_execution_time_seconds)Pythonic API Design
Dataset.expect()method that matches Ray Data patternsWhy
ds.expect()vsfilter()or Expressions API?While
filter()and the expressions API are powerful for data transformations,ds.expect()serves a different purpose focused on data quality validation and enterprise SLA requirements:Key Differences:
Quarantine Workflows:
filter()only returns passing rows, whileexpect()returns bothpassed_dsandfailed_ds, enabling automated quarantine workflows where invalid data is preserved for investigation rather than discarded.Validation Results:
expect()provides rich validation metadata (ExpectationResult) including pass/fail status, failure counts, execution time, and descriptive messages—essential for monitoring, alerting, and compliance reporting.Enterprise SLAs:
expect()uniquely supports execution time constraints, enabling Ray Data to monitor execution time and halt processing if SLA requirements are exceeded.Batch-Level Validation: Unlike
filter()which operates row-by-row,expect()validates entire batches and provides aggregate statistics, making it more efficient for data quality checks across large datasets.Declarative Validation:
expect()provides a clean, declarative API for expressing data quality rules that integrates seamlessly with Ray Data's expression system.Use Cases:
In essence,
filter()transforms data, whileexpect()validates and monitors it—two complementary operations that serve different needs in production data pipelines.Usage Examples:
Related issues
This is a new feature with no related issues.
Additional information
Implementation Details
New Files:
python/ray/data/expectations.py- Core expectations module (~540 lines)python/ray/data/tests/test_expectations.py- Comprehensive test suiteModified Files:
python/ray/data/dataset.py- AddedDataset.expect()method and helper functions (_count_rows_in_batch,_create_dataset_from_batches)python/ray/data/__init__.py- Exportedexpectfunctionpython/ray/data/_internal/plan.py- Integrated execution time expectations into execution plan for time trackingAPI Surface:
Public Functions:
expect()- Factory function for creating expectations (@PublicAPI(stability="alpha"))Internal Classes (marked as
@DeveloperAPI):Expectation- Base class for all expectationsDataQualityExpectation- For data quality validationExecutionTimeExpectation- For execution time constraints (SLA requirements)ExpectationResult- Result of expectation validationDataset Method:
Dataset.expect()- Applies expectations and returns(passed_ds, failed_ds, result)or(passed_ds, failed_ds, List[ExpectationResult])for listsArchitecture Integration
Expression API Integration:
eval_expr()infrastructurewith_column()+filter()pattern for efficient validation (avoids double evaluation)BlockAccessorfor consistent batch format handling (Arrow, pandas, dict)Execution Plan Integration:
ExecutionPlan._execution_time_expectationsmax_execution_time_secondsis exceededDesign Principles:
filter())filter(),map_batches(),BlockAccessor,eval_expr(), etc.@DeveloperAPI, onlyexpect()is@PublicAPITesting
The implementation includes comprehensive tests covering:
Performance Considerations
filter())map_batches()for efficient batch processing