最近研究了一點ch的代碼。
發現一個很有意思的詞,mutation。
google這個詞有突變的意思,但更多的相關文章翻譯這個為"訂正"。
上一篇文章分析了background_pool_size參數。
這個參數和後臺異步工作線程池merge工作有關。
ClickHouse內核中異步merge、mutation工作由統一的工作線程池來完成,這個線程池的大小用戶可以通過參數background_pool_size進行設置。線程池中的線程Task總體邏輯如下,可以看出這個異步Task主要做三塊工作:清理殘留文件,merge Data Parts 和 mutate Data Part。
其實在20.12版本,clickhouse把後臺的merge\ttl\mutation都抽象成了job。
ClickHouse內核中的MergeTree存儲一旦生成一個Data Part,這個Data Part就不可再更改了。所以從MergeTree存儲內核層面,ClickHouse就不擅長做數據更新刪除操作。但是絕大部分用戶場景中,難免會出現需要手動訂正、修複數據的場景。所以ClickHouse為用戶設計了一套離線異步機制來支持低頻的Mutation(改、刪)操作。
Mutation命令執行ALTER TABLE [db.]table DELETE WHERE filter_expr;
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
ClickHouse的方言把Delete和Update操作也加入到了Alter Table的範疇中,它並不支持裸的Delete或者Update操作。當用戶執行一個如上的Mutation操作獲得返回時,ClickHouse內核其實只做了兩件事情:
檢查Mutation操作是否合法;
保存Mutation命令到存儲文件中,喚醒一個異步處理merge和mutation的工作線程;
兩者的主體邏輯分別在MutationsInterpreter::validate函數和StorageMergeTree::mutate函數中。
總結一下:什麼操作會觸發mutation呢?
答案:alter (alter update 或 alter delete)
我們看看這個後臺異步的線程任務調度是怎麼玩兒的:
BlockIO InterpreterAlterQuery::execute()
{
BlockIO res;
const auto & alter = query_ptr->as<ASTAlterQuery &>();
...
if (!mutation_commands.empty())
{
//看這裡!!
MutationsInterpreter(table, metadata_snapshot, mutation_commands, context, false).validate();
table->mutate(mutation_commands, context);
}
startMutation
Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String & mutation_file_name)
{
/// Choose any disk, because when we load mutations we search them at each disk
/// where storage can be placed. See loadMutations().
auto disk = getStoragePolicy()->getAnyDisk();
Int64 version;
{
std::lock_guard lock(currently_processing_in_background_mutex);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get());
version = increment.get();
entry.commit(version);
mutation_file_name = entry.file_name;
auto insertion = current_mutations_by_id.emplace(mutation_file_name, std::move(entry));
current_mutations_by_version.emplace(version, insertion.first->second);
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
}
//觸發異步任務
background_executor.triggerTask();
return version;
}
異步任務執行
void IBackgroundJobExecutor::jobExecutingTask()
try
{
auto job_and_pool = getBackgroundJob();
if (job_and_pool) /// If we have job, then try to assign into background pool
{
auto & pool_config = pools_configs[job_and_pool->pool_type];
/// If corresponding pool is not full increment metric and assign new job
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], pool_config.max_pool_size))
{
try /// this try required because we have to manually decrement metric
{
pools[job_and_pool->pool_type].scheduleOrThrowOnError([this, pool_config, job{std::move(job_and_pool->job)}] ()
{
try /// We don't want exceptions in background pool
{
job();
/// Job done, decrement metric and reset no_work counter
CurrentMetrics::values[pool_config.tasks_metric]--;
/// Job done, new empty space in pool, schedule background task
runTaskWithoutDelay();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
CurrentMetrics::values[pool_config.tasks_metric]--;
scheduleTask(/* with_backoff = */ true);
}
});
/// We've scheduled task in the background pool and when it will finish we will be triggered again. But this task can be
/// extremely long and we may have a lot of other small tasks to do, so we schedule ourselves here.
runTaskWithoutDelay();
}
catch (...)
{
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
tryLogCurrentException(__PRETTY_FUNCTION__);
CurrentMetrics::values[pool_config.tasks_metric]--;
scheduleTask(/* with_backoff = */ true);
}
}
else /// Pool is full and we have some work to do
{
scheduleTask(/* with_backoff = */ false);
}
}
else /// Nothing to do, no jobs
{
scheduleTask(/* with_backoff = */ true);
}
}
可以看到異步任務線程池中的任務執行已經抽象成了job,從後臺中load出job進而調度執行。
那麼,這些job都是什麼呢?接著看:
std::optional<JobAndPool> StorageMergeTree::getDataProcessingJob()
{
if (shutdown_called)
return {};
if (merger_mutator.merges_blocker.isCancelled())
return {};
auto metadata_snapshot = getInMemoryMetadataPtr();
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock);
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
if (merge_entry || mutate_entry)
{
return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, *merge_entry, share_lock);
else if (mutate_entry)
mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
}, PoolType::MERGE_MUTATE};
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
return JobAndPool{[this, share_lock] ()
{
/// All use relative_data_path which changes during rename
/// so execute under share lock.
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
clearOldWriteAheadLogs();
clearOldMutations();
clearEmptyParts();
}, PoolType::MERGE_MUTATE};
}
return {};
}
可以看到job有三種類型,一個是常規merge,一個是mutation,一個是清理。
需要清理的殘留文件分為三部分:過期的Data Part,臨時文件夾,過期的Mutation命令文件。如下方代碼所示,MergeTree Data Part的生命周期包含多個階段,創建一個Data Part的時候分兩階段執行Temporary->Precommitted->Commited,淘汰一個Data Part的時候也可能會先經過一個Outdated狀態,再到Deleting狀態。在Outdated狀態下的Data Part仍然是可查的。異步Task在收集Outdated Data Part的時候會根據它的shared_ptr計數來判斷當前是否有查詢Context引用它,沒有的話才進行刪除。清理臨時文件的邏輯較為簡單,在數據文件夾中遍歷搜索"tmp_"開頭的文件夾,並判斷創建時長是否超過temporary_directories_lifetime。臨時文件夾主要在ClickHouse的兩階段提交過程可能造成殘留。最後是清理數據已經全部訂正完成的過期Mutation命令文件。
enum class State
{
Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
接著說mutation, 既然是異步任務執行,靠的是current_mutations_by_version這個變量,參考如下代碼,特別需要注意的是:
current_mutations_by_version是一個map。當這個map不為空的時候,後臺mutaion任務被調度到後,就會執行。
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */)
Merge邏輯
{
std::lock_guard lock(currently_processing_in_background_mutex);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part;
if (storage_settings.get()->assign_part_uuids)
future_part.uuid = UUIDHelpers::generateV4();
MutationCommands commands;
CurrentlyMergingPartsTaggerPtr tagger;
if (current_mutations_by_version.empty())
return {};
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
{
if (currently_merging_mutating_parts.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_part_size < part->getBytesOnDisk())
{
LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. "
"Max size depends not only on available space, but also on settings "
"'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'",
max_source_part_size, part->getBytesOnDisk(), part->name);
continue;
}
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())
{
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, global_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands);
}
return {};
}
StorageMergeTree::merge函數是MergeTree異步Merge的核心邏輯,Data Part Merge的工作除了通過後臺工作線程自動完成,用戶還可以通過Optimize命令來手動觸發。自動觸發的場景中,系統會根據後臺空閒線程的數據來啟發式地決定本次Merge最大可以處理的數據量大小,max_bytes_to_merge_at_min_space_in_pool和max_bytes_to_merge_at_max_space_in_pool參數分別決定當空閒線程數最大時可處理的數據量上限以及只剩下一個空閒線程時可處理的數據量上限。當用戶的寫入量非常大的時候,應該適當調整工作線程池的大小和這兩個參數。當用戶手動觸發merge時,系統則是根據disk剩餘容量來決定可處理的最大數據量。
Mutation邏輯系統每次都只會訂正一個Data Part,但是會聚合多個mutation任務批量完成,這點實現非常的棒。因為在用戶真實業務場景中一次數據訂正邏輯中可能會包含多個Mutation命令,把這多個mutation操作聚合到一起訂正效率上就非常高。系統每次選擇一個排序鍵最小的並且需要訂正Data Part進行操作,本意上就是把數據從前往後進行依次訂正。
Mutation功能是MergeTree表引擎最新推出一大功能,實現完備度上還有一下兩點需要去優化:
1.mutation沒有實時可見能力。這裡的實時可見並不是指在存儲上立即原地更新,而是給用戶提供一種途徑可以立即看到數據訂正後的最終視圖確保訂正無誤。類比在使用CollapsingMergeTree、SummingMergeTree等高級MergeTree引擎時,數據還沒有完全merge到一個Data Part之前,存儲層並沒有一個數據的最終視圖。但是用戶可以通過Final查詢模式,在計算引擎層實時聚合出數據的最終視圖。這個原理對mutation實時可見也同樣適用,在實時查詢中通過FilterBlockInputStream和ExpressionBlockInputStream完成用戶的mutation操作,給用戶提供一個最終視圖。
2.mutation和merge相互獨立執行。看完本文前面的分析,大家應該也注意到了目前Data Part的merge和mutation是相互獨立執行的,Data Part在同一時刻只能是在merge或者mutation操作中。對於MergeTree這種存儲徹底Immutable的設計,數據頻繁merge、mutation會引入巨大的IO負載。實時上merge和mutation操作是可以合併到一起去考慮的,這樣可以省去數據一次讀寫盤的開銷。對數據寫入壓力很大又有頻繁mutation的場景,會有很大幫助。
對於第2點,這裡我們不禁又回想起clickhouse官方文檔對於參數background_pool_size的說明:
這裡提到了額外的兩個參數:
number_of_free_entries_in_pool_to_execute_mutationnumber_of_free_entries_in_pool_to_lower_max_size_of_mergeM(UInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8, "When there is less than specified number of free entries in pool (or replicated queue), start to lower maximum size of merge to process (or to put in queue). This is to allow small merges to process - not filling the pool with long running merges.", 0) \
M(UInt64, number_of_free_entries_in_pool_to_execute_mutation, 10, "When there is less than specified number of free entries in pool, do not execute part mutations. This is to leave free threads for regular merges and avoid \"Too many parts\"", 0) \這兩個參數怎麼講?和background_pool_size有什麼關聯,其實很簡單,剛才提到因為後臺的merge和mutation是一個線程池來調度的,所以參數number_of_free_entries_in_pool_to_execute_mutation的大概意思,是預留出足夠的線程數量去做mutation,如果線程buffer不夠,則不執行,這個會儘可能規避too many parts的現象。(側面說明目前merge工作不繁重,這個值調到合適的水準,會讓系統後臺儘量優先做merge工作)
std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */)
{
...
for (const auto & part : getDataPartsVector())
{
if (currently_merging_mutating_parts.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
//這個函數做了判斷
size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_part_size < part->getBytesOnDisk())
{
LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. "
"Max size depends not only on available space, but also on settings "
"'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'",
max_source_part_size, part->getBytesOnDisk(), part->name);
continue;
}
...
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands);
}
return {};
}UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() const
彩蛋
{
const auto data_settings = data.getSettings();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
/// DataPart can be store only at one disk. Get maximum reservable free space at all disks.
UInt64 disk_space = data.getStoragePolicy()->getMaxUnreservedFreeSpace();
/// Allow mutations only if there are enough threads, leave free threads for merges else
if (busy_threads_in_pool <= 1
|| background_pool_size - busy_threads_in_pool >= data_settings->number_of_free_entries_in_pool_to_execute_mutation)
return static_cast<UInt64>(disk_space / DISK_USAGE_COEFFICIENT_TO_RESERVE);
return 0;
}在本文的開頭提到:
保存Mutation命令到存儲文件中,喚醒一個異步處理merge和mutation的工作線程;
我們實操看看效果:xiejinke.local :) ALTER TABLE SignReplacingMergeTreeTest update name='王碼子' where id = 15;
ALTER TABLE SignReplacingMergeTreeTest
UPDATE name = '王碼子' WHERE id = 15
Query id: 292c6b52-e03d-40e7-8c74-a5750e9b0b54
Ok.
0 rows in set. Elapsed: 20.909 sec.
xiejinke.local :) ALTER TABLE SignReplacingMergeTreeTest update name='王碼子333' where id = 15;
ALTER TABLE ReplacingMergeTreeTest
UPDATE name = '王碼子333' WHERE id = 15
Query id: c16987b5-8273-44a5-9fd2-5ac68c60a20b
Ok.
0 rows in set. Elapsed: 49.775 sec.
來看看文件:
參考文章:阿里云:ClickHouse內核分析-MergeTree的Merge和Mutation機制https://developer.aliyun.com/article/762090?spm=a2c6h.12873581.0.0.29cc802f1GeMHc&groupCode=clickhousebackground_pool_size官方解釋:https://clickhouse.tech/docs/en/operations/settings/settings/#background_pool_size掃描關注「大數據貓」公眾號
回覆:"clickhouse",即可獲得ClickHouse資料合集。