Skip to content

Commit 5a648cc

Browse files
authored
Revert "Add support for warmups and iterations for each query (#202)" (#207)
This reverts commit 0db754f. Signed-off-by: Jihoon Son <[email protected]>
1 parent 0db754f commit 5a648cc

File tree

3 files changed

+25
-68
lines changed

3 files changed

+25
-68
lines changed

nds-h/nds_h_power.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
#
4-
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
55
# SPDX-License-Identifier: Apache-2.0
66
#
77
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -186,8 +186,6 @@ def run_query_stream(input_prefix,
186186
query_dict,
187187
time_log_output_path,
188188
sub_queries,
189-
warmup_iterations,
190-
iterations,
191189
input_format,
192190
output_path=None,
193191
keep_sc=False,
@@ -239,10 +237,7 @@ def run_query_stream(input_prefix,
239237
spark_session.sparkContext.setJobGroup(query_name, query_name)
240238
print("====== Run {} ======".format(query_name))
241239
q_report = PysparkBenchReport(spark_session, query_name)
242-
summary = q_report.report_on(run_one_query,
243-
warmup_iterations,
244-
iterations,
245-
spark_session,
240+
summary = q_report.report_on(run_one_query, spark_session,
246241
q_content,
247242
query_name,
248243
output_path,
@@ -351,23 +346,13 @@ def load_properties(filename):
351346
default='parquet')
352347
parser.add_argument('--property_file',
353348
help='property file for Spark configuration.')
354-
parser.add_argument('--warmup_iterations',
355-
type=int,
356-
help='Number of warmup iterations for each query.',
357-
default=0)
358-
parser.add_argument('--iterations',
359-
type=int,
360-
help='Number of iterations for each query.',
361-
default=1)
362349
args = parser.parse_args()
363350
query_dict = gen_sql_from_stream(args.query_stream_file)
364351
run_query_stream(args.input_prefix,
365352
args.property_file,
366353
query_dict,
367354
args.time_log,
368355
args.sub_queries,
369-
args.warmup_iterations,
370-
args.iterations,
371356
args.input_format,
372357
args.output_prefix,
373358
args.keep_sc,

nds/PysparkBenchReport.py

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
#
4-
# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
55
# SPDX-License-Identifier: Apache-2.0
66
#
77
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -57,7 +57,7 @@ def __init__(self, spark_session: SparkSession, query_name) -> None:
5757
'query': query_name,
5858
}
5959

60-
def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
60+
def report_on(self, fn: Callable, *args):
6161
"""Record a function for its running environment, running status etc. and exclude sentive
6262
information like tokens, secret and password Generate summary in dict format for it.
6363
@@ -84,41 +84,28 @@ def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
8484
if listener is not None:
8585
print("TaskFailureListener is registered.")
8686
try:
87-
# warmup
88-
for i in range(0, warmup_iterations):
89-
fn(*args)
87+
start_time = int(time.time() * 1000)
88+
fn(*args)
89+
end_time = int(time.time() * 1000)
90+
if listener and len(listener.failures) != 0:
91+
self.summary['queryStatus'].append("CompletedWithTaskFailures")
92+
else:
93+
self.summary['queryStatus'].append("Completed")
9094
except Exception as e:
91-
print('ERROR WHILE WARMUP BEGIN')
95+
# print the exception to ease debugging
96+
print('ERROR BEGIN')
9297
print(e)
9398
traceback.print_tb(e.__traceback__)
94-
print('ERROR WHILE WARMUP END')
95-
96-
start_time = int(time.time() * 1000)
97-
self.summary['startTime'] = start_time
98-
# run the query
99-
for i in range(0, iterations):
100-
try:
101-
start_time = int(time.time() * 1000)
102-
fn(*args)
103-
end_time = int(time.time() * 1000)
104-
if listener and len(listener.failures) != 0:
105-
self.summary['queryStatus'].append("CompletedWithTaskFailures")
106-
else:
107-
self.summary['queryStatus'].append("Completed")
108-
except Exception as e:
109-
# print the exception to ease debugging
110-
print('ERROR BEGIN')
111-
print(e)
112-
traceback.print_tb(e.__traceback__)
113-
print('ERROR END')
114-
end_time = int(time.time() * 1000)
115-
self.summary['queryStatus'].append("Failed")
116-
self.summary['exceptions'].append(str(e))
117-
finally:
118-
self.summary['queryTimes'].append(end_time - start_time)
119-
if listener is not None:
120-
listener.unregister()
121-
return self.summary
99+
print('ERROR END')
100+
end_time = int(time.time() * 1000)
101+
self.summary['queryStatus'].append("Failed")
102+
self.summary['exceptions'].append(str(e))
103+
finally:
104+
self.summary['startTime'] = start_time
105+
self.summary['queryTimes'].append(end_time - start_time)
106+
if listener is not None:
107+
listener.unregister()
108+
return self.summary
122109

123110
def write_summary(self, prefix=""):
124111
"""_summary_

nds/nds_power.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ def run_query_stream(input_prefix,
231231
time_log_output_path,
232232
extra_time_log_output_path,
233233
sub_queries,
234-
warmup_iterations,
235-
iterations,
236234
input_format="parquet",
237235
use_decimal=True,
238236
output_path=None,
@@ -308,18 +306,15 @@ def run_query_stream(input_prefix,
308306
spark_session.sparkContext.setJobGroup(query_name, query_name)
309307
print("====== Run {} ======".format(query_name))
310308
q_report = PysparkBenchReport(spark_session, query_name)
311-
summary = q_report.report_on(run_one_query,warmup_iterations,
312-
iterations,
313-
spark_session,
309+
summary = q_report.report_on(run_one_query,spark_session,
314310
profiler,
315311
q_content,
316312
query_name,
317313
output_path,
318314
output_format)
319315
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
320316
query_times = summary['queryTimes']
321-
for query_time in query_times:
322-
execution_time_list.append((spark_app_id, query_name, query_time))
317+
execution_time_list.append((spark_app_id, query_name, query_times[0]))
323318
queries_reports.append(q_report)
324319
if json_summary_folder:
325320
# property_file e.g.: "property/aqe-on.properties" or just "aqe-off.properties"
@@ -450,14 +445,6 @@ def load_properties(filename):
450445
help='Executable that is called just before/after a query executes.' +
451446
'The executable is called like this ' +
452447
'./hook {start|stop} output_root query_name.')
453-
parser.add_argument('--warmup_iterations',
454-
type=int,
455-
help='Number of warmup iterations for each query.',
456-
default=0)
457-
parser.add_argument('--iterations',
458-
type=int,
459-
help='Number of iterations for each query.',
460-
default=1)
461448
args = parser.parse_args()
462449
query_dict = gen_sql_from_stream(args.query_stream_file)
463450
run_query_stream(args.input_prefix,
@@ -466,8 +453,6 @@ def load_properties(filename):
466453
args.time_log,
467454
args.extra_time_log,
468455
args.sub_queries,
469-
args.warmup_iterations,
470-
args.iterations,
471456
args.input_format,
472457
not args.floats,
473458
args.output_prefix,

0 commit comments

Comments
 (0)