Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions nds-h/nds_h_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import os
import sys
import re
import subprocess

parent_dir = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '..'))

Expand Down Expand Up @@ -158,19 +159,41 @@ def deduplicate(column_names):
return df.toDF(*dedup_col_names)


def parse_explain_str(explain_str):
plan_strs = explain_str.split('\n\n')
plan_dict = {}
for plan_str in plan_strs:
if plan_str.startswith('== Optimized Logical Plan =='):
plan_dict['logical'] = plan_str
elif plan_str.startswith('== Physical Plan =='):
plan_dict['physical'] = plan_str
return plan_dict


def run_one_query(spark_session,
query,
query_name,
output_path,
output_format):
output_format,
save_plan_path,
plan_types,
skip_execution):
df = spark_session.sql(query)
# query15_part1 and query15_part3 are create/drop view queries
empty_output_queries = ['query15_part1', 'query15_part3']
if query_name in empty_output_queries or not output_path:
df.collect()
else:
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
output_path + '/' + query_name)
if not skip_execution:
if query_name in empty_output_queries or not output_path:
df.collect()
else:
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
output_path + '/' + query_name)
if save_plan_path:
subprocess.run(f"mkdir -p {save_plan_path}", shell=True)
explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended')
plans = parse_explain_str(explain_str)
for plan_type in plan_types:
with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f:
f.write(plans[plan_type])


def get_query_subset(query_dict, subset):
Expand All @@ -188,11 +211,14 @@ def run_query_stream(input_prefix,
sub_queries,
warmup_iterations,
iterations,
plan_types,
input_format,
output_path=None,
keep_sc=False,
output_format="parquet",
json_summary_folder=None):
json_summary_folder=None,
save_plan_path=None,
skip_execution=False):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accessibility. TempView Creation time is also recorded.

Expand Down Expand Up @@ -246,7 +272,10 @@ def run_query_stream(input_prefix,
q_content,
query_name,
output_path,
output_format)
output_format,
save_plan_path,
plan_types,
skip_execution)
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
query_times = summary['queryTimes']
execution_time_list.append((spark_app_id, query_name, query_times[0]))
Expand Down Expand Up @@ -359,6 +388,18 @@ def load_properties(filename):
type=int,
help='Number of iterations for each query.',
default=1)
parser.add_argument('--save_plan_path',
help='Save the execution plan of each query to the specified file. If --skip_execution is ' +
'specified, the execution plan will be saved without executing the query.')
parser.add_argument('--plan_types',
type=lambda s: [x.strip() for x in s.split(',')],
help='Comma separated list of plan types to save. ' +
'e.g. "physical, logical". Default is "logical".',
default='logical')
parser.add_argument('--skip_execution',
action='store_true',
help='Skip the execution of the queries. This can be used in conjunction with ' +
'--save_plan_path to only save the execution plans without running the queries.')
args = parser.parse_args()
query_dict = gen_sql_from_stream(args.query_stream_file)
run_query_stream(args.input_prefix,
Expand All @@ -368,8 +409,11 @@ def load_properties(filename):
args.sub_queries,
args.warmup_iterations,
args.iterations,
args.plan_types,
args.input_format,
args.output_prefix,
args.keep_sc,
args.output_format,
args.json_summary_folder)
args.json_summary_folder,
args.save_plan_path,
args.skip_execution)
62 changes: 53 additions & 9 deletions nds/nds_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,43 @@ def register_delta_tables(spark_session, input_prefix, execution_time_list):
return execution_time_list


def parse_explain_str(explain_str):
plan_strs = explain_str.split('\n\n')
plan_dict = {}
for plan_str in plan_strs:
if plan_str.startswith('== Optimized Logical Plan =='):
plan_dict['logical'] = plan_str
elif plan_str.startswith('== Physical Plan =='):
plan_dict['physical'] = plan_str
return plan_dict


def run_one_query(spark_session,
profiler,
query,
query_name,
output_path,
output_format):
output_format,
save_plan_path,
plan_types,
skip_execution):
with profiler(query_name=query_name):
print(f"Running query {query_name}")
df = spark_session.sql(query)
if not output_path:
df.collect()
else:
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
output_path + '/' + query_name)
if not skip_execution:
if not output_path:
df.collect()
else:
ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save(
output_path + '/' + query_name)
if save_plan_path:
subprocess.run(f"mkdir -p {save_plan_path}", shell=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider https://docs.python.org/3.12/library/os.html#os.makedirs instead of forking shell commands

explain_str = spark_session._jvm.PythonSQLUtils.explainString(df._jdf.queryExecution(), 'extended')
plans = parse_explain_str(explain_str)
for plan_type in plan_types:
with open(save_plan_path + '/' + query_name + "." + plan_type, 'w') as f:
f.write(plans[plan_type])


def ensure_valid_column_names(df: DataFrame):
def is_column_start(char):
Expand Down Expand Up @@ -233,6 +256,7 @@ def run_query_stream(input_prefix,
sub_queries,
warmup_iterations,
iterations,
plan_types,
input_format="parquet",
use_decimal=True,
output_path=None,
Expand All @@ -242,7 +266,9 @@ def run_query_stream(input_prefix,
keep_sc=False,
hive_external=False,
allow_failure=False,
profiling_hook=None):
profiling_hook=None,
save_plan_path=None,
skip_execution=False):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accesibility. TempView Creation time is also recorded.

Expand Down Expand Up @@ -315,7 +341,10 @@ def run_query_stream(input_prefix,
q_content,
query_name,
output_path,
output_format)
output_format,
save_plan_path,
plan_types,
skip_execution)
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
query_times = summary['queryTimes']
for query_time in query_times:
Expand Down Expand Up @@ -458,6 +487,18 @@ def load_properties(filename):
type=int,
help='Number of iterations for each query.',
default=1)
parser.add_argument('--save_plan_path',
help='Save the execution plan of each query to the specified file. If --skip_execution is ' +
'specified, the execution plan will be saved without executing the query.')
parser.add_argument('--plan_types',
type=lambda s: [x.strip() for x in s.split(',')],
help='Comma separated list of plan types to save. ' +
'e.g. "physical, logical". Default is "logical".',
default='logical')
parser.add_argument('--skip_execution',
action='store_true',
help='Skip the execution of the queries. This can be used in conjunction with ' +
'--save_plan_path to only save the execution plans without running the queries.')
args = parser.parse_args()
query_dict = gen_sql_from_stream(args.query_stream_file)
run_query_stream(args.input_prefix,
Expand All @@ -468,6 +509,7 @@ def load_properties(filename):
args.sub_queries,
args.warmup_iterations,
args.iterations,
args.plan_types,
args.input_format,
not args.floats,
args.output_prefix,
Expand All @@ -477,4 +519,6 @@ def load_properties(filename):
args.keep_sc,
args.hive,
args.allow_failure,
args.profiling_hook)
args.profiling_hook,
args.save_plan_path,
args.skip_execution)