-
Notifications
You must be signed in to change notification settings - Fork 302
write chunks in TextParserStreamer #3025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,18 +20,6 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| bool m_think_tag_opened = false; | ||||||||||||
| std::string m_text_cache = ""; | ||||||||||||
| bool m_deactivated = false; | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * @brief Ensure required fields exist in the message container. | ||||||||||||
| */ | ||||||||||||
| void ensure_message_fields(JsonContainer& message) { | ||||||||||||
| if (!message.contains("reasoning_content")) { | ||||||||||||
| message["reasoning_content"] = ""; | ||||||||||||
| } | ||||||||||||
| if (!message.contains("content")) { | ||||||||||||
| message["content"] = ""; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * @brief Find the longest suffix of text that is a prefix of the close tag. | ||||||||||||
|
|
@@ -61,8 +49,8 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| void handle_complete_reasoning(JsonContainer& message, std::string_view txt_chunk, | ||||||||||||
| size_t open_idx, size_t close_idx, std::string& delta_text) { | ||||||||||||
| // Extract reasoning content between tags | ||||||||||||
| message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), | ||||||||||||
| close_idx - (open_idx + m_open_tag.size()))); | ||||||||||||
| message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size(), close_idx - (open_idx + m_open_tag.size()))); | ||||||||||||
| message["content"] = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); | ||||||||||||
|
|
||||||||||||
| if (!m_keep_original_content) { | ||||||||||||
| delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); | ||||||||||||
|
|
@@ -79,28 +67,35 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| void handle_open_tag(JsonContainer& message, std::string& reason_str, | ||||||||||||
| std::string_view txt_chunk, size_t open_idx, std::string& delta_text) { | ||||||||||||
| // Start accumulating reasoning content | ||||||||||||
| reason_str.append(txt_chunk.substr(open_idx + m_open_tag.size())); | ||||||||||||
| message["reasoning_content"] = std::move(reason_str); | ||||||||||||
| message["reasoning_content"] = std::string(txt_chunk.substr(open_idx + m_open_tag.size())); | ||||||||||||
|
|
||||||||||||
| if (!m_keep_original_content) { | ||||||||||||
| delta_text.clear(); | ||||||||||||
| } else { | ||||||||||||
| delta_text = txt_chunk; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| m_think_tag_opened = true; | ||||||||||||
| m_text_cache.clear(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /** | ||||||||||||
| * @brief Handle the case where the close tag is found. | ||||||||||||
| */ | ||||||||||||
| void handle_close_tag(JsonContainer& message, std::string& reason_str, | ||||||||||||
| std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { | ||||||||||||
| void handle_close_tag(JsonContainer& message, std::string_view txt_chunk, size_t close_idx, std::string& delta_text) { | ||||||||||||
| // Append text before close tag to reasoning content | ||||||||||||
| reason_str.append(txt_chunk.substr(0, close_idx)); | ||||||||||||
| message["reasoning_content"] = std::move(reason_str); | ||||||||||||
| message["reasoning_content"] = std::move(std::string(txt_chunk.substr(0, close_idx))); | ||||||||||||
| auto content = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); | ||||||||||||
| message["content"] = content; | ||||||||||||
|
|
||||||||||||
| if (!m_keep_original_content) { | ||||||||||||
| delta_text = std::string(txt_chunk.substr(close_idx + m_close_tag.size())); | ||||||||||||
| // Despite the fact that we put txt_chung to delta_text it's correct. | ||||||||||||
| // Since txt_chunk contains some cached parts from the previous calls that were not yet processed yet | ||||||||||||
| // and we kept them in cache until we decide what to do with them. Here we definitely know that that cached parts | ||||||||||||
| // belonged to reasoning_content so we can discard them. | ||||||||||||
| delta_text = content; | ||||||||||||
| } else { | ||||||||||||
| delta_text = txt_chunk; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| m_text_cache.clear(); | ||||||||||||
|
|
@@ -111,25 +106,24 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| /** | ||||||||||||
| * @brief Handle accumulating text while inside reasoning tags. | ||||||||||||
| */ | ||||||||||||
| void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, | ||||||||||||
| std::string_view txt_chunk, std::string& delta_text) { | ||||||||||||
| void handle_inside_reasoning(JsonContainer& message, std::string& reason_str, std::string_view txt_chunk, std::string& delta_text) { | ||||||||||||
| // Find if the end of txt_chunk might be the start of a close tag | ||||||||||||
| const size_t num_chars_to_keep = find_close_tag_prefix_length(txt_chunk); | ||||||||||||
|
|
||||||||||||
| if (num_chars_to_keep > 0) { | ||||||||||||
| // Keep potential partial close tag in cache | ||||||||||||
| m_text_cache = std::string(txt_chunk.substr(txt_chunk.size() - num_chars_to_keep)); | ||||||||||||
| reason_str.append(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); | ||||||||||||
| reason_str = txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep); | ||||||||||||
|
||||||||||||
| delta_text = std::string(txt_chunk.substr(0, txt_chunk.size() - num_chars_to_keep)); | ||||||||||||
| } else { | ||||||||||||
| // No partial close tag, accumulate all text | ||||||||||||
| reason_str.append(txt_chunk); | ||||||||||||
| reason_str = txt_chunk; | ||||||||||||
|
||||||||||||
| m_text_cache.clear(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| message["reasoning_content"] = std::move(reason_str); | ||||||||||||
| if (!m_keep_original_content) { | ||||||||||||
| delta_text.clear(); | ||||||||||||
| } | ||||||||||||
| message["reasoning_content"] = std::move(reason_str); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| public: | ||||||||||||
|
|
@@ -157,13 +151,9 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| } | ||||||||||||
| m_first_run = false; | ||||||||||||
|
|
||||||||||||
| ensure_message_fields(message); | ||||||||||||
|
|
||||||||||||
| const std::string txt_chunk = m_text_cache + delta_text; | ||||||||||||
| std::string reason_str; | ||||||||||||
| if (message.contains("reasoning_content")) { | ||||||||||||
| reason_str = std::move(message["reasoning_content"].get_string()); | ||||||||||||
| } | ||||||||||||
| std::string txt_chunk = m_text_cache + delta_text; | ||||||||||||
| std::string reason_str = message.contains("reasoning_content") ? std::move(message["reasoning_content"].get_string()) : ""; | ||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
|
|
||||||||||||
| // Cache find() results to avoid redundant searches | ||||||||||||
| const auto open_idx = txt_chunk.find(m_open_tag); | ||||||||||||
|
|
@@ -180,13 +170,14 @@ class ReasoningIncrementalParser::ReasoningParserImpl { | |||||||||||
| handle_open_tag(message, reason_str, txt_chunk, open_idx, delta_text); | ||||||||||||
| } | ||||||||||||
| } else if (m_think_tag_opened && close_idx != std::string::npos) { | ||||||||||||
| handle_close_tag(message, reason_str, txt_chunk, close_idx, delta_text); | ||||||||||||
| handle_close_tag(message, txt_chunk, close_idx, delta_text); | ||||||||||||
| } else if (m_think_tag_opened) { | ||||||||||||
| handle_inside_reasoning(message, reason_str, txt_chunk, delta_text); | ||||||||||||
| } else { | ||||||||||||
| // Think tag was not opened yet and not found in the current delta_text. | ||||||||||||
| // Accumulate text in the cache to detect if <think> is split between several delta_text pieces. | ||||||||||||
| m_text_cache += delta_text; | ||||||||||||
| delta_text.clear(); | ||||||||||||
| } | ||||||||||||
|
Comment on lines
+180
to
181
|
||||||||||||
| delta_text.clear(); | |
| } | |
| // Intentionally clear delta_text: no delta content is returned to the user during this phase | |
| // (we are waiting for the <think> tag to be fully detected in the cache). | |
| delta_text.clear(); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,18 +141,34 @@ std::vector<std::shared_ptr<IncrementalParser>> m_parsers; | |
| JsonContainer m_parsed_message; | ||
|
|
||
| TextParserStreamerImpl(std::vector<std::shared_ptr<IncrementalParser>> parsers) : m_parsers{parsers} {} | ||
|
|
||
| }; | ||
|
|
||
| void concatenate_json_containers(const JsonContainer& from, JsonContainer& to, std::vector<std::string> keys_to_concatenate) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is better to have it as the json container method. |
||
| for (const auto& key : keys_to_concatenate) { | ||
| if (to.contains(key) && from.contains(key)) { | ||
| // If both are strings, concatenate | ||
| if (to[key].is_string() && from[key].is_string()) { | ||
| to[key] = to[key].get_string() + from[key].get_string(); | ||
| } | ||
| } else if (from.contains(key)) { | ||
| to[key] = from[key]; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| TextParserStreamer::TextParserStreamer(const Tokenizer& tokenizer, std::vector<std::shared_ptr<IncrementalParser>> parsers) | ||
| : TextStreamer(tokenizer, [this](std::string s) -> CallbackTypeVariant { | ||
| return this->write(s); | ||
| }), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} {} | ||
| }), m_pimpl{std::make_unique<TextParserStreamerImpl>(parsers)} { | ||
| m_pimpl->m_parsed_message["content"] = ""; | ||
| } | ||
|
|
||
| CallbackTypeVariant TextParserStreamer::write(std::string message) { | ||
| CallbackTypeVariant TextParserStreamer::write(std::string delta_text) { | ||
| // When 'write' is called with string, it means new chunk of tokens is decoded into text | ||
|
|
||
| auto flushed_tokens = std::vector<int64_t>(); | ||
| if (message.back() == '\n') { | ||
| if (delta_text.back() == '\n') { | ||
| // Flush all tokens | ||
| flushed_tokens.assign(m_tokens_cache.begin(), m_tokens_cache.end()); | ||
| } else if (m_decoded_lengths.size() >= delay_n_tokens) { | ||
|
|
@@ -177,13 +193,33 @@ CallbackTypeVariant TextParserStreamer::write(std::string message) { | |
| } | ||
| } | ||
|
|
||
| JsonContainer msg; | ||
| // Iterate over all parsers and apply them to the message | ||
| for (auto& parser: m_pimpl->m_parsers) { | ||
| message = parser->parse(m_pimpl->m_parsed_message, message, flushed_tokens); | ||
| delta_text = parser->parse(msg, delta_text, flushed_tokens); | ||
| // Message can be modified inside parser, if parser for example extracted tool calling from message content | ||
| m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + message; | ||
| } | ||
| return write(m_pimpl->m_parsed_message); | ||
| msg["content"] = delta_text; | ||
|
|
||
| // concatenate msg with m_parsed_message | ||
| concatenate_json_containers(msg, m_pimpl->m_parsed_message, {"reasoning_content", "content", "tool_calls"}); | ||
|
|
||
| // We have to put into DeltaMessage's "content" fields only chunks that belong to content. | ||
| // But into m_parsed_message["content"] we need to accumulate full content if m_keep_original_content == True | ||
| // and if m_keep_original_content == False only part that is outside reasoning tags and outside tool calls. | ||
|
|
||
| // fill in msg["content"] with delta_text | ||
| // The line below should be removed and it's function should be performed inside concatenate_json_containers | ||
| // m_pimpl->m_parsed_message["content"] = m_pimpl->m_parsed_message["content"].get_string() + delta_text; | ||
|
|
||
|
|
||
| // TODO: on each for cycle iteration they receive an empty delta_message | ||
| // and pipe iterates though every parser. They should neither delete fields nor rewrite | ||
| // they should only append or add new fields. | ||
| // The only field is updated automaticall is "content" | ||
| // The remaining delta_text is put there. | ||
| // It's parsers responsibility to ensure fields are proper. | ||
| return write(msg); | ||
| } | ||
|
|
||
| JsonContainer TextParserStreamer::get_parsed_message() const { | ||
|
|
@@ -192,6 +228,7 @@ JsonContainer TextParserStreamer::get_parsed_message() const { | |
|
|
||
| void TextParserStreamer::reset() { | ||
| m_pimpl->m_parsed_message = JsonContainer(); | ||
| m_pimpl->m_parsed_message["content"] = ""; | ||
| for (auto& parser : m_pimpl->m_parsers) { | ||
| parser->reset(); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'txt_chung' to 'txt_chunk'.