@@ -957,45 +957,47 @@ def send_training_request_to_edges(self, active_edge_info_dict=None):
957957 run_config = self .request_json .get ("run_config" , {})
958958 run_params = run_config .get ("parameters" , {})
959959 job_yaml = run_params .get ("job_yaml" , {})
960+ job_yaml_default_none = run_params .get ("job_yaml" , None )
960961 computing = job_yaml .get ("computing" , {})
961962 request_num_gpus = computing .get ("minimum_num_gpus" , None )
962963
963964 logging .info ("Send training request to Edge ids: " + str (edge_id_list ))
964965
965- SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (active_edge_info_dict , show_gpu_list = True )
966-
967- # Match and assign gpus to each device
968- assigned_gpu_num_dict , assigned_gpu_ids_dict = SchedulerMatcher .match_and_assign_gpu_resources_to_devices (
969- request_num_gpus , edge_id_list , active_edge_info_dict )
970- if assigned_gpu_num_dict is None or assigned_gpu_ids_dict is None :
971- # If no resources available, send failed message to MLOps and send exception message to all edges.
972- gpu_count , gpu_available_count = SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (
973- active_edge_info_dict , should_print = True )
974- logging .error (f"No resources available."
975- f"Total available GPU count { gpu_available_count } is less than "
976- f"request GPU count { request_num_gpus } " )
977- self .mlops_metrics .report_server_id_status (
978- run_id , ServerConstants .MSG_MLOPS_SERVER_STATUS_FAILED , edge_id = self .edge_id ,
979- server_id = self .edge_id , server_agent_id = self .server_agent_id )
980- self .send_exit_train_with_exception_request_to_edges (edge_id_list , json .dumps (self .request_json ))
981- return
966+ if job_yaml_default_none is not None and request_num_gpus is not None :
967+ SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (active_edge_info_dict , show_gpu_list = True )
968+
969+ # Match and assign gpus to each device
970+ assigned_gpu_num_dict , assigned_gpu_ids_dict = SchedulerMatcher .match_and_assign_gpu_resources_to_devices (
971+ request_num_gpus , edge_id_list , active_edge_info_dict )
972+ if assigned_gpu_num_dict is None or assigned_gpu_ids_dict is None :
973+ # If no resources available, send failed message to MLOps and send exception message to all edges.
974+ gpu_count , gpu_available_count = SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (
975+ active_edge_info_dict , should_print = True )
976+ logging .error (f"No resources available."
977+ f"Total available GPU count { gpu_available_count } is less than "
978+ f"request GPU count { request_num_gpus } " )
979+ self .mlops_metrics .report_server_id_status (
980+ run_id , ServerConstants .MSG_MLOPS_SERVER_STATUS_FAILED , edge_id = self .edge_id ,
981+ server_id = self .edge_id , server_agent_id = self .server_agent_id )
982+ self .send_exit_train_with_exception_request_to_edges (edge_id_list , json .dumps (self .request_json ))
983+ return
982984
983- # Generate master node addr and port
984- master_node_addr , master_node_port = SchedulerMatcher .get_master_node_info (edge_id_list , active_edge_info_dict )
985-
986- # Generate new edge id list after matched
987- edge_id_list = SchedulerMatcher .generate_new_edge_list_for_gpu_matching (assigned_gpu_num_dict )
988- if len (edge_id_list ) <= 0 :
989- gpu_count , gpu_available_count = SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (
990- active_edge_info_dict , should_print = True )
991- logging .error (f"Request parameter for GPU num is invalid."
992- f"Total available GPU count { gpu_available_count } ."
993- f"Request GPU num { request_num_gpus } " )
994- self .mlops_metrics .report_server_id_status (
995- run_id , ServerConstants .MSG_MLOPS_SERVER_STATUS_FAILED , edge_id = self .edge_id ,
996- server_id = self .edge_id , server_agent_id = self .server_agent_id )
997- self .send_exit_train_with_exception_request_to_edges (edge_id_list , json .dumps (self .request_json ))
998- return
985+ # Generate master node addr and port
986+ master_node_addr , master_node_port = SchedulerMatcher .get_master_node_info (edge_id_list , active_edge_info_dict )
987+
988+ # Generate new edge id list after matched
989+ edge_id_list = SchedulerMatcher .generate_new_edge_list_for_gpu_matching (assigned_gpu_num_dict )
990+ if len (edge_id_list ) <= 0 :
991+ gpu_count , gpu_available_count = SchedulerMatcher .parse_and_print_gpu_info_for_all_edges (
992+ active_edge_info_dict , should_print = True )
993+ logging .error (f"Request parameter for GPU num is invalid."
994+ f"Total available GPU count { gpu_available_count } ."
995+ f"Request GPU num { request_num_gpus } " )
996+ self .mlops_metrics .report_server_id_status (
997+ run_id , ServerConstants .MSG_MLOPS_SERVER_STATUS_FAILED , edge_id = self .edge_id ,
998+ server_id = self .edge_id , server_agent_id = self .server_agent_id )
999+ self .send_exit_train_with_exception_request_to_edges (edge_id_list , json .dumps (self .request_json ))
1000+ return
9991001
10001002 client_rank = 1
10011003 for edge_id in edge_id_list :
@@ -1005,9 +1007,10 @@ def send_training_request_to_edges(self, active_edge_info_dict=None):
10051007 request_json ["client_rank" ] = client_rank
10061008 client_rank += 1
10071009
1008- request_json ["scheduler_match_info" ] = SchedulerMatcher .generate_match_info_for_scheduler (
1009- edge_id , edge_id_list , master_node_addr , master_node_port , assigned_gpu_num_dict , assigned_gpu_ids_dict
1010- )
1010+ if job_yaml_default_none is not None and request_num_gpus is not None :
1011+ request_json ["scheduler_match_info" ] = SchedulerMatcher .generate_match_info_for_scheduler (
1012+ edge_id , edge_id_list , master_node_addr , master_node_port , assigned_gpu_num_dict , assigned_gpu_ids_dict
1013+ )
10111014
10121015 self .client_mqtt_mgr .send_message (topic_start_train , json .dumps (request_json ))
10131016
0 commit comments