diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a6f4efd5..0ec6ab705 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,36 +1,35 @@ # BlazingSQL 21.10.00 (TBD) ## New Features - ## Improvements ## Bug Fixes - -# BlazingSQL 21.08.00 (TBD) +# BlazingSQL 21.08.00 (August 12th, 2021) ## New Features - ## Improvements - #1571 Update ucx-py versions to 0.21 - #1554 return ok for filesystems - #1572 Setting up default value for max_bytes_chunk_read to 256 MB - ## Bug Fixes -- #1570 Fix build due to changes in rmm device buffer +- #1570 Fix build due to changes in rmm device buffer - #1574 Fix reading decimal columns from orc file - #1576 Fix `CC`/`CXX` variables in CI - #1581 Fix latest cudf dependencies - #1582 Fix concat suite E2E test for nested calls - #1585 Fix for GCS credentials from filepath - +- #1589 Fix decimal support using float64 +- #1590 Fix build issue with thrust package +- #1595 Fix `spdlog` pinning +- #1597 Fix `google-cloud-cpp` pinning # BlazingSQL 21.06.00 (June 10th, 2021) ## New Features -- #1471 Unbounded partitioned windows +- #1471 Unbounded partitioned windows - #1445 Support for CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP - #1505 Support for right outer join - #1523 Support for DURATION type @@ -50,7 +49,7 @@ - #1455 Support for IS NOT FALSE condition - #1502 Fix IS NOT DISTINCT FROM with joins - #1475 Fix wrong results from timestampdiff/add -- #1528 Fixed build issues due to cudf aggregation API change +- #1528 Fixed build issues due to cudf aggregation API change - #1540 Comparing param set to true for e2e - #1543 Enables provider unit_tests - #1548 Fix orc statistic building @@ -65,7 +64,7 @@ ## New Features - #1367 OverlapAccumulator Kernel - #1364 Implement the concurrent API (bc.sql with token, bc.status, bc.fetch) -- #1426 Window Functions without partitioning +- #1426 Window Functions without partitioning - #1349 Add e2e test for Hive Partitioned Data - #1396 Create tables from other RDBMS - #1427 Support for CONCAT alias operator @@ -73,7 +72,7 @@ - #1472 Implement predicate pushdown for data providers ## Improvements -- #1325 Refactored CacheMachine.h and CacheMachine.cpp +- #1325 Refactored CacheMachine.h and CacheMachine.cpp - #1322 Updated and enabled several E2E tests - #1333 Fixing build due to cudf update - #1344 Removed GPUCacheDataMetadata class @@ -82,7 +81,7 @@ - #1331 Added flag to enable null e2e testing - #1418 Adding support for docker image - #1434 Added documentation for C++ and Python in Sphinx -- #1419 Added concat cache machine timeout +- #1419 Added concat cache machine timeout - #1444 Updating GCP to >= version - #1349 Add e2e test for Hive Partitioned Data - #1447 Improve getting estimated output num rows @@ -99,18 +98,18 @@ - #1350 Fixed bug where there are no projects in a bindable table scan - #1359 Avoid cuda issues when free pinned memory - #1365 Fixed build after sublibs changes on cudf -- #1369 Updated java path for powerpc build +- #1369 Updated java path for powerpc build - #1371 Fixed e2e settings - #1372 Recompute `columns_to_hash` in DistributeAggregationKernel - #1375 Fix empty row_group_ids for parquet -- #1380 Fixed issue with int64 literal values +- #1380 Fixed issue with int64 literal values - #1379 Remove ProjectRemoveRule - #1389 Fix issue when CAST a literal - #1387 Skip getting orc metadata for decimal type - #1392 Fix substrings with nulls - #1398 Fix performance regression - #1401 Fix support for minus unary operation -- #1415 Fixed bug where num_batches was not getting set in BindableTableScan +- #1415 Fixed bug where num_batches was not getting set in BindableTableScan - #1413 Fix for null tests 13 and 23 of windowFunctionTest - #1416 Fix full join when both tables contains nulls - #1423 Fix temporary directory for hive partition test @@ -123,7 +122,7 @@ - #1504 Fixing some conflicts in Dockerfile ## Deprecated Features -- #1394 Disabled support for outer joins with inequalities +- #1394 Disabled support for outer joins with inequalities # BlazingSQL 0.18.0 (February 24, 2021) @@ -136,7 +135,7 @@ - #1238 Implements MergeStramKernel executor model - #1259 Implements SortAndSamplernel executor model, also avoid setting up num of samples - #1271 Added Hive utility for partitioned data -- #1289 Multiple concurrent query support +- #1289 Multiple concurrent query support - #1285 Infer PROTOCOL when Dask client is passed - #1294 Add config options for logger - #1301 Added usage of pinned buffers for communication and fixes various UCX related bugs @@ -145,7 +144,7 @@ - #1303 Add support for INITCAP - #1313 getting and using ORC metadata - #1347 Fixing issue when reading orc metadata from DATE dtype -- #1338 Window Function support for LEAD and LAG statements +- #1338 Window Function support for LEAD and LAG statements - #1362 give useful message when file extension is not recognized - #1361 Supporting first_value and last_value for Window Function @@ -168,7 +167,7 @@ - #1308 Improve the engine loggers - #1314 Added unit tests to verify that OOM error handling works well - #1320 Revamping cache logger -- #1323 Made progress bar update continuously and stay after query is done +- #1323 Made progress bar update continuously and stay after query is done - #1336 Improvements for the cache API - #1483 Improve dependencies script @@ -182,7 +181,7 @@ - #1277 Support FileSystems (GS, S3) when extension of the files are not provided - #1300 Fixed issue when creating tables from a local dir relative path - #1312 Fix progress bar for jupyterlab -- #1318 Disabled require acknowledge +- #1318 Disabled require acknowledge # BlazingSQL 0.17.0 (December 10, 2020) @@ -203,7 +202,7 @@ - #1201 Implement string TRIM - #1216 Add unit test for DAYOFWEEK - #1205 Implement string REVERSE -- #1220 Implement string LEFT and RIGHT +- #1220 Implement string LEFT and RIGHT - #1223 Add support for UNION statement - #1250 updated README.md and CHANGELOG and others preparing for 0.17 release @@ -262,7 +261,7 @@ - #1203 Changed code back so that parquet is not read a single rowgroup at a time - #1207 Calcite uses literal as int32 if not explicit CAST was provided - #1212 Fixed issue when building the thirdpart, cmake version set to 3.18.4 -- #1225 Fixed issue due to change in gather API +- #1225 Fixed issue due to change in gather API - #1254 Fixing support of nightly and stable on localhost - #1258 Fixing gtest version issue diff --git a/README.md b/README.md index 316babd8c..536b54d2c 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,9 @@ You can find our full documentation at [docs.blazingdb.com](https://docs.blazing * Pascal or Better * Compute Capability >= 6.0 * CUDA Support - * 10.1.2 - * 10.2 + * 11.0 + * 11.2 + * 11.4 * Python Support * 3.7 * 3.8 @@ -91,10 +92,10 @@ BlazingSQL can be installed with conda ([miniconda](https://conda.io/miniconda.h ```bash conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql python=$PYTHON_VERSION cudatoolkit=$CUDA_VERSION ``` -Where $CUDA_VERSION is 10.1, 10.2 or 11.0 and $PYTHON_VERSION is 3.7 or 3.8 -*For example for CUDA 10.1 and Python 3.7:* +Where $CUDA_VERSION is 11.0, 11.2 or 11.4 and $PYTHON_VERSION is 3.7 or 3.8 +*For example for CUDA 11.2 and Python 3.8:* ```bash -conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql python=3.7 cudatoolkit=10.1 +conda install -c blazingsql -c rapidsai -c nvidia -c conda-forge -c defaults blazingsql python=3.8 cudatoolkit=11.2 ``` ## Nightly Version @@ -102,7 +103,7 @@ For nightly version cuda 11+ are only supported, see https://github.com/rapidsai ```bash conda install -c blazingsql-nightly -c rapidsai-nightly -c nvidia -c conda-forge -c defaults blazingsql python=$PYTHON_VERSION cudatoolkit=$CUDA_VERSION ``` -Where $CUDA_VERSION is 11.0 or 11.2 and $PYTHON_VERSION is 3.7 or 3.8 +Where $CUDA_VERSION is 11.0, 11.2 or 11.4 and $PYTHON_VERSION is 3.7 or 3.8 *For example for CUDA 11.2 and Python 3.8:* ```bash conda install -c blazingsql-nightly -c rapidsai-nightly -c nvidia -c conda-forge -c defaults blazingsql python=3.8 cudatoolkit=11.2 @@ -117,14 +118,14 @@ This is the recommended way of building all of the BlazingSQL components and dep ```bash conda create -n bsql python=$PYTHON_VERSION conda activate bsql -./dependencies.sh 21.06 $CUDA_VERSION +./dependencies.sh 21.08 $CUDA_VERSION ``` -Where $CUDA_VERSION is is 10.1, 10.2 or 11.0 and $PYTHON_VERSION is 3.7 or 3.8 -*For example for CUDA 10.1 and Python 3.7:* +Where $CUDA_VERSION is is 11.0, 11.2 or 11.4 and $PYTHON_VERSION is 3.7 or 3.8 +*For example for CUDA 11.2 and Python 3.7:* ```bash conda create -n bsql python=3.7 conda activate bsql -./dependencies.sh 21.06 10.1 +./dependencies.sh 21.08 11.2 ``` ### Build @@ -149,14 +150,14 @@ For nightly version cuda 11+ are only supported, see https://github.com/rapidsai ```bash conda create -n bsql python=$PYTHON_VERSION conda activate bsql -./dependencies.sh 21.08 $CUDA_VERSION nightly +./dependencies.sh 21.10 $CUDA_VERSION nightly ``` -Where $CUDA_VERSION is 11.0 or 11.2 and $PYTHON_VERSION is 3.7 or 3.8 +Where $CUDA_VERSION is 11.0, 11.2 or 11.4 and $PYTHON_VERSION is 3.7 or 3.8 *For example for CUDA 11.2 and Python 3.8:* ```bash conda create -n bsql python=3.8 conda activate bsql -./dependencies.sh 21.08 11.2 nightly +./dependencies.sh 21.10 11.2 nightly ``` ### Build diff --git a/build.sh b/build.sh index 0faeeaef0..74fe5dd95 100755 --- a/build.sh +++ b/build.sh @@ -95,7 +95,7 @@ fi # Get version number export GIT_DESCRIBE_TAG=`git describe --tags` export MINOR_VERSION=`echo $GIT_DESCRIBE_TAG | grep -o -E '([0-9]+\.[0-9]+)'` -export UCX_PY_VERSION="0.21" +export UCX_PY_VERSION="0.22" # Process flags if hasArg -v; then diff --git a/ci/cpu/upload.sh b/ci/cpu/upload.sh index 98c00868e..0c1854a8e 100755 --- a/ci/cpu/upload.sh +++ b/ci/cpu/upload.sh @@ -50,8 +50,8 @@ if [ "$UPLOAD_BLAZING" == "1" ]; then test -e ${BLAZINGSQL_FILE} - echo "Upload BlazingSQL to ${CONDA_USERNAME} channel: ${BLAZINGSQL_FILE}" - anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME} ${LABEL_OPTION} --force ${BLAZINGSQL_FILE} + # echo "Upload BlazingSQL to ${CONDA_USERNAME} channel: ${BLAZINGSQL_FILE}" + # anaconda -t ${MY_UPLOAD_KEY} upload -u ${CONDA_USERNAME} ${LABEL_OPTION} --force ${BLAZINGSQL_FILE} if [ ! -z "$RAPIDS_CONDA_USERNAME" ]; then echo "Upload BlazingSQL to ${RAPIDS_CONDA_USERNAME} channel: ${BLAZINGSQL_FILE}" diff --git a/ci/gpu/test.sh b/ci/gpu/test.sh index 4a93a3fb0..bc6576187 100755 --- a/ci/gpu/test.sh +++ b/ci/gpu/test.sh @@ -165,7 +165,7 @@ else PR_BR=${PR_AUTHOR}":"${SOURCE_BRANCH} else echo "The fork doesn't exist" - git clone --depth 1 https://github.com/BlazingDB/blazingsql-testing-files.git --branch ${TARGET_BRANCH} --single-branch + git clone --depth 1 https://github.com/rapidsai/blazingsql-testing-files.git --branch ${TARGET_BRANCH} --single-branch fi fi set -e diff --git a/conda/recipes/blazingsql/meta.yaml b/conda/recipes/blazingsql/meta.yaml index abbe71b18..ce3b0c40c 100644 --- a/conda/recipes/blazingsql/meta.yaml +++ b/conda/recipes/blazingsql/meta.yaml @@ -27,19 +27,19 @@ requirements: - ninja - gtest 1.10 - aws-sdk-cpp - - google-cloud-cpp >=1.25.0 + - google-cloud-cpp >=1.25.0,<1.30 - mysql-connector-cpp 8.0.23 - libpq 13 - nlohmann_json 3.9.1 - cppzmq - python - setuptools - - spdlog >=1.8.5,<2.0.0a0 + - spdlog >=1.8.5,<1.9 - cython >=0.29,<0.30 - openjdk >=8.0, <9.0 - maven - cudf {{ minor_version }}.* - - ucx-py 0.21.* + - ucx-py 0.22.* - ucx-proc=*=gpu - boost-cpp 1.72.0 - dlpack @@ -54,11 +54,11 @@ requirements: - netifaces - pyhive - sqlite 3 - - spdlog >=1.8.5,<2.0.0a0 + - spdlog >=1.8.5,<1.9 - {{ pin_compatible('zeromq', max_pin='x.x.x') }} - dask-cudf {{ minor_version }}.* - dask-cuda {{ minor_version }}.* - - ucx-py 0.21.* + - ucx-py 0.22.* - ucx-proc=*=gpu - {{ pin_compatible('cudatoolkit', max_pin='x.x') }} - tqdm diff --git a/dependencies.sh b/dependencies.sh index ed1a3bd15..16fac36e2 100755 --- a/dependencies.sh +++ b/dependencies.sh @@ -11,7 +11,7 @@ ITALICRED="\e[3;${RED}" ENDCOLOR="\e[0m" RAPIDS_VERSION="21.08" -UCX_PY_VERSION="0.21" +UCX_PY_VERSION="0.22" CUDA_VERSION="11.0" CHANNEL="" @@ -28,13 +28,13 @@ if [ ! -z $3 ]; then fi echo -e "${GREEN}Installing dependencies${ENDCOLOR}" -conda install --yes -c conda-forge spdlog'>=1.8.5,<2.0.0a0' google-cloud-cpp'>=1.25' ninja mysql-connector-cpp=8.0.23 libpq=13 nlohmann_json=3.9.1 +conda install --yes -c conda-forge spdlog'>=1.8.5,<1.9' google-cloud-cpp'>=1.25,<1.30' ninja mysql-connector-cpp=8.0.23 libpq=13 nlohmann_json=3.9.1 # NOTE cython must be the same of cudf (for 0.11 and 0.12 cython is >=0.29,<0.30) conda install --yes -c conda-forge cmake=3.18 gtest==1.10.0=h0efe328_4 gmock cppzmq cython=0.29 openjdk'>=8.0,<9.0' maven jpype1 netifaces pyhive pytest tqdm ipywidgets boost-cpp=1.72.0 echo -e "${GREEN}Install RAPIDS dependencies${ENDCOLOR}" -conda install --yes -c rapidsai$CHANNEL -c nvidia -c conda-forge -c defaults dask-cuda=$RAPIDS_VERSION dask-cudf=$RAPIDS_VERSION cudf=$RAPIDS_VERSION ucx-py=$UCX_PY_VERSION ucx-proc=*=gpu cudatoolkit=$CUDA_VERSION +conda install --yes -c rapidsai$CHANNEL -c nvidia -c conda-forge -c defaults dask-cuda=$RAPIDS_VERSION dask-cudf=$RAPIDS_VERSION cudf=$RAPIDS_VERSION "rapidsai$CHANNEL::librmm=$RAPIDS_VERSION" ucx-py=$UCX_PY_VERSION ucx-proc=*=gpu cudatoolkit=$CUDA_VERSION echo -e "${GREEN}Install E2E test dependencies${ENDCOLOR}" diff --git a/docsrc/source/Doxyfile b/docsrc/source/Doxyfile index 974aaa83c..2cfa4cc66 100644 --- a/docsrc/source/Doxyfile +++ b/docsrc/source/Doxyfile @@ -38,7 +38,7 @@ PROJECT_NAME = "BlazingSQL Engine" # could be handy for archiving the generated documentation or if some version # control system is used. -PROJECT_NUMBER = 21.06 +PROJECT_NUMBER = 21.08 # Using the PROJECT_BRIEF tag one can provide an optional one line description # for a project that appears at the top of each page and should give viewer a diff --git a/docsrc/source/conf.py b/docsrc/source/conf.py index 94cb8c2a7..c1f68a128 100755 --- a/docsrc/source/conf.py +++ b/docsrc/source/conf.py @@ -44,7 +44,7 @@ language = "en" # The full version, including alpha/beta/rc tags -version = '21.06' +version = '21.08' release = f'v{version}' # -- General configuration --------------------------------------------------- diff --git a/engine/bsql_engine/io/io.pyx b/engine/bsql_engine/io/io.pyx index c6fc02a8d..edc6d3630 100644 --- a/engine/bsql_engine/io/io.pyx +++ b/engine/bsql_engine/io/io.pyx @@ -29,7 +29,10 @@ import pyarrow as pa import cudf from cudf._lib cimport * -from cudf._lib.types import np_to_cudf_types, cudf_to_np_types +from cudf._lib.types import ( + SUPPORTED_NUMPY_TO_LIBCUDF_TYPES as np_to_cudf_types, + LIBCUDF_TO_SUPPORTED_NUMPY_TYPES as cudf_to_np_types +) from cudf._lib.cpp.types cimport type_id from cudf._lib.types cimport underlying_type_t_type_id from cudf._lib.cpp.io.types cimport compression_type @@ -39,7 +42,7 @@ from bsql_engine.io.cio cimport * from cpython.ref cimport PyObject from cython.operator cimport dereference, postincrement -from cudf._lib.table cimport Table as CudfXxTable +from cudf._lib.utils cimport data_from_unique_ptr from libcpp.utility cimport pair import logging @@ -310,8 +313,12 @@ cdef class PyBlazingCache: decoded_names = [] for i in range(deref(table).names().size()): decoded_names.append(deref(table).names()[i].decode('utf-8')) - df = cudf.DataFrame(CudfXxTable.from_unique_ptr(blaz_move(deref(table).releaseCudfTable()), decoded_names)._data) - df._rename_columns(decoded_names) + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + blaz_move(deref(table).releaseCudfTable()), + column_names=decoded_names + )) + return df, metadata_py cpdef initializeCaller(uint16_t ralId, string worker_id, string network_iface_name, int ralCommunicationPort, vector[NodeMetaDataUCP] workers_ucp_info, @@ -433,8 +440,11 @@ cpdef parseMetadataCaller(fileList, offset, schema, file_format_hint, args): for i in range(names.size()): # Increment the iterator to the net element decoded_names.append(names[i].decode('utf-8')) - df = cudf.DataFrame(CudfXxTable.from_unique_ptr(blaz_move(dereference(resultSet).cudfTable), decoded_names)._data) - df._rename_columns(decoded_names) + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + blaz_move(dereference(resultSet).cudfTable), + column_names=decoded_names + )) + return df cpdef inferFolderPartitionMetadataCaller(folder_path): @@ -611,12 +621,17 @@ cpdef getExecuteGraphResultCaller(PyBlazingGraph graph, int ctx_token, bool is_s decoded_names.append(names[i].decode('utf-8')) if is_single_node: # the engine returns a concatenated dataframe - df = cudf.DataFrame(CudfXxTable.from_unique_ptr(blaz_move(dereference(resultSet).cudfTables[0]), decoded_names)._data) - return df + return cudf.DataFrame._from_data(*data_from_unique_ptr( + blaz_move(dereference(resultSet).cudfTables[0]), + column_names=decoded_names + )) else: # the engine returns a vector of dataframes dfs = [] for i in range(dereference(resultSet).cudfTables.size()): - dfs.append(cudf.DataFrame(CudfXxTable.from_unique_ptr(blaz_move(dereference(resultSet).cudfTables[i]), decoded_names)._data)) + dfs.append(cudf.DataFrame._from_data(*data_from_unique_ptr( + blaz_move(dereference(resultSet).cudfTables[i]), + column_names=decoded_names + ))) return dfs cpdef runSkipDataCaller(table, queryPy): @@ -646,17 +661,19 @@ cpdef runSkipDataCaller(table, queryPy): return_object = {} return_object['skipdata_analysis_fail'] = dereference(resultSet).skipdata_analysis_fail if return_object['skipdata_analysis_fail']: - return_object['metadata'] = cudf.DataFrame() - return return_object + return_object['metadata'] = cudf.DataFrame() else: - names = dereference(resultSet).names - decoded_names = [] - for i in range(names.size()): - decoded_names.append(names[i].decode('utf-8')) - - df = cudf.DataFrame(CudfXxTable.from_unique_ptr(blaz_move(dereference(resultSet).cudfTable), decoded_names)._data) - return_object['metadata'] = df - return return_object + names = dereference(resultSet).names + decoded_names = [] + for i in range(names.size()): + decoded_names.append(names[i].decode('utf-8')) + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + blaz_move(dereference(resultSet).cudfTable), + column_names=decoded_names + )) + return_object['metadata'] = df + return return_object cpdef getTableScanInfoCaller(logicalPlan): temp = getTableScanInfoPython(str.encode(logicalPlan)) diff --git a/engine/setup.py b/engine/setup.py index 176afbb3b..469dd021c 100644 --- a/engine/setup.py +++ b/engine/setup.py @@ -82,6 +82,7 @@ def build_extensions(self): "-Wno-unknown-pragmas", "-Wno-unused-variable", "-Wno-unused-function", + "-DTHRUST_IGNORE_CUB_VERSION_CHECK", '-isystem' + conda_env_inc, '-isystem' + conda_env_inc_cudf, '-isystem' + conda_env_inc_cub, diff --git a/engine/src/bmr/BufferProvider.cpp b/engine/src/bmr/BufferProvider.cpp index 161e8a61f..0e121b6ab 100644 --- a/engine/src/bmr/BufferProvider.cpp +++ b/engine/src/bmr/BufferProvider.cpp @@ -26,15 +26,15 @@ void pinned_allocator::setUcpContext(ucp_context_h _context) use_ucx = true; } -void base_allocator::allocate(void ** ptr, std::size_t size){ - do_allocate(ptr,size); +void base_allocator::allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){ + do_allocate(ptr,size,mem_handle_ptr); } -void base_allocator::deallocate(void * ptr){ - do_deallocate(ptr); +void base_allocator::deallocate(void * ptr, ucp_mem_h mem_handle){ + do_deallocate(ptr,mem_handle); } -void host_allocator::do_allocate(void ** ptr, std::size_t size){ +void host_allocator::do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){ *ptr = aligned_alloc( BLAZING_ALIGNMENT, size ); if (!ptr) { @@ -42,7 +42,7 @@ void host_allocator::do_allocate(void ** ptr, std::size_t size){ } } -void pinned_allocator::do_allocate(void ** ptr, std::size_t size){ +void pinned_allocator::do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr){ // do we really want to do a host allocation instead of a device one? (have to try zero-copy later) cudaError_t err = cudaMallocHost(ptr, size); @@ -60,7 +60,7 @@ void pinned_allocator::do_allocate(void ** ptr, std::size_t size){ mem_map_params.length = size; mem_map_params.flags = 0; // try UCP_MEM_MAP_NONBLOCK - ucs_status_t status = ucp_mem_map(context, &mem_map_params, &mem_handle); + ucs_status_t status = ucp_mem_map(context, &mem_map_params, mem_handle_ptr); if (status != UCS_OK) { throw std::runtime_error("Error on ucp_mem_map"); @@ -68,11 +68,11 @@ void pinned_allocator::do_allocate(void ** ptr, std::size_t size){ } } -void host_allocator::do_deallocate(void * ptr){ +void host_allocator::do_deallocate(void * ptr, ucp_mem_h mem_handle){ free(ptr); } -void pinned_allocator::do_deallocate(void * ptr){ +void pinned_allocator::do_deallocate(void * ptr, ucp_mem_h mem_handle){ if (use_ucx) { ucs_status_t status = ucp_mem_unmap(context, mem_handle); @@ -91,7 +91,6 @@ void pinned_allocator::do_deallocate(void * ptr){ allocation_pool::allocation_pool(std::unique_ptr allocator, std::size_t size_buffers, std::size_t num_buffers) : num_buffers (num_buffers), buffer_size(size_buffers), allocator(std::move(allocator)) { this->buffer_counter = 0; // this will get incremented by grow() - this->allocation_counter = 0; this->grow(); } @@ -120,7 +119,6 @@ std::unique_ptr allocation_pool::get_chunk() { } for(auto & allocation : allocations){ if(!allocation->allocation_chunks.empty()){ - this->allocation_counter++; auto temp = std::move(allocation->allocation_chunks.top()); allocation->allocation_chunks.pop(); @@ -141,7 +139,7 @@ void allocation_pool::grow() { allocations.back()->index = this->allocations.size() - 1; auto last_index = allocations.size() -1; try{ - allocator->allocate((void **) &allocations[last_index]->data,num_new_buffers * buffer_size); + allocator->allocate((void **) &allocations[last_index]->data,num_new_buffers * buffer_size, &allocations[last_index]->mem_handle); this->allocations[last_index]->total_number_of_chunks = num_new_buffers; for (int buffer_index = 0; buffer_index < num_new_buffers; buffer_index++) { auto buffer = std::make_unique(); @@ -162,7 +160,7 @@ void allocation_pool::free_chunk(std::unique_ptr buffe std::unique_lock lock(in_use_mutex); const std::size_t idx = buffer->allocation->index; - if (idx+1 > this->allocations.size()) { + if (idx >= this->allocations.size()) { std::shared_ptr logger = spdlog::get("batch_logger"); if(logger){ logger->error("|||{0}|||||","free_chunk cannot delete an invalid allocation."); @@ -170,26 +168,24 @@ void allocation_pool::free_chunk(std::unique_ptr buffe assert(("free_chunk cannot delete an invalid allocation.", idx < this->allocations.size())); } - buffer->allocation->allocation_chunks.push(std::move(buffer)); + this->allocations.at(idx)->allocation_chunks.push(std::move(buffer)); if (idx > 0) { if (this->allocations.at(idx)->total_number_of_chunks == this->allocations.at(idx)->allocation_chunks.size()) { - auto it = this->allocations.begin(); - std::advance(it, idx); - if ((*it)->data != nullptr) { - this->allocator->deallocate((*it)->data); - this->allocations.erase(it); - + if (this->allocations.at(idx)->data != nullptr) { + this->buffer_counter -= this->allocations.at(idx)->total_number_of_chunks; + this->allocator->deallocate(this->allocations.at(idx)->data, this->allocations.at(idx)->mem_handle); + // for all allocations after the pos at idx // we need to update the allocation.index after we deleted one for (std::size_t i = idx; i < this->allocations.size(); ++i) { this->allocations[i]->index = this->allocations[i]->index - 1; } + + this->allocations.erase(this->allocations.begin() + idx); } } - } - - this->allocation_counter--; + } } @@ -202,10 +198,9 @@ void allocation_pool::free_all() { auto buffer = std::move(allocation->allocation_chunks.top()); allocation->allocation_chunks.pop(); } - allocator->deallocate(allocation->data); + allocator->deallocate(allocation->data, allocation->mem_handle); } - allocations.resize(0); - this->allocation_counter = 0; + allocations.resize(0); } } @@ -240,10 +235,6 @@ void empty_pools(){ buffer_providers::get_host_buffer_provider()->free_all(); buffer_providers::get_pinned_buffer_provider()->free_all(); } -std::size_t allocation_pool::get_allocated_buffers(){ - return allocation_counter; -} - std::size_t allocation_pool::get_total_buffers(){ return buffer_counter; diff --git a/engine/src/bmr/BufferProvider.h b/engine/src/bmr/BufferProvider.h index 74735afed..3e47f79fe 100644 --- a/engine/src/bmr/BufferProvider.h +++ b/engine/src/bmr/BufferProvider.h @@ -34,6 +34,7 @@ struct blazing_allocation{ char *data; // the pointer to the allocated memory std::stack< std::unique_ptr > allocation_chunks; // These are the available chunks that are part of the allocation. allocation_pool * pool; // this is the pool that was used to make this allocation, and therefore this is what we would use to free it + ucp_mem_h mem_handle; // this is a memhandle used by UCX }; struct blazing_allocation_chunk{ @@ -56,25 +57,20 @@ struct blazing_chunked_column_info { class base_allocator{ public: base_allocator() {} - void allocate(void ** ptr, std::size_t size); - void deallocate(void * ptr); - - virtual ucp_mem_h getUcpMemoryHandle() const - { - throw std::runtime_error("getUcpMemoryHandle not implemented in base class"); - } + void allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr); + void deallocate(void * ptr, ucp_mem_h mem_handle); protected: - virtual void do_allocate(void ** ptr, std::size_t size) = 0; - virtual void do_deallocate(void * ptr) = 0; + virtual void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr) = 0; + virtual void do_deallocate(void * ptr, ucp_mem_h mem_handle) = 0; }; class host_allocator : public base_allocator { public: host_allocator(bool use_ucx) {} protected: - void do_allocate(void ** ptr, std::size_t size); - void do_deallocate(void * ptr); + void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr); + void do_deallocate(void * ptr, ucp_mem_h mem_handle); }; class pinned_allocator : public base_allocator { @@ -83,17 +79,11 @@ class pinned_allocator : public base_allocator { void setUcpContext(ucp_context_h context); - virtual ucp_mem_h getUcpMemoryHandle() const - { - return mem_handle; - } - protected: - void do_allocate(void ** ptr, std::size_t size); - void do_deallocate(void * ptr); + void do_allocate(void ** ptr, std::size_t size, ucp_mem_h * mem_handle_ptr); + void do_deallocate(void * ptr, ucp_mem_h mem_handle); bool use_ucx; ucp_context_h context; - ucp_mem_h mem_handle; }; class allocation_pool { @@ -104,11 +94,6 @@ class allocation_pool { std::unique_ptr get_chunk(); - ucp_mem_h getUcpMemoryHandle() const - { - return allocator->getUcpMemoryHandle(); - } - void free_chunk(std::unique_ptr allocation); std::size_t size_buffers(); @@ -131,8 +116,6 @@ class allocation_pool { int buffer_counter; - int allocation_counter; - std::vector > allocations; std::unique_ptr allocator; diff --git a/engine/src/cache_machine/CacheDataLocalFile.cpp b/engine/src/cache_machine/CacheDataLocalFile.cpp index b8d412454..7316faa20 100644 --- a/engine/src/cache_machine/CacheDataLocalFile.cpp +++ b/engine/src/cache_machine/CacheDataLocalFile.cpp @@ -38,9 +38,9 @@ CacheDataLocalFile::CacheDataLocalFile(std::unique_ptr int attempts_limit = 10; while(attempts <= attempts_limit){ try { - cudf::io::table_metadata metadata; + cudf::io::table_input_metadata metadata; for(auto name : table->names()) { - metadata.column_names.emplace_back(name); + metadata.column_metadata.emplace_back(name); } cudf::io::orc_writer_options out_opts = cudf::io::orc_writer_options::builder(cudf::io::sink_info{this->filePath_}, table->view()) @@ -53,7 +53,7 @@ CacheDataLocalFile::CacheDataLocalFile(std::unique_ptr logger->error("|||{info}||||rows|{rows}", "info"_a="Failed to create CacheDataLocalFile in path: " + this->filePath_ + " attempt " + std::to_string(attempts), "rows"_a=table->num_rows()); - } + } attempts++; if (attempts == attempts_limit){ throw; @@ -88,4 +88,4 @@ std::unique_ptr CacheDataLocalFile::decache() { } } // namespace cache -} // namespace ral \ No newline at end of file +} // namespace ral diff --git a/engine/src/communication/CommunicationInterface/serializer.cpp b/engine/src/communication/CommunicationInterface/serializer.cpp index ce8982609..ebfa1eada 100644 --- a/engine/src/communication/CommunicationInterface/serializer.cpp +++ b/engine/src/communication/CommunicationInterface/serializer.cpp @@ -39,7 +39,7 @@ std::unique_ptr deserialize_from_gpu_raw_buffers( cudf::size_type null_count = columns_offsets[i].metadata.null_count; auto unique_column = cudf::make_strings_column( - num_strings, std::move(offsets_column), std::move(chars_column), null_count, std::move(null_mask),stream); + num_strings, std::move(offsets_column), std::move(chars_column), null_count, std::move(null_mask)); received_samples[i] = std::move(unique_column); } else { diff --git a/engine/src/execution_kernels/BatchWindowFunctionProcessing.cpp b/engine/src/execution_kernels/BatchWindowFunctionProcessing.cpp index d40e1872e..d4bc62bb7 100644 --- a/engine/src/execution_kernels/BatchWindowFunctionProcessing.cpp +++ b/engine/src/execution_kernels/BatchWindowFunctionProcessing.cpp @@ -38,7 +38,7 @@ ComputeWindowKernel::ComputeWindowKernel(std::size_t kernel_id, const std::strin std::tie(this->preceding_value, this->following_value) = get_bounds_from_window_expression(this->expression); this->frame_type = get_frame_type_from_over_clause(this->expression); - std::tie(this->column_indices_to_agg, this->type_aggs_as_str, this->agg_param_values) = + std::tie(this->column_indices_to_agg, this->type_aggs_as_str, this->agg_param_values) = get_cols_to_apply_window_and_cols_to_apply_agg(this->expression); std::tie(this->column_indices_partitioned, std::ignore, std::ignore) = ral::operators::get_vars_to_partition(this->expression); std::tie(this->column_indices_ordered, std::ignore, std::ignore) = ral::operators::get_vars_to_orders(this->expression); @@ -63,8 +63,13 @@ std::unique_ptr ComputeWindowKernel::compute_column_from_window_func cudf::column_view col_view_to_agg, std::size_t pos ) { - // we want firs get the type of aggregation - std::unique_ptr window_aggregation = ral::operators::makeCudfAggregation(this->aggs_wind_func[pos], this->agg_param_values[pos]); + // factories for creating either a groupby or rolling aggregation + auto make_groupby_agg = [&]() { + return ral::operators::makeCudfGroupbyAggregation(this->aggs_wind_func[pos], this->agg_param_values[pos]); + }; + auto make_rolling_agg = [&]() { + return ral::operators::makeCudfRollingAggregation(this->aggs_wind_func[pos], this->agg_param_values[pos]); + }; // want all columns to be partitioned std::vector columns_to_partition; @@ -91,7 +96,7 @@ std::unique_ptr ComputeWindowKernel::compute_column_from_window_func std::vector requests; requests.emplace_back(cudf::groupby::aggregation_request()); requests[0].values = col_view_to_agg; - requests[0].aggregations.push_back(std::move(window_aggregation)); + requests[0].aggregations.push_back(make_groupby_agg()); cudf::groupby::groupby gb_obj(cudf::table_view({partitioned_table_view}), cudf::null_policy::INCLUDE, cudf::sorted::YES, {}, {}); std::pair, std::vector> result = gb_obj.aggregate(requests); @@ -135,24 +140,24 @@ std::unique_ptr ComputeWindowKernel::compute_column_from_window_func else if (window_expression_contains_order_by(this->expression)) { if (window_expression_contains_bounds(this->expression)) { // TODO: for now just ROWS bounds works (not RANGE) - windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, - this->preceding_value >= 0 ? this->preceding_value + 1: partitioned_table_view.num_rows(), - this->following_value >= 0 ? this->following_value : partitioned_table_view.num_rows(), - 1, *window_aggregation); + windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, + this->preceding_value >= 0 ? this->preceding_value + 1: partitioned_table_view.num_rows(), + this->following_value >= 0 ? this->following_value : partitioned_table_view.num_rows(), + 1, *make_rolling_agg()); } else { if (this->type_aggs_as_str[pos] == "LEAD") { - windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, 0, col_view_to_agg.size(), 1, *window_aggregation); + windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, 0, col_view_to_agg.size(), 1, *make_rolling_agg()); } else { - windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, col_view_to_agg.size(), 0, 1, *window_aggregation); + windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, col_view_to_agg.size(), 0, 1, *make_rolling_agg()); } } } else { - windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, col_view_to_agg.size(), col_view_to_agg.size(), 1, *window_aggregation); + windowed_col = cudf::grouped_rolling_window(partitioned_table_view, col_view_to_agg, col_view_to_agg.size(), col_view_to_agg.size(), 1, *make_rolling_agg()); } } else { if (window_expression_contains_bounds(this->expression)) { // TODO: for now just ROWS bounds works (not RANGE) - windowed_col = cudf::rolling_window(col_view_to_agg, this->preceding_value + 1, this->following_value, 1, *window_aggregation); + windowed_col = cudf::rolling_window(col_view_to_agg, this->preceding_value + 1, this->following_value, 1, *make_rolling_agg()); } else { throw std::runtime_error("Window functions without partitions and without bounded windows are currently not supported"); } @@ -165,7 +170,7 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un std::shared_ptr output, cudaStream_t /*stream*/, const std::map& args) { - + if (inputs.size() == 0) { return {ral::execution::task_status::SUCCESS, std::string(), std::vector< std::unique_ptr > ()}; } @@ -176,7 +181,7 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un cudf::table_view input_table_cudf_view = input->view(); std::vector input_names = input->names(); - + std::vector< std::unique_ptr > new_wf_cols; for (std::size_t col_i = 0; col_i < this->type_aggs_as_str.size(); ++col_i) { cudf::column_view col_view_to_agg = input_table_cudf_view.column(column_indices_to_agg[col_i]); @@ -228,8 +233,8 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un } cudf_table_window = std::move(temp_table_window); } - } - + } + std::unique_ptr windowed_table = std::make_unique(std::move(cudf_table_window), output_names); if (windowed_table) { @@ -277,7 +282,7 @@ kstatus ComputeWindowKernel::run() { ral::execution::executor::get_instance()->add_task( std::move(inputs), this->output_cache(), - this, + this, task_args); is_first_batch = false; @@ -296,7 +301,7 @@ kstatus ComputeWindowKernel::run() { } } - + std::unique_lock lock(kernel_mutex); kernel_cv.wait(lock,[this]{ @@ -330,7 +335,7 @@ OverlapGeneratorKernel::OverlapGeneratorKernel(std::size_t kernel_id, const std: auto& self_node = ral::communication::CommunicationData::getInstance().getSelfNode(); self_node_index = context->getNodeIndex(self_node); - total_nodes = context->getTotalNodes(); + total_nodes = context->getTotalNodes(); } @@ -343,7 +348,7 @@ ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std: std::unique_ptr & input = inputs[0]; std::string overlap_type = args.at(TASK_ARG_OVERLAP_TYPE); - + // first lets do the preceding overlap if (overlap_type == PRECEDING_OVERLAP_TYPE || overlap_type == BOTH_OVERLAP_TYPE){ if (input->num_rows() > this->preceding_value){ @@ -355,7 +360,7 @@ ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std: auto clone = input->toBlazingTableView().clone(); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, INCOMPLETE_OVERLAP_STATUS); - this->output_preceding_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); + this->output_preceding_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); } } @@ -370,11 +375,11 @@ ral::execution::task_result OverlapGeneratorKernel::do_process(std::vector< std: auto clone = input->toBlazingTableView().clone(); ral::cache::MetadataDictionary extra_metadata; extra_metadata.add_value(ral::cache::OVERLAP_STATUS, INCOMPLETE_OVERLAP_STATUS); - this->output_following_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); + this->output_following_overlap_cache->addToCache(std::move(clone), "", true, extra_metadata); } } this->output_batches_cache->addToCache(std::move(input)); - + }catch(rmm::bad_alloc e){ return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)}; }catch(std::exception e){ @@ -411,7 +416,7 @@ kstatus OverlapGeneratorKernel::run() { } } else { if (batch_index == 0){ // that was the first batch, no need to do the following overlap - task_args[TASK_ARG_OVERLAP_TYPE] = PRECEDING_OVERLAP_TYPE; + task_args[TASK_ARG_OVERLAP_TYPE] = PRECEDING_OVERLAP_TYPE; } else { task_args[TASK_ARG_OVERLAP_TYPE] = BOTH_OVERLAP_TYPE; } @@ -426,11 +431,11 @@ kstatus OverlapGeneratorKernel::run() { this, task_args); } - - + + batch_index++; } - + // lets wait to make sure that all tasks are done std::unique_lock lock(kernel_mutex); kernel_cv.wait(lock,[this]{ @@ -465,7 +470,7 @@ OverlapAccumulatorKernel::OverlapAccumulatorKernel(std::size_t kernel_id, const this->input_.add_port("batches", "preceding_overlaps", "following_overlaps"); this->num_batches = 0; - + std::tie(this->preceding_value, this->following_value) = get_bounds_from_window_expression(this->expression); ral::cache::cache_settings cache_machine_config; @@ -482,7 +487,7 @@ OverlapAccumulatorKernel::OverlapAccumulatorKernel(std::size_t kernel_id, const auto& self_node = ral::communication::CommunicationData::getInstance().getSelfNode(); self_node_index = context->getNodeIndex(self_node); - + } void OverlapAccumulatorKernel::set_overlap_status(bool preceding, int index, std::string status){ @@ -504,30 +509,30 @@ std::string OverlapAccumulatorKernel::get_overlap_status(bool preceding, int ind } void OverlapAccumulatorKernel::combine_overlaps(bool preceding, int target_batch_index, std::unique_ptr new_overlap, std::string overlap_status) { - + // WSM TODO should make a function that can create a cache data and automatically cache it if the resouce consumption demands it std::unique_ptr new_overlap_cache_data = std::make_unique(std::move(new_overlap)); return combine_overlaps(preceding, target_batch_index, std::move(new_overlap_cache_data), overlap_status); } void OverlapAccumulatorKernel::combine_overlaps(bool preceding, int target_batch_index, std::unique_ptr new_overlap_cache_data, std::string overlap_status) { - + std::vector> overlap_parts; std::unique_ptr existing_overlap = nullptr; if (preceding){ if (preceding_overlap_cache->has_data_in_index_now(target_batch_index)){ existing_overlap = preceding_overlap_cache->get_or_wait_CacheData(target_batch_index); } - } else { + } else { if (following_overlap_cache->has_data_in_index_now(target_batch_index)){ existing_overlap = following_overlap_cache->get_or_wait_CacheData(target_batch_index); } } - + if (existing_overlap) { if (existing_overlap->get_type() == ral::cache::CacheDataType::CONCATENATING){ ral::cache::ConcatCacheData * concat_cache_ptr = static_cast (existing_overlap.get()); - overlap_parts = concat_cache_ptr->releaseCacheDatas(); + overlap_parts = concat_cache_ptr->releaseCacheDatas(); } else { overlap_parts.push_back(std::move(existing_overlap)); } @@ -543,14 +548,14 @@ void OverlapAccumulatorKernel::combine_overlaps(bool preceding, int target_batch } else { overlap_parts.push_back(std::move(new_overlap_cache_data)); } - + std::unique_ptr new_cache_data = std::make_unique(std::move(overlap_parts), this->col_names, this->schema); if (preceding){ - preceding_overlap_cache->put(target_batch_index, std::move(new_cache_data)); - } else { - following_overlap_cache->put(target_batch_index, std::move(new_cache_data)); + preceding_overlap_cache->put(target_batch_index, std::move(new_cache_data)); + } else { + following_overlap_cache->put(target_batch_index, std::move(new_cache_data)); } - set_overlap_status(preceding, target_batch_index, overlap_status); + set_overlap_status(preceding, target_batch_index, overlap_status); } @@ -573,7 +578,7 @@ ral::execution::task_result OverlapAccumulatorKernel::do_process(std::vector< st size_t rows_remaining = overlap_size; if (preceding) { - + for (int i = inputs.size() -1; i >= 0; i--){ size_t cur_table_size = inputs[i]->num_rows(); if (cur_table_size > rows_remaining){ @@ -606,11 +611,11 @@ ral::execution::task_result OverlapAccumulatorKernel::do_process(std::vector< st } } } - - + + std::unique_ptr output_table; if (tables_to_concat.size() == 1 && scope_holder.size() == 1) { - output_table = std::move(scope_holder[0]); + output_table = std::move(scope_holder[0]); } else { output_table = ral::utilities::concatTables(tables_to_concat); } @@ -665,7 +670,7 @@ void OverlapAccumulatorKernel::response_receiver(){ int messages_expected; int total_nodes = context->getTotalNodes(); if (self_node_index == 0){ - messages_expected = 1; + messages_expected = 1; std::string sender_node_id = context->getNode(self_node_index + 1).id(); expected_message_ids.push_back(FOLLOWING_RESPONSE + std::to_string(this->context->getContextToken()) + "_" + std::to_string(this->get_id()) + "_" + sender_node_id); } else if (self_node_index == total_nodes - 1) { @@ -690,7 +695,7 @@ void OverlapAccumulatorKernel::following_request_receiver(){ std::string sender_node_id = context->getNode(self_node_index - 1).id(); expected_message_ids.push_back(FOLLOWING_REQUEST + std::to_string(this->context->getContextToken()) + "_" + std::to_string(this->get_id()) + "_" + sender_node_id); message_receiver(expected_message_ids, messages_expected); - } + } } void OverlapAccumulatorKernel::preceding_request_receiver(){ @@ -701,7 +706,7 @@ void OverlapAccumulatorKernel::preceding_request_receiver(){ std::string sender_node_id = context->getNode(self_node_index + 1).id(); expected_message_ids.push_back(PRECEDING_REQUEST + std::to_string(this->context->getContextToken()) + "_" + std::to_string(this->get_id()) + "_" + sender_node_id); message_receiver(expected_message_ids, messages_expected); - } + } } @@ -720,9 +725,9 @@ void OverlapAccumulatorKernel::message_receiver(std::vector expecte int target_batch_index = std::stoi(metadata.get_value(ral::cache::OVERLAP_TARGET_BATCH_INDEX)); int source_batch_index = metadata.get_value(ral::cache::OVERLAP_MESSAGE_TYPE) == PRECEDING_REQUEST ? num_batches - 1 : 0; - prepare_overlap_task(metadata.get_value(ral::cache::OVERLAP_MESSAGE_TYPE) == PRECEDING_REQUEST, + prepare_overlap_task(metadata.get_value(ral::cache::OVERLAP_MESSAGE_TYPE) == PRECEDING_REQUEST, source_batch_index, target_node_index, target_batch_index, overlap_size); - + } else if (metadata.get_value(ral::cache::OVERLAP_MESSAGE_TYPE) == PRECEDING_RESPONSE || metadata.get_value(ral::cache::OVERLAP_MESSAGE_TYPE) == FOLLOWING_RESPONSE){ @@ -734,7 +739,7 @@ void OverlapAccumulatorKernel::message_receiver(std::vector expecte RAL_EXPECTS(target_node_index == self_node_index, "RESPONSE message arrived at the wrong destination"); combine_overlaps(preceding, target_batch_index, std::move(message_cache_data), overlap_status); - + } else { if(logger) { logger->error("{query_id}|||{info}||kernel_id|{kernel_id}||", @@ -743,7 +748,7 @@ void OverlapAccumulatorKernel::message_receiver(std::vector expecte "kernel_id"_a=this->get_id()); } } - } + } } void OverlapAccumulatorKernel::prepare_overlap_task(bool preceding, int source_batch_index, int target_node_index, int target_batch_index, size_t overlap_size){ @@ -754,7 +759,7 @@ void OverlapAccumulatorKernel::prepare_overlap_task(bool preceding, int source_b while(overlap_rows_needed > 0){ // Lets first try to fulfill the overlap needed from this node if (source_batch_index >= 0 && source_batch_index < this->num_batches){ // num_batches should be finalized for when its used here - + std::unique_ptr batch = batches_cache->get_or_wait_CacheData(source_batch_index); overlap_rows_needed = batch->num_rows() > overlap_rows_needed ? 0 : overlap_rows_needed - batch->num_rows(); if (preceding){ @@ -773,14 +778,14 @@ void OverlapAccumulatorKernel::prepare_overlap_task(bool preceding, int source_b std::unique_ptr batch = preceding_overlap_cache->get_or_wait_CacheData(0); overlap_rows_needed = 0; cache_datas_for_task.push_front(std::move(batch)); - starting_index_of_datas_for_task = -1; + starting_index_of_datas_for_task = -1; } else { // the last index of the following node will come from the neighbor. Its assumed that its complete. // and if its not complete its because there is not enough data to fill the window std::unique_ptr batch = following_overlap_cache->get_or_wait_CacheData(this->num_batches - 1); overlap_rows_needed = 0; - cache_datas_for_task.push_back(std::move(batch)); - } + cache_datas_for_task.push_back(std::move(batch)); + } } } std::vector> cache_datas_for_task_vect(std::make_move_iterator(cache_datas_for_task.begin()), std::make_move_iterator(cache_datas_for_task.end())); @@ -799,7 +804,7 @@ void OverlapAccumulatorKernel::prepare_overlap_task(bool preceding, int source_b preceding ? preceding_overlap_cache : following_overlap_cache, this, task_args); - } + } } void OverlapAccumulatorKernel::send_request(bool preceding, int source_node_index, int target_node_index, int target_batch_index, size_t overlap_size){ @@ -833,7 +838,7 @@ kstatus OverlapAccumulatorKernel::run() { input_batches_cache = this->input_.get_cache("batches"); input_preceding_overlap_cache = this->input_.get_cache("preceding_overlaps"); input_following_overlap_cache = this->input_.get_cache("following_overlaps"); - + int cur_batch_ind = 0; bool have_all_batches = false; while (!have_all_batches){ @@ -847,14 +852,14 @@ kstatus OverlapAccumulatorKernel::run() { } batches_cache->put(cur_batch_ind, std::move(batch)); num_batches = cur_batch_ind + 1; - cur_batch_ind++; + cur_batch_ind++; } else { have_all_batches = true; } } preceding_overlap_statuses.resize(num_batches, UNKNOWN_OVERLAP_STATUS); following_overlap_status.resize(num_batches, UNKNOWN_OVERLAP_STATUS); - + // lets send the requests for the first preceding overlap and last following overlap of this node if (total_nodes > 1 && self_node_index > 0){ send_request(true, self_node_index - 1, self_node_index, 0, this->preceding_value); @@ -871,7 +876,7 @@ kstatus OverlapAccumulatorKernel::run() { if (self_node_index == total_nodes - 1){ // last overlap of last node, so make it empty std::unique_ptr empty_table = ral::utilities::create_empty_table(this->col_names, this->schema); following_overlap_cache->put(num_batches - 1, std::move(empty_table)); - } + } BlazingThread response_receiver_thread, following_request_receiver_thread; if (total_nodes > 1) { @@ -880,7 +885,7 @@ kstatus OverlapAccumulatorKernel::run() { response_receiver_thread = BlazingThread(&OverlapAccumulatorKernel::response_receiver, this); following_request_receiver_thread = BlazingThread(&OverlapAccumulatorKernel::following_request_receiver, this); } - for (int cur_batch_ind = 0; cur_batch_ind < num_batches; cur_batch_ind++){ + for (int cur_batch_ind = 0; cur_batch_ind < num_batches; cur_batch_ind++){ if (cur_batch_ind > 0){ auto overlap_cache_data = input_preceding_overlap_cache->pullCacheData(); if (overlap_cache_data != nullptr){ @@ -889,11 +894,11 @@ kstatus OverlapAccumulatorKernel::run() { RAL_EXPECTS(metadata.has_value(ral::cache::OVERLAP_STATUS), "Overlap Data did not have OVERLAP_STATUS"); set_overlap_status(true, cur_batch_ind, metadata.get_value(ral::cache::OVERLAP_STATUS)); preceding_overlap_cache->put(cur_batch_ind, std::move(overlap_cache_data)); - + if (metadata.get_value(ral::cache::OVERLAP_STATUS) == INCOMPLETE_OVERLAP_STATUS){ size_t overlap_needed = this->preceding_value - cur_overlap_rows > 0 ? this->preceding_value - cur_overlap_rows : 0; // we want the source index to be cur_batch_ind - 2 because cur_batch_ind - 1 is where the original overlap came from, which is incomplete - prepare_overlap_task(true, cur_batch_ind - 2, this->self_node_index, cur_batch_ind, overlap_needed); + prepare_overlap_task(true, cur_batch_ind - 2, this->self_node_index, cur_batch_ind, overlap_needed); } } else { if(logger) { @@ -913,11 +918,11 @@ kstatus OverlapAccumulatorKernel::run() { RAL_EXPECTS(metadata.has_value(ral::cache::OVERLAP_STATUS), "Overlap Data did not have OVERLAP_STATUS"); set_overlap_status(false, cur_batch_ind, metadata.get_value(ral::cache::OVERLAP_STATUS)); following_overlap_cache->put(cur_batch_ind, std::move(overlap_cache_data)); - + if (metadata.get_value(ral::cache::OVERLAP_STATUS) == INCOMPLETE_OVERLAP_STATUS){ size_t overlap_needed = this->following_value - cur_overlap_rows > 0 ? this->following_value - cur_overlap_rows : 0; // we want the source index to be cur_batch_ind + 2 because cur_batch_ind + 1 is where the original overlap came from, which is incomplete - prepare_overlap_task(false, cur_batch_ind + 2, this->self_node_index, cur_batch_ind, overlap_needed); + prepare_overlap_task(false, cur_batch_ind + 2, this->self_node_index, cur_batch_ind, overlap_needed); } } else { if(logger) { @@ -932,8 +937,8 @@ kstatus OverlapAccumulatorKernel::run() { // the preceding request will be responded to by the last batch, so we want to do all the batches before we try to respond to it preceding_request_receiver(); - - // lets wait until the receiver threads are done. + + // lets wait until the receiver threads are done. // When its done, it means we have received overlap requests and have made tasks for them, and // it also means we have received the reponses to the overlap requests we sent out if (total_nodes > 1) { @@ -977,13 +982,13 @@ kstatus OverlapAccumulatorKernel::run() { this->following_overlap_cache->clear(); return kstatus::proceed; - - + + } /* Ideas for when we want to implement RANGE window frame instead of ROWS window frame: The previous kernel if there is RANGE needs to add metadata to every batch and overlap about the value of the first and last element -Then when preparing the overlapping tasks we can see how many batches we need to fulfill the window, just by looking at the metadata about the +Then when preparing the overlapping tasks we can see how many batches we need to fulfill the window, just by looking at the metadata about the first and last elements. */ @@ -991,7 +996,7 @@ first and last elements. This logic that has been implemented has the downside of waiting until all batches are available so that we know the number of batches. We also cant push results to the next phase until we know we have responded to the requests from the neighboring nodes. -This was done to dramatically simplify the logic. Additionally its not as bad of a performance penalty because the previous kernel which does an +This was done to dramatically simplify the logic. Additionally its not as bad of a performance penalty because the previous kernel which does an order by, also needs to wait until all batches are available before it can do its merge. In the future, when we can have CacheData's shared between nodes, then we can revisit this logic to make it more efficient. */ diff --git a/engine/src/io/data_parser/ArgsUtil.cpp b/engine/src/io/data_parser/ArgsUtil.cpp index eb250b085..c2c13d065 100644 --- a/engine/src/io/data_parser/ArgsUtil.cpp +++ b/engine/src/io/data_parser/ArgsUtil.cpp @@ -8,27 +8,17 @@ namespace ral { namespace io { -DataType inferDataType(std::string file_format_hint) { - if(file_format_hint == "parquet") - return DataType::PARQUET; - if(file_format_hint == "json") - return DataType::JSON; - if(file_format_hint == "orc") - return DataType::ORC; - if(file_format_hint == "csv") - return DataType::CSV; - if(file_format_hint == "psv") - return DataType::CSV; - if(file_format_hint == "tbl") - return DataType::CSV; - if(file_format_hint == "txt") - return DataType::CSV; - if(file_format_hint == "mysql") - return DataType::MYSQL; - if(file_format_hint == "postgresql") - return DataType::POSTGRESQL; - if(file_format_hint == "sqlite") - return DataType::SQLITE; +DataType inferDataType(std::string const& file_format_hint) { + if(file_format_hint == "parquet") { return DataType::PARQUET; } + if(file_format_hint == "json") { return DataType::JSON; } + if(file_format_hint == "orc") { return DataType::ORC; } + if(file_format_hint == "csv") { return DataType::CSV; } + if(file_format_hint == "psv") { return DataType::CSV; } + if(file_format_hint == "tbl") { return DataType::CSV; } + if(file_format_hint == "txt") { return DataType::CSV; } + if(file_format_hint == "mysql") { return DataType::MYSQL; } + if(file_format_hint == "postgresql") { return DataType::POSTGRESQL; } + if(file_format_hint == "sqlite") { return DataType::SQLITE; } // NOTE if you need more options the user can pass file_format in the create table return DataType::UNDEFINED; @@ -43,7 +33,7 @@ DataType inferFileType(std::vector files, DataType data_type_hint, std::vector uris; std::transform( - files.begin(), files.end(), std::back_inserter(uris), [](std::string uri) -> Uri { return Uri(uri); }); + files.begin(), files.end(), std::back_inserter(uris), [](std::string const& uri) -> Uri { return {uri}; }); ral::io::uri_data_provider udp(uris, ignore_missing_paths); bool open_file = false; const ral::io::data_handle dh = udp.get_next(open_file); @@ -63,9 +53,9 @@ bool to_bool(std::string value) { return false; } -char ord(std::string value) { return (char) value[0]; } +char ord(std::string value) { return static_cast(value[0]); } -int to_int(std::string value) { return std::atoi(value.c_str()); } +int to_int(std::string const& value) { return std::atoi(value.c_str()); } std::vector to_vector_string(std::string value) { std::string vec = StringUtil::replace(value, "'", ""); @@ -79,33 +69,102 @@ std::vector to_vector_string(std::string value) { std::vector to_vector_int(std::string value) { std::vector input = to_vector_string(value); std::vector ret; - std::transform(input.begin(), input.end(), std::back_inserter(ret), [](std::string v) -> int { return to_int(v); }); + std::transform(input.begin(), input.end(), std::back_inserter(ret), [](std::string const& v) -> int { return to_int(v); }); return ret; } +cudf::data_type convert_string_to_dtype(const std::string& dtype_in) +{ + // TODO: This function should be cleanup to take only libcudf type instances. + std::string dtype = dtype_in; + // first, convert to all lower-case + std::transform(dtype_in.begin(), dtype_in.end(), dtype.begin(), [](unsigned char ch) { + return static_cast(std::tolower(ch)); + }); + if (dtype == "str") { return cudf::data_type(cudf::type_id::STRING); } + if (dtype == "timestamp[s]" || dtype == "datetime64[s]") { return cudf::data_type(cudf::type_id::TIMESTAMP_SECONDS); } + // backwards compat: "timestamp" defaults to milliseconds + if (dtype == "timestamp[ms]" || dtype == "timestamp" || dtype == "datetime64[ms]") { return cudf::data_type(cudf::type_id::TIMESTAMP_MILLISECONDS); } + if (dtype == "timestamp[us]" || dtype == "datetime64[us]") { return cudf::data_type(cudf::type_id::TIMESTAMP_MICROSECONDS); } + if (dtype == "timestamp[ns]" || dtype == "datetime64[ns]") { return cudf::data_type(cudf::type_id::TIMESTAMP_NANOSECONDS); } + if (dtype == "date32") { return cudf::data_type(cudf::type_id::TIMESTAMP_DAYS); } + if (dtype == "bool" || dtype == "boolean") { return cudf::data_type(cudf::type_id::BOOL8); } + if (dtype == "date" || dtype == "date64") { return cudf::data_type(cudf::type_id::TIMESTAMP_MILLISECONDS); } + if (dtype == "timedelta[d]") { return cudf::data_type(cudf::type_id::DURATION_DAYS); } + if (dtype == "timedelta64[s]") { return cudf::data_type(cudf::type_id::DURATION_SECONDS); } + if (dtype == "timedelta64[ms]") { return cudf::data_type(cudf::type_id::DURATION_MILLISECONDS); } + if (dtype == "timedelta64[us]") { return cudf::data_type(cudf::type_id::DURATION_MICROSECONDS); } + if (dtype == "timedelta" || dtype == "timedelta64[ns]") { return cudf::data_type(cudf::type_id::DURATION_NANOSECONDS); } + if (dtype == "float" || dtype == "float32") { return cudf::data_type(cudf::type_id::FLOAT32); } + if (dtype == "double" || dtype == "float64") { return cudf::data_type(cudf::type_id::FLOAT64); } + if (dtype == "byte" || dtype == "int8") { return cudf::data_type(cudf::type_id::INT8); } + if (dtype == "short" || dtype == "int16") { return cudf::data_type(cudf::type_id::INT16); } + if (dtype == "int" || dtype == "int32") { return cudf::data_type(cudf::type_id::INT32); } + if (dtype == "long" || dtype == "int64") { return cudf::data_type(cudf::type_id::INT64); } + if (dtype == "uint8") { return cudf::data_type(cudf::type_id::UINT8); } + if (dtype == "uint16") { return cudf::data_type(cudf::type_id::UINT16); } + if (dtype == "uint32") { return cudf::data_type(cudf::type_id::UINT32); } + if (dtype == "uint64") { return cudf::data_type(cudf::type_id::UINT64); } + + return cudf::data_type(cudf::type_id::EMPTY); +} + +std::vector parse_data_types( + std::vector const& types_as_strings) +{ + std::vector dtypes; + // Assume that the dtype is in dictionary format only if all elements contain a colon + const bool is_dict = std::all_of( + std::cbegin(types_as_strings), std::cend(types_as_strings), [](const std::string& s) { + return std::find(std::cbegin(s), std::cend(s), ':') != std::cend(s); + }); + + auto split_on_colon = [](std::string_view s) { + auto const i = s.find(":"); + return std::pair{s.substr(0, i), s.substr(i + 1)}; + }; + + if (is_dict) { + std::map col_type_map; + std::transform( + std::cbegin(types_as_strings), + std::cend(types_as_strings), + std::back_inserter(dtypes), + [&](auto const& ts) { + auto const [col_name, type_str] = split_on_colon(ts); + return convert_string_to_dtype(std::string{type_str}); + }); + } else { + std::transform(std::cbegin(types_as_strings), + std::cend(types_as_strings), + std::back_inserter(dtypes), + [](auto const& col_dtype) { return convert_string_to_dtype(col_dtype); }); + } + return dtypes; +} cudf::io::json_reader_options getJsonReaderOptions(const std::map & args, cudf::io::arrow_io_source & arrow_source) { - cudf::io::json_reader_options reader_opts = cudf::io::json_reader_options::builder(cudf::io::source_info{&arrow_source}); - reader_opts.enable_lines(true); + auto reader_opts = cudf::io::json_reader_options::builder(cudf::io::source_info{&arrow_source}); + reader_opts.lines(true); if(map_contains("dtype", args)) { - reader_opts.dtypes(to_vector_string(args.at("dtype"))); + reader_opts.dtypes(parse_data_types(to_vector_string(args.at("dtype")))); } if(map_contains("compression", args)) { reader_opts.compression(static_cast(to_int(args.at("compression")))); } if(map_contains("lines", args)) { - reader_opts.enable_lines(to_bool(args.at("lines"))); + reader_opts.lines(to_bool(args.at("lines"))); } if(map_contains("dayfirst", args)) { - reader_opts.enable_dayfirst(to_bool(args.at("dayfirst"))); + reader_opts.dayfirst(to_bool(args.at("dayfirst"))); } if(map_contains("byte_range_offset", args)) { - reader_opts.set_byte_range_offset( (size_t) to_int(args.at("byte_range_offset")) ); + reader_opts.byte_range_offset(static_cast(to_int(args.at("byte_range_offset")))); } if(map_contains("byte_range_size", args)) { - reader_opts.set_byte_range_size( (size_t) to_int(args.at("byte_range_size")) ); + reader_opts.byte_range_size(static_cast(to_int(args.at("byte_range_size")))); } - return reader_opts; + return std::move(reader_opts.build()); } cudf::io::orc_reader_options getOrcReaderOptions(const std::map & args, cudf::io::arrow_io_source & arrow_source) { @@ -132,7 +191,7 @@ cudf::io::csv_reader_options getCsvReaderOptions(const std::map(to_int(args.at("compression")))); } if(map_contains("lineterminator", args)) { reader_opts.set_lineterminator(ord(args.at("lineterminator"))); @@ -153,13 +212,13 @@ cudf::io::csv_reader_options getCsvReaderOptions(const std::map(to_int(args.at("nrows")))); } if(map_contains("skiprows", args)) { - reader_opts.set_skiprows((cudf::size_type) to_int(args.at("skiprows"))); + reader_opts.set_skiprows(static_cast(to_int(args.at("skiprows")))); } if(map_contains("skipfooter", args)) { - reader_opts.set_skipfooter((cudf::size_type) to_int(args.at("skipfooter"))); + reader_opts.set_skipfooter(static_cast(to_int(args.at("skipfooter")))); } if(map_contains("names", args)) { reader_opts.set_names(to_vector_string(args.at("names"))); @@ -168,10 +227,10 @@ cudf::io::csv_reader_options getCsvReaderOptions(const std::map(to_int(args.at("header")))); } if(map_contains("dtype", args)) { - reader_opts.set_dtypes(to_vector_string(args.at("dtype"))); + reader_opts.set_dtypes(parse_data_types(to_vector_string(args.at("dtype")))); } if(map_contains("use_cols_indexes", args)) { reader_opts.set_use_cols_indexes(to_vector_int(args.at("use_cols_indexes"))); @@ -221,10 +280,10 @@ cudf::io::csv_reader_options getCsvReaderOptions(const std::map(to_int(args.at("byte_range_offset")))); } if(map_contains("byte_range_size", args)) { - reader_opts.set_byte_range_size((size_t) to_int(args.at("byte_range_size"))); + reader_opts.set_byte_range_size(static_cast(to_int(args.at("byte_range_size")))); } if(map_contains("out_time_unit", args)) { // TODO @@ -289,7 +348,7 @@ sql_info getSqlInfo(std::map &args_map) { if (args_map.at("table_batch_size").empty()) { sql.table_batch_size = DETAULT_TABLE_BATCH_SIZE; } else { - sql.table_batch_size = static_cast(std::atoll(args_map.at("table_batch_size").data())); + sql.table_batch_size = static_cast(std::atoll(args_map.at("table_batch_size").data())); } } else { sql.table_batch_size = DETAULT_TABLE_BATCH_SIZE; diff --git a/engine/src/io/data_parser/ArgsUtil.h b/engine/src/io/data_parser/ArgsUtil.h index 9614a95b6..7817267b3 100644 --- a/engine/src/io/data_parser/ArgsUtil.h +++ b/engine/src/io/data_parser/ArgsUtil.h @@ -11,7 +11,7 @@ namespace ral { namespace io { -DataType inferDataType(std::string file_format_hint); +DataType inferDataType(std::string const& file_format_hint); DataType inferFileType(std::vector files, DataType data_type_hint, bool ignore_missing_paths = false); @@ -21,7 +21,7 @@ bool to_bool(std::string value); char ord(std::string value); -int to_int(std::string value); +int to_int(std::string const& value); std::vector to_vector_string(std::string value); diff --git a/engine/src/io/data_parser/OrcParser.cpp b/engine/src/io/data_parser/OrcParser.cpp index 4875b165a..72de0a47a 100644 --- a/engine/src/io/data_parser/OrcParser.cpp +++ b/engine/src/io/data_parser/OrcParser.cpp @@ -36,12 +36,19 @@ std::unique_ptr orc_parser::parse_batch( std::vector col_names; col_names.resize(column_indices.size()); + std::vector decimal_cols; + for(size_t column_i = 0; column_i < column_indices.size(); column_i++) { col_names[column_i] = schema.get_name(column_indices[column_i]); + cudf::type_id type_id = schema.get_dtype(column_indices[column_i]); + if (type_id == cudf::type_id::FLOAT64) { + decimal_cols.emplace_back(col_names[column_i]); + } } orc_opts.set_columns(col_names); orc_opts.set_stripes({row_groups}); + orc_opts.set_decimal_cols_as_float(decimal_cols); auto result = cudf::io::read_orc(orc_opts); @@ -64,6 +71,13 @@ void orc_parser::parse_schema( for(cudf::size_type i = 0; i < table_out.tbl->num_columns() ; i++) { std::string name = table_out.metadata.column_names[i]; cudf::type_id type = table_out.tbl->get_column(i).type().id(); + + // For now, we deal with decimals casting the column to float64 + // see parse_batch method to check the casting + if (type == cudf::type_id::DECIMAL64) { + type = cudf::type_id::FLOAT64; + } + size_t file_index = i; bool is_in_file = true; schema.add_column(name, type, file_index, is_in_file); diff --git a/engine/src/io/data_parser/ParquetParser.cpp b/engine/src/io/data_parser/ParquetParser.cpp index 9273cdec9..a2a53f9f7 100644 --- a/engine/src/io/data_parser/ParquetParser.cpp +++ b/engine/src/io/data_parser/ParquetParser.cpp @@ -13,6 +13,7 @@ #include #include +#include namespace ral { namespace io { @@ -44,7 +45,7 @@ std::unique_ptr parquet_parser::parse_batch( pq_args.enable_convert_strings_to_categories(false); pq_args.enable_use_pandas_metadata(false); - + std::vector col_names(column_indices.size()); for(size_t column_i = 0; column_i < column_indices.size(); column_i++) { @@ -64,6 +65,18 @@ std::unique_ptr parquet_parser::parse_batch( auto result = cudf::io::read_parquet(pq_args); auto result_table = std::move(result.tbl); + auto columns = result_table->release(); + + for(std::size_t i = 0; i < columns.size(); i++) { + auto &column = columns[i]; + cudf::type_id column_type_id = column->type().id(); + if (column_type_id == cudf::type_id::DECIMAL64) { + auto casted = cudf::cast(column->view(), cudf::data_type(cudf::type_id::FLOAT64)); + columns[i] = std::move(casted); + } + } + + result_table = std::make_unique(std::move(columns)); if (result.metadata.column_names.size() > column_indices.size()) { auto columns = result_table->release(); // Assuming columns are in the same order as column_indices and any extra columns (i.e. index column) are put last @@ -97,6 +110,13 @@ void parquet_parser::parse_schema( for(int i = 0; i < table_out.tbl->num_columns(); i++) { cudf::type_id type = table_out.tbl->get_column(i).type().id(); + + // For now, we deal with decimals casting the column to float64 + // see parse_batch method to check the casting + if (type == cudf::type_id::DECIMAL64) { + type = cudf::type_id::FLOAT64; + } + size_t file_index = i; bool is_in_file = true; std::string name = table_out.metadata.column_names.at(i); diff --git a/engine/src/io/data_parser/metadata/parquet_metadata.cpp b/engine/src/io/data_parser/metadata/parquet_metadata.cpp index 6071ab913..ba17ddd1a 100644 --- a/engine/src/io/data_parser/metadata/parquet_metadata.cpp +++ b/engine/src/io/data_parser/metadata/parquet_metadata.cpp @@ -145,6 +145,8 @@ cudf::type_id to_dtype(parquet::Type::type physical, parquet::ConvertedType::typ return cudf::type_id::TIMESTAMP_MILLISECONDS; case parquet::ConvertedType::type::TIMESTAMP_MICROS: return cudf::type_id::TIMESTAMP_MICROSECONDS; + case parquet::ConvertedType::type::DECIMAL: + return cudf::type_id::EMPTY; default: break; } @@ -224,7 +226,7 @@ std::unique_ptr get_minmax_metadata( auto logical_type = column->converted_type(); cudf::data_type dtype = cudf::data_type (to_dtype(physical_type, logical_type)) ; - if (columnMetaData->is_stats_set() && dtype.id() != cudf::type_id::STRING) { + if (columnMetaData->is_stats_set() && dtype.id() != cudf::type_id::STRING && dtype.id() != cudf::type_id::EMPTY) { auto statistics = columnMetaData->statistics(); auto col_name_min = "min_" + std::to_string(colIndex) + "_" + column->name(); metadata_dtypes.push_back(dtype); diff --git a/engine/src/operators/GroupBy.cpp b/engine/src/operators/GroupBy.cpp index ef4f6f7ec..df966f06c 100644 --- a/engine/src/operators/GroupBy.cpp +++ b/engine/src/operators/GroupBy.cpp @@ -254,9 +254,9 @@ std::unique_ptr compute_aggregations_without_groupby( numeric_s->set_value((int64_t)(aggregation_input.size() - aggregation_input.null_count())); reductions.emplace_back(std::move(scalar)); } else { - std::unique_ptr agg = makeCudfAggregation(aggregation_types[i]); cudf::type_id output_type = get_aggregation_output_type(aggregation_input.type().id(), aggregation_types[i], false); - std::unique_ptr reduction_out = cudf::reduce(aggregation_input, agg, cudf::data_type(output_type)); + std::unique_ptr reduction_out = cudf::reduce( + aggregation_input, makeCudfAggregation(aggregation_types[i]), cudf::data_type(output_type)); // if this aggregation was a SUM0, and it was not valid, we want it to be a valid 0 instead if (aggregation_types[i] == AggregateKind::SUM0 && !reduction_out->is_valid()){ @@ -326,11 +326,11 @@ std::unique_ptr compute_aggregations_with_groupby( bool got_aggregation_input = false; std::vector all_indices; bool is_multi_var = false; - std::vector> agg_ops_for_request; + std::vector> agg_ops_for_request; for (size_t i = 0; i < aggregation_input_expressions.size(); i++){ std::string agg_input_expr = aggregation_input_expressions[i]; if (expression == agg_input_expr){ - + // For operations like `COUNT($0, $1)` size_t num_columns_to_count = StringUtil::findAndCountAllMatches(agg_input_expr, "$"); if (num_columns_to_count > 1 && aggregation_types[i] == AggregateKind::COUNT_VALID) { @@ -354,7 +354,7 @@ std::unique_ptr compute_aggregations_with_groupby( } got_aggregation_input = true; } - agg_ops_for_request.push_back(makeCudfAggregation(aggregation_types[i])); + agg_ops_for_request.push_back(makeCudfGroupbyAggregation(aggregation_types[i])); agg_out_indices.push_back(i); // this is to know what is the desired order of aggregations output // if the aggregation was given an alias lets use it, otherwise we'll name it based on the aggregation and input diff --git a/engine/src/operators/GroupBy.h b/engine/src/operators/GroupBy.h index 4580a44e9..d622753fb 100644 --- a/engine/src/operators/GroupBy.h +++ b/engine/src/operators/GroupBy.h @@ -30,57 +30,139 @@ enum AggregateKind{ namespace ral { namespace operators { - // offset param is needed for `LAG` and `LEAD` aggs - template - std::unique_ptr makeCudfAggregation(AggregateKind input, int offset = 0){ - if(input == AggregateKind::SUM){ - return cudf::make_sum_aggregation(); - }else if(input == AggregateKind::MEAN){ - return cudf::make_mean_aggregation(); - }else if(input == AggregateKind::MIN){ - return cudf::make_min_aggregation(); - }else if(input == AggregateKind::MAX){ - return cudf::make_max_aggregation(); - }else if(input == AggregateKind::ROW_NUMBER) { - return cudf::make_row_number_aggregation(); - }else if(input == AggregateKind::COUNT_VALID){ - return cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); - }else if(input == AggregateKind::COUNT_ALL){ - return cudf::make_count_aggregation(cudf::null_policy::INCLUDE); - }else if(input == AggregateKind::SUM0){ - return cudf::make_sum_aggregation(); - }else if(input == AggregateKind::LAG){ - return cudf::make_lag_aggregation(offset); - }else if(input == AggregateKind::LEAD){ - return cudf::make_lead_aggregation(offset); - }else if(input == AggregateKind::NTH_ELEMENT){ + inline std::unique_ptr makeCudfAggregation(AggregateKind input, int offset = 0){ + switch(input) { + case AggregateKind::SUM: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::MEAN: { + return cudf::make_mean_aggregation(); + } + case AggregateKind::MIN: { + return cudf::make_min_aggregation(); + } + case AggregateKind::MAX: { + return cudf::make_max_aggregation(); + } + case AggregateKind::COUNT_VALID: { + return cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); + } + case AggregateKind::COUNT_ALL: { + return cudf::make_count_aggregation(cudf::null_policy::INCLUDE); + } + case AggregateKind::SUM0: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::NTH_ELEMENT: { + // TODO: https://github.com/BlazingDB/blazingsql/issues/1531 + return cudf::make_nth_element_aggregation(offset, cudf::null_policy::INCLUDE); + } + case AggregateKind::COUNT_DISTINCT: { + /* Currently this conditional is unreachable. + Calcite transforms count distincts through the + AggregateExpandDistinctAggregates rule, so in fact, + each count distinct is replaced by some group by clauses. */ + return cudf::make_nunique_aggregation(); + } + default: + throw std::runtime_error( + "In makeCudfGroupbyAggregation function: AggregateKind type not supported"); + } + } + + inline std::unique_ptr makeCudfGroupbyAggregation(AggregateKind input, int offset = 0){ + switch(input) { + case AggregateKind::SUM: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::MEAN: { + return cudf::make_mean_aggregation(); + } + case AggregateKind::MIN: { + return cudf::make_min_aggregation(); + } + case AggregateKind::MAX: { + return cudf::make_max_aggregation(); + } + case AggregateKind::COUNT_VALID: { + return cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); + } + case AggregateKind::COUNT_ALL: { + return cudf::make_count_aggregation(cudf::null_policy::INCLUDE); + } + case AggregateKind::SUM0: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::NTH_ELEMENT: { // TODO: https://github.com/BlazingDB/blazingsql/issues/1531 - // return cudf::make_nth_element_aggregation(offset, cudf::null_policy::INCLUDE); - }else if(input == AggregateKind::COUNT_DISTINCT){ + return cudf::make_nth_element_aggregation(offset, cudf::null_policy::INCLUDE); + } + case AggregateKind::COUNT_DISTINCT: { /* Currently this conditional is unreachable. Calcite transforms count distincts through the AggregateExpandDistinctAggregates rule, so in fact, each count distinct is replaced by some group by clauses. */ - // return cudf::make_nunique_aggregation(); + return cudf::make_nunique_aggregation(); } - throw std::runtime_error( - "In makeCudfAggregation function: AggregateKind type not supported"); + default: + throw std::runtime_error( + "In makeCudfGroupbyAggregation function: AggregateKind type not supported"); + } + } + + // offset param is needed for `LAG` and `LEAD` aggs + inline std::unique_ptr makeCudfRollingAggregation(AggregateKind input, int offset = 0){ + switch(input) { + case AggregateKind::SUM: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::MEAN: { + return cudf::make_mean_aggregation(); + } + case AggregateKind::MIN: { + return cudf::make_min_aggregation(); + } + case AggregateKind::MAX: { + return cudf::make_max_aggregation(); + } + case AggregateKind::COUNT_VALID: { + return cudf::make_count_aggregation(cudf::null_policy::EXCLUDE); + } + case AggregateKind::COUNT_ALL: { + return cudf::make_count_aggregation(cudf::null_policy::INCLUDE); + } + case AggregateKind::SUM0: { + return cudf::make_sum_aggregation(); + } + case AggregateKind::ROW_NUMBER: { + return cudf::make_row_number_aggregation(); + } + case AggregateKind::LAG: { + return cudf::make_lag_aggregation(offset); + } + case AggregateKind::LEAD: { + return cudf::make_lead_aggregation(offset); + } + default: + throw std::runtime_error( + "In makeCudfRollingAggregation function: AggregateKind type not supported"); + } } AggregateKind get_aggregation_operation(std::string expression_in, bool is_window_operation = false); - std::tuple, std::vector, std::vector, std::vector> + std::tuple, std::vector, std::vector, std::vector> parseGroupByExpression(const std::string & queryString, std::size_t num_cols); - std::tuple, std::vector, std::vector, std::vector> - modGroupByParametersPostComputeAggregations(const std::vector & group_column_indices, + std::tuple, std::vector, std::vector, std::vector> + modGroupByParametersPostComputeAggregations(const std::vector & group_column_indices, const std::vector & aggregation_types, const std::vector & merging_column_names); std::unique_ptr compute_groupby_without_aggregations( const ral::frame::BlazingTableView & table, const std::vector & group_column_indices); std::unique_ptr compute_aggregations_without_groupby( - const ral::frame::BlazingTableView & table, const std::vector & aggregation_input_expressions, + const ral::frame::BlazingTableView & table, const std::vector & aggregation_input_expressions, const std::vector & aggregation_types, const std::vector & aggregation_column_assigned_aliases); std::unique_ptr compute_aggregations_with_groupby( diff --git a/engine/tests/allocation_pool/allocation_pool.cpp b/engine/tests/allocation_pool/allocation_pool.cpp index f96883e09..4841f4637 100644 --- a/engine/tests/allocation_pool/allocation_pool.cpp +++ b/engine/tests/allocation_pool/allocation_pool.cpp @@ -97,7 +97,8 @@ TEST_F(AllocationPoolTest, mem_map_test) { ral::memory::set_allocation_pools(size_buffers_host, num_buffers_host, size_buffers_pinned, num_buffers_pinned, map_ucx, context); - ucp_mem_h handle = ral::memory::buffer_providers::get_pinned_buffer_provider()->getUcpMemoryHandle(); + std::unique_ptr allocation_chunk = ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk(); + ucp_mem_h handle = allocation_chunk->allocation->mem_handle; ucp_mem_attr_t attr; std::memset(&attr, 0, sizeof(ucp_mem_attr_t)); // check that it is mapped @@ -111,3 +112,47 @@ TEST_F(AllocationPoolTest, mem_map_test) { ASSERT_TRUE(attr.length != 0); ral::memory::empty_pools(); } + + +TEST_F(AllocationPoolTest, get_chuck_free_chunk) { + std::size_t size_buffers_host = 1000000; + std::size_t num_buffers_host = 100; + std::size_t size_buffers_pinned = 1000000; + std::size_t num_buffers_pinned = 100; + bool map_ucx = true; + + auto context = CreateUcpContext(); + ral::memory::set_allocation_pools(size_buffers_host, num_buffers_host, + size_buffers_pinned, num_buffers_pinned, map_ucx, context); + + // lets make some buffers + std::vector > raw_buffers0, raw_buffers1, raw_buffers2; + for (int i = 0; i < num_buffers_pinned; i++){ + raw_buffers0.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk())); + } + for (int i = 0; i < num_buffers_pinned; i++){ + raw_buffers1.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk())); + } + for (int i = 0; i < num_buffers_pinned; i++){ + raw_buffers2.push_back(std::move(ral::memory::buffer_providers::get_pinned_buffer_provider()->get_chunk())); + } + + // lets free them in a different order and make sure we handle that correctly + for(auto i = 0; i < raw_buffers2.size(); i++){ + auto pool = raw_buffers2[i]->allocation->pool; + pool->free_chunk(std::move(raw_buffers2[i])); + } + for(auto i = 0; i < raw_buffers1.size(); i++){ + auto pool = raw_buffers1[i]->allocation->pool; + pool->free_chunk(std::move(raw_buffers1[i])); + } + for(auto i = 0; i < raw_buffers0.size(); i++){ + auto pool = raw_buffers0[i]->allocation->pool; + pool->free_chunk(std::move(raw_buffers0[i])); + } + ASSERT_TRUE(true); + + +} + + diff --git a/new b/new deleted file mode 100644 index e69de29bb..000000000 diff --git a/pyblazing/pyblazing/apiv2/context.py b/pyblazing/pyblazing/apiv2/context.py index 505842f36..5c7d2d21c 100755 --- a/pyblazing/pyblazing/apiv2/context.py +++ b/pyblazing/pyblazing/apiv2/context.py @@ -2,8 +2,7 @@ import cudf -from cudf.core.column.column import build_column -from cudf.utils.dtypes import is_decimal_dtype +from cudf.api.types import is_decimal_dtype from dask.distributed import get_worker from datetime import datetime @@ -12,7 +11,6 @@ from urllib.parse import urlparse from threading import Lock -from weakref import ref from distributed.comm import parse_address from pyblazing.apiv2.filesystem import FileSystem from pyblazing.apiv2 import DataType @@ -23,8 +21,6 @@ ) from pyblazing.apiv2.algebra import get_json_plan, format_json_plan -import json -import collections from pyhive import hive from .hive import ( convertTypeNameStrToCudfType, @@ -34,8 +30,6 @@ get_hive_table, ) import time -import socket -import errno import os import pandas import numpy as np @@ -1070,8 +1064,8 @@ def __init__( for x in self.input._data.values(): # for now `decimal` type is not considered from `np_to_cudf_types_int` call if is_decimal_dtype(x.dtype): - print( - "WARNING: BlazingSQL currently does not support operations on DECIMAL datatype columns" + raise Exception( + "ERROR: BlazingSQL currently does not support tables based on cudf DataFrames with DECIMAL datatype columns" ) type_int = 26 else: @@ -1082,8 +1076,8 @@ def __init__( for x in input.dtypes: # for now `decimal` type is not considered from `np_to_cudf_types_int` call if is_decimal_dtype(x): - print( - "WARNING: BlazingSQL currently does not support operations on DECIMAL datatype columns" + raise Exception( + "ERROR: BlazingSQL currently does not support tables based on dask_cudf DataFrames with DECIMAL datatype columns" ) type_int = 26 else: diff --git a/tests/BlazingSQLTest/jupyterTests.ipynb b/tests/BlazingSQLTest/jupyterTests.ipynb index 7e1fdf6f4..a5ca5cd68 100644 --- a/tests/BlazingSQLTest/jupyterTests.ipynb +++ b/tests/BlazingSQLTest/jupyterTests.ipynb @@ -61,7 +61,7 @@ "\n", "def git_clone():\n", " import git\n", - " git.Git(\"$CONDA_PREFIX\").clone(\"https://github.com/BlazingDB/blazingsql-testing-files.git\")\n", + " git.Git(\"$CONDA_PREFIX\").clone(\"https://github.com/rapidsai/blazingsql-testing-files.git\")\n", " \n", "def git_pull():\n", " import git\n", @@ -601,4 +601,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} diff --git a/tests/BlazingSQLTest/manualTesting.py b/tests/BlazingSQLTest/manualTesting.py index 8f74ad730..77e9a8298 100644 --- a/tests/BlazingSQLTest/manualTesting.py +++ b/tests/BlazingSQLTest/manualTesting.py @@ -20,7 +20,7 @@ gs_project_id=os.getenv("BLAZINGSQL_E2E_GS_PROJECT_ID") gs_bucket_name=os.getenv("BLAZINGSQL_E2E_GS_BUCKET_NAME") -file_results_dir = conda_prefix + "/blazingsql-testing-files/results/" +file_results_dir = conda_prefix + "/blazingsql-testing-files/results/" data_dir = conda_prefix + "/blazingsql-testing-files/data/" def install(package): @@ -28,8 +28,8 @@ def install(package): def git_clone(): import git - git.Git("$CONDA_PREFIX").clone("https://github.com/BlazingDB/blazingsql-testing-files.git") - + git.Git("$CONDA_PREFIX").clone("https://github.com/rapidsai/blazingsql-testing-files.git") + def git_pull(): import git os.chdir(conda_prefix + "/blazingsql-testing-files") @@ -37,57 +37,57 @@ def git_pull(): def unzip(): import tarfile - + os.chdir(conda_prefix + "/blazingsql-testing-files/") tar = tarfile.open('data.tar.gz', "r:gz") tar.extractall() tar.close() def run_query(bc, engine, query, queryId, queryType, worder, orderBy, acceptable_difference, use_percentage, input_type, **kwargs): - - result_gdf = bc.sql(query) + + result_gdf = bc.sql(query) filename = str("TPCH").upper() + "-" + str(queryId) + ".parquet" result_file = file_results_dir + "/" + str(engine) + "/" + filename - + pdf2 = pd.read_parquet(result_file) - stringResult = "" - + stringResult = "" + if result_gdf is not None: if result_gdf.columns is not None: import dask_cudf - + if type(result_gdf) is dask_cudf.core.DataFrame: result_gdf = result_gdf.compute() - + expected_dtypes = result_gdf.dtypes.to_list() - + pdf = upcast_to_float(result_gdf).fillna(get_null_constants(result_gdf)).to_pandas() - + if worder == 1 and pdf.size != 0: pdf.sort_values([orderBy] if orderBy else pdf.columns.to_list(), inplace = True) stringResult = print_query_results(pdf, pdf2, acceptable_difference, use_percentage, engine) return stringResult - + def print_query_results(pdf1, pdf2, acceptable_difference, use_percentage, engine): columnNamesComparison = compare_column_names(pdf1, pdf2) if columnNamesComparison != True: error_message = "Column names are not the same" - + resultComparisson = compare_results(pdf1, pdf2, acceptable_difference, use_percentage, engine) if resultComparisson != "Success": error_message = resultComparisson[6:] stringResult = resultComparisson if resultComparisson != "Success" or columnNamesComparison == False: - stringResult = "Fail" - + stringResult = "Fail" + return stringResult def compare_column_names(pdf1, pdf2): @@ -104,7 +104,7 @@ def compare_column_names(pdf1, pdf2): return True def begins_with(col1, col2, exp): - return col1.startswith(exp) or col2.startswith(exp) + return col1.startswith(exp) or col2.startswith(exp) def compare_results(vdf1, vdf2, acceptable_difference, use_percentage, engine): if vdf1.size == 0 and vdf2.size == 0: @@ -112,9 +112,9 @@ def compare_results(vdf1, vdf2, acceptable_difference, use_percentage, engine): elif pre_compare_results(vdf1.values, vdf2.values): return 'Success' else: - res = assert_equal(vdf1, vdf2, acceptable_difference, use_percentage, engine) + res = assert_equal(vdf1, vdf2, acceptable_difference, use_percentage, engine) return res - + def upcast_to_float(df): for name in df.columns: if np.issubdtype(df[name].dtype, np.bool_): @@ -138,7 +138,7 @@ def pre_compare_results(vdf1, vdf2): return True except (AssertionError, ValueError, TypeError) as e: return False - + def assert_equal(pdf1, pdf2, acceptable_difference, use_percentage, engine): np.warnings.filterwarnings('ignore') if pdf1.shape[0] == pdf2.shape[0]: @@ -151,7 +151,7 @@ def assert_equal(pdf1, pdf2, acceptable_difference, use_percentage, engine): tmp_pdf1 = pdf1.select_dtypes(include=np.inexact) tmp_pdf2 = pdf2.select_dtypes(include=np.inexact) - + res = np.all(exac_comp) and np.allclose(tmp_pdf1.values, tmp_pdf2.values, acceptable_difference, equal_nan=True) if res: return 'Success' @@ -163,15 +163,15 @@ def assert_equal(pdf1, pdf2, acceptable_difference, use_percentage, engine): return 'Fail: Different number of rows blzSQLresult: ' + str(pdf1.shape[0]) + ' ' + (engine) + ' result: '+ str(pdf2.shape[0]) def create_tables(bc, dir_data_lc, fileSchemaType, **kwargs): - - ext = "parquet" - + + ext = "parquet" + tpchTables = ['customer','orders','supplier','lineitem','part','partsupp','nation','region'] tables = kwargs.get('tables', tpchTables) dir_data_lc = dir_data_lc + "tpch/" - + for i, table in enumerate(tables): table_files = ("%s/%s_[0-9]*.%s") % (dir_data_lc, table, ext) t = None @@ -188,21 +188,21 @@ class bcolors: UNDERLINE = '\033[4m' def main(): - + install("gitpython") - + if not os.path.exists(conda_prefix + "/blazingsql-testing-files/"): git_clone() else: git_pull() - + unzip() - - queryType = ' Local Tests ' + + queryType = ' Local Tests ' if len(sys.argv) == 2: n_nodos = sys.argv[1] - else: + else: n_nodos = "1" if n_nodos == "1": @@ -213,18 +213,18 @@ def main(): from dask.distributed import Client client = Client('127.0.0.1:8786') print("Dask client ready!") - bc = BlazingContext(dask_client = client, network_interface='lo') + bc = BlazingContext(dask_client = client, network_interface='lo') log_dict = {} - - def executionLocalTest(queryType): - + + def executionLocalTest(queryType): + #Read Data TPCH------------------------------------------------------------------------------------------------------------ - + tables = ['nation', 'region', 'supplier','customer','lineitem','orders', 'part', 'partsupp'] - + data_types = [DataType.PARQUET] # TODO json - + for fileSchemaType in data_types: create_tables(bc, data_dir, fileSchemaType, tables=tables) @@ -232,66 +232,66 @@ def executionLocalTest(queryType): worder = 1 # Parameter to indicate if its necessary to order the resulsets before compare them use_percentage = False acceptable_difference = 0.01 - + print('==============================') print(queryType) print('==============================') - + queryId = 'TEST_01' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result - + queryId = 'TEST_02' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_03' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_04' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + queryId = 'TEST_05' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_06' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_07' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result executionLocalTest(queryType) - - queryType = ' S3 Tests ' - - def executionS3Test(queryType): - + + queryType = ' S3 Tests ' + + def executionS3Test(queryType): + #Read Data TPCH------------------------------------------------------------------------------------------------------------ - + authority = "tpch_s3" print(authority) @@ -300,15 +300,15 @@ def executionS3Test(queryType): print(hola) print(access_key_id) print(secret_key) - + bc.s3(authority, bucket_name=bucket_name, encryption_type=S3EncryptionType.NONE, access_key_id=access_key_id, secret_key=secret_key) - dir_data_lc = "s3://" + authority + "/" + "DataSet100Mb2part/" - + dir_data_lc = "s3://" + authority + "/" + "DataSet100Mb2part/" + tables = ['nation', 'region', 'supplier','customer','lineitem','orders', 'part', 'partsupp'] data_types = [DataType.PARQUET] # TODO json - + for fileSchemaType in data_types: create_tables(bc, data_dir, fileSchemaType, tables=tables) @@ -316,75 +316,75 @@ def executionS3Test(queryType): worder = 1 # Parameter to indicate if its necessary to order the resulsets before compare them use_percentage = False acceptable_difference = 0.01 - + print('==============================') print(queryType) print('==============================') - + queryId = 'TEST_08' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_09' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + queryId = 'TEST_10' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_11' #print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) #result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + queryId = 'TEST_12' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_13' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_14' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) log_dict[queryId] = result executionS3Test(queryType) - - queryType = ' GS Tests ' - - def executionGSTest(queryType): - + + queryType = ' GS Tests ' + + def executionGSTest(queryType): + authority = "tpch_gs" - + bc.gs(authority, project_id=gs_project_id, bucket_name=gs_bucket_name, use_default_adc_json_file=True, adc_json_file='') - + dir_data_lc = 'gcs://'+ authority +'/100MB2Part/' - + tables = ['nation', 'region', 'supplier','customer','lineitem','orders', 'part', 'partsupp'] - data_types = [DataType.PARQUET] - + data_types = [DataType.PARQUET] + for fileSchemaType in data_types: create_tables(bc, data_dir, fileSchemaType, tables=tables) @@ -392,63 +392,63 @@ def executionGSTest(queryType): worder = 1 # Parameter to indicate if its necessary to order the resulsets before compare them use_percentage = False acceptable_difference = 0.01 - + print('==============================') print(queryType) print('==============================') - + queryId = 'TEST_15' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_16' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + queryId = 'TEST_17' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_18' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_19' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_20' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_21' print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) result = run_query(bc, spark, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + log_dict[queryId] = result queryId = 'TEST_22' #print("Executing " + queryId + " ... ") - query = tpch.get_tpch_query(queryId) + query = tpch.get_tpch_query(queryId) #result = run_query(bc, drill, query, queryId, queryType, worder, '', acceptable_difference, use_percentage, fileSchemaType) - + executionGSTest(queryType) green = bcolors.OKGREEN @@ -463,4 +463,4 @@ def executionGSTest(queryType): print (green + "=======================================" + endc) -main() \ No newline at end of file +main() diff --git a/tests/README.md b/tests/README.md index 391ace6d9..12ede1f1b 100644 --- a/tests/README.md +++ b/tests/README.md @@ -34,7 +34,7 @@ By default the end to end tests: - Run in single node only (nrals: 1) - Compare against parquet result files instead of Drill or pySpark (execution mode: gpuci) - The log directory is the CONDA_PREFIX folder. -- Download automatically and use the testing files (the data folder and the and parquet result files) in your CONDA_PREFIX folder (see https://github.com/BlazingDB/blazingsql-testing-files) +- Download automatically and use the testing files (the data folder and the and parquet result files) in your CONDA_PREFIX folder (see https://github.com/rapidsai/blazingsql-testing-files) ```shell-script cd blazingsql @@ -51,7 +51,7 @@ cd blazingsql ### Adding new Tests - Make a fork from https://github.com/BlazingDB/blazingsql and create a new branch (example feat/my-new-test) -- After that make another one from https://github.com/BlazingDB/blazingsql-testing-files and create a new branch with the same name as above (example feat/my-new-test) +- After that make another one from https://github.com/rapidsai/blazingsql-testing-files and create a new branch with the same name as above (example feat/my-new-test) - Write a new test file in blazingsql/tests/BlazingSQLTest/ - Add new files in blazingsql-testing-files/data/ - Push your changes of both repositories (example: git push origin feat/my-new-test) @@ -322,7 +322,7 @@ $ docker-compose down "dataDirectory": "/path_to_dataset/100MB2Part/" You should have two folders inside dataDirectory: tpch folder and tpcx folder - See https://github.com/BlazingDB/blazingsql-testing-files/blob/master/data.tar.gz + See https://github.com/rapidsai/blazingsql-testing-files/blob/master/data.tar.gz ### Additional modules - CreationDatabases