An HBase & HDFS Short-Circuit Read Odyssey

Huaxiang Sun and Michael Stack


We were asked to consult on an Apache HBase cluster. This deploy had RegionServers that were ABORTing under high-load. On occasion, a RegionServer would hang without crashing holding its allotment of data temporarily offline. HDFS’s Short-Circuit Read (SCR) configuration turned out to be the ultimate culprit but it took us a while to get there. Here is our odyssey in the hope it helps others who find themselves in a similar bind.


The workloads on this multi-tenant cluster included big Apache Spark full table scan batch jobs running in the foreground, making transforms, and writing the results back to the source table as well as multiple Apache Kafka streaming applications reading, inserting and updating. A multi-step, latency-sensitive Spark Streaming application in particular was backing up and we were asked to help identify the root cause.


We assume the reader is familiar with HDFS’s Short-Circuit Read feature and its setup in HBase. If you are looking for a refresher, we suggest you read Sun Lisheng’s excellent writeup on how SCR works, HDFS短路读详解_Hadoop技术博文 (In case you do not read Chinese, machine translation tools do an adequate job for conveying the technical detail into English.).

ABORT & Hang

We started with an investigation into the ABORTing RegionServers. Crash recovery moves Regions and this movement causes any Spark and MapReduce tasks that have already been scheduled to read remotely instead of locally. On this cluster, remote tasks take much longer to complete, which drastically slows data processing.


The ABORTs looked like this:


2021-07-24 02:14:21,613 ERROR [regionserver/host.example.com:16020.logRoller] regionserver.HRegionServer: ***** ABORTING region server host.example.com,16020,1626231398827: IOE in log roller *****

java.io.IOException: Connection to 1.2.3.4/1.2.3.4:9866 closed

at org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput$AckHandler.lambda$channelInactive$2(FanOutOneBlockAsyncDFSOutput.java:286)…


This above stack trace shows an IOException was thrown from the write-ahead log (WAL) subsystem. At the time, the WAL system was “rolling” the log file: flushing and closing the existing file in order to start a new one. This is a critical process for ensuring data durability, and the failure here brought on the ABORT. This is actually intentional behavior -- when a failed write happens at this point, ABORTing the process is the data-safest thing to do. Our observation was that these ABORT occurrences happened way too frequently, at a rate of a few servers each day. Occasionally, a variant on the above had the RegionServer hang here in WAL roll (See HBASE-26042 WAL lockup on 'sync failed' org.apache.hbase.thirdparty.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer).


The WAL roll was provoked by incidences of an IOException reported earlier in the RegionServer log:


2021-07-16 14:08:29,210 WARN [AsyncFSWAL-0-hdfs://mycluster:8020/hbase] wal.AsyncFSWAL: sync failed


This message means that the process failed to sync an edit to the HDFS-hosted WAL. After reporting the failure back to the client, HBase tries to roll the WAL that the sync failed against. This action resets the WAL subsystem by replacing the instance and associated HDFS client state. Most of the time, the reset ran smoothly. Occasionally, during the WAL roll, the RegionServer reported the following IOException, which caused the RegionServer to ABORT:


2021-07-16 14:08:29,210 WARN [AsyncFSWAL-0-hdfs://mycluster:8020/hbase] wal.AsyncFSWAL: sync failed

org.apache.hbase.thirdparty.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Our ‘peer’ in this case is the DataNode. It reset the connection. Why?


Looking in the local DataNode log turned up the cause of the occasional sync fail and of the connection reset. The DataNode logs reported lots of:


BP-1200783253-17.58.114.240-1581116871410:blk_3644043759_2570438962]] datanode.DataNode: host.example.com:9866:DataXceiver error processing WRITE_BLOCK operation src: /1.2.3.4:42287 dst: /1.2.3.5:9866

java.lang.NullPointerException

at sun.nio.ch.EPollArrayWrapper.isEventsHighKilled(EPollArrayWrapper.java:174)


Where these exceptions clumped together, the DataNode, blaming the NullPointerExceptions, would exit:


2021-07-24 02:13:53,942 INFO [Socket Reader #1 for port 9867] util.ExitUtil: Exiting with status 1: Bug in read selector!


This DataNode process crash caused the RegionServer process to ABORT. Note that sometimes it was not the local DataNode peer but one of the remote nodes participating in the WAL “fanout” that had reset. For details on our WAL fanout implementation, the release note on HBASE-14790 Implement a new DFSOutputStream for logging WAL only. DataNodes were crashing at a rate that was far in excess of the RegionServer crash rate but usually the RegionServer would recover and continue on its merry way. It was when the crash broke the established connection (‘Connection reset by peer’) during a WAL roll that we’d see the RegionServer ABORT, or worse, on occasion, the process would hang.


The above NullPointerException in #isEventsHighKilled is an old JVM bug seen by other folks who rely on the Netty project for their networking. Looking for mitigations for that bug led us to discover that the DataNodes were running with a process file descriptor ulimit of 16k. This was not intentional; it was a misconfiguration. Usually DataNodes in this cluster run with 2-4k active file descriptors but something was causing the file descriptor count to spike in excess of the 16k ulimit. We upgraded the JDK from JDK8 to JDK11 and increased the file descriptor ulimit. These changes cured the chronic restarting of DataNodes processes, which in turn cured the RegionServers of their crashing.


However we hadn’t yet cured the cause of the file descriptor bloat.

HDFS SCR tuning

Curing the crashing servers improved the cluster’s overall health, but our multi-step Spark Streaming application was still backing up.


Thankfully, one of the application authors seemed to have a ‘nose’ for identifying ‘bad actors’; at a particular stage in the streaming job he would load the Master UI and sort the view by RegionServer hit rate. The server that was persistently riding with the highest hit rate, he claimed, was causing the backup.


An inspection of the identified RegionServer turned up some interesting traits; it was exhibiting a high hit rate but also suffered from high response latency and its RPC queue was heavily backed-up. The RegionServer and the DataNode processes reported bloated file descriptor counts in excess of 100k, as observed using lsof -p PID|wc -l. Thread dumps were also particular with most handlers, reading, in this state:


"RpcServer.default.FPBQ.Fifo.handler=0,queue=0,port=16020" #64 daemon prio=5 os_prio=0 cpu=1209403.19ms elapsed=114238.14s tid=0x00007ed9466bfc30 nid=0x3c12 waiting on condition [0x00007ed943dfd000]

java.lang.Thread.State: WAITING (parking)

at jdk.internal.misc.Unsafe.park(java.base@11.0.12/Native Method)

- parking to wait for <0x00007edad0f9bad8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at java.util.concurrent.locks.LockSupport.park(java.base@11.0.12/LockSupport.java:194)

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(java.base@11.0.12/AbstractQueuedSynchronizer.java:2018)

at org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:245)

at org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:418)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1002)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:531)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:772)

at org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:709)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:480)

at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:358)

at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:645)

at org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1050)

at org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1002)

at org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1361)

at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1325)

at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:93)

at org.apache.hadoop.hbase.io.util.BlockIOUtils.preadWithExtra(BlockIOUtils.java:214)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readAtOffset(HFileBlock.java:1531)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockDataInternal(HFileBlock.java:1755)

at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderImpl.readBlockData(HFileBlock.java:1563)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl.readBlock(HFileReaderImpl.java:1339)

at org.apache.hadoop.hbase.io.hfile.HFileBlockIndex$CellBasedKeyBlockIndexReader.loadDataBlockWithScanInfo(HFileBlockIndex.java:331)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:681)

at org.apache.hadoop.hbase.io.hfile.HFileReaderImpl$HFileScannerImpl.seekTo(HFileReaderImpl.java:633)

at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seekAtOrAfter(StoreFileScanner.java:315)

at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:216)

at org.apache.hadoop.hbase.regionserver.StoreScanner.seekScanners(StoreScanner.java:426)

at org.apache.hadoop.hbase.regionserver.StoreScanner.(StoreScanner.java:263)

at org.apache.hadoop.hbase.regionserver.HStore.createScanner(HStore.java:2175)

at org.apache.hadoop.hbase.regionserver.HStore.getScanner(HStore.java:2166)

at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.initializeScanners(HRegion.java:6618)

at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.(HRegion.java:6598)

at org.apache.hadoop.hbase.regionserver.HRegion.instantiateRegionScanner(HRegion.java:3028)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:3008)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:2990)

at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:2984)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.get(RSRpcServices.java:2679)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:887)

at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2829)

at org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:44870)

at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:393)

at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:133)

at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:338)

at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:318)


These reads were waiting to allocate a slot in a SCR shared memory segment (See the HDFS短路读详解_Hadoop技术博文 blog for background and see the SCR code in HDFS). Our HDFS version was older so we suspected we were experiencing some of the issues identified in the Sun Lisheng blog post, many of which have been fixed in HDFS 3.3.1. We were also suspicious of the coarse lock on the SCR cache as noted in HDFS-15202, which has also been fixed in HDFS-3.3.1.


To confirm our suspicions, we enabled TRACE-level logging in the org.apache.hadoop.hdfs.shortcircuit package on one of our production RegionServers. The output was profuse and hard to read. The pathological state occurred at high load only, which meant the rate of logging was high -- so high that it overwhelmed the logging sub-system; log-lines were cut-off. We had to infer what a log line was trying to say by correlating the thread name and context with the SCR code. As best as we could tell, within seconds, we were reloading file descriptors for the same blocks over and over again. How could this be? The file descriptors for the blocks were supposed to be cached. We fixated on this log line:


2021-07-31 08:26:24,723 DEBUG [ShortCircuitCache_Cleaner] shortcircuit.ShortCircuitCache: org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache$CacheCleaner@58c8bc43: finishing cache cleaner run started at 5052332994. Demoted 0 mmapped replicas; purged 0 replicas.


Repeatedly, this cache cleaner thread logged activity but in essence was performing no work on the cache: ‘purged 0 replicas’.


In this cluster, dfs.client.read.shortcircuit.streams.cache.size was set to 1000. The default value of this configuration is 256. This parameter controls the size of the HDFS SCR cache of local block file descriptors. dfs.client.read.shortcircuit.streams.cache.expiry.ms was set to the default of five minutes. The cache cleaner runs on a dfs.client.read.shortcircuit.streams.cache.expiry.ms/4 interval. We noticed that when a SCR file descriptor is returned to the SCR cache, the code will check the cache size and purge entries if the size is in excess of the configured limit. This code path seemed to run too often. We speculated that the cluster read rate was such that the cache was up against its limit; when the cache cleaner ran, there was nothing for it to evict because every check-back-in of an element brought on a cache purge to make space to accommodate the returned item.

Analysis

Despite the cache being full, nearly every read required an expensive inline HDFS block open.


Some data that helped with our analysis:

  1. From Sun Lisheng’s blog, under heavy load, the latency at 75th-percentile of allocating the local HDFS block ReplicaInfo SCR subclass that hosts the local block file descriptors is 50ms. 

  2. On average this cluster has ~4 hfiles per region. Almost every file was greater than 256M in size. HDFS blocksize on this cluster was configured to be 256M.

To serve a read request, assume the RegionServer needs to open ~2 HDFS blocks per HFile. On average the HFile size is greater than the default block size so an HFile comprises more than one HDFS block. On the open of an HFile we need to consult the HFile trailer data structure. It is where the HFile metadata is stored. We then read the data which could be in a different block to that of the trailer data structure.


So, if we have approximately two HDFS blocks per HFile and each Region has four HFiles, then on average, we open roughly 8 HDFS blocks per Region. The overhead of setting up SCRs, at an extreme, could be ~50ms * 8 = 400ms, which is a huge portion of our read request budget!


File descriptor counts in the DataNode and RegionServer process tended to bloat under high-load. This was happening because not only was file descriptor allocation slow, but so was the SCR file descriptor purge operation as noted in HDFS短路读详解_Hadoop技术博文 (fixed in HDFS 3.3.1). Open file descriptors stuck around while waiting on their clean-up to run.

Solution

We tried increasing dfs.client.read.shortcircuit.streams.cache.size to 4k from 1k, and then 16k, and saw no change in behavior. It was only when we hit 64k that we started to see cache cleaner log lines stating that the purge was non-zero. That is to say, file descriptors had been in the cache for at least dfs.client.read.shortcircuit.streams.cache.expiry.ms. The file descriptor cache in SCR had enough capacity to cache an active working set. The cache had started working!


We mentioned our discoveries in the Apache HBase #user slack channel and a long-time contributor, Bryan Beaudreault, mentioned that he’d run into a similar issue and offered the following heuristic (which makes sense to us):

...nowadays, we tune dfs.client.read.shortcircuit.streams.cache.size to approx totalHdfsBlocks * 3 / numRegionServers. We periodically re-tune.


totalHfdsBlocks * 3 (hdfs replication factor) is an estimate of the total count of physical blocks in the HDFS cluster. Thus, totalHdfsBlocks * 3 / numRegionServers is an estimate of the average number of HDFS blocks that a RegionServer (or more precisely, a DataNode) hosts. With this configuration, the SCR cache size is configured such that it can cache all of the HDFS block file descriptors managed by one RegionServer.


In our case, Bryan’s equation suggests that for our cluster, we set dfs.client.read.shortcircuit.streams.cache.size to ~60k. This configuration is set in the RegionServer process because it’s the RegionServer that has the instance of the HDFS client, and thus an instance of the ShortCircuitRead cache. We applied this configuration change to all RegionServers in the cluster and it fixed the slow read behavior seen in the multistep Spark Streaming application.


END


We’d like to thank our colleagues Cesar Delgado, Clara Xiong, Nick Dimiduk, and Sean Busbey for their helpful feedback and input.