Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
- #1376 Fixing build due to some strings refactor in cudf, undoing the replace workaround
- #1331 Added flag to enable null e2e testing
- #1418 Adding support for docker image
- #1419 Added concat cache machine timeout
- #1419 Added concat cache machine timeout
- #1429 Cleaning up deadcode and removing some warnings

## Bug Fixes
- #1335 Fixing uninitialized var in orc metadata and handling the parseMetadata exceptions properly
Expand Down
10 changes: 0 additions & 10 deletions engine/src/CalciteExpressionParsing.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
#include <algorithm>
#include <limits.h>

#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>

//#include <cudf.h>
#include <cudf/table/table_view.hpp>
#include <iomanip>
#include <map>
#include <regex>

#include <blazingdb/io/Util/StringUtil.h>
#include "error.hpp"

#include "CalciteExpressionParsing.h"
#include "cudf/binaryop.hpp"
#include <cudf/scalar/scalar_factories.hpp>
#include "parser/expression_tree.hpp"
#include "utilities/scalar_timestamp_parser.hpp"
Expand Down
2 changes: 1 addition & 1 deletion engine/src/CodeTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ class CodeTimer {
bool started_;
bool paused_;
Clock::time_point start_point_;
Clock::time_point end_point_;
Clock::duration accumulated_;
Clock::time_point end_point_;
};
2 changes: 1 addition & 1 deletion engine/src/bmr/BlazingMemoryResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ size_t internal_blazing_host_memory_resource::get_from_driver_used_memory() {
sysinfo (&si);
// NOTE: sync point
total_memory_size = (size_t)si.totalram;
used_memory_size = total_memory_size - (size_t)si.freeram;;
used_memory_size = total_memory_size - (size_t)si.freeram;
return used_memory_size;
}

Expand Down
7 changes: 2 additions & 5 deletions engine/src/bmr/BufferProvider.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "BufferProvider.h"

#include <iostream>
#include <mutex>
#include <cstring>
#include <cuda.h>
Expand All @@ -9,8 +8,6 @@
#include <ucs/type/status.h>

#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>

namespace ral{
Expand Down Expand Up @@ -143,7 +140,7 @@ void allocation_pool::grow() {
try{
allocator->allocate((void **) &allocations[last_index]->data,num_new_buffers * buffer_size);
this->allocations[last_index]->total_number_of_chunks = num_new_buffers;
for (int buffer_index = 0; buffer_index < num_new_buffers; buffer_index++) {
for (std::size_t buffer_index = 0; buffer_index < num_new_buffers; buffer_index++) {
auto buffer = std::make_unique<blazing_allocation_chunk>();
buffer->size = this->buffer_size;
buffer->data = allocations[last_index]->data + buffer_index * this->buffer_size;
Expand Down Expand Up @@ -213,7 +210,7 @@ std::size_t allocation_pool::size_buffers() { return this->buffer_size; }


void set_allocation_pools(std::size_t size_buffers_host, std::size_t num_buffers_host,
std::size_t size_buffers_pinned, std::size_t num_buffers_pinned, bool map_ucx,
std::size_t /*size_buffers_pinned*/, std::size_t /*num_buffers_pinned*/, bool map_ucx,
ucp_context_h context) {

if (buffer_providers::get_host_buffer_provider() == nullptr || buffer_providers::get_host_buffer_provider()->get_total_buffers() == 0) { // not initialized
Expand Down
6 changes: 3 additions & 3 deletions engine/src/bmr/BufferProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class base_allocator{

class host_allocator : public base_allocator {
public:
host_allocator(bool use_ucx) {}
host_allocator(bool /*use_ucx*/) {}
protected:
void do_allocate(void ** ptr, std::size_t size);
void do_deallocate(void * ptr);
Expand Down Expand Up @@ -125,10 +125,10 @@ class allocation_pool {

bool use_ucx;

std::size_t buffer_size;

std::size_t num_buffers;

std::size_t buffer_size;

int buffer_counter;

int allocation_counter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ std::tuple<ral::cache::MetadataDictionary, std::vector<blazingdb::transport::Col
size_t num_chunked_column_info = from_byte_vector<size_t>(data.data() + ptr_offset);
ptr_offset += sizeof(size_t);
std::vector<ral::memory::blazing_chunked_column_info> chunked_column_infos(num_chunked_column_info);
for (auto i = 0; i < num_chunked_column_info; i++){
for (std::size_t i = 0; i < num_chunked_column_info; i++){
// first we deserialize chunk_index
size_t num_chunk_index = from_byte_vector<size_t>(data.data() + ptr_offset);
ptr_offset += sizeof(size_t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ size_t message_receiver::buffer_size(u_int16_t index){
return _buffer_sizes[index];
}

void message_receiver::allocate_buffer(uint16_t index, cudaStream_t stream){
void message_receiver::allocate_buffer(uint16_t index, cudaStream_t /*stream*/){
if (index >= _raw_buffers.size()) {
throw std::runtime_error("Invalid access to raw buffer");
}
Expand Down Expand Up @@ -87,7 +87,7 @@ bool message_receiver::is_finished(){
return _finished_called;
}

void message_receiver::finish(cudaStream_t stream) {
void message_receiver::finish(cudaStream_t /*stream*/) {

std::lock_guard<std::mutex> lock(_finish_mutex);
if(!_finished_called){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class message_sender {
ucp_worker_h origin_node,
int ral_id,
comm::blazing_protocol protocol,
bool require_acknowledge);
bool require_acknowledge);

std::shared_ptr<ral::cache::CacheMachine> get_output_cache(){
return output_cache;
Expand All @@ -61,6 +61,7 @@ class message_sender {
private:
static message_sender * instance;

bool require_acknowledge;
ctpl::thread_pool<BlazingThread> pool;
std::shared_ptr<ral::cache::CacheMachine> output_cache;
std::map<std::string, node> node_address_map;
Expand All @@ -69,7 +70,6 @@ class message_sender {
size_t request_size;
int ral_id;
bool polling_started{false};
bool require_acknowledge;
};

} // namespace comm
4 changes: 2 additions & 2 deletions engine/src/communication/CommunicationInterface/protocols.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void ucp_progress_manager::add_recv_request(char * request, std::function<void()
}


void ucp_progress_manager::add_send_request(char * request, std::function<void()> callback, ucs_status_t status){
void ucp_progress_manager::add_send_request(char * request, std::function<void()> callback, ucs_status_t /*status*/){
// if(status == UCS_OK){
// delete request;
// callback();
Expand Down Expand Up @@ -332,7 +332,7 @@ void ucx_buffer_transport::send_impl(const char * buffer, size_t buffer_size) {
}
#define ACK_BUFFER_SIZE 40
void ucx_buffer_transport::receive_acknowledge(){
for(int i = 0; i < transmitted_acknowledgements.size(); i++){
for(std::size_t i = 0; i < transmitted_acknowledgements.size(); i++){
char * request = new char[_request_size];
std::vector<char> data_buffer(sizeof(int));
char * data = new char[ACK_BUFFER_SIZE];
Expand Down
6 changes: 1 addition & 5 deletions engine/src/cython/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

#include <numeric>
#include <map>
#include "communication/CommunicationData.h"
#include <spdlog/spdlog.h>
#include "CodeTimer.h"
#include "communication/CommunicationInterface/protocols.hpp"
#include "error.hpp"

Expand Down Expand Up @@ -143,8 +141,6 @@ std::shared_ptr<ral::cache::graph> runGenerateGraph(uint32_t masterIndex,
using blazingdb::manager::Context;
using blazingdb::transport::Node;

auto& communicationData = ral::communication::CommunicationData::getInstance();

std::vector<Node> contextNodes;
for (const auto &worker_id : worker_ids) {
contextNodes.emplace_back(worker_id);
Expand All @@ -157,7 +153,7 @@ std::shared_ptr<ral::cache::graph> runGenerateGraph(uint32_t masterIndex,
return graph;
}

void startExecuteGraph(std::shared_ptr<ral::cache::graph> graph, int32_t ctx_token) {
void startExecuteGraph(std::shared_ptr<ral::cache::graph> graph, int32_t /*ctx_token*/) {
start_execute_graph(graph);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,11 @@ ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::u
std::shared_ptr<ral::cache::CacheMachine> output,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
try{

std::vector< ral::frame::BlazingTableView > tableViewsToConcat;
for (std::size_t i = 0; i < inputs.size(); i++){
tableViewsToConcat.emplace_back(inputs[i]->toBlazingTableView());
}

CodeTimer eventTimer;
if( ral::utilities::checkIfConcatenatingStringsWillOverflow(tableViewsToConcat)) {
if(logger) {
logger->warn("{query_id}|{step}|{substep}|{info}",
Expand All @@ -296,9 +294,6 @@ ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::u
}
auto concatenated = ral::utilities::concatTables(tableViewsToConcat);

auto log_input_num_rows = concatenated ? concatenated->num_rows() : 0;
auto log_input_num_bytes = concatenated ? concatenated->sizeInBytes() : 0;

std::vector<int> group_column_indices;
std::vector<std::string> aggregation_input_expressions, aggregation_column_assigned_aliases;
std::vector<AggregateKind> aggregation_types;
Expand Down Expand Up @@ -331,10 +326,6 @@ ral::execution::task_result MergeAggregateKernel::do_process(std::vector< std::u
concatenated->toBlazingTableView(), mod_aggregation_input_expressions, mod_aggregation_types,
mod_aggregation_column_assigned_aliases, mod_group_column_indices);
}
eventTimer.stop();

auto log_output_num_rows = columns->num_rows();
auto log_output_num_bytes = columns->sizeInBytes();

output->addToCache(std::move(columns));
columns = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,10 @@ std::unique_ptr<ral::frame::BlazingTable> PartwiseJoin::join_set(
ral::execution::task_result PartwiseJoin::do_process(std::vector<std::unique_ptr<ral::frame::BlazingTable>> inputs,
std::shared_ptr<ral::cache::CacheMachine> /*output*/,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& args) {
CodeTimer eventTimer;

auto & left_batch = inputs[0];
auto & right_batch = inputs[1];


try{
if (this->normalize_left){
ral::utilities::normalize_types(left_batch, this->join_column_common_types, this->left_column_indices);
Expand All @@ -421,24 +419,12 @@ ral::execution::task_result PartwiseJoin::do_process(std::vector<std::unique_ptr
ral::utilities::normalize_types(right_batch, this->join_column_common_types, this->right_column_indices);
}

auto log_input_num_rows = left_batch->num_rows() + right_batch->num_rows();
auto log_input_num_bytes = left_batch->sizeInBytes() + right_batch->sizeInBytes();

std::unique_ptr<ral::frame::BlazingTable> joined = join_set(left_batch->toBlazingTableView(), right_batch->toBlazingTableView());

auto log_output_num_rows = joined->num_rows();
auto log_output_num_bytes = joined->sizeInBytes();

if (filter_statement != "") {
auto filter_table = ral::processor::process_filter(joined->toBlazingTableView(), filter_statement, this->context.get());
eventTimer.stop();

log_output_num_rows = filter_table->num_rows();
log_output_num_bytes = filter_table->sizeInBytes();

this->add_to_output_cache(std::move(filter_table));
} else{
eventTimer.stop();
this->add_to_output_cache(std::move(joined));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,6 @@ ral::execution::task_result SortAndSampleKernel::do_process(std::vector< std::un
}
}

if(sortedTable){
auto num_rows = sortedTable->num_rows();
auto num_bytes = sortedTable->sizeInBytes();
}

output->addToCache(std::move(sortedTable), "output_a");
}
else if (operation_type == "compute_partition_plan") {
Expand Down Expand Up @@ -579,23 +574,14 @@ ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr
std::shared_ptr<ral::cache::CacheMachine> output,
cudaStream_t /*stream*/, const std::map<std::string, std::string>& /*args*/) {
try{
CodeTimer eventTimer(false);
auto & input = inputs[0];
if (rows_limit<0) {
output->addToCache(std::move(input));
} else {
auto log_input_num_rows = input->num_rows();
auto log_input_num_bytes = input->sizeInBytes();

std::unique_ptr<ral::frame::BlazingTable> limited_input;
bool output_is_just_input;

eventTimer.start();
std::tie(limited_input, output_is_just_input, rows_limit) = ral::operators::limit_table(input->toBlazingTableView(), rows_limit);
eventTimer.stop();

auto log_output_num_rows = output_is_just_input ? input->num_rows() : limited_input->num_rows();
auto log_output_num_bytes = output_is_just_input ? input->sizeInBytes() : limited_input->sizeInBytes();

if (output_is_just_input)
output->addToCache(std::move(input));
Expand All @@ -612,7 +598,6 @@ ral::execution::task_result LimitKernel::do_process(std::vector< std::unique_ptr

kstatus LimitKernel::run() {
CodeTimer timer;
CodeTimer eventTimer(false);

int64_t total_batch_rows = 0;
std::vector<std::unique_ptr<ral::cache::CacheData>> cache_vector;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include "BatchProcessing.h"
#include "CodeTimer.h"
#include "communication/CommunicationData.h"
#include "ExceptionHandling/BlazingThread.h"
#include "io/data_parser/CSVParser.h"
#include "parser/expression_utils.hpp"
#include "taskflow/executor.h"
#include <cudf/types.hpp>
#include <src/utilities/DebuggingUtils.h>
#include <src/execution_graph/logic_controllers/LogicalFilter.h>
#include "execution_graph/logic_controllers/LogicalProject.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
#include <blazingdb/io/Util/StringUtil.h>

#include <cudf/concatenate.hpp>
#include <cudf/stream_compaction.hpp>
#include "cudf/column/column_view.hpp"
#include <cudf/rolling.hpp>
#include <cudf/filling.hpp>
#include <cudf/partitioning.hpp>
#include <cudf/types.hpp>
#include <cudf/copying.hpp>
#include <cudf/aggregation.hpp>
#include <cudf/search.hpp>
#include <cudf/join.hpp>
Expand All @@ -38,7 +35,7 @@ ComputeWindowKernel::ComputeWindowKernel(std::size_t kernel_id, const std::strin
std::unique_ptr<CudfColumn> ComputeWindowKernel::compute_column_from_window_function(
cudf::table_view input_table_cudf_view,
cudf::column_view col_view_to_agg,
std::size_t pos, int & agg_param_count ) {
std::size_t pos, std::size_t & agg_param_count ) {

std::unique_ptr<cudf::aggregation> window_aggregation;

Expand Down Expand Up @@ -144,8 +141,6 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un
return {ral::execution::task_status::SUCCESS, std::string(), std::vector< std::unique_ptr<ral::frame::BlazingTable> > ()};
}

CodeTimer eventTimer(false);

std::unique_ptr<ral::frame::BlazingTable> & input = inputs[0];

try{
Expand All @@ -164,7 +159,7 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un
}

std::vector< std::unique_ptr<CudfColumn> > new_wf_cols;
int agg_param_count = 0;
std::size_t agg_param_count = 0;
for (std::size_t col_i = 0; col_i < this->type_aggs_as_str.size(); ++col_i) {
cudf::column_view col_view_to_agg = input_table_cudf_view.column(column_indices_to_agg[col_i]);

Expand Down Expand Up @@ -195,11 +190,6 @@ ral::execution::task_result ComputeWindowKernel::do_process(std::vector< std::un
std::unique_ptr<cudf::table> cudf_table_window = std::make_unique<cudf::table>(std::move(output_columns));
std::unique_ptr<ral::frame::BlazingTable> windowed_table = std::make_unique<ral::frame::BlazingTable>(std::move(cudf_table_window), output_names);

if (windowed_table) {
cudf::size_type num_rows = windowed_table->num_rows();
std::size_t num_bytes = windowed_table->sizeInBytes();
}

output->addToCache(std::move(windowed_table));
}catch(const rmm::bad_alloc& e){
return {ral::execution::task_status::RETRY, std::string(e.what()), std::move(inputs)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ComputeWindowKernel : public kernel {
std::unique_ptr<CudfColumn> compute_column_from_window_function(
cudf::table_view input_cudf_view,
cudf::column_view input_col_view,
std::size_t pos, int & agg_param_count);
std::size_t pos, std::size_t & agg_param_count);

std::string kernel_name() { return "ComputeWindow";}

Expand Down
Loading