Skip to content

Commit 7ef00fd

Browse files
committed
Add warmup_iterations and iterations back
1 parent 5a648cc commit 7ef00fd

File tree

4 files changed

+59
-29
lines changed

4 files changed

+59
-29
lines changed

nds-h/nds_h_power.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
#
4-
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
55
# SPDX-License-Identifier: Apache-2.0
66
#
77
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -237,7 +237,8 @@ def run_query_stream(input_prefix,
237237
spark_session.sparkContext.setJobGroup(query_name, query_name)
238238
print("====== Run {} ======".format(query_name))
239239
q_report = PysparkBenchReport(spark_session, query_name)
240-
summary = q_report.report_on(run_one_query, spark_session,
240+
summary = q_report.report_on(run_one_query,
241+
spark_session,
241242
q_content,
242243
query_name,
243244
output_path,

nds/PysparkBenchReport.py

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
#
4-
# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
55
# SPDX-License-Identifier: Apache-2.0
66
#
77
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -57,7 +57,7 @@ def __init__(self, spark_session: SparkSession, query_name) -> None:
5757
'query': query_name,
5858
}
5959

60-
def report_on(self, fn: Callable, *args):
60+
def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
6161
"""Record a function for its running environment, running status etc. and exclude sentive
6262
information like tokens, secret and password Generate summary in dict format for it.
6363
@@ -84,28 +84,41 @@ def report_on(self, fn: Callable, *args):
8484
if listener is not None:
8585
print("TaskFailureListener is registered.")
8686
try:
87-
start_time = int(time.time() * 1000)
88-
fn(*args)
89-
end_time = int(time.time() * 1000)
90-
if listener and len(listener.failures) != 0:
91-
self.summary['queryStatus'].append("CompletedWithTaskFailures")
92-
else:
93-
self.summary['queryStatus'].append("Completed")
87+
# warmup
88+
for i in range(0, warmup_iterations):
89+
fn(*args)
9490
except Exception as e:
95-
# print the exception to ease debugging
96-
print('ERROR BEGIN')
91+
print('ERROR WHILE WARMUP BEGIN')
9792
print(e)
9893
traceback.print_tb(e.__traceback__)
99-
print('ERROR END')
100-
end_time = int(time.time() * 1000)
101-
self.summary['queryStatus'].append("Failed")
102-
self.summary['exceptions'].append(str(e))
103-
finally:
104-
self.summary['startTime'] = start_time
105-
self.summary['queryTimes'].append(end_time - start_time)
106-
if listener is not None:
107-
listener.unregister()
108-
return self.summary
94+
print('ERROR WHILE WARMUP END')
95+
96+
start_time = int(time.time() * 1000)
97+
self.summary['startTime'] = start_time
98+
# run the query
99+
for i in range(0, iterations):
100+
try:
101+
start_time = int(time.time() * 1000)
102+
fn(*args)
103+
end_time = int(time.time() * 1000)
104+
if listener and len(listener.failures) != 0:
105+
self.summary['queryStatus'].append("CompletedWithTaskFailures")
106+
else:
107+
self.summary['queryStatus'].append("Completed")
108+
except Exception as e:
109+
# print the exception to ease debugging
110+
print('ERROR BEGIN')
111+
print(e)
112+
traceback.print_tb(e.__traceback__)
113+
print('ERROR END')
114+
end_time = int(time.time() * 1000)
115+
self.summary['queryStatus'].append("Failed")
116+
self.summary['exceptions'].append(str(e))
117+
finally:
118+
self.summary['queryTimes'].append(end_time - start_time)
119+
if listener is not None:
120+
listener.unregister()
121+
return self.summary
109122

110123
def write_summary(self, prefix=""):
111124
"""_summary_

nds/nds_maintenance.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,11 @@ def run_query(spark_session,
227227
spark_session.sparkContext.setJobGroup(query_name, query_name)
228228
print(f"====== Run {query_name} ======")
229229
q_report = PysparkBenchReport(spark_session, query_name)
230-
summary = q_report.report_on(run_dm_query, spark_session,
231-
q_content,
232-
query_name,
233-
warehouse_type)
230+
summary = q_report.report_on(run_dm_query, 0, 1,
231+
spark_session,
232+
q_content,
233+
query_name,
234+
warehouse_type)
234235
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
235236
execution_time_list.append((spark_app_id, query_name, summary['queryTimes']))
236237
if json_summary_folder:

nds/nds_power.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ def run_query_stream(input_prefix,
231231
time_log_output_path,
232232
extra_time_log_output_path,
233233
sub_queries,
234+
warmup_iterations,
235+
iterations,
234236
input_format="parquet",
235237
use_decimal=True,
236238
output_path=None,
@@ -306,15 +308,18 @@ def run_query_stream(input_prefix,
306308
spark_session.sparkContext.setJobGroup(query_name, query_name)
307309
print("====== Run {} ======".format(query_name))
308310
q_report = PysparkBenchReport(spark_session, query_name)
309-
summary = q_report.report_on(run_one_query,spark_session,
311+
summary = q_report.report_on(run_one_query,warmup_iterations,
312+
iterations,
313+
spark_session,
310314
profiler,
311315
q_content,
312316
query_name,
313317
output_path,
314318
output_format)
315319
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
316320
query_times = summary['queryTimes']
317-
execution_time_list.append((spark_app_id, query_name, query_times[0]))
321+
for query_time in query_times:
322+
execution_time_list.append((spark_app_id, query_name, query_time))
318323
queries_reports.append(q_report)
319324
if json_summary_folder:
320325
# property_file e.g.: "property/aqe-on.properties" or just "aqe-off.properties"
@@ -445,6 +450,14 @@ def load_properties(filename):
445450
help='Executable that is called just before/after a query executes.' +
446451
'The executable is called like this ' +
447452
'./hook {start|stop} output_root query_name.')
453+
parser.add_argument('--warmup_iterations',
454+
type=int,
455+
help='Number of warmup iterations for each query.',
456+
default=0)
457+
parser.add_argument('--iterations',
458+
type=int,
459+
help='Number of iterations for each query.',
460+
default=1)
448461
args = parser.parse_args()
449462
query_dict = gen_sql_from_stream(args.query_stream_file)
450463
run_query_stream(args.input_prefix,
@@ -453,6 +466,8 @@ def load_properties(filename):
453466
args.time_log,
454467
args.extra_time_log,
455468
args.sub_queries,
469+
args.warmup_iterations,
470+
args.iterations,
456471
args.input_format,
457472
not args.floats,
458473
args.output_prefix,

0 commit comments

Comments
 (0)