Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 26 additions & 35 deletions src/cpp/src/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
bool m_think_tag_opened = false;
std::string m_text_cache = "";
bool m_deactivated = false;

/**
* @brief Ensure required fields exist in the message container.
*/
void ensure_message_fields(JsonContainer& message) {
if (!message.contains("reasoning_content")) {
message["reasoning_content"] = "";
}
if (!message.contains("content")) {
message["content"] = "";
}
}

/**
* @brief Find the longest suffix of text that is a prefix of the close tag.
Expand Down Expand Up @@ -61,8 +49,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk,
size_t open_idx, size_t close_idx, std::string& delta_text) {
// Extract reasoning content between tags
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(),
close_idx - (open_idx + m_open_tag.size())));
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
Expand All @@ -79,28 +67,35 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
void handle_open_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
// Start accumulating reasoning content
reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size()));
message["reasoning_content"] = std::move(reason_str);
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size()));

if (!m_keep_original_content) {
delta_text.clear();
} else {
delta_text = txt_chunk;
}

m_think_tag_opened = true;
m_text_cache.clear();
}

/**
* @brief Handle the case where the close tag is found.
*/
void handle_close_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
// Append text before close tag to reasoning content
reason_str.append(txt_chunk.substr(0, close_idx));
message["reasoning_content"] = std::move(reason_str);
message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx)));
auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
message["content"] = content;

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
// Despite the fact that we put txt_chung to delta_text it's correct.
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'txt_chung' to 'txt_chunk'.

Suggested change
// Despite the fact that we put txt_chung to delta_text it's correct.
// Despite the fact that we put txt_chunk to delta_text it's correct.

Copilot uses AI. Check for mistakes.
// Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet
// and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts
// belonged to reasoning_content so we can discard them.
delta_text = content;
} else {
delta_text = txt_chunk;
}

m_text_cache.clear();
Expand All @@ -111,25 +106,24 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle accumulating text while inside reasoning tags.
*/
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, std::string& delta_text) {
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk, std::string& delta_text) {
// Find if the end of txt_chunk might be the start of a close tag
const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk);

if (num_chars_to_keep > 0) {
// Keep potential partial close tag in cache
m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep));
reason_str.append(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep);
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment instead of append changes semantics. Previously accumulated reasoning content in reason_str will be lost. This should use reason_str.append() or += to maintain accumulated content, or the accumulated content should be preserved through a different mechanism.

Copilot uses AI. Check for mistakes.
delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
} else {
// No partial close tag, accumulate all text
reason_str.append(txt_chunk);
reason_str = txt_chunk;
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assignment instead of append discards previously accumulated reasoning content. This should use reason_str.append() or += to maintain accumulated content from previous calls.

Copilot uses AI. Check for mistakes.
m_text_cache.clear();
}

message["reasoning_content"] = std::move(reason_str);
if (!m_keep_original_content) {
delta_text.clear();
}
message["reasoning_content"] = std::move(reason_str);
}

public:
Expand Down Expand Up @@ -157,13 +151,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
}
m_first_run = false;

ensure_message_fields(message);

const std::string txt_chunk = m_text_cache + delta_text;
std::string reason_str;
if (message.contains("reasoning_content")) {
reason_str = std::move(message["reasoning_content"].get_string());
}
std::string txt_chunk = m_text_cache + delta_text;
std::string reason_str = message.contains("reasoning_content") ? std::move(message["reasoning_content"].get_string()) : "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::string reason_str = message.contains("reasoning_content") ? std::move(message["reasoning_content"].get_string()) : "";


// Cache find() results to avoid redundant searches
const auto open_idx = txt_chunk.find(m_open_tag);
Expand All @@ -180,13 +170,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text);
}
} else if (m_think_tag_opened && close_idx != std::string::npos) {
handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text);
handle_close_tag(message, txt_chunk, close_idx, delta_text);
} else if (m_think_tag_opened) {
handle_inside_reasoning(message, reason_str, txt_chunk, delta_text);
} else {
// Think tag was not opened yet and not found in the current delta_text.
// Accumulate text in the cache to detect if <think> is split between several delta_text pieces.
m_text_cache += delta_text;
delta_text.clear();
}
Comment on lines +180 to 181
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] When think tag is not yet opened and accumulating in cache, delta_text is cleared. This means no delta content is returned to the user during this phase. Consider adding a comment explaining this behavior is intentional to avoid confusion.

Suggested change
delta_text.clear();
}
// Intentionally clear delta_text: no delta content is returned to the user during this phase
// (we are waiting for the <think> tag to be fully detected in the cache).
delta_text.clear();

Copilot uses AI. Check for mistakes.

return delta_text;
Expand Down
49 changes: 43 additions & 6 deletions src/cpp/src/text_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,34 @@ std::vector<std::shared_ptr<IncrementalParser>> m_parsers;
JsonContainer m_parsed_message;

TextParserStreamerImpl(std::vector<std::shared_ptr<IncrementalParser>> parsers) : m_parsers{parsers} {}

};

void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector<std::string> keys_to_concatenate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to have it as the json container method.

for (const auto& key : keys_to_concatenate) {
if (to.contains(key) && from.contains(key)) {
// If both are strings, concatenate
if (to[key].is_string() && from[key].is_string()) {
to[key] = to[key].get_string() + from[key].get_string();
}
} else if (from.contains(key)) {
to[key] = from[key];
}
}
}

TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector<std::shared_ptr<IncrementalParser>> parsers)
: TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant {
return this->write(s);
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {}
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {
m_pimpl->m_parsed_message["content"] = "";
}

CallbackTypeVariant TextParserStreamer::write(std::string message) {
CallbackTypeVariant TextParserStreamer::write(std::string delta_text) {
// When 'write' is called with string, it means new chunk of tokens is decoded into text

auto flushed_tokens = std::vector<int64_t>();
if (message.back() == '\n') {
if (delta_text.back() == '\n') {
// Flush all tokens
flushed_tokens.assign(m_tokens_cache.begin(), m_tokens_cache.end());
} else if (m_decoded_lengths.size() >= delay_n_tokens) {
Expand All @@ -177,13 +193,33 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) {
}
}

JsonContainer msg;
// Iterate over all parsers and apply them to the message
for (auto& parser: m_pimpl->m_parsers) {
message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens);
delta_text = parser->parse(msg, delta_text, flushed_tokens);
// Message can be modified inside parser, if parser for example extracted tool calling from message content
m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message;
}
return write(m_pimpl->m_parsed_message);
msg["content"] = delta_text;

// concatenate msg with m_parsed_message
concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content", "content", "tool_calls"});

// We have to put into DeltaMessage's "content" fields only chunks that belong to content.
// But into m_parsed_message["content"] we need to accumulate full content if m_keep_original_content == True
// and if m_keep_original_content == False only part that is outside reasoning tags and outside tool calls.

// fill in msg["content"] with delta_text
// The line below should be removed and it's function should be performed inside concatenate_json_containers
// m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + delta_text;


// TODO: on each for cycle iteration they receive an empty delta_message
// and pipe iterates though every parser. They should neither delete fields nor rewrite
// they should only append or add new fields.
// The only field is updated automaticall is "content"
// The remaining delta_text is put there.
// It's parsers responsibility to ensure fields are proper.
return write(msg);
}

JsonContainer TextParserStreamer::get_parsed_message() const {
Expand All @@ -192,6 +228,7 @@ JsonContainer TextParserStreamer::get_parsed_message() const {

void TextParserStreamer::reset() {
m_pimpl->m_parsed_message = JsonContainer();
m_pimpl->m_parsed_message["content"] = "";
for (auto& parser : m_pimpl->m_parsers) {
parser->reset();
}
Expand Down
9 changes: 7 additions & 2 deletions tests/cpp/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

using namespace ov::genai;

namespace ov::genai {
void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector<std::string> keys_to_concatenate);
}

TEST(ParserTest, test_llama3_parser_1) {
std::string prompt = R"(What's the weather in New York today?<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n[get_weather(location="New York, NY", unit="celsius")]<|eom_id|>)";
// By default content should keep original values.
Expand Down Expand Up @@ -92,12 +96,13 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) {
std::string ref_res = "First, I recognize that the question is asking for the sum of 2 and 1.\n\nI know that addition involves combining two numbers to find their total.\n\nStarting with 2, I add 1 to it.\n\n2 plus 1 equals 3.\n";

JsonContainer msg;

JsonContainer accumulated_msg;
for (int i = 1; i < input_stream.size(); i++) {
std::string delta_text = input_stream[i];
delta_text = parser.parse(msg, delta_text);
concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"});
}
ASSERT_EQ(msg["reasoning_content"], ref_res);
ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res);
}

TEST(ParserTest, test_custom_parser) {
Expand Down
Loading
Loading