Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cpp/include/openvino/genai/json_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ class OPENVINO_GENAI_EXPORTS JsonContainer {
*/
static JsonContainer from_json_string(const std::string& json_str);

/**
* @brief Concatenate two JsonContainers.
* @param dst Destination JsonContainer to append to
* @param src Source JsonContainer to append from
* @throw ov::Exception if keys in both containers are not strings.
*/
static void concatenate(JsonContainer& dst, const JsonContainer& src);

/**
* @brief Create JsonContainer as an empty JSON object.
*/
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/include/openvino/genai/parsers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class OPENVINO_GENAI_EXPORTS IncrementalParser {
* @return std::string Filtered text that should be added to the content
*/
virtual std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens = std::nullopt
) = 0;
Expand Down Expand Up @@ -222,7 +222,7 @@ class OPENVINO_GENAI_EXPORTS ReasoningIncrementalParser : public IncrementalPars
* @return std::string Filtered text with reasoning content processed according to configuration
Copy link
Contributor Author

@pavel-esir pavel-esir Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update doc. Replace message -> delta_message in @param message JsonContainer to store parsed results and reasoning metadata

*/
std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens = std::nullopt
) override;
Expand Down
18 changes: 18 additions & 0 deletions src/cpp/src/json_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,5 +409,23 @@ void* JsonContainer::_get_json_value_ptr() const {
return m_impl->get_json_value_ptr(m_path, AccessMode::Read);
}

void JsonContainer::concatenate(JsonContainer& dst, const JsonContainer& src) {
auto dst_ = static_cast<nlohmann::ordered_json*>(dst._get_json_value_ptr());
auto src_ = static_cast<const nlohmann::ordered_json*>(src._get_json_value_ptr());

for (auto it = src_->begin(); it != src_->end(); ++it) {
const auto& src_val = it.value();

if (!dst_->contains(it.key())) {
(*dst_)[it.key()] = src_val;
continue;
}

OPENVINO_ASSERT(src_val.is_string(), "JsonContainer concatenate supports only string concatenation for object values.");
auto& dst_val = (*dst_)[it.key()];
dst_val = dst_val.get<std::string>() + src_val.get<std::string>();
}
}

} // namespace genai
} // namespace ov
79 changes: 35 additions & 44 deletions src/cpp/src/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
bool m_think_tag_opened = false;
std::string m_text_cache = "";
bool m_deactivated = false;

/**
* @brief Ensure required fields exist in the message container.
*/
void ensure_message_fields(JsonContainer& message) {
if (!message.contains("reasoning_content")) {
message["reasoning_content"] = "";
}
if (!message.contains("content")) {
message["content"] = "";
}
}

/**
* @brief Find the longest suffix of text that is a prefix of the close tag.
Expand Down Expand Up @@ -61,8 +49,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk,
size_t open_idx, size_t close_idx, std::string& delta_text) {
// Extract reasoning content between tags
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(),
close_idx - (open_idx + m_open_tag.size())));
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The calculation close_idx - (open_idx + m_open_tag.size()) is duplicated from the removed line. Consider extracting this to a variable for clarity, e.g., size_t reasoning_length = close_idx - (open_idx + m_open_tag.size());

Suggested change
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
size_t reasoning_length = close_idx - (open_idx + m_open_tag.size());
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), reasoning_length));

Copilot uses AI. Check for mistakes.
message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
Expand All @@ -76,31 +64,37 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle the case where only the open tag is found.
*/
void handle_open_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
void handle_open_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
// Start accumulating reasoning content
reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size()));
message["reasoning_content"] = std::move(reason_str);
delta_message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size()));

if (!m_keep_original_content) {
delta_text.clear();
} else {
delta_text = txt_chunk;
}

m_think_tag_opened = true;
m_text_cache.clear();
}

/**
* @brief Handle the case where the close tag is found.
*/
void handle_close_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
void handle_close_tag(JsonContainer& delta_message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
// Append text before close tag to reasoning content
reason_str.append(txt_chunk.substr(0, close_idx));
message["reasoning_content"] = std::move(reason_str);
delta_message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx)));
auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
delta_message["content"] = content;

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
// Despite the fact that we put txt_chunk to delta_text it's correct.
// Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet
// and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts
// belonged to reasoning_content so we can discard them.
delta_text = content;
} else {
delta_text = txt_chunk;
}

m_text_cache.clear();
Expand All @@ -111,25 +105,25 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle accumulating text while inside reasoning tags.
*/
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, std::string& delta_text) {
void handle_inside_reasoning(JsonContainer& delta_message, std::string_view txt_chunk, std::string& delta_text) {
// Find if the end of txt_chunk might be the start of a close tag
const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk);

std::string reason_str;
if (num_chars_to_keep > 0) {
// Keep potential partial close tag in cache
m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep));
reason_str.append(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep);
delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep));
} else {
// No partial close tag, accumulate all text
reason_str.append(txt_chunk);
reason_str = txt_chunk;
m_text_cache.clear();
}

delta_message["reasoning_content"] = std::move(reason_str);
if (!m_keep_original_content) {
delta_text.clear();
}
message["reasoning_content"] = std::move(reason_str);
}

public:
Expand All @@ -145,7 +139,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
m_close_tag(close_tag) {}

std::string parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens
) {
Expand All @@ -157,13 +151,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
}
m_first_run = false;

ensure_message_fields(message);

const std::string txt_chunk = m_text_cache + delta_text;
std::string reason_str;
if (message.contains("reasoning_content")) {
reason_str = std::move(message["reasoning_content"].get_string());
}
std::string txt_chunk = m_text_cache + delta_text;

// Cache find() results to avoid redundant searches
const auto open_idx = txt_chunk.find(m_open_tag);
Expand All @@ -175,19 +164,21 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
? close_idx : std::string::npos;

if (close_idx_after_open != std::string::npos) {
handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text);
handle_complete_reasoning(delta_message, txt_chunk, open_idx, close_idx_after_open, delta_text);
} else {
handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text);
handle_open_tag(delta_message, txt_chunk, open_idx, delta_text);
}
} else if (m_think_tag_opened && close_idx != std::string::npos) {
handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text);
handle_close_tag(delta_message, txt_chunk, close_idx, delta_text);
} else if (m_think_tag_opened) {
handle_inside_reasoning(message, reason_str, txt_chunk, delta_text);
handle_inside_reasoning(delta_message, txt_chunk, delta_text);
} else {
// Think tag was not opened yet and not found in the current delta_text.
// Accumulate text in the cache to detect if <think> is split between several delta_text pieces.
m_text_cache += delta_text;
}
// Intentionally clear delta_text: no delta content is returned to the user during this phase
// (we are waiting for the <think> tag to be fully detected in the cache).
Comment on lines +181 to +182
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment above this line describes why delta_text is cleared, but it should be updated to reflect that this intentional clearing is part of the caching strategy while waiting for the full <think> tag.

Suggested change
// Intentionally clear delta_text: no delta content is returned to the user during this phase
// (we are waiting for the <think> tag to be fully detected in the cache).
// Intentionally clear delta_text as part of the caching strategy:
// no delta content is returned to the user during this phase because we are
// accumulating partial data in m_text_cache until the full <think> tag is detected.

Copilot uses AI. Check for mistakes.
delta_text.clear();

return delta_text;
}
Expand All @@ -207,11 +198,11 @@ ReasoningIncrementalParser::ReasoningIncrementalParser(bool expect_open_tag, boo
ReasoningIncrementalParser::~ReasoningIncrementalParser() = default;

std::string ReasoningIncrementalParser::parse(
JsonContainer& message,
JsonContainer& delta_message,
std::string& delta_text,
const std::optional<std::vector<int64_t>>& delta_tokens
) {
return m_impl->parse(message, delta_text, delta_tokens);
return m_impl->parse(delta_message, delta_text, delta_tokens);
}

void ReasoningIncrementalParser::reset() {
Expand Down
22 changes: 16 additions & 6 deletions src/cpp/src/text_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,21 @@ std::vector<std::shared_ptr<IncrementalParser>> m_parsers;
JsonContainer m_parsed_message;

TextParserStreamerImpl(std::vector<std::shared_ptr<IncrementalParser>> parsers) : m_parsers{parsers} {}

};

TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector<std::shared_ptr<IncrementalParser>> parsers)
: TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant {
return this->write(s);
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {}
}), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {
m_pimpl->m_parsed_message["content"] = "";
}

CallbackTypeVariant TextParserStreamer::write(std::string message) {
CallbackTypeVariant TextParserStreamer::write(std::string delta_text) {
// When 'write' is called with string, it means new chunk of tokens is decoded into text

auto flushed_tokens = std::vector<int64_t>();
if (message.back() == '\n') {
if (delta_text.back() == '\n') {
// Flush all tokens
flushed_tokens.assign(m_tokens_cache.begin(), m_tokens_cache.end());
} else if (m_decoded_lengths.size() >= delay_n_tokens) {
Expand All @@ -177,13 +180,19 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) {
}
}

// Every time we start to cycle through iterative parsers we create a new delta_message.
// Parsers should neither delete fields nor rewrite they should only append or add new fields.
// The only field is updated automaticall is "content": delta_text is put there.
JsonContainer delta_message;
// Iterate over all parsers and apply them to the message
for (auto& parser: m_pimpl->m_parsers) {
message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens);
delta_text = parser->parse(delta_message, delta_text, flushed_tokens);
// Message can be modified inside parser, if parser for example extracted tool calling from message content
m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message;
}
return write(m_pimpl->m_parsed_message);
delta_message["content"] = delta_text;

JsonContainer::concatenate(m_pimpl->m_parsed_message, delta_message);
return write(delta_message);
}

JsonContainer TextParserStreamer::get_parsed_message() const {
Expand All @@ -192,6 +201,7 @@ JsonContainer TextParserStreamer::get_parsed_message() const {

void TextParserStreamer::reset() {
m_pimpl->m_parsed_message = JsonContainer();
m_pimpl->m_parsed_message["content"] = "";
for (auto& parser : m_pimpl->m_parsers) {
parser->reset();
}
Expand Down
10 changes: 8 additions & 2 deletions tests/cpp/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

using namespace ov::genai;

// namespace ov::genai {
// void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector<std::string> keys_to_concatenate);
// }

TEST(ParserTest, test_llama3_parser_1) {
std::string prompt = R"(What's the weather in New York today?<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n[get_weather(location="New York, NY", unit="celsius")]<|eom_id|>)";
// By default content should keep original values.
Expand Down Expand Up @@ -92,12 +96,14 @@ TEST_F(DeepSeekR1ReasoningParserTest, ReasoningContentAccumulatesAcrossCalls) {
std::string ref_res = "First, I recognize that the question is asking for the sum of 2 and 1.\n\nI know that addition involves combining two numbers to find their total.\n\nStarting with 2, I add 1 to it.\n\n2 plus 1 equals 3.\n";

JsonContainer msg;

JsonContainer accumulated_msg;
for (int i = 1; i < input_stream.size(); i++) {
std::string delta_text = input_stream[i];
delta_text = parser.parse(msg, delta_text);
// concatenate_json_containers(msg, accumulated_msg, {"reasoning_content", "content"});
JsonContainer::concatenate(accumulated_msg, msg);
}
ASSERT_EQ(msg["reasoning_content"], ref_res);
ASSERT_EQ(accumulated_msg["reasoning_content"], ref_res);
}

TEST(ParserTest, test_custom_parser) {
Expand Down
Loading
Loading