springcloud gateway直接内存溢出

1. 错误描述

io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 939524103, max: 954728448)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:648)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:623)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:202)
        at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:186)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:394)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
        at org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
        at org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
        at org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:91)
        at org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:75)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
      

我的环境配置如下:

2. 排查过程

2.1. 回顾Java进程内存分配

JVM内存区域划分位两块:堆区和非堆区

【需要注意的是:永久代(jdk1.8时彻底移除)存放JVM运行时使用的类,永久代的对象在full GC的时候才进行垃圾回收】

2.2. 什么是直接内存 DirectMemory

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是《Java虚拟机规范》中定义的内存区域。但是这部分内存也被频繁地使用,而且也可能导致 OutOfMemoryError 异常出现,所以我们放到这里一起讲解。 在 JDK 1.4 中新加入了 NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的 I/O 方式,它可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆里面的 DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆中来回复制数据。 显然,本机直接内存的分配不会受到 Java 堆大小的限制,但是,既然是内存,则肯定还是会受到本机总内存(包括物理内存、SWAP分区或者分页文件)大小以及处理器寻址空间的限制,一般服务器管理员配置虚拟机参数时,会根据实际内存去设置 -Xmx 等参数信息,但经常忽略掉直接内存,使得各个内存区域总和大于物理内存限制(包括物理的和操作系统级的限制),从而导致动态扩展时出现 OutOfMemoryError 异常。


摘自:《深入理解 Java 虚拟机 第三版》2.2.7 小节 。

翻译一下上文可以得到下图:

实际上,栈中访问一个对象还是要借助堆,stack 寻求一个对象还是和以前一样,会问:”堆,请把对象xxx给我“,而不会向 native 堆索要。所以这个直接性是不彻底的。真正的实现是这样的,Java 程序仍然需要使用在 Java heap 中的一个对象(实际上规定为 DirectByteBuffer 类型对象来操作),但是这个对象(buffer)所持有的数据实际上存储于 native memory 中,而 Java 堆只仅仅拥有着对 native heap 的一个引用。

关于直接内存的具体介绍请参考:

https://cloud.tencent.com/developer/article/1586341

但作者的说法在当今的眼光看也是有点问题,比如:“直接内存的最大大小可以通过 -XX:MaxDirectMemorySize 来设置,默认是 64M。”

jdk8之后似乎就有变动,默认是初始的DirectMemorySize=0,最大为Xmx配置的大小,参考JDK文档

https://docs.oracle.com/javase/8/docs/technotes/tools/unix/java.html#-XX:MaxDirectMemorySize=size

2.3. 分析事故原因

考虑到DirectMemory是因为NIO的一些操作导致,而网关reactor底层依赖Netty实现,会大量使用NIO,刚好契合我们的现象;同时本着先怀疑自身而不是中间件的原则,先查看是否存在申请dataBuffer后未正常释放的case,还真有:

第一处问题:

因为有获取请求体验签及修改请求体进行加密串替换及日志打印等需求,我单独写了一个过滤器进行相关操作代码如下:

        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
            .then(Mono.defer(() -> {
                // 重新封装请求
                ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
                // 记录响应日志
                ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, requestUri);
                return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build());
            }));

可以看到上述代码使用CachedBodyOutputMessage包装后,正常情况下,CachedBodyOutputMessage内部已实现dataBuffer释放的操作,但如遇到异常则需要手动释放,这里并未进行相关操作,更改后如下:

MyCachedBodyOutputMessage outputMessage = new MyCachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
            .then(Mono.defer(
                    () -> {
                        // 重新封装请求
                        ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
                        // 记录响应日志
                        ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, requestUri);
                        return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build());
                    }
                )
            ).onErrorResume((Function>) throwable -> release(exchange, outputMessage, throwable));

可以看到这里CachedBodyOutputMessage对象被替换给了自定义的MyCachedBodyOutputMessage,且集成了手动释放release(exchange, outputMessage, throwable)

为什么要自定义MyCachedBodyOutputMessage?

    /**
     * 释放dataBuffer
     *
     * @param exchange
     * @param outputMessage
     * @param throwable
     * @return
     */
    Mono release(ServerWebExchange exchange, MyCachedBodyOutputMessage outputMessage,
                       Throwable throwable) {
        if (outputMessage.isCached()) {
            return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
        }
        return Mono.error(throwable);
    }

如上述源码,释放前需要进行outputMessage.isCached()判断,而isCached()并不是public方法,无法直接调用,因为复制了CachedBodyOutputMessage的源码,仅仅改动了isCached()的修饰符形成了新的MyCachedBodyOutputMessage类,这样才可以顺利完成outputMessage.isCached()的调用

第二处问题:

 /**
     * 记录响应日志
     * 通过 DataBufferFactory 解决响应体分段传输问题。
     */
    private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, String requestUri) {
        ServerHttpResponse response = exchange.getResponse();
        DataBufferFactory bufferFactory = response.bufferFactory();
        return new ServerHttpResponseDecorator(response) {
            @Override
            public Mono writeWith(Publisher<? extends DataBuffer> body) {
                if (body instanceof Flux) {
                    // 获取响应类型,如果是 json 就打印
                    String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
                    if (ObjectUtil.equal(this.getStatusCode(), HttpStatus.OK)
                        && StrUtil.contains(originalResponseContentType, "application/json")) {
                        Flux<? extends DataBuffer> fluxBody = Flux.from(body);
                        return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
                            // 合并多个流集合,解决返回体分段传输
                            DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
                            DataBuffer join = dataBufferFactory.join(dataBuffers);
                            byte[] content = new byte[join.readableByteCount()];
                            join.read(content);
                            // 释放掉内存 注意这里,手动释放了,但实际上是没有释放成功的
                            DataBufferUtils.release(join);
                            String responseResult = new String(content, StandardCharsets.UTF_8);
                            // 设置响应时间
                            exchange.getAttributes().put(Constants.RESPONSE_TIME, System.currentTimeMillis());
                            // 设置响应体
                            exchange.getAttributes().put(Constants.RESPONSE_BODY_OBJECT_KEY, responseResult);
                      		  // 其他业务操作
                            log.info("=========== requestUri:{} ,responseResult:{}", requestUri, responseResult);
                            return bufferFactory.wrap(content);
                        }));
                    }
                }
                // if body is not a flux. never got there.
                return super.writeWith(body);
            }
        };
    }

这里确实调用了DataBufferUtils.release(join);手动释放,但查看其源码后发现,释放并未生效:

	/**
	 * Release the given data buffer, if it is a {@link PooledDataBuffer} and
	 * has been {@linkplain PooledDataBuffer#isAllocated() allocated}.
	 * @param dataBuffer the data buffer to release
	 * @return {@code true} if the buffer was released; {@code false} otherwise.
	 */
	public static boolean release(@Nullable DataBuffer dataBuffer) {
		if (dataBuffer instanceof PooledDataBuffer) {
			PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
			if (pooledDataBuffer.isAllocated()) {
				try {
					return pooledDataBuffer.release();
				}
				catch (IllegalStateException ex) {
					// Avoid dependency on Netty: IllegalReferenceCountException
					if (logger.isDebugEnabled()) {
						logger.debug("Failed to release PooledDataBuffer: " + dataBuffer, ex);
					}
					return false;
				}
			}
		}
		return false;
	}

可以看到如果想要生效dataBuffer必须是PooledDataBuffer类型的,而我的代码是DefaultDataBuffer类型的,因次并未成功释放;

修改后的代码如下

// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存 注意这里,手动释放了,但实际上是没有释放成功的
DataBufferUtils.release(join);

----------------------------------------------------------------------------------------------
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存 ,成功释放
DataBufferUtils.release(join);

2.4. 看下修改后的效果对比

修改前

修改后

因为释放掉了dataBuffer,整体内存占用都降低了很多,发布到线上后面持续在进行观察...

3. 思考

为什么这个问题是在非生产环境,而不是生产环境出现呢?

猜测原因:

首先要明确:fullGC的时候会释放掉直接内存的占用,

非生产环境因为只有内部测试时才会用到,流量较小,很难触发fullGC,因此也就一直无法释放直接内存。

生产环境流量依然远远大于非生产,因为产生了gc所以也就间接的释放掉了直接内存

展开阅读全文

页面更新:2024-04-25

标签:内存   源码   虚拟机   对象   大小   类型   操作   代码   环境   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top