diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index f8483e9edb..5eba9b5130 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -511,6 +511,7 @@ server_notification_definitions! { ItemCompleted => "item/completed" (v2::ItemCompletedNotification), AgentMessageDelta => "item/agentMessage/delta" (v2::AgentMessageDeltaNotification), CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), + FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification), McpToolCallProgress => "item/mcpToolCall/progress" (v2::McpToolCallProgressNotification), AccountUpdated => "account/updated" (v2::AccountUpdatedNotification), AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index fd148b256f..bd38539bb2 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1353,6 +1353,16 @@ pub struct CommandExecutionOutputDeltaNotification { pub delta: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct FileChangeOutputDeltaNotification { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index c34c7b281e..efdce3162d 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -18,6 +18,7 @@ use codex_app_server_protocol::ContextCompactedNotification; use codex_app_server_protocol::ErrorNotification; use codex_app_server_protocol::ExecCommandApprovalParams; use codex_app_server_protocol::ExecCommandApprovalResponse; +use codex_app_server_protocol::FileChangeOutputDeltaNotification; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::FileUpdateChange; @@ -501,17 +502,44 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => { - let notification = CommandExecutionOutputDeltaNotification { - thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), - item_id: exec_command_output_delta_event.call_id.clone(), - delta: String::from_utf8_lossy(&exec_command_output_delta_event.chunk).to_string(), + let item_id = exec_command_output_delta_event.call_id.clone(); + let delta = String::from_utf8_lossy(&exec_command_output_delta_event.chunk).to_string(); + // The underlying EventMsg::ExecCommandOutputDelta is used for shell, unified_exec, + // and apply_patch tool calls. We represent apply_patch with the FileChange item, and + // everything else with the CommandExecution item. + // + // We need to detect which item type it is so we can emit the right notification. + // We already have state tracking FileChange items on item/started, so let's use that. + let is_file_change = { + let map = turn_summary_store.lock().await; + map.get(&conversation_id) + .is_some_and(|summary| summary.file_change_started.contains(&item_id)) }; - outgoing - .send_server_notification(ServerNotification::CommandExecutionOutputDelta( - notification, - )) - .await; + if is_file_change { + let notification = FileChangeOutputDeltaNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item_id, + delta, + }; + outgoing + .send_server_notification(ServerNotification::FileChangeOutputDelta( + notification, + )) + .await; + } else { + let notification = CommandExecutionOutputDeltaNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item_id, + delta, + }; + outgoing + .send_server_notification(ServerNotification::CommandExecutionOutputDelta( + notification, + )) + .await; + } } EventMsg::ExecCommandEnd(exec_command_end_event) => { let ExecCommandEndEvent { diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index cd35e31adc..03ee279e51 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -11,6 +11,7 @@ use app_test_support::to_response; use codex_app_server_protocol::ApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::CommandExecutionStatus; +use codex_app_server_protocol::FileChangeOutputDeltaNotification; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; @@ -725,6 +726,26 @@ async fn turn_start_file_change_approval_v2() -> Result<()> { ) .await?; + let output_delta_notif = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("item/fileChange/outputDelta"), + ) + .await??; + let output_delta: FileChangeOutputDeltaNotification = serde_json::from_value( + output_delta_notif + .params + .clone() + .expect("item/fileChange/outputDelta params"), + )?; + assert_eq!(output_delta.thread_id, thread.id); + assert_eq!(output_delta.turn_id, turn.id); + assert_eq!(output_delta.item_id, "patch-call"); + assert!( + !output_delta.delta.is_empty(), + "expected delta to be non-empty, got: {}", + output_delta.delta + ); + let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async { loop { let completed_notif = mcp