From 36cfba1529bc8a82f09ae278a7e72c65e2d6c687 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 17 Apr 2025 17:08:13 -0700 Subject: [PATCH 1/4] Add support for saving plans --- nds/nds_power.py | 62 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/nds/nds_power.py b/nds/nds_power.py index eef10b0..7038483 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -163,20 +163,43 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list): return execution_time_list +def parse_explain_str(explain_str): + plan_strs = explain_str.split('\n\n') + plan_dict = {} + for plan_str in plan_strs: + if plan_str.startswith('== Optimized Logical Plan =='): + plan_dict['logical'] = plan_str + elif plan_str.startswith('== Physical Plan =='): + plan_dict['physical'] = plan_str + return plan_dict + + def run_one_query(spark_session, profiler, query, query_name, output_path, - output_format): + output_format, + save_plan_path, + plan_types, + skip_execution): with profiler(query_name=query_name): print(f"Running query {query_name}") df = spark_session.sql(query) - if not output_path: - df.collect() - else: - ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + if not skip_execution: + if not output_path: + df.collect() + else: + ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( + output_path + '/' + query_name) + if save_plan_path: + subprocess.run(f"mkdir -p {save_plan_path}", shell=True) + explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') + plans = parse_explain_str(explain_str) + for plan_type in plan_types: + with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f: + f.write(plans[plan_type]) + def ensure_valid_column_names(df: DataFrame): def is_column_start(char): @@ -233,6 +256,7 @@ def run_query_stream(input_prefix, sub_queries, warmup_iterations, iterations, + plan_types, input_format="parquet", use_decimal=True, output_path=None, @@ -242,7 +266,9 @@ def run_query_stream(input_prefix, keep_sc=False, hive_external=False, allow_failure=False, - profiling_hook=None): + profiling_hook=None, + save_plan_path=None, + skip_execution=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accesibility. TempView Creation time is also recorded. @@ -315,7 +341,10 @@ def run_query_stream(input_prefix, q_content, query_name, output_path, - output_format) + output_format, + save_plan_path, + plan_types, + skip_execution) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] for query_time in query_times: @@ -458,6 +487,18 @@ def load_properties(filename): type=int, help='Number of iterations for each query.', default=1) + parser.add_argument('--save_plan_path', + help='Save the execution plan of each query to the specified file. If --skip_execution is ' + + 'specified, the execution plan will be saved without executing the query.') + parser.add_argument('--plan_types', + type=lambda s: [x.strip() for x in s.split(',')], + help='Comma separated list of plan types to save. ' + + 'e.g. "physical, logical". Default is "logical".', + default='logical') + parser.add_argument('--skip_execution', + action='store_true', + help='Skip the execution of the queries. This can be used in conjunction with ' + + '--save_plan_path to only save the execution plans without running the queries.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, @@ -468,6 +509,7 @@ def load_properties(filename): args.sub_queries, args.warmup_iterations, args.iterations, + args.plan_types, args.input_format, not args.floats, args.output_prefix, @@ -477,4 +519,6 @@ def load_properties(filename): args.keep_sc, args.hive, args.allow_failure, - args.profiling_hook) + args.profiling_hook, + args.save_plan_path, + args.skip_execution) From 7b5953f085b388d1c88828af0de6e511db5f7088 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 24 Apr 2025 13:46:57 -0700 Subject: [PATCH 2/4] support nds-h Signed-off-by: Jihoon Son --- nds-h/nds_h_power.py | 62 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index fd3f075..4c6cb34 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -38,6 +38,7 @@ import os import sys import re +import subprocess parent_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..')) @@ -158,19 +159,41 @@ def deduplicate(column_names): return df.toDF(*dedup_col_names) +def parse_explain_str(explain_str): + plan_strs = explain_str.split('\n\n') + plan_dict = {} + for plan_str in plan_strs: + if plan_str.startswith('== Optimized Logical Plan =='): + plan_dict['logical'] = plan_str + elif plan_str.startswith('== Physical Plan =='): + plan_dict['physical'] = plan_str + return plan_dict + + def run_one_query(spark_session, query, query_name, output_path, - output_format): + output_format, + save_plan_path, + plan_types, + skip_execution): df = spark_session.sql(query) # query15_part1 and query15_part3 are create/drop view queries empty_output_queries = ['query15_part1', 'query15_part3'] - if query_name in empty_output_queries or not output_path: - df.collect() - else: - ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + if not skip_execution: + if query_name in empty_output_queries or not output_path: + df.collect() + else: + ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( + output_path + '/' + query_name) + if save_plan_path: + subprocess.run(f"mkdir -p {save_plan_path}", shell=True) + explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') + plans = parse_explain_str(explain_str) + for plan_type in plan_types: + with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f: + f.write(plans[plan_type]) def get_query_subset(query_dict, subset): @@ -188,11 +211,14 @@ def run_query_stream(input_prefix, sub_queries, warmup_iterations, iterations, + plan_types, input_format, output_path=None, keep_sc=False, output_format="parquet", - json_summary_folder=None): + json_summary_folder=None, + save_plan_path=None, + skip_execution=False): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accessibility. TempView Creation time is also recorded. @@ -246,7 +272,10 @@ def run_query_stream(input_prefix, q_content, query_name, output_path, - output_format) + output_format, + save_plan_path, + plan_types, + skip_execution) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) @@ -359,6 +388,18 @@ def load_properties(filename): type=int, help='Number of iterations for each query.', default=1) + parser.add_argument('--save_plan_path', + help='Save the execution plan of each query to the specified file. If --skip_execution is ' + + 'specified, the execution plan will be saved without executing the query.') + parser.add_argument('--plan_types', + type=lambda s: [x.strip() for x in s.split(',')], + help='Comma separated list of plan types to save. ' + + 'e.g. "physical, logical". Default is "logical".', + default='logical') + parser.add_argument('--skip_execution', + action='store_true', + help='Skip the execution of the queries. This can be used in conjunction with ' + + '--save_plan_path to only save the execution plans without running the queries.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, @@ -368,8 +409,11 @@ def load_properties(filename): args.sub_queries, args.warmup_iterations, args.iterations, + args.plan_types, args.input_format, args.output_prefix, args.keep_sc, args.output_format, - args.json_summary_folder) + args.json_summary_folder, + args.save_plan_path, + args.skip_execution) From f9d9d9f4fab1cfa66f0873e56946410f33277386 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 28 Apr 2025 17:17:52 -0700 Subject: [PATCH 3/4] update help message for skip_execution --- nds-h/nds_h_power.py | 3 ++- nds/nds_power.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index 4c6cb34..2df1568 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -399,7 +399,8 @@ def load_properties(filename): parser.add_argument('--skip_execution', action='store_true', help='Skip the execution of the queries. This can be used in conjunction with ' + - '--save_plan_path to only save the execution plans without running the queries.') + '--save_plan_path to only save the execution plans without running the queries.' + + 'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, diff --git a/nds/nds_power.py b/nds/nds_power.py index 7038483..a2e294a 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -498,7 +498,8 @@ def load_properties(filename): parser.add_argument('--skip_execution', action='store_true', help='Skip the execution of the queries. This can be used in conjunction with ' + - '--save_plan_path to only save the execution plans without running the queries.') + '--save_plan_path to only save the execution plans without running the queries.' + + 'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.') args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) run_query_stream(args.input_prefix, From 183d6046da79a2124054b9bdcedb07511fe0c94d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 29 Apr 2025 10:49:20 -0700 Subject: [PATCH 4/4] use os.makedirs --- nds-h/nds_h_power.py | 2 +- nds/nds_power.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index 2df1568..29ca536 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -188,7 +188,7 @@ def run_one_query(spark_session, ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( output_path + '/' + query_name) if save_plan_path: - subprocess.run(f"mkdir -p {save_plan_path}", shell=True) + os.makedirs(save_plan_path, exist_ok=True) explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') plans = parse_explain_str(explain_str) for plan_type in plan_types: diff --git a/nds/nds_power.py b/nds/nds_power.py index a2e294a..7106870 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -193,7 +193,7 @@ def run_one_query(spark_session, ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( output_path + '/' + query_name) if save_plan_path: - subprocess.run(f"mkdir -p {save_plan_path}", shell=True) + os.makedirs(save_plan_path, exist_ok=True) explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended') plans = parse_explain_str(explain_str) for plan_type in plan_types: