跳过正文

ElasticSearch 创建快照源码解读

·1531 字·8 分钟·
ElasticSearch
Guimu
作者
Guimu
Everything is 0 and 1.
目录

Elasticsearch 快照创建完整流程详解
#

概述
#

本文档详细解释了从 RestCreateSnapshotAction#prepareRequest 接收快照请求到最终执行 Repository.snapshotShard() 的完整流程。该流程涉及多个组件的协调工作,包括REST层、Transport层、集群状态管理和分片级别的快照操作。

流程图
#

graph TD A[REST API 请求
接收用户快照创建请求
RestCreateSnapshotAction.prepareRequest] --> B[解析请求参数
提取仓库名和快照名等信息
request.param repository snapshot] B --> C[构建快照请求对象
创建 CreateSnapshotRequest
new CreateSnapshotRequest repository snapshot] C --> D[客户端调用
转发到集群管理接口
client.admin.cluster.createSnapshot] D --> E[抽象客户端执行
路由到传输层处理
AbstractClient.execute] E --> F[传输层主节点操作
在主节点上执行快照逻辑
TransportCreateSnapshotAction.masterOperation] F --> G{检查等待完成标志
决定同步还是异步执行
request.waitForCompletion} G -->|同步执行| H[执行并等待完成
阻塞直到快照完成
snapshotsService.executeSnapshot] G -->|异步执行| I[创建快照任务
立即返回响应
snapshotsService.createSnapshot] I --> J[提交快照创建请求
获取仓库数据并准备任务
submitCreateSnapshotRequest] J --> K[主节点任务队列
将任务加入执行队列
masterServiceTaskQueue.submitTask] K --> L[快照任务执行器
开始执行快照创建逻辑
SnapshotTaskExecutor.execute] L --> M[创建快照条目
验证请求并生成快照元数据
createSnapshot] M --> N[生成快照进度条目
创建 SnapshotsInProgress.Entry
SnapshotsInProgress.startedEntry] N --> O[更新集群状态
将快照信息加入集群状态
snapshotsInProgress.withAddedEntry] O --> P[广播集群状态
通知所有节点快照已开始
clusterStatePublisher.publish] P --> Q[分片快照服务监听
检测到新的快照任务
SnapshotShardsService.clusterChanged] Q --> R[启动分片快照任务
为每个分片创建快照任务
startNewShardSnapshots] R --> S[分片级快照执行
在数据节点上执行分片快照
snapshot] S --> T[调用仓库快照接口
委托给具体的存储仓库
repository.snapshotShard] T --> U[仓库分片快照
将任务加入快照队列
shardSnapshotTaskRunner.enqueueShardSnapshot] U --> V[执行分片数据快照
BlobStoreRepository 核心逻辑
doSnapshotShard] V --> W[仓库状态检查
验证仓库是否可写
isReadOnly] W --> X[获取存储容器
定位分片在存储中的位置
shardContainer indexId shardId] X --> Y[构建快照元数据
分析现有快照信息
buildBlobStoreIndexShardSnapshots] Y --> Z[重复性检查
确保快照名称唯一
snapshots.snapshots.anyMatch] Z --> AA[分析索引文件
获取当前分片的所有文件
context.indexCommit.getFileNames] AA --> BB[获取文件元数据
计算文件哈希值和大小
store.getMetadata] BB --> CC[遍历文件列表
逐个分析每个索引文件
for fileName in fileNames] CC --> DD{增量检查
文件是否已存在于快照中
snapshots.findPhysicalIndexFile} DD -->|文件已存在| EE[重用现有文件
避免重复上传节省空间
indexCommitPointFiles.add existingFileInfo] DD -->|文件不存在| FF[标记需要上传
加入待上传文件列表
filesToSnapshot.add snapshotFileInfo] EE --> GG[更新快照状态
记录处理进度和统计信息
snapshotStatus.moveToStarted] FF --> GG GG --> HH[生成快照元数据
创建分片快照描述文件
snapshots.withAddedSnapshot] HH --> II[上传索引文件
将新文件异步上传到存储
snapshotFiles] II --> JJ[更新元数据文件
保存快照元数据到存储
INDEX_SHARD_SNAPSHOTS_FORMAT.write] JJ --> KK[返回快照结果
完成分片快照并返回状态
context.onResponse ShardSnapshotResult]

详细流程分析
#

1. REST 层处理 - 接收用户快照创建请求
#

步骤描述:解析HTTP请求参数,提取仓库名和快照名等信息,构建快照请求对象

// RestCreateSnapshotAction.java
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    // 解析请求参数 - 提取仓库名和快照名等信息
    String repository = request.param("repository");
    String snapshot = request.param("snapshot");
    
    // 构建快照请求对象 - 创建 CreateSnapshotRequest
    CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest(repository, snapshot);
    request.applyContentParser(p -> createSnapshotRequest.source(p.mapOrdered()));
    
    // 设置超时时间和等待完成标志
    createSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createSnapshotRequest.masterNodeTimeout()));
    createSnapshotRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
    
    // 客户端调用 - 转发到集群管理接口
    return channel -> client.admin().cluster().createSnapshot(createSnapshotRequest, new RestToXContentListener<>(channel));
}

功能说明:

  • 解析请求参数:从HTTP请求中提取仓库名(repository)和快照名(snapshot)
  • 构建请求对象:创建 CreateSnapshotRequest 对象,封装所有快照参数
  • 设置执行选项:配置超时时间和是否等待完成的标志
  • 转发请求:将请求转发到集群管理接口进行后续处理

2. Transport 层处理 - 在主节点上执行快照逻辑
#

步骤描述:路由到传输层处理,在主节点上执行快照逻辑,决定同步还是异步执行

// TransportCreateSnapshotAction.java
@Override
protected void masterOperation(
    Task task,
    final CreateSnapshotRequest request,
    ClusterState state,
    final ActionListener<CreateSnapshotResponse> listener
) {
    // 检查等待完成标志 - 决定同步还是异步执行
    if (request.waitForCompletion()) {
        // 同步执行 - 执行并等待完成,阻塞直到快照完成
        snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
    } else {
        // 异步执行 - 创建快照任务,立即返回响应
        snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse()));
    }
}

功能说明:

  • 路由到主节点:确保快照操作在主节点上执行,保证集群状态一致性
  • 执行模式选择:根据 waitForCompletion 参数决定执行方式
  • 同步模式:阻塞等待快照完成后返回结果
  • 异步模式:立即返回响应,快照在后台执行

3. SnapshotsService 核心处理 - 创建快照任务并立即返回响应
#

步骤描述:验证请求并生成快照元数据,获取仓库数据并准备任务

// SnapshotsService.java
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
    final String repositoryName = request.repository();
    final String snapshotName = IndexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
    
    // 验证仓库名和快照名
    validate(repositoryName, snapshotName);
    
    // 生成新的快照UUID
    final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID());
    Repository repository = repositoriesService.repository(request.repository());
    
    // 检查仓库是否只读
    if (repository.isReadOnly()) {
        listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
        return;
    }
    
    // 提交快照创建请求 - 获取仓库数据并准备任务
    submitCreateSnapshotRequest(request, listener, repository, new Snapshot(repositoryName, snapshotId), repository.getMetadata());
}

功能说明:

  • 请求验证:验证仓库名和快照名的合法性,支持日期数学表达式
  • 生成快照ID:创建唯一的快照标识符,包含名称和UUID
  • 仓库状态检查:确保仓库不是只读状态,可以进行写操作
  • 任务提交:将快照创建任务提交到主节点任务队列进行处理

4. 快照任务执行器 - 开始执行快照创建逻辑
#

步骤描述:验证请求并生成快照元数据,创建快照进度条目,更新集群状态

// SnapshotsService.SnapshotTaskExecutor.createSnapshot
private SnapshotsInProgress createSnapshot(
    CreateSnapshotTask createSnapshotTask,
    TaskContext<SnapshotTask> taskContext,
    ClusterState currentState,
    SnapshotsInProgress snapshotsInProgress
) {
    final RepositoryData repositoryData = createSnapshotTask.repositoryData;
    final Snapshot snapshot = createSnapshotTask.snapshot;
    final String repositoryName = snapshot.getRepository();
    final String snapshotName = snapshot.getSnapshotId().getName();
    
    // 1. 基础验证 - 验证请求并生成快照元数据
    ensureRepositoryExists(repositoryName, currentState);
    ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository);
    ensureSnapshotNameNotRunning(snapshotsInProgress, repositoryName, snapshotName);
    validate(repositoryName, snapshotName, currentState);
    
    // 2. 并发限制检查
    final SnapshotDeletionsInProgress deletionsInProgress = SnapshotDeletionsInProgress.get(currentState);
    ensureNoCleanupInProgress(currentState, repositoryName, snapshotName, "create snapshot");
    ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshotsInProgress, deletionsInProgress);
    
    // 3. 解析要快照的索引
    Map<Boolean, List<String>> requestedIndices = Arrays.stream(
        indexNameExpressionResolver.concreteIndexNames(currentState, request)
    ).collect(Collectors.partitioningBy(systemIndices::isSystemIndex));
    
    // 4. 创建分片映射
    final Map<String, IndexId> indexIds = repositoryData.resolveNewIndices(indices, allIndices);
    ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(
        snapshotsInProgress,
        deletionsInProgress,
        currentState,
        indexIds.values(),
        useShardGenerations(version),
        repositoryData,
        repositoryName
    );
    
    // 5. 生成快照进度条目 - 创建 SnapshotsInProgress.Entry
    final var newEntry = SnapshotsInProgress.startedEntry(
        snapshot,
        request.includeGlobalState(),
        request.partial(),
        indexIds,
        dataStreams,
        featureStates,
        repositoryData.getGenId(),
        shards,
        version,
        startTime,
        repositoryData.getUuid(),
        request.userMetadata()
    );
    
    // 6. 更新集群状态 - 将快照信息加入集群状态
    final var res = snapshotsInProgress.withAddedEntry(newEntry);
    taskContext.success(() -> {
        logger.info("snapshot [{}] started", snapshot);
        createSnapshotTask.listener.onResponse(snapshot);
        if (newEntry.state().completed()) {
            endSnapshot(newEntry, currentState.metadata(), createSnapshotTask.repositoryData);
        }
    });
    return res;
}

功能说明:

  • 全面验证:检查仓库存在性、快照名称可用性、并发限制等
  • 索引解析:解析要快照的索引列表,区分系统索引和用户索引
  • 分片映射:创建分片到节点的映射关系,确定每个分片的快照位置
  • 进度跟踪:创建 SnapshotsInProgress.Entry 用于跟踪快照进度
  • 状态更新:将快照信息加入集群状态,触发分布式快照流程

5. 集群状态发布 - 广播集群状态,通知所有节点快照已开始
#

步骤描述:主节点发布新的集群状态,将状态发送到所有数据节点

// MasterService.java
protected void publish(
    ClusterStatePublicationEvent clusterStatePublicationEvent,
    ClusterStatePublisher.AckListener ackListener,
    ActionListener<Void> publicationListener
) {
    // 广播集群状态 - 通知所有节点快照已开始
    clusterStatePublisher.publish(
        clusterStatePublicationEvent,
        new ThreadedActionListener<>(
            threadPoolExecutor,
            new ContextPreservingActionListener<>(ackListener, threadContext)
        ),
        publicationListener
    );
}

功能说明:

  • 状态广播:主节点将包含快照信息的集群状态发布到所有节点
  • 协调器发布:使用 Coordinator 作为发布器,确保状态一致性
  • 异步处理:使用异步机制避免阻塞主节点的其他操作
  • 确认机制:等待足够数量的节点确认状态更新

6. 分片快照服务 - 检测到新的快照任务,为每个分片创建快照任务
#

步骤描述:监听集群状态变化,检测到新的快照条目时启动分片快照,在数据节点上执行分片快照

// SnapshotShardsService.java
@Override
public void clusterChanged(ClusterChangedEvent event) {
    try {
        final var currentSnapshots = SnapshotsInProgress.get(event.state());
        // 分片快照服务监听 - 检测到新的快照任务
        if (SnapshotsInProgress.get(event.previousState()).equals(currentSnapshots) == false) {
            final var localNodeId = event.state().nodes().getLocalNodeId();
            final var removingLocalNode = event.nodesDelta().removedNodes().stream().anyMatch(n -> n.getId().equals(localNodeId));
            synchronized (shardSnapshots) {
                cancelRemoved(currentSnapshots);
                for (SnapshotsInProgress.Entry entry : currentSnapshots.forRepo(repositoryName)) {
                    // 启动分片快照任务 - 为每个分片创建快照任务
                    handleUpdatedSnapshotsInProgressEntry(localNodeId, removingLocalNode, entry);
                }
            }
        }
    } catch (Exception e) {
        logger.warn("Failed to update snapshot state ", e);
    }
}

功能说明:

  • 状态监听:监听集群状态变化,检测新的快照任务
  • 节点过滤:识别分配给当前节点的分片,只处理本节点负责的分片
  • 任务创建:为每个需要快照的分片创建独立的快照任务
  • 并发执行:在专用的快照线程池中并发执行多个分片快照任务

7. 分片快照执行 - 在数据节点上执行分片快照,委托给具体的存储仓库
#

步骤描述:验证分片状态,获取索引提交点,调用仓库的snapshotShard方法

// SnapshotShardsService.java
private void snapshot(
    final ShardId shardId,
    final Snapshot snapshot,
    final IndexId indexId,
    final IndexShardSnapshotStatus snapshotStatus,
    IndexVersion version,
    final long entryStartTime,
    ActionListener<ShardSnapshotResult> resultListener
) {
    ActionListener.run(resultListener, listener -> {
        snapshotStatus.ensureNotAborted();
        final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
        
        // 1. 验证分片状态 - 确保分片可以进行快照
        if (indexShard.routingEntry().primary() == false) {
            throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
        }
        
        // 2. 获取仓库和索引提交点
        final Repository repository = repositoriesService.repository(snapshot.getRepository());
        SnapshotIndexCommit snapshotIndexCommit = null;
        try {
            snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
            final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit());
            
            // 3. 调用仓库快照接口 - 委托给具体的存储仓库
            repository.snapshotShard(
                new SnapshotShardContext(
                    indexShard.store(),
                    indexShard.mapperService(),
                    snapshot.getSnapshotId(),
                    indexId,
                    snapshotIndexCommit,
                    shardStateId,
                    snapshotStatus,
                    version,
                    entryStartTime,
                    listener
                )
            );
        } catch (Exception e) {
            // 错误处理...
        }
    });
}

功能说明:

  • 分片验证:确保只对主分片进行快照,且分片不在重新分配过程中
  • 状态检查:验证分片已完全恢复,可以安全进行快照操作
  • 提交点获取:获取分片的索引提交点,确保快照的一致性
  • 仓库委托:将实际的快照操作委托给具体的存储仓库实现

8. 仓库层处理 - 将任务加入快照队列,执行分片数据快照
#

步骤描述:仓库分片快照,将任务加入快照队列,执行BlobStoreRepository核心逻辑

// BlobStoreRepository.java
@Override
public void snapshotShard(SnapshotShardContext context) {
    // 仓库分片快照 - 将任务加入快照队列
    shardSnapshotTaskRunner.enqueueShardSnapshot(context);
}

功能说明:

  • 任务队列管理:将分片快照任务加入专门的队列进行管理
  • 并发控制:通过队列机制控制同时执行的快照任务数量
  • 资源管理:避免过多的并发快照任务影响集群性能
  • 错误隔离:单个分片快照失败不会影响其他分片的快照操作

快照过程中的锁定机制详解
#

🔒 锁定机制总览
#

Elasticsearch快照过程采用三层锁定架构,通过不同层级的锁定机制确保数据一致性和并发安全:

  1. 集群状态锁定(Cluster State Lock) - 集群级协调锁
  2. 分片级锁定(Shard-level Lock) - IndexCommit引用计数保护
  3. 存储层锁定(Store-level Lock) - 文件系统访问保护

🔗 锁定调用链路图
#

graph TD A[REST请求] --> B[TransportCreateSnapshotAction] B --> C[🔒 集群状态锁定入口
MasterServiceTaskQueue.submitTask] C --> D[🔒 主节点互斥锁
MasterService.runTasks] D --> E[🔒 集群状态写锁
ClusterStateTaskExecutor.execute] E --> F[集群状态发布] F --> G[SnapshotShardsService.clusterChanged] G --> H[🔒 分片快照入口
SnapshotShardsService.snapshot] H --> I[🔒 IndexCommit获取
IndexShard.acquireIndexCommitForSnapshot] I --> J[🔒 引擎层锁定
InternalEngine.acquireLastIndexCommit] J --> K{是否需要Flush?
flushFirst参数} K -->|true| L[🔒 Flush锁定
InternalEngine.flushHoldingLock] K -->|false| M[🔒 引用计数保护
CombinedDeletionPolicy.acquireIndexCommit] L --> M M --> N[🔒 存储层锁定
Store.getMetadata] N --> O[BlobStoreRepository.doSnapshotShard]

🔒 第一层:集群状态锁定(Cluster State Lock)
#

锁定范围与实现
#

  • 锁定范围:整个Elasticsearch集群的元数据操作
  • 锁定类型:集群状态写锁(排他锁)
  • 持续时间:100-500ms(集群状态广播完成)

具体代码入口
#

// 🔒 锁定入口1:主节点任务提交
// SnapshotsService.java
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
    // 提交到主节点任务队列,触发集群状态锁定
    masterServiceTaskQueue.submitTask(
        "create_snapshot [" + snapshot.getSnapshotId().getName() + ']',
        new CreateSnapshotTask(repository, repositoryData, listener, snapshot, request, initialRepositoryMetadata),
        request.masterNodeTimeout()
    );
}

// 🔒 锁定入口2:主节点任务执行器
// MasterService.java
private void runTasks(TaskBatch taskBatch) {
    final ClusterState previousClusterState = state();
    
    // 🔒 关键锁定点:获取主服务互斥锁
    synchronized (mutex) {
        ClusterState newClusterState = previousClusterState;
        
        // 🔒 执行集群状态更新任务(需要写锁)
        for (UpdateTask updateTask : taskBatch.toExecute) {
            // 这里会调用 SnapshotsService.SnapshotTaskExecutor.execute()
            newClusterState = updateTask.execute(newClusterState);
        }
        
        // 🔒 发布新的集群状态
        publishClusterState(taskBatch, newClusterState, previousClusterState);
    }
}

🔒 第二层:分片级锁定(Shard-level IndexCommit Lock)
#

锁定范围与实现
#

  • 锁定范围:单个分片的IndexCommit和相关文件
  • 锁定类型:引用计数保护 + 可选Flush锁
  • 持续时间:整个快照过程(几分钟到几小时)

具体代码入口
#

// 🔒 锁定入口1:分片快照触发
// SnapshotShardsService.java
private void snapshot(...) {
    final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
    
    // 🔒 关键锁定点:获取分片的IndexCommit引用
    snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
    
    // 委托给仓库层处理
    repository.snapshotShard(new SnapshotShardContext(...));
}

// 🔒 锁定入口2:引擎层IndexCommit锁定
// InternalEngine.java
@Override
public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws EngineException {
    // 🔒 可选的Flush锁定 - 短暂暂停写操作
    if (flushFirst) {
        logger.trace("start flush for snapshot");
        PlainActionFuture<FlushResult> future = new PlainActionFuture<>();
        flush(false, true, future);  // 🔒 这里会暂停写入操作
        future.actionGet();  // 等待flush完成
        logger.trace("finish flush for snapshot");
    }
    
    // 🔒 获取IndexCommit引用,增加引用计数
    return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
}

// 🔒 锁定入口3:引用计数保护机制
// CombinedDeletionPolicy.java
public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
    final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
    // 🔒 增加引用计数,防止IndexCommit被删除
    snapshotting.setUserData(snapshotting.getUserData());
    return snapshotting;
}

🔒 第三层:存储层锁定(Store-level Lock)
#

锁定范围与实现
#

  • 锁定范围:分片存储目录的文件系统访问
  • 锁定类型:读写锁 + 文件系统锁
  • 持续时间:元数据读取期间(几毫秒到几秒)

具体代码入口
#

// 🔒 锁定入口1:存储层元数据访问
// Store.java
public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) throws IOException {
    ensureOpen();
    failIfCorrupted();
    
    // 🔒 根据需要选择读锁或写锁
    java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock();
    lock.lock();
    try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {}) {
        // 🔒 在锁保护下读取元数据
        return MetadataSnapshot.loadFromIndexCommit(commit, directory, logger);
    } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
        markStoreCorrupted(ex);
        throw ex;
    } finally {
        lock.unlock();  // 🔒 确保锁被释放
    }
}

📊 锁定时序与性能影响分析
#

🕐 时序阶段一:集群状态锁定期(T0 -> T0+500ms)
#

// 锁定事件
Time: T0 (快照请求提交)
Lock: ClusterState Write Lock (集群状态写锁)
Scope: 整个集群
Duration: 100-500ms
Impact Level: HIGH集群级影响

// 具体影响分析
Blocked Operations:
├──  IndicesService.createIndex()                     // 创建索引被阻塞
├──  IndicesService.deleteIndex()                     // 删除索引被阻塞  
├──  MetadataUpdateSettingsService.updateSettings()   // 更新设置被阻塞
├──  MetadataMappingService.putMapping()             // 更新映射被阻塞
├──  SnapshotsService.createSnapshot()               // 其他快照被阻塞
├──  RestoreService.restoreSnapshot()                // 快照恢复被阻塞
└──  Regular read/write operations                   // 正常读写(无影响)

🕑 时序阶段二:分片IndexCommit获取期(T1 -> T1+200ms)
#

// 锁定事件
Time: T1 (分片开始快照)
Lock: IndexCommit Reference Lock + Optional Flush Lock  
Scope: 单个分片
Duration: 50-200ms如果需要flush),否则几毫秒
Impact Level: MEDIUM分片级写操作可能短暂暂停

// Flush锁定分析(如果flushFirst=true)
if (flushFirst == true) {
    Duration: 50-200ms取决于未提交数据量
    Code: InternalEngine.flushHoldingLock()
    
    Temporarily Blocked Operations:
    ├──  IndexShard.index(IndexRequest)               // 新文档写入暂停
    ├──  IndexShard.delete(DeleteRequest)             // 文档删除暂停
    ├──  IndexShard.update(UpdateRequest)             // 文档更新暂停
    ├──  InternalEngine.forceMerge()                  // 强制合并暂停
    └──  IndexShard.get(GetRequest)                   // 文档读取(无影响)
}

// IndexCommit引用锁定分析(默认情况)
Duration: 整个快照过程几分钟到几小时
Impact Level: LOW主要是保护性锁定不阻止新操作
Code: CombinedDeletionPolicy.acquireIndexCommit()

Protected Resources:
├── 🔒 IndexCommit.getFileNames()                      // 文件列表保护
├── 🔒 Directory.openInput(fileName)                   // 文件读取保护  
├── 🔒 DeletionPolicy.onCommit()                       // 防止文件被删除策略清理
└── 🔒 Segment files (.si, .cfs, .cfe等)               // 段文件保护

🚨 关键澄清:写操作并未被完全禁止
#

// ❌ 常见误解
"分片快照期间不能对索引数据进行写入/修改"

// ✅ 实际情况  
"分片快照期间写入/修改操作可以正常进行,只是:
 1. 可选的短暂flush锁定(默认不开启)
 2. IndexCommit引用保护(不阻止新写入)
 3. I/O资源竞争(性能轻微下降)"

// 具体验证代码
public void verifyWriteOperationsDuringSnapshot() {
    // 快照进行中
    SnapshotIndexCommit snapshotCommit = new SnapshotIndexCommit(
        indexShard.acquireIndexCommitForSnapshot()
    );
    
    // ✅ 新的写入操作依然可以执行
    IndexRequest newDoc = new IndexRequest("test").id("new_doc").source("field", "value");
    IndexResult result = indexShard.index(newDoc);  // 成功执行
    
    // ✅ 新的删除操作依然可以执行  
    DeleteRequest deleteRequest = new DeleteRequest("test", "doc_to_delete");
    DeleteResult deleteResult = indexShard.delete(deleteRequest);  // 成功执行
    
    // ✅ 新的数据可以被搜索到
    SearchRequest searchRequest = new SearchRequest("test");
    // 搜索能找到刚刚添加的 new_doc
    
    // 🔒 只是旧的IndexCommit文件被保护不被删除
    // 新的写入会创建新的segments和新的IndexCommit
    // 快照读取的是旧的IndexCommit,新数据在新的IndexCommit中
}

关键技术细节
#

1. 增量快照机制
#

Elasticsearch通过以下方式实现增量快照:

  • 文件哈希比较:比较文件的MD5哈希值,相同的文件不重复上传
  • 文件元数据缓存:在分片快照元数据中记录已上传的文件信息
  • 代际管理:使用ShardGeneration跟踪快照版本

2. 分布式协调机制
#

  • 主节点协调:主节点负责创建快照条目和分配分片任务
  • 集群状态广播:通过集群状态变化触发各节点的分片快照
  • 分片级并行:每个节点并行处理分配给它的分片快照

3. 错误处理和中断机制
#

  • 快照状态跟踪:IndexShardSnapshotStatus跟踪每个分片的快照状态
  • 中断检查:在关键操作点检查快照是否被中断
  • 错误恢复:失败的分片可以重新调度到其他节点

4. S3存储结构
#

repository/
├── index-{generation}              # 仓库元数据
├── index.latest                    # 最新generation指针
├── indices/
│   └── {index-uuid}/
│       ├── meta-{generation}.dat   # 索引元数据
│       └── {shard-id}/
│           ├── index-{generation}  # 分片快照元数据
│           ├── snap-{uuid}.dat     # 快照信息
│           └── __{uuid}            # 实际数据文件
└── snap-{snapshot-uuid}.dat        # 全局快照信息

性能优化策略
#

1. 并发控制
#

  • 限制同时进行的快照数量(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING
  • 分片级并行处理
  • 异步I/O操作

2. 网络优化
#

  • 速率限制(MAX_SNAPSHOT_BYTES_PER_SEC
  • 分块上传大文件
  • 压缩传输数据

3. 存储优化
#

  • 增量备份减少数据传输
  • 文件级去重
  • 虚拟文件(哈希值直接存储在元数据中)

总结
#

Elasticsearch的快照创建流程是一个复杂的分布式协调过程,整个过程可以分为以下几个关键阶段:

快照生成过程的核心阶段
#

🔄 阶段一:请求接收与验证阶段
#

目标:接收用户请求,验证参数合法性,构建快照任务

  • REST层解析请求参数,构建CreateSnapshotRequest对象
  • 传输层决定同步或异步执行模式
  • SnapshotsService验证请求并生成快照ID

🎯 阶段二:集群状态协调阶段
#

目标:在集群级别协调快照任务,确保一致性

  • 主节点任务队列管理快照创建任务
  • 集群状态锁定,防止并发元数据操作
  • 创建SnapshotsInProgress条目,广播到所有节点

🚀 阶段三:分片并行快照阶段
#

目标:各数据节点并行执行分片级快照

  • 分片快照服务监听集群状态变化
  • 分片级IndexCommit锁定,保护数据一致性
  • 调用仓库层执行实际的数据快照

💾 阶段四:数据存储与元数据管理阶段
#

目标:将数据安全持久化到存储仓库

  • BlobStoreRepository执行增量快照逻辑
  • 文件级去重,避免重复数据传输
  • 更新快照元数据,完成快照流程

通过这样的多层次锁定机制和分阶段协调,Elasticsearch能够在保证数据一致性的同时,最大程度地减少对正常读写操作的影响,实现高效的分布式快照功能。

相关文章

ElasticSearch 源码调试
·692 字·4 分钟
ElasticSearch