diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index fd3f075..29ca536 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -38,6 +38,7 @@ import os import sys import re +import subprocess parent_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..')) @@ -158,19 +159,41 @@ def deduplicate(column_names): return df.toDF(*dedup_col_names) +def parse_explain_str(explain_str): + plan_strs = explain_str.split('\n\n') + plan_dict = {} + for plan_str in plan_strs: + if plan_str.startswith('== Optimized Logical Plan =='): + plan_dict['logical'] = plan_str + elif plan_str.startswith('== Physical Plan =='): + plan_dict['physical'] = plan_str + return plan_dict + + def run_one_query(spark_session, query, query_name, output_path, - output_format): + output_format, + save_plan_path, + plan_types, + skip_execution): df = spark_session.sql(query) # query15_part1 and query15_part3 are create/drop view queries empty_output_queries = ['query15_part1', 'query15_part3'] - if query_name in empty_output_queries or not output_path: - df.collect() - else: - ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + if not skip_execution: + if query_name in empty_output_queries or not output_path: + df.collect() + else: + ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( + output_path + '/' + query_name) + if save_plan_path: + os.makedirs(save_plan_path, exist_ok=True) + explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') + plans = parse_explain_str(explain_str) + for plan_type in plan_types: + with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f: + f.write(plans[plan_type]) def get_query_subset(query_dict, subset): @@ -188,11 +211,14 @@ def run_query_stream(input_prefix, sub_queries, warmup_iterations, iterations, + plan_types, input_format, output_path=None, keep_sc=False, output_format="parquet", - json_summary_folder=None): + json_summary_folder=None, + save_plan_path=None, + skip_execution=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accessibility. TempView Creation time is also recorded. @@ -246,7 +272,10 @@ def run_query_stream(input_prefix, q_content, query_name, output_path, - output_format) + output_format, + save_plan_path, + plan_types, + skip_execution) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) @@ -359,6 +388,19 @@ def load_properties(filename): type=int, help='Number of iterations for each query.', default=1) + parser.add_argument('--save_plan_path', + help='Save the execution plan of each query to the specified file. If --skip_execution is ' + + 'specified, the execution plan will be saved without executing the query.') + parser.add_argument('--plan_types', + type=lambda s: [x.strip() for x in s.split(',')], + help='Comma separated list of plan types to save. ' + + 'e.g. "physical, logical". Default is "logical".', + default='logical') + parser.add_argument('--skip_execution', + action='store_true', + help='Skip the execution of the queries. This can be used in conjunction with ' + + '--save_plan_path to only save the execution plans without running the queries.' + + 'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, @@ -368,8 +410,11 @@ def load_properties(filename): args.sub_queries, args.warmup_iterations, args.iterations, + args.plan_types, args.input_format, args.output_prefix, args.keep_sc, args.output_format, - args.json_summary_folder) + args.json_summary_folder, + args.save_plan_path, + args.skip_execution) diff --git a/nds/nds_power.py b/nds/nds_power.py index eef10b0..7106870 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -163,20 +163,43 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list): return execution_time_list +def parse_explain_str(explain_str): + plan_strs = explain_str.split('\n\n') + plan_dict = {} + for plan_str in plan_strs: + if plan_str.startswith('== Optimized Logical Plan =='): + plan_dict['logical'] = plan_str + elif plan_str.startswith('== Physical Plan =='): + plan_dict['physical'] = plan_str + return plan_dict + + def run_one_query(spark_session, profiler, query, query_name, output_path, - output_format): + output_format, + save_plan_path, + plan_types, + skip_execution): with profiler(query_name=query_name): print(f"Running query {query_name}") df = spark_session.sql(query) - if not output_path: - df.collect() - else: - ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + if not skip_execution: + if not output_path: + df.collect() + else: + ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( + output_path + '/' + query_name) + if save_plan_path: + os.makedirs(save_plan_path, exist_ok=True) + explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') + plans = parse_explain_str(explain_str) + for plan_type in plan_types: + with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f: + f.write(plans[plan_type]) + def ensure_valid_column_names(df: DataFrame): def is_column_start(char): @@ -233,6 +256,7 @@ def run_query_stream(input_prefix, sub_queries, warmup_iterations, iterations, + plan_types, input_format="parquet", use_decimal=True, output_path=None, @@ -242,7 +266,9 @@ def run_query_stream(input_prefix, keep_sc=False, hive_external=False, allow_failure=False, - profiling_hook=None): + profiling_hook=None, + save_plan_path=None, + skip_execution=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accesibility. TempView Creation time is also recorded. @@ -315,7 +341,10 @@ def run_query_stream(input_prefix, q_content, query_name, output_path, - output_format) + output_format, + save_plan_path, + plan_types, + skip_execution) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] for query_time in query_times: @@ -458,6 +487,19 @@ def load_properties(filename): type=int, help='Number of iterations for each query.', default=1) + parser.add_argument('--save_plan_path', + help='Save the execution plan of each query to the specified file. If --skip_execution is ' + + 'specified, the execution plan will be saved without executing the query.') + parser.add_argument('--plan_types', + type=lambda s: [x.strip() for x in s.split(',')], + help='Comma separated list of plan types to save. ' + + 'e.g. "physical, logical". Default is "logical".', + default='logical') + parser.add_argument('--skip_execution', + action='store_true', + help='Skip the execution of the queries. This can be used in conjunction with ' + + '--save_plan_path to only save the execution plans without running the queries.' + + 'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, @@ -468,6 +510,7 @@ def load_properties(filename): args.sub_queries, args.warmup_iterations, args.iterations, + args.plan_types, args.input_format, not args.floats, args.output_prefix, @@ -477,4 +520,6 @@ def load_properties(filename): args.keep_sc, args.hive, args.allow_failure, - args.profiling_hook) + args.profiling_hook, + args.save_plan_path, + args.skip_execution)