Skip to content

Commit e028068

Browse files
NDS-H support for warmup iterations (#211)
* Changes for removing duplicate PysparkBenchReport + nds-h warmup+iterations changes * Changes for removing duplicate PysparkBenchReport + nds-h warmup+iterations changes * Refactor for nds/nds-h Signed-off-by: Sayed Bilal Bari <[email protected]> * License updated Signed-off-by: Sayed Bilal Bari <[email protected]> * Consolidating common jvm_listener and aqe.properties Signed-off-by: Sayed Bilal Bari <[email protected]> * Updating license header Signed-off-by: Sayed Bilal Bari <[email protected]> * Removing old files and adding symlinks to new files * Changes to test with symlinks * Reverting NDS related changes Signed-off-by: Sayed Bilal Bari <[email protected]> * Reverting NDS related changes Signed-off-by: Sayed Bilal Bari <[email protected]> * Updated headers for files without any change --------- Signed-off-by: Sayed Bilal Bari <[email protected]>
1 parent 6b3c165 commit e028068

File tree

2 files changed

+49
-21
lines changed

2 files changed

+49
-21
lines changed

nds-h/nds_h_power.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ def run_query_stream(input_prefix,
186186
query_dict,
187187
time_log_output_path,
188188
sub_queries,
189+
warmup_iterations,
190+
iterations,
189191
input_format,
190192
output_path=None,
191193
keep_sc=False,
@@ -238,6 +240,8 @@ def run_query_stream(input_prefix,
238240
print("====== Run {} ======".format(query_name))
239241
q_report = PysparkBenchReport(spark_session, query_name)
240242
summary = q_report.report_on(run_one_query,
243+
warmup_iterations,
244+
iterations,
241245
spark_session,
242246
q_content,
243247
query_name,
@@ -347,13 +351,23 @@ def load_properties(filename):
347351
default='parquet')
348352
parser.add_argument('--property_file',
349353
help='property file for Spark configuration.')
354+
parser.add_argument('--warmup_iterations',
355+
type=int,
356+
help='Number of warmup iterations for each query.',
357+
default=0)
358+
parser.add_argument('--iterations',
359+
type=int,
360+
help='Number of iterations for each query.',
361+
default=1)
350362
args = parser.parse_args()
351363
query_dict = gen_sql_from_stream(args.query_stream_file)
352364
run_query_stream(args.input_prefix,
353365
args.property_file,
354366
query_dict,
355367
args.time_log,
356368
args.sub_queries,
369+
args.warmup_iterations,
370+
args.iterations,
357371
args.input_format,
358372
args.output_prefix,
359373
args.keep_sc,

utils/python_benchmark_reporter/PysparkBenchReport.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,14 @@ def __init__(self, spark_session: SparkSession, query_name) -> None:
5656
'query': query_name,
5757
}
5858

59-
def report_on(self, fn: Callable, *args):
59+
def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
6060
"""Record a function for its running environment, running status etc. and exclude sentive
6161
information like tokens, secret and password Generate summary in dict format for it.
6262
6363
Args:
6464
fn (Callable): a function to be recorded
65-
65+
:param iterations:
66+
:param warmup_iterations:
6667
Returns:
6768
dict: summary of the fn
6869
"""
@@ -83,28 +84,41 @@ def report_on(self, fn: Callable, *args):
8384
if listener is not None:
8485
print("TaskFailureListener is registered.")
8586
try:
86-
start_time = int(time.time() * 1000)
87-
fn(*args)
88-
end_time = int(time.time() * 1000)
89-
if listener and len(listener.failures) != 0:
90-
self.summary['queryStatus'].append("CompletedWithTaskFailures")
91-
else:
92-
self.summary['queryStatus'].append("Completed")
87+
# warmup
88+
for i in range(0, warmup_iterations):
89+
fn(*args)
9390
except Exception as e:
94-
# print the exception to ease debugging
95-
print('ERROR BEGIN')
91+
print('ERROR WHILE WARMUP BEGIN')
9692
print(e)
9793
traceback.print_tb(e.__traceback__)
98-
print('ERROR END')
99-
end_time = int(time.time() * 1000)
100-
self.summary['queryStatus'].append("Failed")
101-
self.summary['exceptions'].append(str(e))
102-
finally:
103-
self.summary['startTime'] = start_time
104-
self.summary['queryTimes'].append(end_time - start_time)
105-
if listener is not None:
106-
listener.unregister()
107-
return self.summary
94+
print('ERROR WHILE WARMUP END')
95+
96+
start_time = int(time.time() * 1000)
97+
self.summary['startTime'] = start_time
98+
# run the query
99+
for i in range(0, iterations):
100+
try:
101+
start_time = int(time.time() * 1000)
102+
fn(*args)
103+
end_time = int(time.time() * 1000)
104+
if listener and len(listener.failures) != 0:
105+
self.summary['queryStatus'].append("CompletedWithTaskFailures")
106+
else:
107+
self.summary['queryStatus'].append("Completed")
108+
except Exception as e:
109+
# print the exception to ease debugging
110+
print('ERROR BEGIN')
111+
print(e)
112+
traceback.print_tb(e.__traceback__)
113+
print('ERROR END')
114+
end_time = int(time.time() * 1000)
115+
self.summary['queryStatus'].append("Failed")
116+
self.summary['exceptions'].append(str(e))
117+
finally:
118+
self.summary['queryTimes'].append(end_time - start_time)
119+
if listener is not None:
120+
listener.unregister()
121+
return self.summary
108122

109123
def write_summary(self, prefix=""):
110124
"""_summary_

0 commit comments

Comments
 (0)