3838import os
3939import sys
4040import re
41+ import subprocess
4142
4243parent_dir = os .path .abspath (os .path .join (os .path .dirname (sys .argv [0 ]), '..' ))
4344
@@ -158,19 +159,41 @@ def deduplicate(column_names):
158159 return df .toDF (* dedup_col_names )
159160
160161
162+ def parse_explain_str (explain_str ):
163+ plan_strs = explain_str .split ('\n \n ' )
164+ plan_dict = {}
165+ for plan_str in plan_strs :
166+ if plan_str .startswith ('== Optimized Logical Plan ==' ):
167+ plan_dict ['logical' ] = plan_str
168+ elif plan_str .startswith ('== Physical Plan ==' ):
169+ plan_dict ['physical' ] = plan_str
170+ return plan_dict
171+
172+
161173def run_one_query (spark_session ,
162174 query ,
163175 query_name ,
164176 output_path ,
165- output_format ):
177+ output_format ,
178+ save_plan_path ,
179+ plan_types ,
180+ skip_execution ):
166181 df = spark_session .sql (query )
167182 # query15_part1 and query15_part3 are create/drop view queries
168183 empty_output_queries = ['query15_part1' , 'query15_part3' ]
169- if query_name in empty_output_queries or not output_path :
170- df .collect ()
171- else :
172- ensure_valid_column_names (df ).write .format (output_format ).mode ('overwrite' ).save (
173- output_path + '/' + query_name )
184+ if not skip_execution :
185+ if query_name in empty_output_queries or not output_path :
186+ df .collect ()
187+ else :
188+ ensure_valid_column_names (df ).write .format (output_format ).mode ('overwrite' ).save (
189+ output_path + '/' + query_name )
190+ if save_plan_path :
191+ subprocess .run (f"mkdir -p { save_plan_path } " , shell = True )
192+ explain_str = spark_session ._jvm .PythonSQLUtils .explainString (df ._jdf .queryExecution (), 'extended' )
193+ plans = parse_explain_str (explain_str )
194+ for plan_type in plan_types :
195+ with open (save_plan_path + '/' + query_name + "." + plan_type , 'w' ) as f :
196+ f .write (plans [plan_type ])
174197
175198
176199def get_query_subset (query_dict , subset ):
@@ -188,11 +211,14 @@ def run_query_stream(input_prefix,
188211 sub_queries ,
189212 warmup_iterations ,
190213 iterations ,
214+ plan_types ,
191215 input_format ,
192216 output_path = None ,
193217 keep_sc = False ,
194218 output_format = "parquet" ,
195- json_summary_folder = None ):
219+ json_summary_folder = None ,
220+ save_plan_path = None ,
221+ skip_execution = False ):
196222 """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
197223 for easy accessibility. TempView Creation time is also recorded.
198224
@@ -246,7 +272,10 @@ def run_query_stream(input_prefix,
246272 q_content ,
247273 query_name ,
248274 output_path ,
249- output_format )
275+ output_format ,
276+ save_plan_path ,
277+ plan_types ,
278+ skip_execution )
250279 print (f"Time taken: { summary ['queryTimes' ]} millis for { query_name } " )
251280 query_times = summary ['queryTimes' ]
252281 execution_time_list .append ((spark_app_id , query_name , query_times [0 ]))
@@ -359,6 +388,18 @@ def load_properties(filename):
359388 type = int ,
360389 help = 'Number of iterations for each query.' ,
361390 default = 1 )
391+ parser .add_argument ('--save_plan_path' ,
392+ help = 'Save the execution plan of each query to the specified file. If --skip_execution is ' +
393+ 'specified, the execution plan will be saved without executing the query.' )
394+ parser .add_argument ('--plan_types' ,
395+ type = lambda s : [x .strip () for x in s .split (',' )],
396+ help = 'Comma separated list of plan types to save. ' +
397+ 'e.g. "physical, logical". Default is "logical".' ,
398+ default = 'logical' )
399+ parser .add_argument ('--skip_execution' ,
400+ action = 'store_true' ,
401+ help = 'Skip the execution of the queries. This can be used in conjunction with ' +
402+ '--save_plan_path to only save the execution plans without running the queries.' )
362403 args = parser .parse_args ()
363404 query_dict = gen_sql_from_stream (args .query_stream_file )
364405 run_query_stream (args .input_prefix ,
@@ -368,8 +409,11 @@ def load_properties(filename):
368409 args .sub_queries ,
369410 args .warmup_iterations ,
370411 args .iterations ,
412+ args .plan_types ,
371413 args .input_format ,
372414 args .output_prefix ,
373415 args .keep_sc ,
374416 args .output_format ,
375- args .json_summary_folder )
417+ args .json_summary_folder ,
418+ args .save_plan_path ,
419+ args .skip_execution )
0 commit comments