Skip to content

Commit a786d21

Browse files
authored
Add support for saving query plans (#212)
* Add support for saving plans * support nds-h Signed-off-by: Jihoon Son <[email protected]> * update help message for skip_execution * use os.makedirs --------- Signed-off-by: Jihoon Son <[email protected]>
1 parent e028068 commit a786d21

File tree

2 files changed

+108
-18
lines changed

2 files changed

+108
-18
lines changed

nds-h/nds_h_power.py

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import os
3939
import sys
4040
import re
41+
import subprocess
4142

4243
parent_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..'))
4344

@@ -158,19 +159,41 @@ def deduplicate(column_names):
158159
return df.toDF(*dedup_col_names)
159160

160161

162+
def parse_explain_str(explain_str):
163+
plan_strs = explain_str.split('\n\n')
164+
plan_dict = {}
165+
for plan_str in plan_strs:
166+
if plan_str.startswith('== Optimized Logical Plan =='):
167+
plan_dict['logical'] = plan_str
168+
elif plan_str.startswith('== Physical Plan =='):
169+
plan_dict['physical'] = plan_str
170+
return plan_dict
171+
172+
161173
def run_one_query(spark_session,
162174
query,
163175
query_name,
164176
output_path,
165-
output_format):
177+
output_format,
178+
save_plan_path,
179+
plan_types,
180+
skip_execution):
166181
df = spark_session.sql(query)
167182
# query15_part1 and query15_part3 are create/drop view queries
168183
empty_output_queries = ['query15_part1', 'query15_part3']
169-
if query_name in empty_output_queries or not output_path:
170-
df.collect()
171-
else:
172-
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
173-
output_path + '/' + query_name)
184+
if not skip_execution:
185+
if query_name in empty_output_queries or not output_path:
186+
df.collect()
187+
else:
188+
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
189+
output_path + '/' + query_name)
190+
if save_plan_path:
191+
os.makedirs(save_plan_path, exist_ok=True)
192+
explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended')
193+
plans = parse_explain_str(explain_str)
194+
for plan_type in plan_types:
195+
with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f:
196+
f.write(plans[plan_type])
174197

175198

176199
def get_query_subset(query_dict, subset):
@@ -188,11 +211,14 @@ def run_query_stream(input_prefix,
188211
sub_queries,
189212
warmup_iterations,
190213
iterations,
214+
plan_types,
191215
input_format,
192216
output_path=None,
193217
keep_sc=False,
194218
output_format="parquet",
195-
json_summary_folder=None):
219+
json_summary_folder=None,
220+
save_plan_path=None,
221+
skip_execution=False):
196222
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
197223
for easy accessibility. TempView Creation time is also recorded.
198224
@@ -246,7 +272,10 @@ def run_query_stream(input_prefix,
246272
q_content,
247273
query_name,
248274
output_path,
249-
output_format)
275+
output_format,
276+
save_plan_path,
277+
plan_types,
278+
skip_execution)
250279
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
251280
query_times = summary['queryTimes']
252281
execution_time_list.append((spark_app_id, query_name, query_times[0]))
@@ -359,6 +388,19 @@ def load_properties(filename):
359388
type=int,
360389
help='Number of iterations for each query.',
361390
default=1)
391+
parser.add_argument('--save_plan_path',
392+
help='Save the execution plan of each query to the specified file. If --skip_execution is ' +
393+
'specified, the execution plan will be saved without executing the query.')
394+
parser.add_argument('--plan_types',
395+
type=lambda s: [x.strip() for x in s.split(',')],
396+
help='Comma separated list of plan types to save. ' +
397+
'e.g. "physical, logical". Default is "logical".',
398+
default='logical')
399+
parser.add_argument('--skip_execution',
400+
action='store_true',
401+
help='Skip the execution of the queries. This can be used in conjunction with ' +
402+
'--save_plan_path to only save the execution plans without running the queries.' +
403+
'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.')
362404
args = parser.parse_args()
363405
query_dict = gen_sql_from_stream(args.query_stream_file)
364406
run_query_stream(args.input_prefix,
@@ -368,8 +410,11 @@ def load_properties(filename):
368410
args.sub_queries,
369411
args.warmup_iterations,
370412
args.iterations,
413+
args.plan_types,
371414
args.input_format,
372415
args.output_prefix,
373416
args.keep_sc,
374417
args.output_format,
375-
args.json_summary_folder)
418+
args.json_summary_folder,
419+
args.save_plan_path,
420+
args.skip_execution)

nds/nds_power.py

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -163,20 +163,43 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list):
163163
return execution_time_list
164164

165165

166+
def parse_explain_str(explain_str):
167+
plan_strs = explain_str.split('\n\n')
168+
plan_dict = {}
169+
for plan_str in plan_strs:
170+
if plan_str.startswith('== Optimized Logical Plan =='):
171+
plan_dict['logical'] = plan_str
172+
elif plan_str.startswith('== Physical Plan =='):
173+
plan_dict['physical'] = plan_str
174+
return plan_dict
175+
176+
166177
def run_one_query(spark_session,
167178
profiler,
168179
query,
169180
query_name,
170181
output_path,
171-
output_format):
182+
output_format,
183+
save_plan_path,
184+
plan_types,
185+
skip_execution):
172186
with profiler(query_name=query_name):
173187
print(f"Running query {query_name}")
174188
df = spark_session.sql(query)
175-
if not output_path:
176-
df.collect()
177-
else:
178-
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
179-
output_path + '/' + query_name)
189+
if not skip_execution:
190+
if not output_path:
191+
df.collect()
192+
else:
193+
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
194+
output_path + '/' + query_name)
195+
if save_plan_path:
196+
os.makedirs(save_plan_path, exist_ok=True)
197+
explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended')
198+
plans = parse_explain_str(explain_str)
199+
for plan_type in plan_types:
200+
with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f:
201+
f.write(plans[plan_type])
202+
180203

181204
def ensure_valid_column_names(df: DataFrame):
182205
def is_column_start(char):
@@ -233,6 +256,7 @@ def run_query_stream(input_prefix,
233256
sub_queries,
234257
warmup_iterations,
235258
iterations,
259+
plan_types,
236260
input_format="parquet",
237261
use_decimal=True,
238262
output_path=None,
@@ -242,7 +266,9 @@ def run_query_stream(input_prefix,
242266
keep_sc=False,
243267
hive_external=False,
244268
allow_failure=False,
245-
profiling_hook=None):
269+
profiling_hook=None,
270+
save_plan_path=None,
271+
skip_execution=False):
246272
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
247273
for easy accesibility. TempView Creation time is also recorded.
248274
@@ -315,7 +341,10 @@ def run_query_stream(input_prefix,
315341
q_content,
316342
query_name,
317343
output_path,
318-
output_format)
344+
output_format,
345+
save_plan_path,
346+
plan_types,
347+
skip_execution)
319348
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
320349
query_times = summary['queryTimes']
321350
for query_time in query_times:
@@ -458,6 +487,19 @@ def load_properties(filename):
458487
type=int,
459488
help='Number of iterations for each query.',
460489
default=1)
490+
parser.add_argument('--save_plan_path',
491+
help='Save the execution plan of each query to the specified file. If --skip_execution is ' +
492+
'specified, the execution plan will be saved without executing the query.')
493+
parser.add_argument('--plan_types',
494+
type=lambda s: [x.strip() for x in s.split(',')],
495+
help='Comma separated list of plan types to save. ' +
496+
'e.g. "physical, logical". Default is "logical".',
497+
default='logical')
498+
parser.add_argument('--skip_execution',
499+
action='store_true',
500+
help='Skip the execution of the queries. This can be used in conjunction with ' +
501+
'--save_plan_path to only save the execution plans without running the queries.' +
502+
'Note that "spark.sql.adaptive.enabled" should be set to false to get GPU physical plans.')
461503
args = parser.parse_args()
462504
query_dict = gen_sql_from_stream(args.query_stream_file)
463505
run_query_stream(args.input_prefix,
@@ -468,6 +510,7 @@ def load_properties(filename):
468510
args.sub_queries,
469511
args.warmup_iterations,
470512
args.iterations,
513+
args.plan_types,
471514
args.input_format,
472515
not args.floats,
473516
args.output_prefix,
@@ -477,4 +520,6 @@ def load_properties(filename):
477520
args.keep_sc,
478521
args.hive,
479522
args.allow_failure,
480-
args.profiling_hook)
523+
args.profiling_hook,
524+
args.save_plan_path,
525+
args.skip_execution)

0 commit comments

Comments
 (0)