@@ -163,20 +163,42 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list):
163163 return execution_time_list
164164
165165
166+ def parse_explain_str (explain_str ):
167+ plan_strs = explain_str .split ('\n \n ' )
168+ plan_dict = {}
169+ for plan_str in plan_strs :
170+ if plan_str .startswith ('== Optimized Logical Plan ==' ):
171+ plan_dict ['logical' ] = plan_str
172+ elif plan_str .startswith ('== Physical Plan ==' ):
173+ plan_dict ['physical' ] = plan_str
174+ return plan_dict
175+
176+
166177def run_one_query (spark_session ,
167178 profiler ,
168179 query ,
169180 query_name ,
170181 output_path ,
171- output_format ):
182+ output_format ,
183+ save_plan_path ,
184+ plan_types ,
185+ skip_execution ):
172186 with profiler (query_name = query_name ):
173187 print (f"Running query { query_name } " )
174188 df = spark_session .sql (query )
175- if not output_path :
176- df .collect ()
177- else :
178- ensure_valid_column_names (df ).write .format (output_format ).mode ('overwrite' ).save (
179- output_path + '/' + query_name )
189+ if not skip_execution :
190+ if not output_path :
191+ df .collect ()
192+ else :
193+ ensure_valid_column_names (df ).write .format (output_format ).mode ('overwrite' ).save (
194+ output_path + '/' + query_name )
195+ if save_plan_path :
196+ explain_str = spark_session ._jvm .PythonSQLUtils .explainString (df ._jdf .queryExecution (), 'extended' )
197+ plans = parse_explain_str (explain_str )
198+ for plan_type in plan_types :
199+ with open (save_plan_path + '/' + query_name + "." + plan_type , 'w' ) as f :
200+ f .write (plans [plan_type ])
201+
180202
181203def ensure_valid_column_names (df : DataFrame ):
182204 def is_column_start (char ):
@@ -233,6 +255,7 @@ def run_query_stream(input_prefix,
233255 sub_queries ,
234256 warmup_iterations ,
235257 iterations ,
258+ plan_types ,
236259 input_format = "parquet" ,
237260 use_decimal = True ,
238261 output_path = None ,
@@ -242,7 +265,9 @@ def run_query_stream(input_prefix,
242265 keep_sc = False ,
243266 hive_external = False ,
244267 allow_failure = False ,
245- profiling_hook = None ):
268+ profiling_hook = None ,
269+ save_plan_path = None ,
270+ skip_execution = False ):
246271 """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
247272 for easy accesibility. TempView Creation time is also recorded.
248273
@@ -315,7 +340,10 @@ def run_query_stream(input_prefix,
315340 q_content ,
316341 query_name ,
317342 output_path ,
318- output_format )
343+ output_format ,
344+ save_plan_path ,
345+ plan_types ,
346+ skip_execution )
319347 print (f"Time taken: { summary ['queryTimes' ]} millis for { query_name } " )
320348 query_times = summary ['queryTimes' ]
321349 for query_time in query_times :
@@ -458,6 +486,18 @@ def load_properties(filename):
458486 type = int ,
459487 help = 'Number of iterations for each query.' ,
460488 default = 1 )
489+ parser .add_argument ('--save_plan_path' ,
490+ help = 'Save the execution plan of each query to the specified file. If --skip_execution is ' +
491+ 'specified, the execution plan will be saved without executing the query.' )
492+ parser .add_argument ('--plan_types' ,
493+ type = lambda s : [x .strip () for x in s .split (',' )],
494+ help = 'Comma separated list of plan types to save. ' +
495+ 'e.g. "physical, logical". Default is "logical".' ,
496+ default = 'logical' )
497+ parser .add_argument ('--skip_execution' ,
498+ action = 'store_true' ,
499+ help = 'Skip the execution of the queries. This can be used in conjunction with ' +
500+ '--save_plan_path to only save the execution plans without running the queries.' )
461501 args = parser .parse_args ()
462502 query_dict = gen_sql_from_stream (args .query_stream_file )
463503 run_query_stream (args .input_prefix ,
@@ -468,6 +508,7 @@ def load_properties(filename):
468508 args .sub_queries ,
469509 args .warmup_iterations ,
470510 args .iterations ,
511+ args .plan_types ,
471512 args .input_format ,
472513 not args .floats ,
473514 args .output_prefix ,
@@ -477,4 +518,6 @@ def load_properties(filename):
477518 args .keep_sc ,
478519 args .hive ,
479520 args .allow_failure ,
480- args .profiling_hook )
521+ args .profiling_hook ,
522+ args .save_plan_path ,
523+ args .skip_execution )
0 commit comments