Skip to content

Commit fc11bba

Browse files
Reverting NDS related changes
Signed-off-by: Sayed Bilal Bari <[email protected]>
1 parent 4ab62ba commit fc11bba

File tree

7 files changed

+298
-9
lines changed

7 files changed

+298
-9
lines changed

nds/PysparkBenchReport.py

Lines changed: 0 additions & 1 deletion
This file was deleted.

nds/PysparkBenchReport.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
#
4+
# SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
5+
# SPDX-License-Identifier: Apache-2.0
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
# -----
20+
#
21+
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
22+
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
23+
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
24+
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
25+
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
26+
#
27+
# You may not use this file except in compliance with the TPC EULA.
28+
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
29+
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
30+
# obtained from using this file do not comply with the TPC-DS Benchmark.
31+
#
32+
33+
import json
34+
import os
35+
import time
36+
import traceback
37+
from typing import Callable
38+
from pyspark.sql import SparkSession
39+
40+
import python_listener
41+
42+
class PysparkBenchReport:
43+
"""Class to generate json summary report for a benchmark
44+
"""
45+
def __init__(self, spark_session: SparkSession, query_name) -> None:
46+
self.spark_session = spark_session
47+
self.summary = {
48+
'env': {
49+
'envVars': {},
50+
'sparkConf': {},
51+
'sparkVersion': None
52+
},
53+
'queryStatus': [],
54+
'exceptions': [],
55+
'startTime': None,
56+
'queryTimes': [],
57+
'query': query_name,
58+
}
59+
60+
def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
61+
"""Record a function for its running environment, running status etc. and exclude sentive
62+
information like tokens, secret and password Generate summary in dict format for it.
63+
64+
Args:
65+
fn (Callable): a function to be recorded
66+
67+
Returns:
68+
dict: summary of the fn
69+
"""
70+
spark_conf = dict(self.spark_session.sparkContext._conf.getAll())
71+
env_vars = dict(os.environ)
72+
redacted = ["TOKEN", "SECRET", "PASSWORD"]
73+
filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted))
74+
self.summary['env']['envVars'] = filtered_env_vars
75+
self.summary['env']['sparkConf'] = spark_conf
76+
self.summary['env']['sparkVersion'] = self.spark_session.version
77+
listener = None
78+
try:
79+
listener = python_listener.PythonListener()
80+
listener.register()
81+
except TypeError as e:
82+
print("Not found com.nvidia.spark.rapids.listener.Manager", str(e))
83+
listener = None
84+
if listener is not None:
85+
print("TaskFailureListener is registered.")
86+
try:
87+
# warmup
88+
for i in range(0, warmup_iterations):
89+
fn(*args)
90+
except Exception as e:
91+
print('ERROR WHILE WARMUP BEGIN')
92+
print(e)
93+
traceback.print_tb(e.__traceback__)
94+
print('ERROR WHILE WARMUP END')
95+
96+
start_time = int(time.time() * 1000)
97+
self.summary['startTime'] = start_time
98+
# run the query
99+
for i in range(0, iterations):
100+
try:
101+
start_time = int(time.time() * 1000)
102+
fn(*args)
103+
end_time = int(time.time() * 1000)
104+
if listener and len(listener.failures) != 0:
105+
self.summary['queryStatus'].append("CompletedWithTaskFailures")
106+
else:
107+
self.summary['queryStatus'].append("Completed")
108+
except Exception as e:
109+
# print the exception to ease debugging
110+
print('ERROR BEGIN')
111+
print(e)
112+
traceback.print_tb(e.__traceback__)
113+
print('ERROR END')
114+
end_time = int(time.time() * 1000)
115+
self.summary['queryStatus'].append("Failed")
116+
self.summary['exceptions'].append(str(e))
117+
finally:
118+
self.summary['queryTimes'].append(end_time - start_time)
119+
if listener is not None:
120+
listener.unregister()
121+
return self.summary
122+
123+
def write_summary(self, prefix=""):
124+
"""_summary_
125+
126+
Args:
127+
query_name (str): name of the query
128+
prefix (str, optional): prefix for the output json summary file. Defaults to "".
129+
"""
130+
# Power BI side is retrieving some information from the summary file name, so keep this file
131+
# name format for pipeline compatibility
132+
filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json'
133+
self.summary['filename'] = filename
134+
with open(filename, "w") as f:
135+
json.dump(self.summary, f, indent=2)
136+
137+
def is_success(self):
138+
"""Check if the query succeeded, queryStatus == Completed
139+
"""
140+
return self.summary['queryStatus'][0] == 'Completed'

nds/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ finished. This is often used for test or query monitoring purpose.
301301
To build:
302302
303303
```bash
304-
cd utils/jvm_listener
304+
cd jvm_listener
305305
mvn package
306306
```
307307
@@ -353,7 +353,7 @@ nds_power.py \
353353
parquet_sf3k \
354354
./nds_query_streams/query_0.sql \
355355
time.csv \
356-
--property_file ../utils/properties/aqe-on.properties
356+
--property_file properties/aqe-on.properties
357357
```
358358
359359
User can also use `spark-submit` to submit `nds_power.py` directly.
@@ -364,7 +364,7 @@ Note the template file must follow the `spark-submit-template` utility as the _f
364364
All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by
365365
double quotes in the template file. Please follow the format in [power_run_gpu.template](./power_run_gpu.template).
366366
367-
User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.
367+
User can define the `properties` file like [aqe-on.properties](./properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.
368368
369369
The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc:
370370

nds/base.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export NUM_EXECUTORS=${NUM_EXECUTORS:-8}
3131
export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G}
3232

3333
# The NDS listener jar which is built in jvm_listener directory.
34-
export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-../utils/jvm_listener/target/benchmark-listener-1.0-SNAPSHOT.jar}
34+
export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-./jvm_listener/target/nds-benchmark-listener-1.0-SNAPSHOT.jar}
3535
# The spark-rapids jar which is required when running on GPU
3636
export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-22.06.0.jar}
3737
export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip`

nds/check.py

Lines changed: 0 additions & 1 deletion
This file was deleted.

nds/check.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#!/usr/bin/env python3
2+
#
3+
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
4+
# SPDX-License-Identifier: Apache-2.0
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
# -----
19+
#
20+
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
21+
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
22+
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
23+
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
24+
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
25+
#
26+
# You may not use this file except in compliance with the TPC EULA.
27+
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
28+
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
29+
# obtained from using this file do not comply with the TPC-DS Benchmark.
30+
#
31+
32+
import argparse
33+
import os
34+
import sys
35+
from pathlib import Path
36+
37+
38+
def check_version():
39+
req_ver = (3, 6)
40+
cur_ver = sys.version_info
41+
if cur_ver < req_ver:
42+
raise Exception('Minimum required Python version is 3.6, but current python version is {}.'
43+
.format(str(cur_ver.major) + '.' + str(cur_ver.minor)) +
44+
' Please use proper Python version')
45+
46+
47+
def check_build():
48+
"""check jar and tpcds executable
49+
50+
Raises:
51+
Exception: the build is not done or broken
52+
53+
Returns:
54+
PosixPath, PosixPath: path of jar and dsdgen executable
55+
"""
56+
# Check if necessary executable or jars are built.
57+
# we assume user won't move this script.
58+
src_dir = Path(__file__).parent.absolute()
59+
jar_path = list(
60+
Path(src_dir / 'tpcds-gen/target').rglob("tpcds-gen-*.jar"))
61+
tool_path = list(Path(src_dir / 'tpcds-gen/target/tools').rglob("dsdgen"))
62+
if jar_path == [] or tool_path == []:
63+
raise Exception('Target jar file is not found in `target` folder or dsdgen executable is ' +
64+
'not found in `target/tools` folder.' +
65+
'Please refer to README document and build this project first.')
66+
return jar_path[0], tool_path[0]
67+
68+
69+
def get_abs_path(input_path):
70+
"""receive a user input path and return absolute path of it.
71+
72+
Args:
73+
input_path (str): user's input path
74+
75+
Returns:
76+
str: if the input is absolute, return it; if it's relative path, return the absolute path of
77+
it.
78+
"""
79+
if Path(input_path).is_absolute():
80+
# it's absolute path
81+
output_path = input_path
82+
else:
83+
# it's relative path where this script is executed
84+
output_path = os.getcwd() + '/' + input_path
85+
return output_path
86+
87+
88+
def valid_range(range, parallel):
89+
"""check the range validation
90+
91+
Args:
92+
range (str): a range specified for a range data generation, e.g. "1,10"
93+
parallel (str): string type number for parallelism in TPC-DS data generation, e.g. "20"
94+
95+
Raises:
96+
Exception: error message for invalid range input.
97+
"""
98+
if len(range.split(',')) != 2:
99+
msg = 'Invalid range: please specify a range with a comma between start and end. e.g., "1,10".'
100+
raise Exception(msg)
101+
range_start = int(range.split(',')[0])
102+
range_end = int(range.split(',')[1])
103+
if range_start < 1 or range_start > range_end or range_end > int(parallel):
104+
msg = 'Please provide correct child range: 1 <= range_start <= range_end <= parallel'
105+
raise Exception(msg)
106+
return range_start, range_end
107+
108+
109+
def parallel_value_type(p):
110+
"""helper function to check parallel valuie
111+
112+
Args:
113+
p (str): parallel value
114+
115+
Raises:
116+
argparse.ArgumentTypeError: ArgumentTypeError exception
117+
118+
Returns:
119+
str: parallel in string
120+
"""
121+
if int(p) < 2:
122+
raise argparse.ArgumentTypeError("PARALLEL must be >= 2")
123+
return p
124+
125+
126+
def get_dir_size(start_path):
127+
total_size = 0
128+
for dirpath, dirnames, filenames in os.walk(start_path):
129+
for f in filenames:
130+
fp = os.path.join(dirpath, f)
131+
# skip if it is symbolic link
132+
if not os.path.islink(fp):
133+
total_size += os.path.getsize(fp)
134+
return total_size
135+
136+
def check_json_summary_folder(json_summary_folder):
137+
if json_summary_folder:
138+
# prepare a folder to save json summaries of query results
139+
if not os.path.exists(json_summary_folder):
140+
os.makedirs(json_summary_folder)
141+
else:
142+
if os.listdir(json_summary_folder):
143+
raise Exception(f"json_summary_folder {json_summary_folder} is not empty. " +
144+
"There may be already some json files there. Please clean the folder " +
145+
"or specify another one.")
146+
147+
def check_query_subset_exists(query_dict, subset_list):
148+
"""check if the query subset exists in the query dictionary"""
149+
for q in subset_list:
150+
if q not in query_dict.keys():
151+
raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.")
152+
return True

nds/nds_gen_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import shutil
3636
import subprocess
3737

38-
from check import check_build_nds, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range
38+
from check import check_build, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range
3939

4040
check_version()
4141

nds/nds_gen_query_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import subprocess
3636
import sys
3737

38-
from check import check_build_nds, check_version, get_abs_path
38+
from check import check_build, check_version, get_abs_path
3939

4040
check_version()
4141

nds/nds_maintenance.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import os
3737

3838
from pyspark.sql import SparkSession
39-
4039
from PysparkBenchReport import PysparkBenchReport
4140

4241
from check import check_json_summary_folder, get_abs_path

0 commit comments

Comments
 (0)