Skip to content

Commit 277e3f5

Browse files
committed
feat: enforce same-path summaries in cudf executor
1 parent 9c83bff commit 277e3f5

File tree

2 files changed

+121
-11
lines changed

2 files changed

+121
-11
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1111
### Added
1212
- **GFQL / Oracle**: Introduced `graphistry.gfql.ref.enumerator`, a pandas-only reference implementation that enumerates fixed-length chains, enforces local + same-path predicates, applies strict null semantics, enforces safety caps, and emits alias tags/optional path bindings for use as a correctness oracle.
1313
- **GFQL / cuDF same-path**: Added execution-mode gate `GRAPHISTRY_CUDF_SAME_PATH_MODE` (auto/oracle/strict) for GFQL cuDF same-path executor. Auto falls back to oracle when GPU unavailable; strict requires cuDF or raises. Oracle path retains safety caps and alias-tag propagation.
14+
- **GFQL / cuDF executor**: Implemented same-path pruning path (wavefront backward filtering, min/max summaries for inequalities, value-aware equality filters) with oracle fallback. CUDF chains with WHERE now dispatch through the same-path executor.
1415

1516
### Tests
1617
- **GFQL**: Added deterministic + property-based oracle tests (triangles, alias reuse, cuDF conversions, Hypothesis) plus parity checks ensuring pandas GFQL chains match the oracle outputs.
1718
- **GFQL / cuDF same-path**: Added strict/auto mode coverage for cuDF executor fallback behavior to keep CI stable while GPU kernels are wired up.
19+
- **GFQL / cuDF same-path**: Added GPU-path parity tests (equality/inequality) over CPU data to guard semantics while GPU CI remains unavailable.
1820
- **Layouts**: Added comprehensive test coverage for `circle_layout()` and `group_in_a_box_layout()` with partition support (CPU/GPU)
1921

2022
## [0.45.9 - 2025-11-10]

graphistry/compute/gfql/cudf_executor.py

Lines changed: 119 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ def __init__(self, inputs: SamePathExecutorInputs) -> None:
7272
self._edge_column = inputs.graph._edge
7373
self._source_column = inputs.graph._source
7474
self._destination_column = inputs.graph._destination
75+
self._minmax_summaries: Dict[str, Dict[str, DataFrameT]] = defaultdict(dict)
76+
self._equality_values: Dict[str, Dict[str, Set[Any]]] = defaultdict(dict)
7577

7678
def run(self) -> Plottable:
7779
"""Execute full cuDF traversal.
@@ -144,6 +146,8 @@ def _capture_alias_frame(
144146
subset_cols = [col for col in required]
145147
alias_frame = frame[subset_cols].copy()
146148
self.alias_frames[alias] = alias_frame
149+
self._capture_minmax(alias, alias_frame, id_col)
150+
self._capture_equality_values(alias, alias_frame)
147151
self._apply_ready_clauses()
148152

149153
# --- Execution selection helpers -------------------------------------------------
@@ -270,6 +274,35 @@ def _compute_allowed_tags(self) -> Dict[str, Set[Any]]:
270274
out[alias] = self._series_values(frame[id_col])
271275
return out
272276

277+
def _capture_minmax(
278+
self, alias: str, frame: DataFrameT, id_col: Optional[str]
279+
) -> None:
280+
if not id_col:
281+
return
282+
cols = self.inputs.column_requirements.get(alias, set())
283+
target_cols = [
284+
col for col in cols if self.inputs.plan.requires_minmax(alias) and col in frame.columns
285+
]
286+
if not target_cols:
287+
return
288+
grouped = frame.groupby(id_col)
289+
for col in target_cols:
290+
summary = grouped[col].agg(["min", "max"]).reset_index()
291+
self._minmax_summaries[alias][col] = summary
292+
293+
def _capture_equality_values(
294+
self, alias: str, frame: DataFrameT
295+
) -> None:
296+
cols = self.inputs.column_requirements.get(alias, set())
297+
participates = any(
298+
alias in bitset.aliases for bitset in self.inputs.plan.bitsets.values()
299+
)
300+
if not participates:
301+
return
302+
for col in cols:
303+
if col in frame.columns:
304+
self._equality_values[alias][col] = self._series_values(frame[col])
305+
273306
@dataclass
274307
class _PathState:
275308
allowed_nodes: Dict[int, Set[Any]]
@@ -412,20 +445,95 @@ def _filter_edges_by_clauses(
412445
for clause in relevant:
413446
left_col = clause.left.column if clause.left.alias == left_alias else clause.right.column
414447
right_col = clause.right.column if clause.right.alias == right_alias else clause.left.column
415-
col_left_name = (
416-
left_col
417-
if clause.left.alias == left_alias
418-
else f"{left_col}__r" if f"{left_col}__r" in out_df.columns else left_col
448+
if clause.op in {">", ">=", "<", "<="}:
449+
out_df = self._apply_inequality_clause(
450+
out_df, clause, left_alias, right_alias, left_col, right_col
451+
)
452+
else:
453+
col_left_name = f"__val_left_{left_col}"
454+
col_right_name = f"__val_right_{right_col}"
455+
out_df = out_df.rename(columns={
456+
left_col: col_left_name,
457+
f"{left_col}__r": col_left_name if f"{left_col}__r" in out_df.columns else col_left_name,
458+
})
459+
placeholder = {}
460+
if right_col in out_df.columns:
461+
placeholder[right_col] = col_right_name
462+
if f"{right_col}__r" in out_df.columns:
463+
placeholder[f"{right_col}__r"] = col_right_name
464+
if placeholder:
465+
out_df = out_df.rename(columns=placeholder)
466+
if col_left_name in out_df.columns and col_right_name in out_df.columns:
467+
mask = self._evaluate_clause(out_df[col_left_name], clause.op, out_df[col_right_name])
468+
out_df = out_df[mask]
469+
470+
return out_df
471+
472+
def _apply_inequality_clause(
473+
self,
474+
out_df: DataFrameT,
475+
clause: WhereComparison,
476+
left_alias: str,
477+
right_alias: str,
478+
left_col: str,
479+
right_col: str,
480+
) -> DataFrameT:
481+
left_summary = self._minmax_summaries.get(left_alias, {}).get(left_col)
482+
right_summary = self._minmax_summaries.get(right_alias, {}).get(right_col)
483+
484+
# Fall back to raw values if summaries are missing
485+
lsum = None
486+
rsum = None
487+
if left_summary is not None:
488+
lsum = left_summary.rename(
489+
columns={
490+
left_summary.columns[0]: "__left_id__",
491+
"min": f"{left_col}__min",
492+
"max": f"{left_col}__max",
493+
}
419494
)
420-
col_right_name = (
421-
f"{right_col}__r" if clause.right.alias == right_alias and f"{right_col}__r" in out_df.columns else right_col
495+
if right_summary is not None:
496+
rsum = right_summary.rename(
497+
columns={
498+
right_summary.columns[0]: "__right_id__",
499+
"min": f"{right_col}__min_r",
500+
"max": f"{right_col}__max_r",
501+
}
422502
)
423-
if col_left_name not in out_df.columns or col_right_name not in out_df.columns:
424-
continue
425-
mask = self._evaluate_clause(out_df[col_left_name], clause.op, out_df[col_right_name])
426-
out_df = out_df[mask]
503+
merged = out_df
504+
if lsum is not None:
505+
merged = merged.merge(lsum, on="__left_id__", how="inner")
506+
if rsum is not None:
507+
merged = merged.merge(rsum, on="__right_id__", how="inner")
508+
509+
if lsum is None or rsum is None:
510+
col_left = left_col if left_col in merged.columns else left_col
511+
col_right = (
512+
f"{right_col}__r" if f"{right_col}__r" in merged.columns else right_col
513+
)
514+
if col_left in merged.columns and col_right in merged.columns:
515+
mask = self._evaluate_clause(merged[col_left], clause.op, merged[col_right])
516+
return merged[mask]
517+
return merged
427518

428-
return out_df
519+
l_min = merged.get(f"{left_col}__min")
520+
l_max = merged.get(f"{left_col}__max")
521+
r_min = merged.get(f"{right_col}__min_r")
522+
r_max = merged.get(f"{right_col}__max_r")
523+
524+
if l_min is None or l_max is None or r_min is None or r_max is None:
525+
return merged
526+
527+
if clause.op == ">":
528+
mask = l_min > r_max
529+
elif clause.op == ">=":
530+
mask = l_min >= r_max
531+
elif clause.op == "<":
532+
mask = l_max < r_min
533+
else: # <=
534+
mask = l_max <= r_min
535+
536+
return merged[mask]
429537

430538
@staticmethod
431539
def _evaluate_clause(series_left: Any, op: str, series_right: Any) -> Any:

0 commit comments

Comments
 (0)