ReplicatedMergeTree之数据同步流程
在创建了ReplicatedMergeTree
后,会有几个taskHolder
在后台去监听zk的log并向queue添加,监听mutations
的变化并触发mutation
相关操作。这里先不对mutation
相关操作做分析,主要先说明一下正常的数据插入和正常的数据复制流程。
首先了解一个taskHolder
queue_task_handle
:负责从queue
中选取节点执行操作
这个task是如何启动的呢,这个主要在建表时或者server重启时通过startup()
方法调用的,具体不多说,可以看以下两个方法:
第一个:
BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
...
{
...
// 这里就是创建表时调用ReplicatedMergeTree::startup()的地方
res->startup();
}
...
}
第二个:
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
{
LOG_INFO(log, "Starting up tables.");
...
auto startupOneTable = [&](const StoragePtr & table)
{
// 这里是在server重启时,会对每个数据库重新start每个table的后台线程及各种任务,
// 调用相应table的startup()方法
table->startup();
...
};
...
}
StorageReplicatedMergeTree::startup()
方法,如下:
void StorageReplicatedMergeTree::startup()
{
if (is_readonly)
return;
if (set_table_structure_at_startup)
set_table_structure_at_startup();
(
zookeeper_path, replica_path,
database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
getDataParts());
StoragePtr ptr = shared_from_this();
InterserverIOEndpointPtr data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this, ptr);
data_parts_exchange_endpoint_holder = std::make_shared<InterserverIOEndpointHolder>(
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
// 这里会将queueTask()添加到queue_task_handle这个taskHolder中
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
// movePartsTask()添加到move_parts_task_handle中
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
// 激活副本
restarting_thread.start();
startup_event.wait();
}
queueTask()
这个方法就是处理之前添加到Queue中数据的方法了,如下:
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
if (queue.actions_blocker.isCancelled())
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
// 对正在执行的entry用selected变量来表示,方便以下处理,表意也比较好
ReplicatedMergeTreeQueue::SelectedEntry selected;
try
{
// 选择需要处理的entrys
selected = (merger_mutator, *this);
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LogEntryPtr & entry = ;
// 如果没有需要处理的直接返回
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
// 有需要处理的执行以下代码
time_t prev_attempt_time = entry->last_attempt_time;
// 这里是真正的处理方法,并且传入了一个匿名方法
bool res = ([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
{
try
{
// 这里是真正的执行器execute
return executeLogEntry(*entry_to_process);
}
catch (const Exception & e)
{
...
throw;
}
catch (...)
{
...
throw;
}
});
// 如果entry处理失败或已经被处理,就sleep
bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);
// sleep了就表示有问题了,没有sleep就表示成功,继续下面的流程
return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
}
这个方法中主要做的是根据不同的type去执行不同的操作,先了解下type都有哪些:
- EMPTY:没使用
- GET_PART:从另一个副本拉取数据
- MERGE_PARTS:执行part的merge
- DROP_RANGE:删除指定分区中指定的part
- CLEAR_COLUMN:在指定part中删除指定的列
- CLEAR_INDEX:在执行part中删除指定索引
- REPLACE_RANGE:用新的分区的指定范围盖原分区的指定范围
- MUTATE_PART:表示执行一个或多个变更
bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
// 根据不同type执行不同逻辑
if ( == LogEntry::DROP_RANGE)
{
executeDropRange(entry);
return true;
}
if ( == LogEntry::CLEAR_COLUMN || == LogEntry::CLEAR_INDEX)
{
executeClearColumnOrIndexInPartition(entry);
return true;
}
if ( == LogEntry::REPLACE_RANGE)
{
executeReplaceRange(entry);
return true;
}
// 如果是副本间复制就会执行下面逻辑
if ( == LogEntry::GET_PART ||
== LogEntry::MERGE_PARTS ||
== LogEntry::MUTATE_PART)
{
// 因为所有的在处理的part都会在预提交-已提交的流程中,先在预提交中选择entry
DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
if (!existing_part)
// 如果在预提交中找不到,再去已提交的part里去找
existing_part = getActiveContainingPart(entry.new_part_name);
// 如果该entry是自身节点添加的数据,也会被触发执行到这个流程中,因为自身有数据了
// 这里跳过处理
if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
{
if (!( == LogEntry::GET_PART && entry.source_replica == replica_name))
{
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because part " + existing_part->name + " already exists.");
}
return true;
}
}
if ( == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
if ( && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
{
LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
return true;
}
bool do_fetch = false;
if ( == LogEntry::GET_PART)
{
do_fetch = true;
}
else if ( == LogEntry::MERGE_PARTS)
{
do_fetch = !tryExecuteMerge(entry);
}
else if ( == LogEntry::MUTATE_PART)
{
do_fetch = !tryExecutePartMutation(entry);
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>()), ErrorCodes::LOGICAL_ERROR);
}
// 这里是根据上面的判断决定是否要去其他副本拉取,如果是true,会执行真正的拉取动过
if (do_fetch)
return executeFetch(entry);
return true;
}
executeFetch(entry)
方法中也有很多逻辑,至此,已经了解到ReplicatedMergeTree
是如何触发数据同步的,以及根据不同的类型用不同的方法进行处理,下面看下executeFetch(entry)
方法
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
/// 查找是否有需要覆盖的part
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings_ptr = getSettings();
// 设置一些并行参数,判断replicated_max_parallel_fetches和
// replicated_max_parallel_fetches_for_table是否符合要求
static std::atomic_uint total_fetches {0};
if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
{
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++total_fetches;
SCOPE_EXIT({--total_fetches;});
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++current_table_fetches;
SCOPE_EXIT({--current_table_fetches;});
try
{
if (())
{
// 与多副本确认机制有关,比较复杂,先略过
if ()
{
...
}
if (())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART);
}
}
try
{
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
// 拉取part的方法
if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, ))
return false;
}
catch (Exception & e)
{
/// No stacktrace, just log message
if (() == ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS)
("Too busy replica. Will try later.");
throw;
}
if ( == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
}
catch (...)
{
...
}
return true;
}
穿插一个findReplicaHavingCoveringPart
方法,可以大致了解下里面的逻辑
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entry, bool active)
{
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
/// 按照特定的规则随机的选择副本,也即随机重排列副本的顺序
std::shuffle((), (), thread_local_rng);
for (const String & replica : replicas)
{
// 跳过自己
if (replica == replica_name)
continue;
// 如果replica不是active状态的也跳过
if (active && !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
continue;
String largest_part_found;
// 获取所有的parts
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
if (part_on_replica == entry.new_part_name
|| MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, format_version))
{
if (largest_part_found.empty()
|| MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version))
{
largest_part_found = part_on_replica;
}
}
}
if (!largest_part_found.empty())
{
bool the_same_part = largest_part_found == entry.new_part_name;
/// 确认largest_part_found不是源part
if (!the_same_part)
{
String reject_reason;
if (!(largest_part_found, entry, reject_reason))
{
LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason);
return {};
}
}
return replica;
}
}
return {};
}
回归正题,流程执行到fetchPart
方法就真正的要执行每一个小part的拉取了,代码如下
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
{
...
std::function<MutableDataPartPtr()> get_part;
if (part_to_clone)
{
get_part = [&, part_to_clone]()
{
return cloneAndLoadDataPart(part_to_clone, "tmp_clone_", part_info);
};
}
else
{
// 获取需要clone数据的副本地址
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
{
if (interserver_scheme != )
throw Exception("Interserver schemes are different: '" + interserver_scheme
+ "' != '" + + "', can't fetch part from " + ,
ErrorCodes::LOGICAL_ERROR);
// 这里的fetchPart主要就是构造HTTP参数及连接真正拉取数据
return (
part_name, source_replica_path,
, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
};
}
...
return true;
}
至此,整个ReplicatedMergeTree
的数据异步复制流程的主要逻辑就完整了,还有许多细节这里都忽略了,比如quorum
等,后续有空再补充。