File tree Expand file tree Collapse file tree 4 files changed +37
-9
lines changed Expand file tree Collapse file tree 4 files changed +37
-9
lines changed Original file line number Diff line number Diff line change @@ -46,6 +46,11 @@ void FlowUnitGroup::InitTrace() {
4646 }
4747}
4848
49+ uint32_t FlowUnitGroup::GetBatchSize () const
50+ {
51+ return batch_size_;
52+ }
53+
4954std::shared_ptr<TraceSlice> FlowUnitGroup::StartTrace (
5055 FUExecContextList &exec_ctx_list) {
5156 std::call_once (trace_init_flag_, &FlowUnitGroup::InitTrace, this );
Original file line number Diff line number Diff line change @@ -414,6 +414,9 @@ std::shared_ptr<FlowUnit> FlowUnitManager::CreateSingleFlowUnit(
414414 return nullptr ;
415415 }
416416
417+ MBLOG_INFO << " max_executor_thread_num: " << max_executor_thread_num;
418+ device->GetDeviceExecutor ()->SetThreadCount (max_executor_thread_num);
419+
417420 flowunit->SetBindDevice (device);
418421 std::vector<FlowUnitInput> &in_list = flowunit_desc->GetFlowUnitInput ();
419422 for (auto &in_item : in_list) {
Original file line number Diff line number Diff line change @@ -762,30 +762,48 @@ void Node::CleanDataContext() {
762762}
763763
764764Status Node::Run (RunType type) {
765+
765766 std::list<std::shared_ptr<FlowUnitDataContext>> data_ctx_list;
767+ size_t process_count = 0 ;
766768 auto ret = Recv (type, data_ctx_list);
767- if (!ret) {
768- return ret;
769- }
770769
771- ret = Process (data_ctx_list);
772770 if (!ret) {
773771 return ret;
774772 }
775773
776- if (!GetOutputNames ().empty ()) {
777- ret = Send (data_ctx_list);
774+ std::list<std::shared_ptr<FlowUnitDataContext>> process_ctx_list;
775+
776+ for (auto & ctx: data_ctx_list){
777+
778+ process_count++;
779+ process_ctx_list.push_back (ctx);
780+
781+ if (process_ctx_list.size () < flowunit_group_->GetBatchSize ()){
782+ if (process_count < data_ctx_list.size ()){
783+ continue ;
784+ }
785+ }
786+
787+ ret = Process (process_ctx_list);
778788 if (!ret) {
779789 return ret;
780790 }
781- } else {
782- SetLastError (data_ctx_list);
791+
792+ if (!GetOutputNames ().empty ()) {
793+ ret = Send (process_ctx_list);
794+ if (!ret) {
795+ return ret;
796+ }
797+ } else {
798+ SetLastError (process_ctx_list);
799+ }
800+
801+ process_ctx_list.clear ();
783802 }
784803
785804 Clean (data_ctx_list);
786805 return STATUS_SUCCESS;
787806}
788-
789807void Node::SetLastError (
790808 std::list<std::shared_ptr<FlowUnitDataContext>>& data_ctx_list) {
791809 for (auto & data_ctx : data_ctx_list) {
Original file line number Diff line number Diff line change @@ -64,6 +64,8 @@ class FlowUnitGroup {
6464
6565 Status Close ();
6666
67+ uint32_t GetBatchSize () const ;
68+
6769 private:
6870 std::weak_ptr<Node> node_;
6971 uint32_t batch_size_;
You can’t perform that action at this time.
0 commit comments