autoOptimizer::OptimizeProjectionFilterWindowAsTopGroupN(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef { std::vector<AbstractPlanNodeRef> optimized_children{}; for (constauto &child : plan->children_) { optimized_children.emplace_back(OptimizeProjectionFilterWindowAsTopGroupN(child)); } auto optimized_plan = plan->CloneWithChildren(std::move(optimized_children));
// First node is Projection. if (optimized_plan->GetType() == PlanType::Projection) { constauto &projection_plan = dynamic_cast<const ProjectionPlanNode &>(*optimized_plan); auto &first_child = projection_plan.GetChildren().at(0); // Second node is filter. if (first_child->GetType() == PlanType::Filter) { constauto &filter_child = dynamic_cast<const FilterPlanNode &>(*first_child); // auto filter_logic_expr = dynamic_cast<const LogicExpression&>(filter_child.predicate_); auto &second_child = filter_child.GetChildren().at(0); // Third node is window. if (second_child->GetType() == PlanType::Window) { constauto &window_child = dynamic_cast<const WindowFunctionPlanNode &>(*second_child); // Try to find Rank's col index. int rank_idx = -1; std::string rank_name; for (size_t col = 0; col < window_child.columns_.size(); ++col) { if (window_child.window_functions_.find(rank_idx) != window_child.window_functions_.end()) { // Is a window function if (window_child.window_functions_.at(rank_idx).type_ == WindowFunctionType::Rank) { rank_idx = col; rank_name = window_child.OutputSchema().GetColumn(rank_idx).GetName(); goto OptimizeProjectionFilterWindowAsTopGroupN_END;; } } } // If we find Rank, we can continue to check if it stays after projection. if (rank_idx != -1) { constauto &projection_schema = projection_plan.OutputSchema(); for (auto &column : projection_schema.GetColumns()) { if (column.GetName() == rank_name) { // Find the rank line goto OptimizeProjectionFilterWindowAsTopGroupN_END; } } } // Projection removed RANK, we can optimize it to TopGroupN node. // Find group_bys const std::vector<AbstractExpressionRef> &group_by = window_child.window_functions_.begin()->second.partition_by_; // Find order by const std::vector<std::pair<OrderByType, AbstractExpressionRef>> &order_by = window_child.window_functions_.begin()->second.order_by_; // Find top n; auto value_expr = dynamic_cast<const ConstantValueExpression &>(*filter_child.predicate_->GetChildAt(1)); // auto value_expr = dynamic_cast<const ConstantValueExpression &>(*maybe_value_expr); int n = value_expr.val_.CastAs(TypeId::INTEGER).GetAs<int>(); return std::make_shared<TopNPerGroupPlanNode>(projection_plan.output_schema_, window_child.children_.at(0), group_by, order_by, n); } } }
// Use a priority queue to store the top N tuples for each group. Tuple tuple; RID rid; while (child_executor_->Next(&tuple, &rid)) { // nlogn AggregateKey group_by_key = ConstructGroupByTuple(tuple, group_by); auto it = partition_top_n_.find(group_by_key); if (partition_top_n_.find(group_by_key) == partition_top_n_.end()) { it = partition_top_n_.emplace(group_by_key, TopNTuplePriorityQueue(top_n, comparator)).first; } it->second.Push(tuple); }
iter_ = partition_top_n_.begin(); }
autoTopNPerGroupExecutor::Next(Tuple *tuple, RID *rid) -> bool{ if (iter_ != partition_top_n_.end()) { if (iter_->second.Empty()) { // Move to the next pq, and erase current pq. iter_ = partition_top_n_.erase(iter_); returnNext(tuple, rid); } *tuple = iter_->second.Pop(); *rid = tuple->GetRid(); returntrue; } returnfalse; }
voidMaintainSize(){ while (pq_.size() > n_) { // If removing top elements will lead to underflow, break; if (pq_.size() - freq_map_[pq_.top()] < n_) { break; } // Remove top tuples. while (freq_map_[pq_.top()] > 1) { freq_map_[pq_.top()] -= 1; pq_.pop(); } freq_map_.erase(pq_.top()); pq_.pop(); } } };
voidMaintainSize(){ while (pq_.size() > n_) { // If removing top elements will lead to underflow, break; uint32_t top_tuple_freq = freq_map_[pq_.top()]; if (pq_.size() - top_tuple_freq < n_) { break; } // Remove top tuples. freq_map_.erase(pq_.top()); for (size_t i = 0; i < top_tuple_freq; ++i) { pq_.pop(); } } }
CREATETABLE t4(x int, y int); CREATETABLE t5(x int, y int); CREATETABLE t6(x int, y int);
SELECT*FROM t4, t5, t6 WHERE (t4.x = t5.x) AND (t5.y = t6.y) AND (t4.y >=1000000) AND (t4.y <1500000) AND (t6.x >=100000) AND (t6.x <150000);
可见他其实想吧t4、t5 join on x, t5、t6 join on y, 但是并没有人为地分步合并,而是一股脑地把条件全放在了where后面。如果没有优化,就导致该query执行的时候会把t4、t5、t6做两个笛卡尔积(Cartesian product,即无条件的join)生产一个巨大的table,然后遍历这个巨大的table进行过滤。
autoOptimizer::OptimizePredicatePushdown(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef { std::vector<AbstractPlanNodeRef> optimized_children{}; for (constauto &child : plan->children_) { optimized_children.emplace_back(OptimizePredicatePushdown(child)); } auto optimized_plan = plan->CloneWithChildren(std::move(optimized_children));
// Plan Should be a filter node. if (optimized_plan->GetType() == PlanType::Filter) { auto filter_plan = dynamic_cast<FilterPlanNode *>(optimized_plan.get()); std::vector<std::shared_ptr<ComparisonExpression>> pushdowns; filter_plan->predicate_ = ParsePredicate(filter_plan->predicate_, filter_plan->OutputSchema(), pushdowns); // No predicate could be pushed down, simply return. if (pushdowns.empty()) { return optimized_plan; } // All predicate could be pushed down, remove the filter node. if (filter_plan->predicate_ == nullptr) { returnPushdown(filter_plan->children_[0], pushdowns); } // Part of predicate could be pushed down. return filter_plan->CloneWithChildren({Pushdown(filter_plan->children_[0], pushdowns)}); }
autoOptimizer::Pushdown(const AbstractPlanNodeRef &plan, std::vector<std::shared_ptr<ComparisonExpression>> &pushdowns) -> AbstractPlanNodeRef { if (pushdowns.empty()) { return plan; } // Check if plan is Filter. switch (plan->GetType()) { case PlanType::NestedLoopJoin: { TODO: ... ... return optimized_plan; }
// Skip these nodes. case PlanType::Filter: case PlanType::Limit: case PlanType::Sort: { return plan->CloneWithChildren(std::vector<AbstractPlanNodeRef>{Pushdown(plan->GetChildAt(0), pushdowns)}); }
// Block pushdown, generate a filter plan above it. case PlanType::SeqScan: case PlanType::MockScan: case PlanType::Aggregation: case PlanType::Window: case PlanType::Delete: case PlanType::Update: case PlanType::Insert: case PlanType::Projection: case PlanType::InitCheck: { return std::make_shared<FilterPlanNode>(plan->output_schema_, ConcatencateComparisonAnd(pushdowns), plan); }
// Advanced planNode, they should be generated latter. case PlanType::TopNPerGroup: case PlanType::TopN: case PlanType::HashJoin: case PlanType::NestedIndexJoin: { UNREACHABLE("This kind of node should not appear in pushdown. They should be made after pushdown."); }
explain SELECT v, d1, d2 FROM ( SELECT v, MAX(v1) AS d1, MIN(v1), MAX(v2), MIN(v2), MAX(v1) +MIN(v1), MAX(v2) +MIN(v2), MAX(v1) +MAX(v1) +MAX(v2) AS d2 FROM t7 LEFTJOIN (SELECT v4 FROM t8 WHERE1==2) ON v < v4 GROUPBY v );
autoOptimizer::OptimizeConstantFolding(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef { std::vector<AbstractPlanNodeRef> optimized_children{}; for (constauto &child : plan->children_) { optimized_children.emplace_back(OptimizeConstantFolding(child)); } auto optimized_plan = plan->CloneWithChildren(std::move(optimized_children));
// For filter, folding its predicate. if (optimized_plan->GetType() == PlanType::Filter) { auto filter_plan = dynamic_cast<FilterPlanNode *>(optimized_plan.get()); auto new_predicate = FoldingPredicate(filter_plan->predicate_); // If the new_predicate is constant, // true, return its child; false, return an empty value node. if (auto new_predicate_const = dynamic_cast<const ConstantValueExpression *>(new_predicate.get())) { if (new_predicate_const->Evaluate({}, Schema({})).GetAs<bool>()) { return plan->children_[0]; } return std::make_shared<ValuesPlanNode>(plan->output_schema_, std::vector<std::vector<AbstractExpressionRef>>{}); } // new_predicate is not a constant, return a new filter. return std::make_shared<FilterPlanNode>(filter_plan->output_schema_, new_predicate, filter_plan->children_[0]); } return optimized_plan; }
if (optimized_plan->GetType() == PlanType::Projection) { auto projection_plan = dynamic_cast<const ProjectionPlanNode *>(optimized_plan.get()); auto child_plan = projection_plan->GetChildAt(0);
// Pruning when child is Projection. if (child_plan->GetType() == PlanType::Projection) { // Collect used cols. std::vector<uint32_t> used_col_idxs; used_col_idxs.reserve(optimized_plan->output_schema_->GetColumnCount()); for (constauto &expr : projection_plan->expressions_) { CollectUsedColumnIdx(expr, used_col_idxs); } std::sort(used_col_idxs.begin(), used_col_idxs.end()); // Prune child. auto child_projection_plan = dynamic_cast<const ProjectionPlanNode *>(child_plan.get()); std::vector<AbstractExpressionRef> pruned_exprs; std::vector<Column> pruned_columns; pruned_exprs.reserve(used_col_idxs.size()); pruned_columns.reserve(used_col_idxs.size()); for (uint32_t idx : used_col_idxs) { pruned_exprs.push_back(child_projection_plan->expressions_[idx]); pruned_columns.push_back(child_projection_plan->output_schema_->GetColumn(idx)); } // Replace parent by its optimized child. auto optimized_child_plan = std::make_shared<ProjectionPlanNode>(std::make_shared<Schema>(pruned_columns), pruned_exprs, child_plan->children_[0]); returnOptimizeColumnPruning(optimized_child_plan); }
// Pruning when child is Aggregation. if (child_plan->GetType() == PlanType::Aggregation) { // Collect used cols. std::vector<uint32_t> used_col_idxs; used_col_idxs.reserve(optimized_plan->output_schema_->GetColumnCount()); for (constauto &expr : projection_plan->expressions_) { CollectUsedColumnIdx(expr, used_col_idxs); } std::sort(used_col_idxs.begin(), used_col_idxs.end()); // Prune child. auto child_aggregation_plan = dynamic_cast<const AggregationPlanNode *>(child_plan.get()); size_t group_col_length = child_aggregation_plan->group_bys_.size(); std::vector<AbstractExpressionRef> pruned_aggregates; std::vector<AggregationType> pruned_agg_types; std::vector<Column> pruned_columns; pruned_aggregates.reserve(used_col_idxs.size()); pruned_agg_types.reserve(used_col_idxs.size()); pruned_columns.reserve(used_col_idxs.size()); for (size_t i = 0; i < group_col_length; ++i) { pruned_columns.push_back(child_aggregation_plan->output_schema_->GetColumn(i)); } for (uint32_t idx : used_col_idxs) { // Maybe optimized to binary, upper_bound. if (idx >= group_col_length) { pruned_aggregates.push_back(child_aggregation_plan->aggregates_[idx - group_col_length]); pruned_agg_types.push_back(child_aggregation_plan->agg_types_[idx - group_col_length]); pruned_columns.push_back(child_aggregation_plan->output_schema_->GetColumn(idx)); } } // Make new optimized node child. auto optimized_aggr = std::make_shared<AggregationPlanNode>( std::make_shared<Schema>(pruned_columns), child_aggregation_plan->children_[0], child_aggregation_plan->group_bys_, pruned_aggregates, pruned_agg_types); // Modified parent node schema and expr. std::vector<AbstractExpressionRef> pruned_exprs; pruned_exprs.reserve(used_col_idxs.size()); for (constauto &expr : projection_plan->expressions_) { pruned_exprs.push_back(PrunedProjectionExpression(expr, used_col_idxs)); } optimized_plan = std::make_shared<ProjectionPlanNode>(projection_plan->output_schema_, pruned_exprs, optimized_aggr); return optimized_plan->CloneWithChildren({OptimizeColumnPruning(optimized_plan->children_[0])}); } }
if (optimized_plan->GetType() == PlanType::Projection) { auto projection_plan = dynamic_cast<const ProjectionPlanNode *>(optimized_plan.get()); auto child_plan = projection_plan->GetChildAt(0);
// Pruning when child is Projection. if (child_plan->GetType() == PlanType::Projection) { // Collect used cols. std::vector<uint32_t> used_col_idxs; used_col_idxs.reserve(optimized_plan->output_schema_->GetColumnCount()); for (constauto &expr : projection_plan->expressions_) { CollectUsedColumnIdx(expr, used_col_idxs); } std::sort(used_col_idxs.begin(), used_col_idxs.end()); // Prune child. auto child_projection_plan = dynamic_cast<const ProjectionPlanNode *>(child_plan.get()); std::vector<AbstractExpressionRef> pruned_exprs; std::vector<Column> pruned_columns; pruned_exprs.reserve(used_col_idxs.size()); pruned_columns.reserve(used_col_idxs.size()); for (uint32_t idx : used_col_idxs) { pruned_exprs.push_back(child_projection_plan->expressions_[idx]); pruned_columns.push_back(child_projection_plan->output_schema_->GetColumn(idx)); } // Replace parent by its optimized child. auto optimized_child_plan = std::make_shared<ProjectionPlanNode>(std::make_shared<Schema>(pruned_columns), pruned_exprs, child_plan->children_[0]); returnOptimizeColumnPruning(optimized_child_plan); }
// Pruning when child is Aggregation. if (child_plan->GetType() == PlanType::Aggregation) { auto child_aggregation_plan = dynamic_cast<const AggregationPlanNode *>(child_plan.get());
// Collect used cols. std::vector<uint32_t> used_col_idxs; used_col_idxs.reserve(optimized_plan->output_schema_->GetColumnCount()); size_t group_cols = child_aggregation_plan->GetGroupBys().size(); for (size_t i = 0; i < group_cols; ++i) { // Must use group by columns. used_col_idxs.push_back(i); } for (constauto &expr : projection_plan->expressions_) { CollectUsedColumnIdx(expr, used_col_idxs); } std::sort(used_col_idxs.begin(), used_col_idxs.end());
for (size_t i = 0; i < group_col_length; ++i) { pruned_columns.push_back(child_aggregation_plan->output_schema_->GetColumn(i)); } for (uint32_t idx : used_col_idxs) { // Maybe optimized to binary, upper_bound. if (idx >= group_col_length) { // idx >= group_col_length means it's aggr. auto aggr_expr = child_aggregation_plan->aggregates_[idx - group_col_length]; auto aggr_type = child_aggregation_plan->agg_types_[idx - group_col_length]; std::string aggr_pair = aggr_expr->ToString() + std::to_string(static_cast<int>(aggr_type)); if (exist_expr.count(aggr_pair) == 0) { // Not exist yet, add to pruned lists. pruned_aggregates.push_back(aggr_expr); pruned_agg_types.push_back(aggr_type); pruned_columns.push_back(child_aggregation_plan->output_schema_->GetColumn(idx)); exist_expr[aggr_pair] = pruned_columns.size() - 1; } else { // Has existed, add to re_direct map. re_direct[idx] = exist_expr.find(aggr_pair)->second; } } }
// Make new optimized node child. auto optimized_aggr = std::make_shared<AggregationPlanNode>( std::make_shared<Schema>(pruned_columns), child_aggregation_plan->children_[0], child_aggregation_plan->group_bys_, pruned_aggregates, pruned_agg_types);
autoOptimizer::RearrangeColIdxs(std::unordered_map<uint32_t, uint32_t> &re_direct, std::vector<uint32_t> &used_col_idxs) -> std::vector<int> { // Initialize the result map and the del array int max_col_idx = static_cast<int>(used_col_idxs.back()); std::vector<int> col_idx_map(max_col_idx + 1, -1); std::vector<int> del(max_col_idx + 2, 0);
// Cache the result of std::lower_bound for each i std::vector<uint32_t> idx_map(max_col_idx + 1, used_col_idxs.size()); for (int i = 0; i <= max_col_idx; ++i) { idx_map[i] = std::find(used_col_idxs.begin(), used_col_idxs.end(), i) - used_col_idxs.begin(); }
// First pass: Fill col_idx_map and update del for (int i = 0; i <= max_col_idx; ++i) { // 11 uint32_t idx = idx_map[i]; if (idx < used_col_idxs.size()) { if (re_direct.count(i)) { col_idx_map[i] = re_direct[i]; del[idx + 1] -= 1; } else { // When it's not in re_direct. col_idx_map[i] = idx; } } }
// Compute cumulative del values for (size_t i = 1; i < del.size(); ++i) { del[i] += del[i - 1]; }
// Final adjustment for col_idx_map for (int i = 0; i <= max_col_idx; ++i) { uint32_t idx = idx_map[i]; if (col_idx_map[i] != -1 && !re_direct.count(idx)) { col_idx_map[i] += del[col_idx_map[i]]; } }