Elasticsearch 快照创建完整流程详解 #
概述 #
本文档详细解释了从 RestCreateSnapshotAction#prepareRequest 接收快照请求到最终执行 Repository.snapshotShard() 的完整流程。该流程涉及多个组件的协调工作,包括REST层、Transport层、集群状态管理和分片级别的快照操作。
流程图 #
接收用户快照创建请求
RestCreateSnapshotAction.prepareRequest] --> B[解析请求参数
提取仓库名和快照名等信息
request.param repository snapshot] B --> C[构建快照请求对象
创建 CreateSnapshotRequest
new CreateSnapshotRequest repository snapshot] C --> D[客户端调用
转发到集群管理接口
client.admin.cluster.createSnapshot] D --> E[抽象客户端执行
路由到传输层处理
AbstractClient.execute] E --> F[传输层主节点操作
在主节点上执行快照逻辑
TransportCreateSnapshotAction.masterOperation] F --> G{检查等待完成标志
决定同步还是异步执行
request.waitForCompletion} G -->|同步执行| H[执行并等待完成
阻塞直到快照完成
snapshotsService.executeSnapshot] G -->|异步执行| I[创建快照任务
立即返回响应
snapshotsService.createSnapshot] I --> J[提交快照创建请求
获取仓库数据并准备任务
submitCreateSnapshotRequest] J --> K[主节点任务队列
将任务加入执行队列
masterServiceTaskQueue.submitTask] K --> L[快照任务执行器
开始执行快照创建逻辑
SnapshotTaskExecutor.execute] L --> M[创建快照条目
验证请求并生成快照元数据
createSnapshot] M --> N[生成快照进度条目
创建 SnapshotsInProgress.Entry
SnapshotsInProgress.startedEntry] N --> O[更新集群状态
将快照信息加入集群状态
snapshotsInProgress.withAddedEntry] O --> P[广播集群状态
通知所有节点快照已开始
clusterStatePublisher.publish] P --> Q[分片快照服务监听
检测到新的快照任务
SnapshotShardsService.clusterChanged] Q --> R[启动分片快照任务
为每个分片创建快照任务
startNewShardSnapshots] R --> S[分片级快照执行
在数据节点上执行分片快照
snapshot] S --> T[调用仓库快照接口
委托给具体的存储仓库
repository.snapshotShard] T --> U[仓库分片快照
将任务加入快照队列
shardSnapshotTaskRunner.enqueueShardSnapshot] U --> V[执行分片数据快照
BlobStoreRepository 核心逻辑
doSnapshotShard] V --> W[仓库状态检查
验证仓库是否可写
isReadOnly] W --> X[获取存储容器
定位分片在存储中的位置
shardContainer indexId shardId] X --> Y[构建快照元数据
分析现有快照信息
buildBlobStoreIndexShardSnapshots] Y --> Z[重复性检查
确保快照名称唯一
snapshots.snapshots.anyMatch] Z --> AA[分析索引文件
获取当前分片的所有文件
context.indexCommit.getFileNames] AA --> BB[获取文件元数据
计算文件哈希值和大小
store.getMetadata] BB --> CC[遍历文件列表
逐个分析每个索引文件
for fileName in fileNames] CC --> DD{增量检查
文件是否已存在于快照中
snapshots.findPhysicalIndexFile} DD -->|文件已存在| EE[重用现有文件
避免重复上传节省空间
indexCommitPointFiles.add existingFileInfo] DD -->|文件不存在| FF[标记需要上传
加入待上传文件列表
filesToSnapshot.add snapshotFileInfo] EE --> GG[更新快照状态
记录处理进度和统计信息
snapshotStatus.moveToStarted] FF --> GG GG --> HH[生成快照元数据
创建分片快照描述文件
snapshots.withAddedSnapshot] HH --> II[上传索引文件
将新文件异步上传到存储
snapshotFiles] II --> JJ[更新元数据文件
保存快照元数据到存储
INDEX_SHARD_SNAPSHOTS_FORMAT.write] JJ --> KK[返回快照结果
完成分片快照并返回状态
context.onResponse ShardSnapshotResult]
详细流程分析 #
1. REST 层处理 - 接收用户快照创建请求 #
步骤描述:解析HTTP请求参数,提取仓库名和快照名等信息,构建快照请求对象
// RestCreateSnapshotAction.java
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// 解析请求参数 - 提取仓库名和快照名等信息
String repository = request.param("repository");
String snapshot = request.param("snapshot");
// 构建快照请求对象 - 创建 CreateSnapshotRequest
CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(repository, snapshot);
request.applyContentParser(p -> createSnapshotRequest.source(p.mapOrdered()));
// 设置超时时间和等待完成标志
createSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createSnapshotRequest.masterNodeTimeout()));
createSnapshotRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
// 客户端调用 - 转发到集群管理接口
return channel -> client.admin().cluster().createSnapshot(createSnapshotRequest, new RestToXContentListener<>(channel));
}
功能说明:
- 解析请求参数:从HTTP请求中提取仓库名(repository)和快照名(snapshot)
- 构建请求对象:创建
CreateSnapshotRequest对象,封装所有快照参数 - 设置执行选项:配置超时时间和是否等待完成的标志
- 转发请求:将请求转发到集群管理接口进行后续处理
2. Transport 层处理 - 在主节点上执行快照逻辑 #
步骤描述:路由到传输层处理,在主节点上执行快照逻辑,决定同步还是异步执行
// TransportCreateSnapshotAction.java
@Override
protected void masterOperation(
Task task,
final CreateSnapshotRequest request,
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {
// 检查等待完成标志 - 决定同步还是异步执行
if (request.waitForCompletion()) {
// 同步执行 - 执行并等待完成,阻塞直到快照完成
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
} else {
// 异步执行 - 创建快照任务,立即返回响应
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse()));
}
}
功能说明:
- 路由到主节点:确保快照操作在主节点上执行,保证集群状态一致性
- 执行模式选择:根据
waitForCompletion参数决定执行方式 - 同步模式:阻塞等待快照完成后返回结果
- 异步模式:立即返回响应,快照在后台执行
3. SnapshotsService 核心处理 - 创建快照任务并立即返回响应 #
步骤描述:验证请求并生成快照元数据,获取仓库数据并准备任务
// SnapshotsService.java
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
final String repositoryName = request.repository();
final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
// 验证仓库名和快照名
validate(repositoryName, snapshotName);
// 生成新的快照UUID
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
Repository repository = repositoriesService.repository(request.repository());
// 检查仓库是否只读
if (repository.isReadOnly()) {
listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
return;
}
// 提交快照创建请求 - 获取仓库数据并准备任务
submitCreateSnapshotRequest(request, listener, repository, new Snapshot(repositoryName, snapshotId), repository.getMetadata());
}
功能说明:
- 请求验证:验证仓库名和快照名的合法性,支持日期数学表达式
- 生成快照ID:创建唯一的快照标识符,包含名称和UUID
- 仓库状态检查:确保仓库不是只读状态,可以进行写操作
- 任务提交:将快照创建任务提交到主节点任务队列进行处理
4. 快照任务执行器 - 开始执行快照创建逻辑 #
步骤描述:验证请求并生成快照元数据,创建快照进度条目,更新集群状态
// SnapshotsService.SnapshotTaskExecutor.createSnapshot
private SnapshotsInProgress createSnapshot(
CreateSnapshotTask createSnapshotTask,
TaskContext<SnapshotTask> taskContext,
ClusterState currentState,
SnapshotsInProgress snapshotsInProgress
) {
final RepositoryData repositoryData = createSnapshotTask.repositoryData;
final Snapshot snapshot = createSnapshotTask.snapshot;
final String repositoryName = snapshot.getRepository();
final String snapshotName = snapshot.getSnapshotId().getName();
// 1. 基础验证 - 验证请求并生成快照元数据
ensureRepositoryExists(repositoryName, currentState);
ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
validate(repositoryName, snapshotName, currentState);
// 2. 并发限制检查
final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);
// 3. 解析要快照的索引
Map<Boolean, List<String>> requestedIndices = Arrays.stream(
indexNameExpressionResolver.concreteIndexNames(currentState, request)
).collect(Collectors.partitioningBy(systemIndices::isSystemIndex));
// 4. 创建分片映射
final Map<String, IndexId> indexIds = repositoryData.resolveNewIndices(indices, allIndices);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(
snapshotsInProgress,
deletionsInProgress,
currentState,
indexIds.values(),
useShardGenerations(version),
repositoryData,
repositoryName
);
// 5. 生成快照进度条目 - 创建 SnapshotsInProgress.Entry
final var newEntry = SnapshotsInProgress.startedEntry(
snapshot,
request.includeGlobalState(),
request.partial(),
indexIds,
dataStreams,
featureStates,
repositoryData.getGenId(),
shards,
version,
startTime,
repositoryData.getUuid(),
request.userMetadata()
);
// 6. 更新集群状态 - 将快照信息加入集群状态
final var res = snapshotsInProgress.withAddedEntry(newEntry);
taskContext.success(() -> {
logger.info("snapshot [{}] started", snapshot);
createSnapshotTask.listener.onResponse(snapshot);
if (newEntry.state().completed()) {
endSnapshot(newEntry, currentState.metadata(), createSnapshotTask.repositoryData);
}
});
return res;
}
功能说明:
- 全面验证:检查仓库存在性、快照名称可用性、并发限制等
- 索引解析:解析要快照的索引列表,区分系统索引和用户索引
- 分片映射:创建分片到节点的映射关系,确定每个分片的快照位置
- 进度跟踪:创建
SnapshotsInProgress.Entry用于跟踪快照进度 - 状态更新:将快照信息加入集群状态,触发分布式快照流程
5. 集群状态发布 - 广播集群状态,通知所有节点快照已开始 #
步骤描述:主节点发布新的集群状态,将状态发送到所有数据节点
// MasterService.java
protected void publish(
ClusterStatePublicationEvent clusterStatePublicationEvent,
ClusterStatePublisher.AckListener ackListener,
ActionListener<Void> publicationListener
) {
// 广播集群状态 - 通知所有节点快照已开始
clusterStatePublisher.publish(
clusterStatePublicationEvent,
new ThreadedActionListener<>(
threadPoolExecutor,
new ContextPreservingActionListener<>(ackListener, threadContext)
),
publicationListener
);
}
功能说明:
- 状态广播:主节点将包含快照信息的集群状态发布到所有节点
- 协调器发布:使用
Coordinator作为发布器,确保状态一致性 - 异步处理:使用异步机制避免阻塞主节点的其他操作
- 确认机制:等待足够数量的节点确认状态更新
6. 分片快照服务 - 检测到新的快照任务,为每个分片创建快照任务 #
步骤描述:监听集群状态变化,检测到新的快照条目时启动分片快照,在数据节点上执行分片快照
// SnapshotShardsService.java
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
final var currentSnapshots = SnapshotsInProgress.get(event.state());
// 分片快照服务监听 - 检测到新的快照任务
if (SnapshotsInProgress.get(event.previousState()).equals(currentSnapshots) == false) {
final var localNodeId = event.state().nodes().getLocalNodeId();
final var removingLocalNode = event.nodesDelta().removedNodes().stream().anyMatch(n -> n.getId().equals(localNodeId));
synchronized (shardSnapshots) {
cancelRemoved(currentSnapshots);
for (SnapshotsInProgress.Entry entry : currentSnapshots.forRepo(repositoryName)) {
// 启动分片快照任务 - 为每个分片创建快照任务
handleUpdatedSnapshotsInProgressEntry(localNodeId, removingLocalNode, entry);
}
}
}
} catch (Exception e) {
logger.warn("Failed to update snapshot state ", e);
}
}
功能说明:
- 状态监听:监听集群状态变化,检测新的快照任务
- 节点过滤:识别分配给当前节点的分片,只处理本节点负责的分片
- 任务创建:为每个需要快照的分片创建独立的快照任务
- 并发执行:在专用的快照线程池中并发执行多个分片快照任务
7. 分片快照执行 - 在数据节点上执行分片快照,委托给具体的存储仓库 #
步骤描述:验证分片状态,获取索引提交点,调用仓库的snapshotShard方法
// SnapshotShardsService.java
private void snapshot(
final ShardId shardId,
final Snapshot snapshot,
final IndexId indexId,
final IndexShardSnapshotStatus snapshotStatus,
IndexVersion version,
final long entryStartTime,
ActionListener<ShardSnapshotResult> resultListener
) {
ActionListener.run(resultListener, listener -> {
snapshotStatus.ensureNotAborted();
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
// 1. 验证分片状态 - 确保分片可以进行快照
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
// 2. 获取仓库和索引提交点
final Repository repository = repositoriesService.repository(snapshot.getRepository());
SnapshotIndexCommit snapshotIndexCommit = null;
try {
snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit());
// 3. 调用仓库快照接口 - 委托给具体的存储仓库
repository.snapshotShard(
new SnapshotShardContext(
indexShard.store(),
indexShard.mapperService(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
shardStateId,
snapshotStatus,
version,
entryStartTime,
listener
)
);
} catch (Exception e) {
// 错误处理...
}
});
}
功能说明:
- 分片验证:确保只对主分片进行快照,且分片不在重新分配过程中
- 状态检查:验证分片已完全恢复,可以安全进行快照操作
- 提交点获取:获取分片的索引提交点,确保快照的一致性
- 仓库委托:将实际的快照操作委托给具体的存储仓库实现
8. 仓库层处理 - 将任务加入快照队列,执行分片数据快照 #
步骤描述:仓库分片快照,将任务加入快照队列,执行BlobStoreRepository核心逻辑
// BlobStoreRepository.java
@Override
public void snapshotShard(SnapshotShardContext context) {
// 仓库分片快照 - 将任务加入快照队列
shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}
功能说明:
- 任务队列管理:将分片快照任务加入专门的队列进行管理
- 并发控制:通过队列机制控制同时执行的快照任务数量
- 资源管理:避免过多的并发快照任务影响集群性能
- 错误隔离:单个分片快照失败不会影响其他分片的快照操作
快照过程中的锁定机制详解 #
🔒 锁定机制总览 #
Elasticsearch快照过程采用三层锁定架构,通过不同层级的锁定机制确保数据一致性和并发安全:
- 集群状态锁定(Cluster State Lock) - 集群级协调锁
- 分片级锁定(Shard-level Lock) - IndexCommit引用计数保护
- 存储层锁定(Store-level Lock) - 文件系统访问保护
🔗 锁定调用链路图 #
MasterServiceTaskQueue.submitTask] C --> D[🔒 主节点互斥锁
MasterService.runTasks] D --> E[🔒 集群状态写锁
ClusterStateTaskExecutor.execute] E --> F[集群状态发布] F --> G[SnapshotShardsService.clusterChanged] G --> H[🔒 分片快照入口
SnapshotShardsService.snapshot] H --> I[🔒 IndexCommit获取
IndexShard.acquireIndexCommitForSnapshot] I --> J[🔒 引擎层锁定
InternalEngine.acquireLastIndexCommit] J --> K{是否需要Flush?
flushFirst参数} K -->|true| L[🔒 Flush锁定
InternalEngine.flushHoldingLock] K -->|false| M[🔒 引用计数保护
CombinedDeletionPolicy.acquireIndexCommit] L --> M M --> N[🔒 存储层锁定
Store.getMetadata] N --> O[BlobStoreRepository.doSnapshotShard]
🔒 第一层:集群状态锁定(Cluster State Lock) #
锁定范围与实现 #
- 锁定范围:整个Elasticsearch集群的元数据操作
- 锁定类型:集群状态写锁(排他锁)
- 持续时间:100-500ms(集群状态广播完成)
具体代码入口 #
// 🔒 锁定入口1:主节点任务提交
// SnapshotsService.java
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
// 提交到主节点任务队列,触发集群状态锁定
masterServiceTaskQueue.submitTask(
"create_snapshot [" + snapshot.getSnapshotId().getName() + ']',
new CreateSnapshotTask(repository, repositoryData, listener, snapshot, request, initialRepositoryMetadata),
request.masterNodeTimeout()
);
}
// 🔒 锁定入口2:主节点任务执行器
// MasterService.java
private void runTasks(TaskBatch taskBatch) {
final ClusterState previousClusterState = state();
// 🔒 关键锁定点:获取主服务互斥锁
synchronized (mutex) {
ClusterState newClusterState = previousClusterState;
// 🔒 执行集群状态更新任务(需要写锁)
for (UpdateTask updateTask : taskBatch.toExecute) {
// 这里会调用 SnapshotsService.SnapshotTaskExecutor.execute()
newClusterState = updateTask.execute(newClusterState);
}
// 🔒 发布新的集群状态
publishClusterState(taskBatch, newClusterState, previousClusterState);
}
}
🔒 第二层:分片级锁定(Shard-level IndexCommit Lock) #
锁定范围与实现 #
- 锁定范围:单个分片的IndexCommit和相关文件
- 锁定类型:引用计数保护 + 可选Flush锁
- 持续时间:整个快照过程(几分钟到几小时)
具体代码入口 #
// 🔒 锁定入口1:分片快照触发
// SnapshotShardsService.java
private void snapshot(...) {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
// 🔒 关键锁定点:获取分片的IndexCommit引用
snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
// 委托给仓库层处理
repository.snapshotShard(new SnapshotShardContext(...));
}
// 🔒 锁定入口2:引擎层IndexCommit锁定
// InternalEngine.java
@Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
// 🔒 可选的Flush锁定 - 短暂暂停写操作
if (flushFirst) {
logger.trace("start flush for snapshot");
PlainActionFuture<FlushResult> future = new PlainActionFuture<>();
flush(false, true, future); // 🔒 这里会暂停写入操作
future.actionGet(); // 等待flush完成
logger.trace("finish flush for snapshot");
}
// 🔒 获取IndexCommit引用,增加引用计数
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
}
// 🔒 锁定入口3:引用计数保护机制
// CombinedDeletionPolicy.java
public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
// 🔒 增加引用计数,防止IndexCommit被删除
snapshotting.setUserData(snapshotting.getUserData());
return snapshotting;
}
🔒 第三层:存储层锁定(Store-level Lock) #
锁定范围与实现 #
- 锁定范围:分片存储目录的文件系统访问
- 锁定类型:读写锁 + 文件系统锁
- 持续时间:元数据读取期间(几毫秒到几秒)
具体代码入口 #
// 🔒 锁定入口1:存储层元数据访问
// Store.java
public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) throws IOException {
ensureOpen();
failIfCorrupted();
// 🔒 根据需要选择读锁或写锁
java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock();
lock.lock();
try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {}) {
// 🔒 在锁保护下读取元数据
return MetadataSnapshot.loadFromIndexCommit(commit, directory, logger);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
} finally {
lock.unlock(); // 🔒 确保锁被释放
}
}
📊 锁定时序与性能影响分析 #
🕐 时序阶段一:集群状态锁定期(T0 -> T0+500ms) #
// 锁定事件
Time: T0 (快照请求提交)
Lock: ClusterState Write Lock (集群状态写锁)
Scope: 整个集群
Duration: 100-500ms
Impact Level: HIGH(集群级影响)
// 具体影响分析
Blocked Operations:
├── ❌ IndicesService.createIndex() // 创建索引被阻塞
├── ❌ IndicesService.deleteIndex() // 删除索引被阻塞
├── ❌ MetadataUpdateSettingsService.updateSettings() // 更新设置被阻塞
├── ❌ MetadataMappingService.putMapping() // 更新映射被阻塞
├── ❌ SnapshotsService.createSnapshot() // 其他快照被阻塞
├── ❌ RestoreService.restoreSnapshot() // 快照恢复被阻塞
└── ✅ Regular read/write operations // 正常读写(无影响)
🕑 时序阶段二:分片IndexCommit获取期(T1 -> T1+200ms) #
// 锁定事件
Time: T1 (分片开始快照)
Lock: IndexCommit Reference Lock + Optional Flush Lock
Scope: 单个分片
Duration: 50-200ms(如果需要flush),否则几毫秒
Impact Level: MEDIUM(分片级写操作可能短暂暂停)
// Flush锁定分析(如果flushFirst=true)
if (flushFirst == true) {
Duration: 50-200ms(取决于未提交数据量)
Code: InternalEngine.flushHoldingLock()
Temporarily Blocked Operations:
├── ❌ IndexShard.index(IndexRequest) // 新文档写入暂停
├── ❌ IndexShard.delete(DeleteRequest) // 文档删除暂停
├── ❌ IndexShard.update(UpdateRequest) // 文档更新暂停
├── ❌ InternalEngine.forceMerge() // 强制合并暂停
└── ✅ IndexShard.get(GetRequest) // 文档读取(无影响)
}
// IndexCommit引用锁定分析(默认情况)
Duration: 整个快照过程(几分钟到几小时)
Impact Level: LOW(主要是保护性锁定,不阻止新操作)
Code: CombinedDeletionPolicy.acquireIndexCommit()
Protected Resources:
├── 🔒 IndexCommit.getFileNames() // 文件列表保护
├── 🔒 Directory.openInput(fileName) // 文件读取保护
├── 🔒 DeletionPolicy.onCommit() // 防止文件被删除策略清理
└── 🔒 Segment files (.si, .cfs, .cfe等) // 段文件保护
🚨 关键澄清:写操作并未被完全禁止 #
// ❌ 常见误解
"分片快照期间不能对索引数据进行写入/修改"
// ✅ 实际情况
"分片快照期间写入/修改操作可以正常进行,只是:
1. 可选的短暂flush锁定(默认不开启)
2. IndexCommit引用保护(不阻止新写入)
3. I/O资源竞争(性能轻微下降)"
// 具体验证代码
public void verifyWriteOperationsDuringSnapshot() {
// 快照进行中
SnapshotIndexCommit snapshotCommit = new SnapshotIndexCommit(
indexShard.acquireIndexCommitForSnapshot()
);
// ✅ 新的写入操作依然可以执行
IndexRequest newDoc = new IndexRequest("test").id("new_doc").source("field", "value");
IndexResult result = indexShard.index(newDoc); // 成功执行
// ✅ 新的删除操作依然可以执行
DeleteRequest deleteRequest = new DeleteRequest("test", "doc_to_delete");
DeleteResult deleteResult = indexShard.delete(deleteRequest); // 成功执行
// ✅ 新的数据可以被搜索到
SearchRequest searchRequest = new SearchRequest("test");
// 搜索能找到刚刚添加的 new_doc
// 🔒 只是旧的IndexCommit文件被保护不被删除
// 新的写入会创建新的segments和新的IndexCommit
// 快照读取的是旧的IndexCommit,新数据在新的IndexCommit中
}
关键技术细节 #
1. 增量快照机制 #
Elasticsearch通过以下方式实现增量快照:
- 文件哈希比较:比较文件的MD5哈希值,相同的文件不重复上传
- 文件元数据缓存:在分片快照元数据中记录已上传的文件信息
- 代际管理:使用ShardGeneration跟踪快照版本
2. 分布式协调机制 #
- 主节点协调:主节点负责创建快照条目和分配分片任务
- 集群状态广播:通过集群状态变化触发各节点的分片快照
- 分片级并行:每个节点并行处理分配给它的分片快照
3. 错误处理和中断机制 #
- 快照状态跟踪:IndexShardSnapshotStatus跟踪每个分片的快照状态
- 中断检查:在关键操作点检查快照是否被中断
- 错误恢复:失败的分片可以重新调度到其他节点
4. S3存储结构 #
repository/
├── index-{generation} # 仓库元数据
├── index.latest # 最新generation指针
├── indices/
│ └── {index-uuid}/
│ ├── meta-{generation}.dat # 索引元数据
│ └── {shard-id}/
│ ├── index-{generation} # 分片快照元数据
│ ├── snap-{uuid}.dat # 快照信息
│ └── __{uuid} # 实际数据文件
└── snap-{snapshot-uuid}.dat # 全局快照信息
性能优化策略 #
1. 并发控制 #
- 限制同时进行的快照数量(
MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING) - 分片级并行处理
- 异步I/O操作
2. 网络优化 #
- 速率限制(
MAX_SNAPSHOT_BYTES_PER_SEC) - 分块上传大文件
- 压缩传输数据
3. 存储优化 #
- 增量备份减少数据传输
- 文件级去重
- 虚拟文件(哈希值直接存储在元数据中)
总结 #
Elasticsearch的快照创建流程是一个复杂的分布式协调过程,整个过程可以分为以下几个关键阶段:
快照生成过程的核心阶段 #
🔄 阶段一:请求接收与验证阶段 #
目标:接收用户请求,验证参数合法性,构建快照任务
- REST层解析请求参数,构建CreateSnapshotRequest对象
- 传输层决定同步或异步执行模式
- SnapshotsService验证请求并生成快照ID
🎯 阶段二:集群状态协调阶段 #
目标:在集群级别协调快照任务,确保一致性
- 主节点任务队列管理快照创建任务
- 集群状态锁定,防止并发元数据操作
- 创建SnapshotsInProgress条目,广播到所有节点
🚀 阶段三:分片并行快照阶段 #
目标:各数据节点并行执行分片级快照
- 分片快照服务监听集群状态变化
- 分片级IndexCommit锁定,保护数据一致性
- 调用仓库层执行实际的数据快照
💾 阶段四:数据存储与元数据管理阶段 #
目标:将数据安全持久化到存储仓库
- BlobStoreRepository执行增量快照逻辑
- 文件级去重,避免重复数据传输
- 更新快照元数据,完成快照流程
通过这样的多层次锁定机制和分阶段协调,Elasticsearch能够在保证数据一致性的同时,最大程度地减少对正常读写操作的影响,实现高效的分布式快照功能。