Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
77dd218
Merge remote-tracking branch 'upstream/branch-0.19' into feature/wind…
Apr 9, 2021
1c65336
Merge branch 'feature/df_window_overlap2' into feature/window_overlap3
Apr 9, 2021
4b19a0d
WIP, barely starting to figure stuff out
Apr 12, 2021
bcfda81
Merge branch 'feature/window_overlap2' into feature/window_overlap3
Apr 12, 2021
ef264ae
WIP on getting partial aggregations. Also refactored a bit
Apr 13, 2021
2c6d80b
some refactoring
Apr 13, 2021
33c111f
fixed get_bounds_from_window_expression and added more unit tests
Apr 14, 2021
93e1fc9
fixed issues with different types of windows. Added more e2e tests
Apr 14, 2021
a5d1b73
updated CHANGELOG
Apr 14, 2021
e5d01a3
Merge branch 'feature/unbounded_partitioned_windows' into feature/win…
Apr 15, 2021
d1bed0a
did some refactoring. Also more window WIP
Apr 15, 2021
b6ef177
did some more refactoring and implementation
Apr 15, 2021
fc0d834
WIP
Apr 15, 2021
e83e256
Merge branch 'branch-0.20' into feature/unbounded_partitioned_windows
Apr 16, 2021
a778b81
various refactors
Apr 16, 2021
b5773dc
Merge branch 'branch-0.20' into feature/window_overlap3
Apr 19, 2021
85c0831
Merge branch 'branch-0.20' into feature/unbounded_partitioned_windows
Apr 19, 2021
58325ac
Merge branch 'feature/unbounded_partitioned_windows' into feature/win…
Apr 19, 2021
ffc161c
fixed build bugs added a little more
Apr 19, 2021
1418be6
working on WindowAggMergerKernel
Apr 20, 2021
e87e481
fixed some build bugs
Apr 21, 2021
454a79c
WIP
Apr 23, 2021
3703366
more WIP
Apr 23, 2021
af7aac6
refactored test
Apr 27, 2021
c8c4d8c
fixed a bug and added a new test
Apr 28, 2021
f7921f9
WIP
Apr 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## New Features

- #1471 Unbounded partitioned windows


## Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ void SortAndSampleKernel::make_partition_plan_task(){
if(!(nodes[i] == ral::communication::CommunicationData::getInstance().getSelfNode())) {
std::string message_id = std::to_string(this->context->getContextToken()) + "_" + std::to_string(this->get_id()) + "_" + nodes[i].id();
auto samples_cache_data = this->query_graph->get_input_message_cache()->pullCacheData(message_id);
ral::cache::CPUCacheData * cache_ptr = static_cast<ral::cache::CPUCacheData *> (samples_cache_data.get());
total_num_rows_for_sampling += std::stoll(cache_ptr->getMetadata().get_values()[ral::cache::TOTAL_TABLE_ROWS_METADATA_LABEL]);
total_bytes_for_sampling += std::stoll(cache_ptr->getMetadata().get_values()[ral::cache::TOTAL_TABLE_ROWS_METADATA_LABEL]) * std::stoll(cache_ptr->getMetadata().get_values()[ral::cache::AVG_BYTES_PER_ROW_METADATA_LABEL]);
auto metadata = samples_cache_data->getMetadata();
total_num_rows_for_sampling += std::stoll(metadata.get_values()[ral::cache::TOTAL_TABLE_ROWS_METADATA_LABEL]);
total_bytes_for_sampling += std::stoll(metadata.get_values()[ral::cache::TOTAL_TABLE_ROWS_METADATA_LABEL]) * std::stoll(metadata.get_values()[ral::cache::AVG_BYTES_PER_ROW_METADATA_LABEL]);
sampleCacheDatas.push_back(std::move(samples_cache_data));
}
}
Expand Down Expand Up @@ -186,12 +186,7 @@ void SortAndSampleKernel::compute_partition_plan(
} else {
context->incrementQuerySubstep();

// just to concat all the samples
std::vector<ral::frame::BlazingTableView> sampledTableViews;
for (std::size_t i = 0; i < inputSamples.size(); i++){
sampledTableViews.push_back(inputSamples[i]->toBlazingTableView());
}
auto concatSamples = ral::utilities::concatTables(sampledTableViews);
auto concatSamples = ral::utilities::concatTables(std::move(inputSamples));
concatSamples->ensureOwnership();

ral::cache::MetadataDictionary extra_metadata;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ComputeWindowKernel : public kernel {
std::unique_ptr<CudfColumn> compute_column_from_window_function(
cudf::table_view input_cudf_view,
cudf::column_view input_col_view,
std::size_t pos, int & agg_param_count);
std::size_t pos);

std::string kernel_name() { return "ComputeWindow";}

Expand All @@ -43,7 +43,7 @@ class ComputeWindowKernel : public kernel {
std::vector<int> column_indices_partitioned; // column indices to be partitioned: [1]
std::vector<int> column_indices_ordered; // column indices to be ordered: [3]
std::vector<int> column_indices_to_agg; // column indices to be agg: [0, 0]
std::vector<int> agg_param_values; // due to LAG or LEAD: [5]
std::vector<int> agg_param_values; // due to LAG or LEAD: [0, 5]
int preceding_value; // X PRECEDING
int following_value; // Y FOLLOWING
std::string frame_type; // ROWS or RANGE
Expand All @@ -68,6 +68,7 @@ const std::string TASK_ARG_OVERLAP_SIZE="overlap_size";
const std::string TASK_ARG_SOURCE_BATCH_INDEX="source_batch_index";
const std::string TASK_ARG_TARGET_BATCH_INDEX="target_batch_index";
const std::string TASK_ARG_TARGET_NODE_INDEX="target_node_index";
const std::string TASK_ARG_BATCH_INDEX="batch_index";

const std::string PRECEDING_OVERLAP_TYPE="preceding";
const std::string FOLLOWING_OVERLAP_TYPE="following";
Expand All @@ -76,10 +77,12 @@ const std::string PRECEDING_REQUEST="preceding_request";
const std::string FOLLOWING_REQUEST="following_request";
const std::string PRECEDING_RESPONSE="preceding_response";
const std::string FOLLOWING_RESPONSE="following_response";
const std::string MERGE_EXPRESSIONS="merge_expressions";
const std::string MERGE_AGGREGATION="merge_aggregation";


/**
* The OverlapGeneratorKernel is only used for window functions that have no partition by clause and that also have bounded window frames.
* @brief The OverlapGeneratorKernel is only used for window functions that have no partition by clause and that also have bounded window frames.
* The OverlapGeneratorKernel assumes that it will be following by OverlapAccumulatorKernel and has three output caches:
* - "batches"
* - "preceding_overlaps"
Expand Down Expand Up @@ -118,7 +121,7 @@ class OverlapGeneratorKernel : public kernel {


/**
* The OverlapAccumulatorKernel assumes three input caches:
* @brief The OverlapAccumulatorKernel assumes three input caches:
* - "batches"
* - "preceding_overlaps"
* - "following_overlaps"
Expand Down Expand Up @@ -190,6 +193,101 @@ class OverlapAccumulatorKernel : public distributing_kernel {
};



/**
* @brief The ComputeWindowKernelUnbounded is only used for window functions that have no partition by clause and that also have unbounded window frames.
* The ComputeWindowKernelUnbounded assumes that it will be followed by WindowAggMergerKernel and has two output caches:
* - "batches"
* - "cumulative_aggregations"
*
* Its purpose is to take is to compute the main Window Function (ROW_NUMBER, LAG, LEAD, MIN, ...) to each batch
* which has already pattitioned and sorted, much like ComputeWindowKernel. Additionally it will calculate partial aggregation values and send them all to the
* mater node. The master node will accumulate the partial aggregation values and calculate cumulative aggregation values for the whole cluster.
* Then the master node will send back out to all other nodes the cumulative aggregation values that correspond to each node and add them to the cumulative_aggregations CacheMachine.
*/

class ComputeWindowKernelUnbounded : public distributing_kernel {
public:
ComputeWindowKernelUnbounded(std::size_t kernel_id, const std::string & queryString,
std::shared_ptr<Context> context,
std::shared_ptr<ral::cache::graph> query_graph);

std::unique_ptr<CudfColumn> compute_column_from_window_function(
cudf::table_view input_cudf_view,
cudf::column_view input_col_view,
std::size_t pos);

std::string kernel_name() { return "ComputeWindowUnbounded";}

ral::execution::task_result do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

kstatus run() override;

private:
// ComputeWindowKernelUnbounded(min_keys=[MIN($0) OVER (ORDER BY $3 DESC)], lag_col=[MAX($0) OVER (PARTITION BY $1)], n_name=[$2])
std::vector<int> column_indices_partitioned; // column indices to be partitioned: [1]
std::vector<int> column_indices_ordered; // column indices to be ordered: [3]
std::vector<int> column_indices_to_agg; // column indices to be agg: [0, 0]
std::vector<int> agg_param_values; // due to LAG or LEAD: [0, 0]
int preceding_value; // X PRECEDING
int following_value; // Y FOLLOWING
std::string frame_type; // ROWS or RANGE
std::vector<std::string> type_aggs_as_str; // ["MIN", "MAX"]
std::vector<AggregateKind> aggs_wind_func; // [AggregateKind::MIN, AggregateKind::MAX]
bool remove_overlap; // If we need to remove the overlaps after computing the windows
std::vector<std::unique_ptr<ral::frame::BlazingTable>> partial_aggregations; // container to hold single row tables of the partial_aggregation values for each batch
};



/**
* @brief The WindowAggMergerKernel is only used for window functions that have no partition by clause and that also have unbounded window frames.
* The WindowAggMergerKernel assumes that it was preceded by ComputeWindowKernelUnbounded and has two input caches:
* - "batches"
* - "cumulative_aggregations"
*
* Its purpose is to take the cumulative_aggregations and merge them with each corresponding batch.
*/
class WindowAggMergerKernel : public kernel {
public:
WindowAggMergerKernel(std::size_t kernel_id, const std::string & queryString,
std::shared_ptr<Context> context,
std::shared_ptr<ral::cache::graph> query_graph);

std::string kernel_name() { return "WindowAggMerger";}

ral::execution::task_result do_process(std::vector< std::unique_ptr<ral::frame::BlazingTable> > inputs,
std::shared_ptr<ral::cache::CacheMachine> output,
cudaStream_t stream, const std::map<std::string, std::string>& args) override;

kstatus run() override;

private:

std::vector<std::vector<std::string>> get_merge_expressions_per_batch(
const std::vector<std::vector<std::string>> & cumulative_aggregations_str);

std::vector<std::vector<std::string>> get_cumulative_aggregations_as_strings(cudf::table_view cumulative_aggregations_view);

// WindowAggMergerKernel(min_keys=[MIN($0) OVER (ORDER BY $3 DESC)], lag_col=[MAX($0) OVER (PARTITION BY $1)], n_name=[$2])
std::vector<int> column_indices_partitioned; // column indices to be partitioned: [1]
std::vector<int> column_indices_ordered; // column indices to be ordered: [3]
std::vector<int> column_indices_to_agg; // column indices to be agg: [0, 0]
std::vector<int> agg_param_values; // due to LAG or LEAD: [0, 0]
int preceding_value; // X PRECEDING
int following_value; // Y FOLLOWING
std::string frame_type; // ROWS or RANGE
std::vector<std::string> type_aggs_as_str; // ["MIN", "MAX"]
std::vector<AggregateKind> aggs_wind_func; // [AggregateKind::MIN, AggregateKind::MAX]

std::vector<std::vector<std::string>> merge_expressions_per_batch;

};



} // namespace batch
} // namespace ral

12 changes: 5 additions & 7 deletions engine/src/execution_graph/logic_controllers/CacheData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,13 @@ std::unique_ptr<ral::frame::BlazingTable> ConcatCacheData::decache() {
return _cache_datas[0]->decache();
}

std::vector<std::unique_ptr<ral::frame::BlazingTable>> tables_holder;
std::vector<ral::frame::BlazingTableView> table_views;
std::vector<std::unique_ptr<ral::frame::BlazingTable>> tables;
for (auto && cache_data : _cache_datas){
tables_holder.push_back(cache_data->decache());
table_views.push_back(tables_holder.back()->toBlazingTableView());

RAL_EXPECTS(!ral::utilities::checkIfConcatenatingStringsWillOverflow(table_views), "Concatenating tables will overflow");
tables.push_back(cache_data->decache());

RAL_EXPECTS(!ral::utilities::checkIfConcatenatingStringsWillOverflow(tables), "Concatenating tables will overflow");
}
return ral::utilities::concatTables(table_views);
return ral::utilities::concatTables(std::move(tables));
}

size_t ConcatCacheData::sizeInBytes() const {
Expand Down
18 changes: 7 additions & 11 deletions engine/src/execution_graph/logic_controllers/CacheMachine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,21 +782,17 @@ std::unique_ptr<ral::frame::BlazingTable> ConcatenatingCacheMachine::pullFromCac
output = data->decache();
num_rows = output->num_rows();
} else {
std::vector<std::unique_ptr<ral::frame::BlazingTable>> tables_holder;
std::vector<ral::frame::BlazingTableView> table_views;
std::vector<std::unique_ptr<ral::frame::BlazingTable>> tables;
for (size_t i = 0; i < collected_messages.size(); i++){
CodeTimer cacheEventTimer;
cacheEventTimer.start();

auto data = collected_messages[i]->release_data();
tables_holder.push_back(data->decache());
table_views.push_back(tables_holder[i]->toBlazingTableView());

tables.push_back(data->decache());
// if we dont have to concatenate all, lets make sure we are not overflowing, and if we are, lets put one back
if (!concat_all && ral::utilities::checkIfConcatenatingStringsWillOverflow(table_views)){
auto cache_data = std::make_unique<GPUCacheData>(std::move(tables_holder.back()));
tables_holder.pop_back();
table_views.pop_back();
if (!concat_all && ral::utilities::checkIfConcatenatingStringsWillOverflow(tables)){
auto cache_data = std::make_unique<GPUCacheData>(std::move(tables.back()));
tables.pop_back();
collected_messages[i] = std::make_unique<message>(std::move(cache_data), collected_messages[i]->get_message_id());
for (; i < collected_messages.size(); i++){
this->waitingCache->put(std::move(collected_messages[i]));
Expand All @@ -821,7 +817,7 @@ std::unique_ptr<ral::frame::BlazingTable> ConcatenatingCacheMachine::pullFromCac
}
}

if( concat_all && ral::utilities::checkIfConcatenatingStringsWillOverflow(table_views) ) { // if we have to concatenate all, then lets throw a warning if it will overflow strings
if( concat_all && ral::utilities::checkIfConcatenatingStringsWillOverflow(tables) ) { // if we have to concatenate all, then lets throw a warning if it will overflow strings
CodeTimer cacheEventTimer;
cacheEventTimer.start();
cacheEventTimer.stop();
Expand All @@ -839,7 +835,7 @@ std::unique_ptr<ral::frame::BlazingTable> ConcatenatingCacheMachine::pullFromCac
"description"_a="In ConcatenatingCacheMachine::pullFromCache Concatenating will overflow strings length");
}
}
output = ral::utilities::concatTables(table_views);
output = ral::utilities::concatTables(std::move(tables));
num_rows = output->num_rows();
}

Expand Down
20 changes: 10 additions & 10 deletions engine/src/execution_graph/logic_controllers/LogicalProject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,11 +787,7 @@ std::vector<std::unique_ptr<ral::frame::BlazingColumn>> evaluate_expressions(
const std::vector<std::string> & expressions) {
using interops::column_index_type;

// Let's clean all the expressions that contains Window functions (if exists)
// as they should be updated with new indices
std::vector<std::string> new_expressions = clean_window_function_expressions(expressions, table.num_columns());

std::vector<std::unique_ptr<ral::frame::BlazingColumn>> out_columns(new_expressions.size());
std::vector<std::unique_ptr<ral::frame::BlazingColumn>> out_columns(expressions.size());

std::vector<bool> column_used(table.num_columns(), false);
std::vector<std::pair<int, int>> out_idx_computed_idx_pair;
Expand All @@ -800,8 +796,8 @@ std::vector<std::unique_ptr<ral::frame::BlazingColumn>> evaluate_expressions(
std::vector<cudf::mutable_column_view> interpreter_out_column_views;

function_evaluator_transformer evaluator{table};
for(size_t i = 0; i < new_expressions.size(); i++){
std::string expression = replace_calcite_regex(new_expressions[i]);
for(size_t i = 0; i < expressions.size(); i++){
std::string expression = replace_calcite_regex(expressions[i]);
expression = expand_if_logical_op(expression);

parser::parse_tree tree;
Expand Down Expand Up @@ -904,9 +900,9 @@ std::vector<std::unique_ptr<ral::frame::BlazingColumn>> evaluate_expressions(
out_columns.clear();
computed_columns.clear();

size_t const half_size = new_expressions.size() / 2;
std::vector<std::string> split_lo(new_expressions.begin(), new_expressions.begin() + half_size);
std::vector<std::string> split_hi(new_expressions.begin() + half_size, new_expressions.end());
size_t const half_size = expressions.size() / 2;
std::vector<std::string> split_lo(expressions.begin(), expressions.begin() + half_size);
std::vector<std::string> split_hi(expressions.begin() + half_size, expressions.end());
auto out_cols_lo = evaluate_expressions(table, split_lo);
auto out_cols_hi = evaluate_expressions(table, split_hi);

Expand Down Expand Up @@ -955,6 +951,10 @@ std::unique_ptr<ral::frame::BlazingTable> process_project(
out_column_names[i] = name;
}

// Let's clean all the expressions that contains Window functions (if exists)
// as they should be updated with new indices
expressions = clean_window_function_expressions(expressions, blazing_table_in->num_columns());

return std::make_unique<ral::frame::BlazingTable>(evaluate_expressions(blazing_table_in->view(), expressions), out_column_names);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,16 @@ struct tree_processor {
}
}
else if (is_project(expr) && is_window_function(expr) && first_windowed_call) {

// Calcite for some reason makes double UNBOUNDED windows always be set as RANGE. If we treat them as ROWS its easier and equivalent
StringUtil::findAndReplaceAll(expr, "RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", "ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING");

if (window_expression_contains_multiple_diff_over_clauses(expr)) {
throw std::runtime_error("In Window Function: multiple WINDOW FUNCTIONs with different OVER clauses are not supported currently");
throw std::runtime_error("In Window Function: multiple WINDOW FUNCTIONs with different OVER clauses are not supported currently. Expression found is: " + expr);
}

if (window_expression_contains_bounds_by_range(expr)) {
throw std::runtime_error("In Window Function: RANGE is not currently supported");
throw std::runtime_error("In Window Function: RANGE is not currently supported. Expression found is: " + expr);
}

if (window_expression_contains_partition_by(expr)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ std::shared_ptr<ral::cache::CacheMachine> kernel::output_cache(std::string cache
return this->output_.get_cache(cache_id);
}

std::shared_ptr<ral::cache::CacheMachine> kernel::input_cache() {
auto kernel_id = std::to_string(this->get_id());
return this->input_.get_cache(kernel_id);
std::shared_ptr<ral::cache::CacheMachine> kernel::input_cache(std::string cache_id) {
cache_id = cache_id.empty() ? std::to_string(this->get_id()) : cache_id;
return this->input_.get_cache(cache_id);
}

bool kernel::add_to_output_cache(std::unique_ptr<ral::frame::BlazingTable> table, std::string cache_id, bool always_add) {
Expand Down
17 changes: 10 additions & 7 deletions engine/src/execution_graph/logic_controllers/taskflow/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace ral {
namespace cache {
class kernel;
class graph;
using kernel_pair = std::pair<kernel *, std::string>;

/**
* @brief This interface represents a computation unit in the execution graph.
Expand Down Expand Up @@ -72,8 +71,6 @@ class kernel {
virtual kstatus run() = 0;


kernel_pair operator[](const std::string & portname) { return std::make_pair(this, portname); }

/**
* @brief Returns the kernel identifier.
*
Expand All @@ -96,14 +93,20 @@ class kernel {
void set_type_id(kernel_type kernel_type_id_) { kernel_type_id = kernel_type_id_; }

/**
* @brief Returns the input cache.
* @brief Returns the input cache associated to an identifier. If none is provided it returns the default
*
* @param cache_id The cache identifier.
*
* @return A shared pointer to the desired CacheMachine
*/
std::shared_ptr<ral::cache::CacheMachine> input_cache();
std::shared_ptr<ral::cache::CacheMachine> input_cache(std::string cache_id = "");

/**
* @brief Returns the output cache associated to an identifier.
* @brief Returns the output cache associated to an identifier. If none is provided it returns the default
*
* @param cache_id The cache identifier.
*
* @return cache_id The identifier of the output cache.
* @return A shared pointer to the desired CacheMachine
*/
std::shared_ptr<ral::cache::CacheMachine> output_cache(std::string cache_id = "");

Expand Down
2 changes: 0 additions & 2 deletions engine/src/execution_graph/logic_controllers/taskflow/kpair.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ struct cache_settings {
bool is_array_access = false; // is it a cache designated for array access
};

using kernel_pair = std::pair<kernel *, std::string>;

/**
@brief A temporary object to represent a pair of two kernels linked into the execution graph.
*/
Expand Down
Loading