Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 15 additions & 34 deletions src/cpp/src/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,10 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
* @brief Handle the case where both open and close tags are found in the same chunk.
*/
void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk,
size_t open_idx, size_t close_idx, std::string& delta_text) {
size_t open_idx, size_t close_idx) {
// Extract reasoning content between tags
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(),
close_idx - (open_idx + m_open_tag.size())));

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
}
message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size())));
message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));

m_think_tag_opened = false;
m_deactivated = true;
Expand All @@ -77,31 +73,22 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
* @brief Handle the case where only the open tag is found.
*/
void handle_open_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t open_idx, std::string& delta_text) {
std::string_view txt_chunk, size_t open_idx) {
// Start accumulating reasoning content
reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size()));
message["reasoning_content"] = std::move(reason_str);

if (!m_keep_original_content) {
delta_text.clear();
}


m_think_tag_opened = true;
m_text_cache.clear();
}

/**
* @brief Handle the case where the close tag is found.
*/
void handle_close_tag(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, size_t close_idx, std::string& delta_text) {
void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx) {
// Append text before close tag to reasoning content
reason_str.append(txt_chunk.substr(0, close_idx));
message["reasoning_content"] = std::move(reason_str);

if (!m_keep_original_content) {
delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));
}
message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx)));
message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size()));;

m_text_cache.clear();
m_think_tag_opened = false;
Expand All @@ -111,8 +98,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
/**
* @brief Handle accumulating text while inside reasoning tags.
*/
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str,
std::string_view txt_chunk, std::string& delta_text) {
void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk) {
// Find if the end of txt_chunk might be the start of a close tag
const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk);

Expand All @@ -126,9 +112,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
m_text_cache.clear();
}

if (!m_keep_original_content) {
delta_text.clear();
}
message["reasoning_content"] = std::move(reason_str);
}

Expand All @@ -150,6 +133,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
const std::optional<std::vector<int64_t>>& delta_tokens
) {
if (m_deactivated) {
message["content"] = delta_text;
return delta_text;
}
if (!m_expect_open_tag && m_first_run) {
Expand All @@ -160,10 +144,7 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
ensure_message_fields(message);

const std::string txt_chunk = m_text_cache + delta_text;
std::string reason_str;
if (message.contains("reasoning_content")) {
reason_str = std::move(message["reasoning_content"].get_string());
}
std::string reason_str = std::move(message["reasoning_content"].get_string());

// Cache find() results to avoid redundant searches
const auto open_idx = txt_chunk.find(m_open_tag);
Expand All @@ -175,14 +156,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl {
? close_idx : std::string::npos;

if (close_idx_after_open != std::string::npos) {
handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open, delta_text);
handle_complete_reasoning(message, txt_chunk, open_idx, close_idx_after_open);
} else {
handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text);
handle_open_tag(message, reason_str, txt_chunk, open_idx);
}
} else if (m_think_tag_opened && close_idx != std::string::npos) {
handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text);
handle_close_tag(message, txt_chunk, close_idx);
} else if (m_think_tag_opened) {
handle_inside_reasoning(message, reason_str, txt_chunk, delta_text);
handle_inside_reasoning(message, reason_str, txt_chunk);
} else {
// Think tag was not opened yet and not found in the current delta_text.
// Accumulate text in the cache to detect if <think> is split between several delta_text pieces.
Expand Down
26 changes: 23 additions & 3 deletions src/cpp/src/text_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,23 @@ std::vector<std::shared_ptr<IncrementalParser>> m_parsers;
JsonContainer m_parsed_message;

TextParserStreamerImpl(std::vector<std::shared_ptr<IncrementalParser>> parsers) : m_parsers{parsers} {}

};

void concatenate_json_containers(JsonContainer& from, const JsonContainer& to, std::vector<std::string> keys_to_concatenate) {
for (const auto& key : keys_to_concatenate) {
if (to.contains(key) && from.contains(key)) {
// If both are strings, concatenate
if (to[key].is_string() && from[key].is_string()) {
to[key] = to[key].get_string() + from[key].get_string();
}
} else if (from.contains(key)) {
auto r = from[key];
to[key] = from[key];
}
}
}

TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector<std::shared_ptr<IncrementalParser>> parsers)
: TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant {
return this->write(s);
Expand Down Expand Up @@ -177,13 +192,18 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) {
}
}

JsonContainer msg;
// Iterate over all parsers and apply them to the message
for (auto& parser: m_pimpl->m_parsers) {
message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens);
message = parser->parse(msg, message, flushed_tokens);
// Message can be modified inside parser, if parser for example extracted tool calling from message content
m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message;
}
return write(m_pimpl->m_parsed_message);

// concatenate msg with m_parsed_message
concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"content", "reasoning_content"});

// return write(m_pimpl->m_parsed_message);
return write(msg);
}

JsonContainer TextParserStreamer::get_parsed_message() const {
Expand Down
14 changes: 7 additions & 7 deletions tests/python_tests/test_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from utils.hugging_face import convert_and_save_tokenizer, download_and_convert_model
from utils.ov_genai_pipelines import create_ov_pipeline
import pytest
from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser
from openvino_genai import Tokenizer, IncrementalParser, Parser, TextParserStreamer, StreamingStatus, Llama3JsonToolParser, Phi4ReasoningParser, ReasoningParser, Phi4ReasoningIncrementalParser, DeepSeekR1ReasoningIncrementalParser, GenerationConfig, ReasoningIncrementalParser
from transformers import AutoTokenizer
import re

Expand Down Expand Up @@ -51,13 +51,13 @@ def write(self, message):
return StreamingStatus.RUNNING
streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()])

msg = {}
for subword in stream_string:
streamer._write(subword)

think_content = answer.split("</think>")[0].replace("<think>", "")
content = answer


msg = streamer.get_parsed_message()
assert msg['reasoning_content'] == think_content
assert msg['content'] == content

Expand Down Expand Up @@ -161,17 +161,17 @@ def test_incremental_phi4_reason_parser_2(hf_ov_genai_models, split_answer):

class CustomStreamer(TextParserStreamer):
def write(self, message):
msg.update(message)
# will be accumulated automatically inside streamer
return StreamingStatus.RUNNING
streamer = CustomStreamer(genai_tokenizer, parsers=[Phi4ReasoningIncrementalParser()])

msg = {}
for subword in split_answer:
streamer._write(subword)

think_content = (''.join(split_answer)).split("</think>")[0].replace("<think>", "")
content = ''.join(split_answer)
content = (''.join(split_answer).split("</think>")[1])

msg = streamer.get_parsed_message()
assert msg['reasoning_content'] == think_content
assert msg['content'] == content

Expand Down Expand Up @@ -378,7 +378,7 @@ def write(self, message):
streamer = CustomStreamer(tok, parsers=[Phi4ReasoningIncrementalParser()])

prompt = "Please say \"hello\""
res = pipe.generate([prompt], max_new_tokens=600, parsers=[Phi4ReasoningParser()])
res = pipe.generate([prompt], max_new_tokens=600, parsers=[ReasoningParser(keep_original_content=False)])
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parser class 'ReasoningParser' is not imported. The removed import at line 7 shows 'Phi4ReasoningParser' was available, but 'ReasoningParser' is not in the import list. This will cause a NameError at runtime.

Copilot uses AI. Check for mistakes.

# extract manually reasoning content from the parsed result
content = res.texts[0]
Expand Down
Loading