@@ -790,6 +790,118 @@ void flb_test_forward_zstd()
790790 test_ctx_destroy (ctx );
791791}
792792
793+ static int cb_count_only (void * record , size_t size , void * data )
794+ {
795+ int n = get_output_num ();
796+ set_output_num (n + 1 );
797+ flb_free (record );
798+ return 0 ;
799+ }
800+
801+ void flb_test_threaded_forward_issue_10946 ()
802+ {
803+ struct flb_lib_out_cb cb = {0 };
804+ flb_ctx_t * ctx ;
805+ int in_ffd , out_ffd , ret ;
806+ int out_count ;
807+ flb_sockfd_t fd ;
808+ char * buf ;
809+ size_t size ;
810+ int root_type ;
811+ struct flb_processor * proc ;
812+ struct flb_processor_unit * pu ;
813+ struct cfl_variant v_key = {
814+ .type = CFL_VARIANT_STRING ,
815+ .data .as_string = "log"
816+ };
817+ struct cfl_variant v_mode = {
818+ .type = CFL_VARIANT_STRING ,
819+ .data .as_string = "partial_message"
820+ };
821+ char * json = "[\"logs\",1234567890,{\"log\":\"hello\"}]" ;
822+
823+ clear_output_num ();
824+
825+ cb .cb = cb_count_only ;
826+ cb .data = & out_count ;
827+
828+ /* Service */
829+ ctx = flb_create ();
830+ TEST_CHECK (ctx != NULL );
831+ flb_service_set (ctx ,
832+ "Flush" , "0.200000000" ,
833+ "Grace" , "1" ,
834+ "Log_Level" , "error" ,
835+ NULL );
836+
837+ in_ffd = flb_input (ctx , (char * ) "forward" , NULL );
838+ TEST_CHECK (in_ffd >= 0 );
839+ ret = flb_input_set (ctx , in_ffd ,
840+ "tag" , "logs" ,
841+ "threaded" , "true" ,
842+ NULL );
843+ TEST_CHECK (ret == 0 );
844+
845+ /* Attach a logs-processor: multiline (minimal settings).
846+ * This mirrors the YAML:
847+ * processors.logs:
848+ * - name: multiline
849+ * multiline.key_content: log
850+ * mode: partial_message
851+ */
852+ proc = flb_processor_create (ctx -> config , "ut" , NULL , 0 );
853+ TEST_CHECK (proc != NULL );
854+
855+ pu = flb_processor_unit_create (proc , FLB_PROCESSOR_LOGS , "multiline" );
856+ TEST_CHECK (pu != NULL );
857+
858+ ret = flb_processor_unit_set_property (pu , "multiline.key_content" , & v_key );
859+ TEST_CHECK (ret == 0 );
860+
861+ ret = flb_processor_unit_set_property (pu , "mode" , & v_mode );
862+ TEST_CHECK (ret == 0 );
863+
864+ ret = flb_input_set_processor (ctx , in_ffd , proc );
865+ TEST_CHECK (ret == 0 );
866+
867+ /* Output: lib -> count arrivals of tag 'logs' (after processors) */
868+ out_ffd = flb_output (ctx , (char * ) "lib" , (void * ) & cb );
869+ TEST_CHECK (out_ffd >= 0 );
870+ ret = flb_output_set (ctx , out_ffd ,
871+ "match" , "logs" ,
872+ "format" , "json" ,
873+ NULL );
874+ TEST_CHECK (ret == 0 );
875+
876+ /* Start engine */
877+ ret = flb_start (ctx );
878+ TEST_CHECK (ret == 0 );
879+
880+ /* Send a single Forward frame to 'logs' */
881+ fd = connect_tcp (NULL , -1 );
882+ TEST_CHECK (fd >= 0 );
883+
884+ /* ["logs", 1234567890, {"log":"hello"}] */
885+ ret = flb_pack_json (json , strlen (json ), & buf , & size , & root_type , NULL );
886+ TEST_CHECK (ret == 0 );
887+ TEST_CHECK (send (fd , buf , size , 0 ) == (ssize_t ) size );
888+ flb_free (buf );
889+
890+ /* Give it a moment to flush */
891+ flb_time_msleep (1500 );
892+
893+ /* With the fix, at least one record must arrive */
894+ out_count = get_output_num ();
895+ TEST_CHECK (out_count > 0 );
896+ if (!TEST_CHECK (out_count > 0 )) {
897+ TEST_MSG ("no outputs with threaded+multiline; emitter RB/collector likely missing" );
898+ }
899+
900+ /* Cleanup */
901+ flb_socket_close (fd );
902+ flb_stop (ctx );
903+ flb_destroy (ctx );
904+ }
793905
794906TEST_LIST = {
795907 {"forward" , flb_test_forward },
@@ -801,5 +913,6 @@ TEST_LIST = {
801913#endif
802914 {"forward_gzip" , flb_test_forward_gzip },
803915 {"forward_zstd" , flb_test_forward_zstd },
916+ {"issue_10946" , flb_test_threaded_forward_issue_10946 },
804917 {NULL , NULL }
805918};
0 commit comments