目 录CONTENT

文章目录

HDFS 短路本地读取系列(二):你以为的「本地读」和真正的「本地读」—getLegacy vs getBlockReaderLocal 的本质差异

醉酒的行者
2026-06-07 / 0 评论 / 0 点赞 / 1 阅读 / 0 字

HDFS 短路本地读取系列(二):你以为的「本地读」和真正的「本地读」—getLegacy vs getBlockReaderLocal 的本质差异

导语:在 HDFS 的读取路径中,BlockReaderFactory.build() 是客户端选择读取策略的总入口。在这个四级降级链路中,第二级和第三级分别由 getLegacyBlockReaderLocal()getBlockReaderLocal() 接管。一个基于文件路径直接打开(HDFS-2246),一个基于 Unix Domain Socket 文件描述符传递(HDFS-347)。本文将深入 Hadoop 3.x源码,彻底解读这两个方法的内部机制、调用链路与设计哲学。


一、背景:为什么需要短路本地读取?

传统 HDFS 读取路径中,即使客户端与 DataNode 同机,数据也要经过:

磁盘 → Page Cache → DataNode 用户态 Buffer → TCP Send Buffer → TCP Recv Buffer → 客户端用户态 Buffer

这是 3 次 CPU 拷贝 的马拉松。当 Spark 扫描百亿行 Parquet、HBase 承载百万 QPS 时,这 3 次拷贝会被放大为整个系统的性能。

短路本地读取(Short-Circuit Local Read) 的核心思想:客户端与 DataNode 同机时,让客户端直接读取本地块文件,绕过 DataNode 的 TCP 协议栈。

Hadoop 实现了两套短路读取机制,分别由 getLegacyBlockReaderLocal()getBlockReaderLocal() 触发。

读取路径对比:传统 TCP 路径 vs 短路本地读取路径

1.1 内核环回 vs 普通 TCP:第 3 次 CPU 拷贝的特殊性

传统路径中的 "第 3 次 CPU 拷贝"标注为"内核环回",这里详细解释它和普通跨机器 TCP 的区别。

内核环回(Kernel Loopback)指的是同机 TCP 通信时,数据走 127.0.0.1 回环接口。很多人误以为 127.0.0.1 就是"短路"——但实际上它仍然要完整遍历 TCP 协议栈

DataNode write()
  → tcp_sendmsg()          // 分段、拷贝到 sk_buff
  → tcp_transmit_skb()     // 构造 TCP 头、计算校验和(纯 CPU 算!)
  → ip_queue_xmit()        // IP 层路由
  → loopback_xmit()        // 发现目标在本地,直接丢入接收队列
  → netif_rx()             // 触发软中断(NET_RX_SOFTIRQ)
  → tcp_v4_rcv()           // TCP 入站处理
  → tcp_rcv_established()  // 重组、ACK、窗口
  → sock_def_readable()    // 通知接收 socket
Client read()
维度普通 TCP(跨机器)内核环回(同机 127.0.0.1)
数据路径内核 → NIC 驱动 → 网线 → 对端 NIC → 内核全程在内核内部兜一圈
DMA 次数2 次(发送 NIC + 接收 NIC 各读/写内存)0 次(根本不出网卡)
TCP 协议栈完整走一遍完整走一遍(一行代码不少)
校验和硬件 offload 到网卡(TCP Segmentation Offload)CPU 自己算
数据是否拷贝通过 sk_buff 跨机器传输同机器时 skb_clone() 共享数据页,不一定 literal memcpy()
主要瓶颈网卡带宽(10G/25G/100G)内核 TCP 栈 CPU 处理能力

关键认知:内核环回未必执行字面意义上的 memcpy() 数据拷贝(现代内核通过 skb_clone() 共享数据页),但 TCP 协议栈处理本身(分段、校验和、ACK、窗口管理)就是巨大的 CPU 开销。图中标注的"第 3 次 CPU 拷贝(内核环回)",指的是数据走完整 TCP 栈所消耗的 CPU 代价——这才是短路读取真正要绕过的。

而短路读取的优势恰恰在于:Client 根本不走 socket,拿到 FD 后直接 pread() / mmap() 操作块文件——连 TCP 栈的影子都没有。这就是为什么延迟能从 ~0.5ms 降到 ~0.1ms。


二、总入口:BlockReaderFactory.build() 的四级降级链路

// BlockReaderFactory.java — build() 方法(简化)
public BlockReader build() throws IOException {
    // 第0级:外部插件(ReplicaAccessorBuilder)
    BlockReader reader = tryToCreateExternalBlockReader();
    if (reader != null) return reader;

    final ShortCircuitConf scConf = conf.getShortCircuitConf();
    try {
        if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
            if (clientContext.getUseLegacyBlockReaderLocal()) {
                // 第一级短路:Legacy 路径(本文主角之一)
                reader = getLegacyBlockReaderLocal();
                if (reader != null) return reader;
            } else {
                // 第二级短路:新式 fd 传递路径(本文主角之二)
                reader = getBlockReaderLocal();
                if (reader != null) return reader;
            }
        }
        if (scConf.isDomainSocketDataTraffic()) {
            // 第三级:Domain Socket 数据流(仍走 DataNode,但无 TCP)
            reader = getRemoteBlockReaderFromDomain();
            if (reader != null) return reader;
        }
    } catch (IOException e) { /* 降级 */ }
    
    // 最终兜底:TCP 远程读取
    return getRemoteBlockReaderFromTcp();
}

关键配置开关

配置项默认值作用
dfs.client.read.shortcircuitfalse是否启用短路读取
dfs.client.use.legacy.blockreader.localfalsetrue → 走 getLegacyBlockReaderLocal()
dfs.domain.socket.pathDomain Socket 路径模板(如 /var/run/hadoop-hdfs/dn._PORT

BlockReaderFactory.build() 四级降级链路

BlockReaderFactory.build() 四级降级链路

各级降级触发条件对照表

降级级别方法触发条件配置开关
第0级tryToCreateExternalBlockReader()存在外部 ReplicaAccessorBuilder 插件
第1级(Legacy)getLegacyBlockReaderLocal()dfs.client.read.shortcircuit=true dfs.client.use.legacy.blockreader.local=truedfs.client.use.legacy.blockreader.local
第1级(新版)getBlockReaderLocal()dfs.client.read.shortcircuit=true dfs.client.use.legacy.blockreader.local=false(默认)dfs.domain.socket.path
第2级getRemoteBlockReaderFromDomain()dfs.domain.socket.data.traffic=true 且 Domain Socket 可用dfs.domain.socket.data.traffic
第3级(兜底)getRemoteBlockReaderFromTcp()以上全部失败

设计哲学:每次尝试失败都静默降级try...catch),不会抛异常,直到找到可用的读取方式或最终走到 TCP 兜底。


三、getLegacyBlockReaderLocal深度解读

3.1 方法签名与调用时机

// BlockReaderFactory.java — getLegacyBlockReaderLocal()
private BlockReader getLegacyBlockReaderLocal() throws IOException

触发条件(必须全部满足):

  1. dfs.client.read.shortcircuit = true
  2. clientContext.getUseLegacyBlockReaderLocal() = true
  3. 目标 DataNode 地址是本地地址(DFSUtilClient.isLocalAddress()
  4. clientContext.getDisableLegacyBlockReaderLocal() = false(未被上次失败禁用)

3.2 核心流程源码解读

private BlockReader getLegacyBlockReaderLocal() throws IOException {
    // 【守卫条件1】目标必须为本机
    if (!DFSUtilClient.isLocalAddress(inetSocketAddress)) {
        return null;
    }
    
    // 【守卫条件2】未被上次失败禁用
    if (clientContext.getDisableLegacyBlockReaderLocal()) {
        return null;
    }
    
    try {
        // 【核心】通过 RPC 获取块文件路径,然后直接打开
        return BlockReaderLocalLegacy.newBlockReader(conf,
            userGroupInformation, configuration, fileName, block, token,
            datanode, startOffset, length, storageType);
    } catch (RemoteException re) {
        ioe = re.unwrapRemoteException(
            InvalidToken.class, AccessControlException.class);
    }
    
    // 【失败处理】权限异常说明用户不在白名单,禁用 Legacy 路径
    if ((!(ioe instanceof AccessControlException)) && isSecurityException(ioe)) {
        throw ioe;  // 非权限异常,直接抛出(阻止降级)
    }
    LOG.warn("Disabling legacy local reads.", ioe);
    clientContext.setDisableLegacyBlockReaderLocal();  // 禁用
    return null;
}

3.3 BlockReaderLocalLegacy.newBlockReader的 RPC 交互

// BlockReaderLocalLegacy.java — newBlockReader() 核心片段
static BlockReaderLocalLegacy newBlockReader(...) throws IOException {
    // 【步骤1】从本地缓存获取块路径(LRU,最多 10000 条)
    LocalDatanodeInfo localDatanodeInfo = 
        getLocalDatanodeInfo(node.getIpcPort());
    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
    
    if (pathinfo == null) {
        // 【步骤2】缓存未命中,发起 RPC 调用
        // 调用 DataNode 的 ClientDatanodeProtocol.getBlockLocalPathInfo()
        pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
            configuration, conf.getSocketTimeout(), token,
            conf.isConnectToDnViaHostname(), storageType);
    }
    
    // 【步骤3】直接打开本地文件(这是"短路"的本质)
    File blkfile = new File(pathinfo.getBlockPath());
    dataIn = new FileInputStream(blkfile);  // 直接 open()
    
    if (!skipChecksumCheck) {
        File metafile = new File(pathinfo.getMetaPath());
        checksumIn = new FileInputStream(metafile);
        // 解析 checksum 头
        final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(...);
    }
    
    return new BlockReaderLocalLegacy(scConf, file, blk,
        startOffset, checksum, verifyChecksum, dataIn, ..., checksumIn);
}

调用链路图

getLegacyBlockReaderLocal() 调用链路

3.4 Legacy 路径的致命缺陷

  1. 权限问题:客户端需要直接读取 DataNode 数据目录的权限,通常需要将用户加入 dfs.block.local-path-access.user 白名单,运维复杂。
  2. 安全问题:绕过 DataNode 直接读文件,DataNode 无法感知读取行为,也无法做节流控制。
  3. 缓存一致性LocalDatanodeInfo 的 LRU 缓存在 DataNode 删除块后可能返回过期路径。

四、getBlockReaderLocal深度解读

4.1 方法签名与架构革新

// BlockReaderFactory.java — getBlockReaderLocal()
private BlockReader getBlockReaderLocal() throws IOException

对应 JIRA HDFS-347,核心是:通过 Unix Domain Socket 让 DataNode 把块文件的 FileDescriptor 传递给客户端

这样客户端不需要直接访问数据目录的权限——DataNode 作为文件所有者打开 FD,然后通过内核级 SCM_RIGHTS 机制把 FD "发送" 给客户端。

4.2 核心流程源码解读

private BlockReader getBlockReaderLocal() throws IOException {
    // 【步骤1】获取 Domain Socket 路径状态(带缓存)
    if (pathInfo == null) {
        pathInfo = clientContext.getDomainSocketFactory()
            .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
    }
    
    // 【步骤2】检查路径是否可用
    if (!pathInfo.getPathState().getUsableForShortCircuit()) {
        return null;
    }
    
    // 【步骤3】从 ShortCircuitCache 获取或创建 ShortCircuitReplica
    // (这是整个流程的灵魂)
    ShortCircuitCache cache = clientContext.getShortCircuitCache();
    ExtendedBlockId key = new ExtendedBlockId(
        block.getBlockId(), block.getBlockPoolId());
    
    // 【关键】fetchOrCreate() 是线程安全的 Fetch-Or-Create 模式
    // 如果缓存中没有,会回调 this.createShortCircuitReplicaInfo()
    // (this 实现了 ShortCircuitReplicaCreator 接口)
    ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
    
    if (info.getInvalidTokenException() != null) {
        throw info.getInvalidTokenException();
    }
    if (info.getReplica() == null) {
        return null;  // 创建失败,返回 null 触发降级
    }
    
    // 【步骤4】使用 Builder 模式构造 BlockReaderLocal
    return new BlockReaderLocal.Builder(conf.getShortCircuitConf())
        .setFilename(fileName)
        .setBlock(block)
        .setStartOffset(startOffset)
        .setShortCircuitReplica(info.getReplica())  // 注入 replica(含 data/meta fd)
        .setVerifyChecksum(verifyChecksum)
        .setCachingStrategy(cachingStrategy)
        .setStorageType(storageType)
        .build();
}

4.3 ShortCircuitCache.fetchOrCreate的并发艺术

这是整个短路读取中最精妙的设计之一:多线程请求同一 Block 时,只创建一个 ShortCircuitReplica,其余线程等待

// ShortCircuitCache.java — fetchOrCreate()
public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
    ShortCircuitReplicaCreator creator) {
    
    lock.lock();
    try {
        // 【快路径】缓存命中
        Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key);
        if (waitable != null) {
            info = fetch(key, waitable);  // 等待或直接获取
            if (info != null) return info;
        }
        
        // 【慢路径】缓存未命中:放入 Waitable,让其他线程等待
        newWaitable = new Waitable<>(lock.newCondition());
        replicaInfoMap.put(key, newWaitable);
    } finally { lock.unlock(); }
    
    // 【释放锁后】回调 creator.createShortCircuitReplicaInfo()
    // 即 BlockReaderFactory.createShortCircuitReplicaInfo()
    return create(key, creator, newWaitable);
}

Waitable 的作用是:多个线程可以同时 await() 同一个 Waitable,当创建线程调用 provide() 时,所有等待线程被唤醒并获得同一个 ShortCircuitReplicaInfo

4.4 createShortCircuitReplicaInfo FD 传递的真正发生地

当缓存未命中时,ShortCircuitCache.create() 会调用 BlockReaderFactory.createShortCircuitReplicaInfo()

// BlockReaderFactory.java — createShortCircuitReplicaInfo()
@Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
    // 【步骤1】获取或创建 Domain Socket 连接
    DomainPeer peer = nextDomainPeer();
    
    // 【步骤2】在共享内存中分配 slot(与 DataNode 同步状态)
    ShortCircuitReplica.ShareMemorySlot slot = cache.allocShmSlot(...);
    
    // 【步骤3】向 DataNode 发送短路读取请求
    // 底层:Sender.requestShortCircuitFds()
    requestShortCircuitFds(peer, slot, ...);
    
    // 【步骤4】接收 DataNode 通过 SCM_RIGHTS 传递的 FD
    // 底层:DomainSocket.recvFileInputStreams()
    FileInputStream[] fis = peer.getDomainSocket().recvFileInputStreams(...);
    
    // 【步骤5】封装为 ShortCircuitReplica
    ShortCircuitReplica replica = new ShortCircuitReplica(
        key, fis[0], fis[1], slot, ...);  // fis[0]=data, fis[1]=meta
    
    return new ShortCircuitReplicaInfo(replica);
}

4.5 DataNode侧 SCM_RIGHTS FD 传递的内核之旅

DataNode 的 DataXceiver.requestShortCircuitFds() 处理客户端请求:

// DataXceiver.java — requestShortCircuitFds()(DataNode 侧)
public void requestShortCircuitFds(...) throws IOException {
    // 【步骤1】权限校验(验证 Block Token)
    checkAccess(...);
    
    // 【步骤2】在共享内存中注册 slot
    datanode.getShortCircuitRegistry().registerSlot(...);
    
    // 【步骤3】打开块文件(DataNode 有权限,因为它是文件 owner)
    FileInputStream[] fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
    
    // 【步骤4】通过 SCM_RIGHTS 发送 FD
    // 内核处理:为客户端进程分配新 FD,指向同一个 struct file
    DomainSocket sock = peer.getDomainSocket();
    sock.sendFileDescriptors(
        new FileDescriptor[]{fis[0].getFD(), fis[1].getFD()},
        buf, 0, buf.length);
}

SCM_RIGHTS 内核机制(简述):

  • 发送方调用 sendmsg(),将 FD 放入辅助数据(cmsg_type = SCM_RIGHTS
  • 内核在接收方进程的文件描述符表中分配新条目,指向发送方 FD 对应的同一个 struct file
  • 接收方拿到的新 FD 与发送方 FD 值不同,但指向同一文件偏移和状态

FD 传递全流程(SCM_RIGHTS 机制)

下面用图展示 getBlockReaderLocal() 完整调用链,从客户端发起请求到拿到 FD 的全过程:

getBlockReaderLocal() FD 传递全流程

SCM_RIGHTS 内核机制简述

步骤发送方(DataNode)接收方(客户端)内核动作
1调用 sendmsg(),FD 放入辅助数据 cmsg_type=SCM_RIGHTS识别 SCM_RIGHTS,找到 struct file
2调用 recvmsg() 接收在接收方 FD 表中分配新条目,指向同一 struct file;返回新 FD 值(与发送方 FD 值不同)
3拿到 new FileInputStream(fd)后续 IO 直接操作同一文件偏移量

结果:客户端拿到的新 FD,和 DataNode 打开时的 FD 值不同,但指向同一文件描述符结构,共享文件偏移量和状态。

4.6 Unix Domain Socket 深度解析

上文反复提到 "Domain Socket",它到底是什么?和 TCP 环回有什么本质区别?.sock 文件在里面扮演什么角色?本节彻底讲清楚。

4.6.1 什么是 Domain Socket?

Unix Domain Socket(UDS) 是同机进程间通信(IPC)机制,数据只在内核内存中传递,不绕网卡,不走 TCP 协议栈。

维度TCP Socket(127.0.0.1)Unix Domain Socket
地址族AF_INET / AF_INET6AF_UNIX / AF_LOCAL
寻址方式IP:Port文件系统路径(如 /var/run/hdfs/dn.sock
内核协议栈完整 TCP/IP 栈(校验和、分段、ACK、拥塞控制)直接内核内存拷贝,无协议处理
SCM_RIGHTS❌ 不支持✅ 支持 FD 传递(这才是关键!)
延迟~0.05ms(协议栈开销)~0.005ms(纯内存搬运)
安全模型IP 层面的访问控制文件系统权限(0600)+ SO_PEERCRED 获取对端 UID

4.6.2 .sock 文件的本质

/var/run/hdfs/dn.sock 不存储数据,它只做「门牌号」——让客户端通过文件系统路径找到服务端绑定的内核 struct socket 对象。

bind() 时内核做了什么?

DataNode 调用 bind("/var/run/hdfs/dn.sock"):

  VFS 层:
    → 在路径上创建一个 inode
    → inode->i_mode  = S_IFSOCK  (socket 类型,不是普通文件!)
    → inode->i_fop   = &socket_file_ops

  内核内存:
    → 分配 struct socket + struct sock(AF_UNIX 协议族)
    → 服务端 fd 指向这个 struct socket
    → 把路径字符串和 struct socket 的对应关系记录到 Unix socket 哈希表

  ls -l 输出:
    srwx------ 1 hdfs hdfs ... /var/run/hdfs/dn.sock
    ↑ s 开头 = socket 文件类型

connect() 时内核做了什么?

客户端 connect("/var/run/hdfs/dn.sock"):

  1. VFS 解析路径 → 找到 S_IFSOCK 类型的 inode
  2. 通过 inode 找到 DN 绑定的 struct socket
  3. 分配客户端自己的 struct socket
  4. 内核将两端的 struct socket 配对:
     → 服务端 struct unix_sock->peer 指向客户端
     → 客户端 struct unix_sock->peer 指向服务端
     → 两端 sk_receive_queue 互相关联

  结果:
    客户端 fd → struct socket ←→ struct socket ← DN accept fd
                 ↑                     ↑
            完全在内核内存中,不走网卡

一句话.sock 文件是一个特殊的 inode(S_IFSOCK),内核通过它找到服务端绑定的 struct socketconnect() 的本质是让客户端的 struct socket 和服务端的 struct socket 配对,之后的通信直接在内核内存里完成。

4.6.3 谁做服务端?为什么?

DataNode 是服务端(bind + listen + accept),客户端是连接方(connect)。

正确的方向(HDFS 实际实现):

DataNode 启动:
  ├─ socket() → bind(/var/run/hdfs/dn.sock) → listen()
  └─ accept() 阻塞等待

客户端发起短路读取:
  ├─ step1: connect(/var/run/hdfs/dn.sock)   ← 主动连接 DN
  ├─ step2: allocShmSlot()                     ← 分配共享内存
  ├─ step3: sendmsg() 发送读取请求(普通数据)
  └─ DN 收到请求后:
      ├─ checkAccess(token)                    ← 权限校验
      ├─ openBlockFile()                       ← 打开块文件,拿到 FD
      └─ sendmsg(SCM_RIGHTS)                  ← 把 FD 发回客户端

为什么必须是 DN 做服务端?安全原因:

假设客户端做服务端(不可行):
  → DN 需要 connect() 到客户端的 socket
  → 客户端怎么告诉 DN "我的 socket 路径在哪"?
  → 任意进程都可以伪装成客户端...

正确设计(DN 做服务端):
  → /var/run/hdfs/dn.sock 固定路径,权限 0600
  → 客户端 connect() 时,DN 通过 SO_PEERCRED 获取客户端 UID
  → 结合 block token 校验,确保只有合法客户端能拿到 FD

4.6.4 全双工 & SCM_RIGHTS 双向

UDS 一旦建立,是全双工的:

UDS 连接建立后:

  客户端 ←─────── 全双工 ──────→ DataNode
            (同一条连接,双向同时可用)

  客户端可以:sendmsg(SCM_RIGHTS) → DN 收
  DN 也可以:sendmsg(SCM_RIGHTS) → 客户端收

SCM_RIGHTS 没有方向限制——只要在已连接的 socket 上调用 sendmsg() 即可。HDFS 实际只用了 DN → 客户端 这个方向(把块文件 FD 发给客户端),但反向在技术上完全可行。

4.6.5 SCM_RIGHTS 是系统调用

SCM_RIGHTS 本身不是系统调用,而是一个常量标志位。但通过 UDS 传递 SCM_RIGHTS整个动作,必须经过 sendmsg() / recvmsg() 这一对系统调用完成:

应用层:
  sendmsg(sockfd, &msg, 0)       ← 一次系统调用,从用户态陷入内核态
    │
    │  msg.msg_control 包含:
    │  struct cmsghdr {
    │      cmsg_type = SCM_RIGHTS;   ← 常量标志,告诉内核"我要传 FD"
    │      cmsg_data = {block_fd};   ← 要传递的 FD 值
    │  }
    │
    ▼ 内核态
  __sys_sendmsg() → ___sys_sendmsg()
    └─ scm_send()
       └─ case SCM_RIGHTS:
          scm_fp_copy(cmsg_data)
            1. fget(fd) → 获取 struct file
            2. 在接收方 FD 表中分配新条目,指向同一 struct file
            3. 新 FD 值写回接收方的 cmsg_data

HDFS 短路读取中,这一步是关键路径——一次 sendmsg() + 一次 recvmsg(),FD 从 DataNode 进程传递到客户端进程,没有数据拷贝,但确实走了两次系统调用


五、BlockReaderLocal 的读取路径:两种 I/O 模式

5.1 read() 方法的双分支

// BlockReaderLocal.java — read(ByteBuffer buf)
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
    boolean canSkipChecksum = createNoChecksumContext();
    try {
        if (canSkipChecksum && zeroReadaheadRequested) {
            // 【路径 A】直接操作 FileChannel,无 bounce buffer
            nRead = readWithoutBounceBuffer(buf);
        } else {
            // 【路径 B,默认】通过 bounce buffer 预读 + 可选 checksum 验证
            nRead = readWithBounceBuffer(buf, canSkipChecksum);
        }
        return nRead;
    } finally {
        if (canSkipChecksum) releaseNoChecksumContext();
    }
}

5.2 两个分支的触发条件(源码真相)

先说结论:readWithoutBounceBuffer() 在生产环境中几乎不会触发。

// BlockReaderLocal 构造函数(源码第 257-265 行)
int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
    ((Math.min(builder.bufferSize, builder.maxReadahead) +
        bytesPerChecksum - 1) / bytesPerChecksum);

if (maxReadaheadChunks == 0) {
    this.zeroReadaheadRequested = true;    // ← 唯一触发条件
    maxReadaheadChunks = 1;
} else {
    this.zeroReadaheadRequested = false;   // ← 正常 checksum 类型,永远 false
}
变量含义何时为 true
zeroReadaheadRequested是否禁用预读(走直接读取)bytesPerChecksum == 0(checksum 类型为 NULL)
canSkipChecksum是否可以跳过 checksum 验证verifyChecksum=falseaddNoChecksumAnchor() 成功

bytesPerChecksum == 0 意味着什么?

dfs.checksum.type=NULL,写入的块完全没有校验和

→ 这是一个几乎没人用的配置(数据损坏无法检测)

→ 所以 readWithoutBounceBuffer() 在生产中基本是死代码

dfs.client.read.shortcircuit.skip.checksum=true 有什么用?

// Builder 构造函数(第 89 行)
this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
//                              ↑ dfs.client.read.shortcircuit.skip.checksum

设为 true 后:

  • verifyChecksum = false
  • createNoChecksumContext() 返回 true
  • zeroReadaheadRequested 仍为 false
  • 结果:走 readWithBounceBuffer()只是跳过了 checksum 验证,并没有零拷贝

5.3 readWithBounceBuffer():默认路径(2 次 CPU 拷贝)

重要澄清:readWithBounceBuffer ≠ 零拷贝。 名字里的 "bounce" 已经揭示了真相——数据需要先"弹"到一个中间缓冲区,再"弹"到用户目标缓冲区。

5.3.1 bounce buffer 在哪里?

// BlockReaderLocal.java 第 68 行
private static final DirectBufferPool bufferPool = new DirectBufferPool();

// 第 224 行
private ByteBuffer dataBuf;  // ← 这就是 bounce buffer

// 第 284-286 行:懒初始化
private synchronized void createDataBufIfNeeded() {
    if (dataBuf == null) {
        dataBuf = bufferPool.getBuffer(
            maxAllocatedChunks * bytesPerChecksum);
    }
}

// DirectBufferPool.java — 底层是 allocateDirect
public ByteBuffer getBuffer(int size) {
    // ...先从池里尝试复用...
    return ByteBuffer.allocateDirect(size);  // ← Direct ByteBuffer!
}

dataBuf 是 DirectByteBuffer,位于用户进程的堆外 native 内存,不在内核空间:

Bounce Buffer 内存布局

选 DirectByteBuffer 而不是堆内 byte[] 的原因:JVM 做 I/O 时,堆内 byte[] 可能被 GC 移动,需要先 pin(钉住)或隐式拷贝到 native 缓冲区。DirectByteBuffer 天然避免了这个问题。

5.3.2 数据流:2 次 CPU 拷贝

数据流:2 次 CPU 拷贝

步骤方式位于
内核 PageCachebounce bufferFileInputStream.read() / pread()内核 → 用户空间 native
bounce buffer用户传入的缓冲区ByteBuffer.get() / ByteBuffer.put()用户空间 native → JVM 堆/用户 buf

与传统的 TCP 远程读(4 次拷贝)相比,短路 bounce buffer 已经省掉了内核 Socket 缓冲区的 2 次拷贝。但这不叫零拷贝,只是"短路"——数据还是要经过用户空间手递手转交一次。

5.4 真正的零拷贝:HDFS Zero-Copy Read(mmap)

BlockReaderLocal.read() 的两个分支都不是真正的零拷贝

Hadoop 2.6+ 提供了另一条路径——通过 mmap 实现真正的零拷贝,走的是 DFSInputStreamHasEnhancedByteBufferAccess 接口:

// DFSInputStream.java — read(pool, maxLen, opts) 完整实现
// 这才是真实代码,下面两个方法共同完成零拷贝读

// ===== 方法一:对外入口 =====
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
        int maxLength, EnumSet<ReadOption> opts)
            throws IOException, UnsupportedOperationException {

    // 确保当前有可用的 blockReader
    if ((blockReader == null) || (blockEnd == -1)) {
        if (pos >= getFileLength())  return null;
        if ((!seekToBlockSource(pos)) || (blockReader == null))
            throw new IOException(...);
    }

    ByteBuffer buffer = null;

    // ① 先尝试 mmap 零拷贝(受 dfs.client.mmap.enabled 控制)
    if (dfsClient.getConf()
            .getShortCircuitConf().isShortCircuitMmapEnabled()) {
        buffer = tryReadZeroCopy(maxLength, opts);  // ← 真正调 getClientMmap() 的地方
    }

    // ② mmap 成功,直接返回
    if (buffer != null) {
        return buffer;  // ← 零拷贝路径,buffer 直接映射 PageCache
    }

    // ③ mmap 不可用或不支持,退回普通短路读(bounce buffer,2次拷贝)
    buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
    if (buffer != null) {
        getExtendedReadBuffers().put(buffer, bufferPool);
    }
    return buffer;
}

// ===== 方法二:零拷贝的实际执行者 =====
private synchronized ByteBuffer tryReadZeroCopy(
        int maxLength, EnumSet<ReadOption> opts) throws IOException {

    final long curPos   = pos;
    final long blockPos = curPos
            - currentLocatedBlock.getStartOffset();  // ← block 内偏移

    // 截断到 block 末尾,防止越界
    long length63 = (curPos + maxLength <= blockEnd + 1)
            ? maxLength
            : (blockEnd - curPos + 1);
    if (length63 <= 0) return null;

    // MappedByteBuffer 上限 2GB(31-bit),超出的部分本次不 mmap
    int length;
    if (blockPos + length63 <= Integer.MAX_VALUE)
        length = (int) length63;
    else
        return null;

    // ★ 真正发起 mmap 的地方
    final ClientMmap clientMmap = blockReader.getClientMmap(opts);
    if (clientMmap == null)  return null;  // 走普通短路读(bounce buffer,2次拷贝)
    ...
}

5.4.1 BlockReaderLocal.getClientMmap() — mmap 的底层入口

tryReadZeroCopy() 真正调 getClientMmap(),而这个方法在 BlockReaderLocal 的实现是:

// BlockReaderLocal.java 第 677 行
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {

    // 需要校验 checksum 且没传 SKIP_CHECKSUMS 时,必须先 anchoring
    boolean anchor = verifyChecksum
            && !opts.contains(ReadOption.SKIP_CHECKSUMS);

    if (anchor) {
        if (!createNoChecksumContext()) {
            return null;  // 无法 anchoring(block 未 mlock),放弃 mmap
        }
    }

    ClientMmap clientMmap = null;
    try {
        clientMmap = replica.getOrCreateClientMmap(anchor);
        //            ↑ 最终走到 FileChannel.map(POSIX_MMAP, offset, length)
        //              → MappedByteBuffer(真正的零拷贝!)
    } finally {
        if (clientMmap == null && anchor) {
            releaseNoChecksumContext();  // 失败回滚
        }
    }
    return clientMmap;
}

只有 BlockReaderLocal 实现了 getClientMmap() 如果当前是远程 BlockReaderRemote,该方法返回 null,tryReadZeroCopy() 直接失败退回 bounce buffer。

5.4.2 dfs.client.mmap.enabled — 零拷贝总开关

tryReadZeroCopy() 是否被调用,受一个总开关控制:

// 唯一生效位置:DFSInputStream.read(pool, maxLen, opts)
if (dfsClient.getConf()
        .getShortCircuitConf().isShortCircuitMmapEnabled()) {
    buffer = tryReadZeroCopy(maxLength, opts);
}
参数默认值作用
dfs.client.mmap.enabledtrue总开关,控制是否尝试 mmap
dfs.client.mmap.cache.size256mmap 映射的 LRU 缓存条目数
dfs.client.mmap.cache.timeout.ms60000缓存条目过期时间
dfs.client.mmap.retry.timeout.ms300000mmap 失败后的重试间隔

设为 false 时,所有调用方的零拷贝尝试都被跳过,直接 fallback 到 ByteBufferUtil.fallbackRead()readWithBounceBuffer()

5.4.3 tryReadZeroCopy vs readWithoutBounceBuffer:别混淆

两者名字都带"Zero/WithoutBuffer",但本质完全不同:

维度tryReadZeroCopy()readWithoutBounceBuffer()
所在类DFSInputStreamBlockReaderLocal
触发入口read(pool, maxLen, opts)read(ByteBuffer) / read(byte[]) 内部
底层机制mmapMappedByteBufferpread() 直接读入用户 buffer
CPU 拷贝0 次1 次(省掉 bounce,但不是零)
生产可用性✅ mmap 可用时自动触发❌ 需 bytesPerChecksum==0(几乎不会触发)
互相关系完全独立,无调用关系完全独立,无调用关系

5.4.4 mmap 为什么是真正的零拷贝:页表共享原理

这不是修辞手法,而是物理页框级别的共享——用户空间的虚拟地址和内核空间的虚拟地址,通过各自的页表,指向同一个物理页框

普通读取 vs mmap:页表共享原理

更深一层:数据是惰性加载的——缺页中断机制:

① FileChannel.map() 执行后
   用户页表项全部为空(Present=0),未分配物理页框
   此时 "映射" 只是一个 VMA(虚拟内存区域)记录

② 用户第一次读 MappedByteBuffer.get()
   CPU 查页表 → Present=0 → 触发 Page Fault(缺页中断)
       │
       ▼
   内核缺页处理程序:
     1. 分配一块物理页框(4KB)
     2. 通过 DMA 从磁盘读 Block 数据到该页框
     3. 填写用户页表:PTE.paddr = 页框地址, Present=1
     4. 同时填写内核 PageCache 映射
     5. iret 返回用户态,CPU 重试刚才的指令
     6. 这次页表命中 → 数据已在内存 → 零拷贝读到!

③ 后续读同一页 → MMU 硬件直接转译
   整个过程:用户态执行一条 mov 指令读取内存
   没有 read() 系统调用,没有内核态切换,没有拷贝!

5.4.5 为什么不是所有短路读都用 mmap?

mmap 这条路径有严格的约束条件:

限制说明
必须跳过 checksummmap 返回的 buffer 直接暴露内核内存,无法逐 chunk 校验。如果传入 SKIP_CHECKSUMSgetClientMmap() 内部还需要 anchoring(mlock 防止 DN 删除文件)
MappedByteBuffer 2GB 限制Java 用 int 索引,单段 mmap 不能超过 31-bit 范围(HDFS-5101)
文件描述符消耗每个 mmap 映射消耗一个 FD + VMA,所以需要 ShortCircuitCache 做 LRU 淘汰
并发安全需要 anchoring 机制阻止 DN 在客户端读取期间删除对应 block 文件

bounce buffer 的最大存在理由:checksum 校验不可分割。 校验时需要一个独立的工作区,不能直接操作用户缓冲区——万一校验失败,用户缓冲区已经被污染了。

5.4.6 完整调用链总结

mmap 零拷贝完整调用链

mmap 的触发条件回顾:

1. dfs.client.read.shortcircuit=true
2. dfs.client.mmap.enabled=true(默认)
3. BlockReader 是 BlockReaderLocal(短路读已生效)
4. Replica 支持 mmap(数据在本地文件系统)
5. 用户通过 DFSInputStream.read(pool, maxLen, opts) 调用
   → 普通 FSDataInputStream.read(byte[]) / read(ByteBuffer) 不走 mmap!

5.5 三种 DFSInputStream.read() 与新版短路读的关系

客户端收到 FD 后,走新版短路读——但最终执行哪个读取方法,取决于调用方用的是哪种 read() 接口。

5.5.1 三种读方法的调用链

DFSInputStream
    │
    ├── read(byte[] buf, int off, int len)      
    │     │
    │     └── ByteArrayStrategy
    │           │
    │           └── readWithStrategy()           ← 统一调度引擎
    │                 │
    │                 ├── blockSeekTo()
    │                 │     └── BlockReaderFactory.build()
    │                 │           └── getBlockReaderLocal()  ✅ 新版短路
    │                 │
    │                 └── BlockReaderLocal.read(byte[], int, int) 
    │                       │
    │                       └── readWithBounceBuffer()  ← 2 次 CPU 拷贝
    │
    ├── read(ByteBuffer buf)                    
    │     │
    │     └── ByteBufferStrategy
    │           │
    │           └── readWithStrategy()             ← 同一个引擎!
    │                 │
    │                 ├── blockSeekTo()
    │                 │     └── BlockReaderFactory.build()
    │                 │           └── getBlockReaderLocal()  ✅ 新版短路
    │                 │
    │                 └── BlockReaderLocal.read(ByteBuffer)
    │                       │
    │                       └── readWithBounceBuffer()  ← 2 次 CPU 拷贝
    │
    └── read(ByteBufferPool pool, int maxLen,
             EnumSet<ReadOption> opts)          
          │
          └── seekToBlockSource()
                │
                └── BlockReaderFactory.build()
                      └── getBlockReaderLocal()  ✅ 新版短路
          │
          ├── tryReadZeroCopy()
          │     └── getClientMmap()              ← 0 次拷贝 ✅
          │
          └── fallback: ByteBufferUtil.fallbackRead()
                └── DFSInputStream.read(ByteBuffer)  ← 退回 2 次拷贝

5.5.2 三种方法的本质差异

维度read(byte[])read(ByteBuffer)read(pool, maxLen, opts)
接口来源java.io.InputStreamByteBufferReadable(Hadoop 扩展)HasEnhancedByteBufferAccess(Hadoop 扩展)
缓冲区来源调用者提供的 byte[]调用者提供的 ByteBufferByteBufferPool 借出的 ByteBuffer
调度引擎readWithStrategy()readWithStrategy()独立调度
最终执行方法read(byte[], int, int)read(ByteBuffer)getClientMmap() → mmap
CPU 拷贝次数2 次2 次0 次
典型使用者hadoop fs -cat、MapReduceHBase、Netty 等 NIO 框架Impala、Presto 等 MPP 引擎

5.5.3 设计哲学

三个方法体现了 "兼容层 → 扩展层 → 性能层" 的递进设计:

  • read(byte[]):兼容标准 java.io.InputStream,让任何使用传统 IO 的框架无缝接入
  • read(ByteBuffer):拥抱 Java NIO,让 NIO 框架减少一次 byte[] ↔ ByteBuffer 转换
  • read(pool, maxLen, opts):为追求极致性能的 MPP 引擎开辟专用零拷贝通道

三种方法共享同一个 BlockReaderFactory——无论走哪条路,获取 BlockReader 时都经过:

BlockReaderFactory.build() 优先级链

所以新版短路读取并不是绑定到某个特定 read() 方法上,而是一种 BlockReader 获取策略。拿到 BlockReaderLocal 后,read(byte[])read(ByteBuffer)readWithBounceBuffer()read(pool, maxLen, opts)tryReadZeroCopy()getClientMmap()

一句话:绝大多数 HDFS 客户端的短路读最终执行的是 BlockReaderLocal.read(byte[], int, int)readWithBounceBuffer()(2 次拷贝),因为 hadoop fs、MapReduce、Spark 等生态都通过标准 InputStream.read(byte[]) 接口读取。零拷贝 getClientMmap() 是 MPP 引擎的高速专用道。

六、两种实现的全面对比

维度getLegacyBlockReaderLocal()getBlockReaderLocal()
JIRAHDFS-2246HDFS-347
FD 获取方式RPC 获取路径 → new FileInputStream(path)Domain Socket + SCM_RIGHTS FD 传递
权限要求客户端需加入 dfs.block.local-path-access.user无(DataNode 代为打开 FD)
安全性低(绕过 DataNode 访问控制)高(DataNode 验证 Token 后才传递 FD)
mmap 支持不支持支持(通过 ShortCircuitReplica.loadMmapInternal()
缓存机制LocalDatanodeInfo(LRU 10000)ShortCircuitCache(引用计数 + Waitable)
引用计数无(每次 new FileInputStream有(refCount,支撑 evictable 管理)
配置开关dfs.client.use.legacy.blockreader.local=true同上 =false(默认)
推荐程度已废弃(仅兼容旧版)✅ 生产推荐

七、实战:生产环境配置建议

7.1 推荐配置(hdfs-site.xml)

<!-- 启用短路本地读取 -->
<property>
    <name>dfs.client.read.shortcircuit</name>
    <value>true</value>
</property>

<!-- 使用新版 fd 传递机制(默认即 false,显式写出更清晰) -->
<property>
    <name>dfs.client.use.legacy.blockreader.local</name>
    <value>false</value>
</property>

<!-- Domain Socket 路径模板 -->
<property>
    <name>dfs.domain.socket.path</name>
    <value>/var/lib/hadoop-hdfs/dn._PORT</value>
</property>

<!-- 跳过 checksum(当 checksum 已由底层存储保证时,可选) -->
<property>
    <name>dfs.client.read.shortcircuit.skip.checksum</name>
    <value>false</value>
</property>

<!-- ShortCircuitCache 配置 -->
<property>
    <name>dfs.client.read.shortcircuit.streams.cache.size</name>
    <value>4096</value>  <!-- 默认 256,大内存机器可调大 -->
</property>

<!-- mmap 零拷贝相关 -->
<property>
    <name>dfs.client.mmap.enabled</name>
    <value>true</value>  <!-- 总开关:是否尝试 mmap 零拷贝,默认已开启 -->
</property>
<property>
    <name>dfs.client.mmap.cache.size</name>
    <value>256</value>  <!-- mmap 映射 LRU 缓存条目数 -->
</property>
<property>
    <name>dfs.client.mmap.cache.timeout.ms</name>
    <value>60000</value>  <!-- mmap 缓存过期时间,默认 1 分钟 -->
</property>

7.2 踩坑记录

  1. Domain Socket 路径权限问题:DataNode 需要对该路径有读写权限,且客户端需要执行权限(才能 connect)。建议将 Socket 文件放在 /var/run/hadoop-hdfs/ 下,并由 hdfs 用户拥有。

  2. Native 库未加载libhadoop.so 未正确加载时,Domain Socket 不可用,会自动降级为 TCP 读取。检查:hadoop checknative -a

  3. mmap 零拷贝不生效:确保调用方使用 DFSInputStream.read(ByteBufferPool, maxLen, opts) 接口而非标准 InputStream.read(byte[])hadoop fs -cat 和 MapReduce 默认走的都是标准接口,零拷贝只在 MPP 引擎(Impala、Presto)中生效。

  4. 注意 mmap 的 FD 消耗:每个 mmap 映射消耗一个文件描述符。dfs.client.mmap.cache.size=256 意味着最多同时持有 256 个 mmap 映射。高并发场景下可适当增大,但要确保 ulimit -n 足够大。

  5. mmap 导致虚拟内存耗尽:大量使用 mmap 时,注意 vm.max_map_count 设置(sysctl vm.max_map_count,建议 ≥ 655300)。


八、总结

getLegacyBlockReaderLocal()getBlockReaderLocal() 代表了 HDFS 短路本地读取的两个时代:

  • Legacy 路径(HDFS-2246)简单直接——RPC 拿路径,直接 open()。但权限管理复杂,安全性弱,已不建议在新集群中使用。

  • 新版路径(HDFS-347)通过 Unix Domain Socket 的 SCM_RIGHTS 机制实现 FD 传递,巧妙地让 DataNode 保持了对读取的控制权,同时让客户端获得了直接读取的能力。

  • 真正的零拷贝BlockReaderLocal.read() 的两个分支都不是零拷贝。readWithBounceBuffer() 是默认路径(2 次 CPU 拷贝),bounce buffer 位于用户空间堆外 native 内存(DirectByteBuffer)。readWithoutBounceBuffer() 在生产中几乎不触发(需要 dfs.checksum.type=NULL)。真正的零拷贝方案是 HDFS Zero-Copy ReadDFSInputStream.read(pool, maxLen, opts) + mmap + getClientMmap()),通过页表共享让用户虚拟地址和内核 PageCache 指向同一块物理页框,CPU 拷贝次数为 0。

  • 三种读取接口的定位read(byte[]) 兼容标准 IO、read(ByteBuffer) 拥抱 NIO、read(pool, maxLen, opts) 为 MPP 引擎留高速专用道。三者共享同一个 BlockReaderFactory 获取 BlockReader,但最终执行的读取方法不同——前两者走 readWithBounceBuffer(),后者走 getClientMmap() mmap 零拷贝。

设计哲学:真正的性能优化,不是堆砌新特性,而是让数据走最短的路径——让内核做它最擅长的事(Page Cache 管理、SCM_RIGHTS FD 传递、mmap 页表映射),让用户态代码保持简单而强大。

0

评论区