Skip to main content

Quarkus中使用Hibernate Reactive进行异步保存

· 7分钟阅读

环境

> java -version
openjdk version "11.0.19" 2023-04-18
OpenJDK Runtime Environment GraalVM CE 22.3.2 (build 11.0.19+7-jvmci-22.3-b18)
OpenJDK 64-Bit Server VM GraalVM CE 22.3.2 (build 11.0.19+7-jvmci-22.3-b18, mixed mode, sharing)

> quarkus -version
3.2.2.Final

quarkus extensions

<dependency>  
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-reactive-panache</artifactId>
</dependency>

诉求

整体项目是基于reactive方式,查询数据库通过Hibernate Reactive,所以调用链都是通过Uni<?>来传递;

具体来说,在登录成功后,需要进行登录信息的入库,而这个入库操作成功与否并不是很重要,完全可以作为一个异步操作;

这只是一个小的应用场景,不排除有其他异步操作,像异步日志,异步通知之类。

相关类

SysLoginInfoService.java
package com.xkyii.spry.web.service;  

import com.xkyii.spry.web.entity.SysLoginInfo;
import com.xkyii.spry.web.entity.SysUser;
import com.xkyii.spry.web.repository.SysLoginInfoRepository;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.jboss.logging.Logger;

import java.util.Date;

@ApplicationScoped
public class SysLoginInfoService {

@Inject
Logger logger;

@Inject
SysLoginInfoRepository loginInfoRepository;

public Uni<SysLoginInfo> create(SysUser user) {

SysLoginInfo info = new SysLoginInfo();
info.setUserName(user.getUserName());
info.setIpaddr("127.0.0.1");
info.setLoginLocation("内网IP");
info.setBrowser("Chrome 11");
info.setOs("Unknown");
info.setStatus("1");
info.setMsg("成功");
info.setLoginTime(new Date());

logger.info("创建登录日志");

return loginInfoRepository.persist(info);
}
}

尝试

Java方式 ❌

尝试了几种:

import java.util.concurrent.Executor;  
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

@ApplicationScoped
public class SysUserService {

@Inject
Executor executor;

@Inject
ExecutorService executorService;

@Inject
ScheduledExecutorService scheduledExecutorService;

@WithTransaction
public Uni<LoginOutput> login(LoginCommand input) {

String username = input.getUsername();
return userRepository.find("userName", username).firstResult()
// 省略其他步骤
// ...
// 保存登录日志 (想要个异步效果)
.onItem().invoke(u -> {

// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [245]: 'vert.x-eventloop-thread-3' current Thread [2456]: 'executor-thread-1'
executor.execute(() -> {
loginInfoService.create(u)
.subscribe().with(x -> logger.info("创建登录日志 with executor"), Throwable::printStackTrace);
});

// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [244]: 'vert.x-eventloop-thread-2' current Thread [2589]: 'executor-thread-1'
executorService.execute(() -> {
loginInfoService.create(u)
.subscribe().with(x -> logger.info("创建登录日志 with executorService"), Throwable::printStackTrace);
});

// java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
scheduledExecutorService.schedule(() -> {
loginInfoService.create(u)
.subscribe().with(x -> logger.info("创建登录日志 with scheduledExecutorService"), Throwable::printStackTrace);
}, 10, TimeUnit.MILLISECONDS);
}
// 生成token
.onItem().transform(u -> new LoginOutput(tokenService.generateToken(u)))
;
}
}

这几种方式,用来运行常规的异步任务应该是没问题的,但是与hibernate reactive配合并不成功;

Vertx方式 ❌

@ApplicationScoped  
public class SysUserService {

@WithTransaction
public Uni<LoginOutput> login(LoginCommand input) {

String username = input.getUsername();
return userRepository.find("userName", username).firstResult()
// 省略其他步骤
// ...
// 保存登录日志 (想要个异步效果)
.onItem().invoke(u -> {

// 1
// 运行在同一个线程,没报错,但是也没有保存成功。
Vertx.currentContext().runOnContext(e -> {
loginInfoService.create(u)
.subscribe().with(x -> logger.info("创建登录日志 with Vertx.currentContext()"), Throwable::printStackTrace);
});

// 2
// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [245]: 'vert.x-eventloop-thread-3' current Thread [1056]: 'vert.x-worker-thread-2'
vertx.executeBlocking(e -> {
loginInfoService.create(u)
.emitOn(Infrastructure.getDefaultWorkerPool())
.subscribe().with(x -> logger.info("创建登录日志 with vertx.executeBlocking"), Throwable::printStackTrace);
});
}
// 生成token
.onItem().transform(u -> new LoginOutput(tokenService.generateToken(u)))
;
}
}
  1. 没有报错,但是数据并没有成功入库,应该是没有正确开启session的缘故,并且由于和当前的调用一定会运行在同一个线程,所以并不能达到效果
  2. 报错了

Smallrye方式 ❌

@ApplicationScoped  
public class SysUserService {

@WithTransaction
public Uni<LoginOutput> login(LoginCommand input) {

String username = input.getUsername();
return userRepository.find("userName", username).firstResult()
// 省略其他步骤
// ...
// 保存登录日志 (想要个异步效果)
.onItem().invoke(u -> {

// 1
// 保存成功,但是实际上没有达到异步效果,只是subscribe在另一个线程而已
loginInfoService.create(u)
.emitOn(Infrastructure.getDefaultWorkerPool())
.subscribe().with(x -> logger.info("创建登录日志 emitOn Infrastructure.getDefaultWorkerPool()"), Throwable::printStackTrace);

// 2
// java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [244]: 'vert.x-eventloop-thread-2' current Thread [1911]: 'executor-thread-1'
loginInfoService.create(u)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribe().with(x -> logger.info("创建登录日志 runSubscriptionOn Infrastructure.getDefaultWorkerPool()"), Throwable::printStackTrace);
}
// 生成token
.onItem().transform(u -> new LoginOutput(tokenService.generateToken(u)))
;
}
}
  1. 不是异步效果,执行仍然在当前线程,只是订阅结果到worker线程了
  2. 执行在异步线程,但是报错了

EventLoop ✔

经过前面的尝试,以及对报错内容分析,其实已经大致找到了原因,简单来说就是hibernate reactive session不是线程安全的,所以必须运行在Vert.xEventLoop thread,不能是worker thread或者其他线程;

整理了一个方案,有点蹩脚,但是简单测试一下是可以用:

  
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;

import static io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.setContextSafe;

@ApplicationScoped
public class SysUserService {

@WithTransaction
public Uni<LoginOutput> login(LoginCommand input) {

String username = input.getUsername();
return userRepository.find("userName", username).firstResult()
// 省略其他步骤
// ...
// 保存登录日志 (想要个异步效果)
.onItem().invoke(u -> {

// 既然只能运行在EventLoop thread,而当前eventloop thread又达不到异步效果,就尝试在别的eventloop thread上运行
VertxInternal vxi = (VertxInternal) vertx;
Executor delegate = vertx.nettyEventLoopGroup();
EventLoopContext context = vxi.createEventLoopContext();
ContextInternal internal = (ContextInternal) VertxContext.getOrCreateDuplicatedContext(context);
setContextSafe(internal, true);
delegate.execute(() -> internal.dispatch(() -> {
loginInfoService.create(u)
.subscribe().with(x -> logger.info("创建登录日志 with getOrCreateDuplicatedContext"), Throwable::printStackTrace);
}));
}
// 生成token
.onItem().transform(u -> new LoginOutput(tokenService.generateToken(u)))
;
}
}

EventBus ✔

这是推荐方式了,业务分离,实现简单

  1. 建立一个事件消费端,这里直接把create方法调整了一下
package com.xkyii.spry.web.service;  

import com.xkyii.spry.web.entity.SysLoginInfo;
import com.xkyii.spry.web.entity.SysUser;
import com.xkyii.spry.web.repository.SysLoginInfoRepository;
import io.quarkus.hibernate.reactive.panache.common.WithTransaction;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.jboss.logging.Logger;

import java.util.Date;

@ApplicationScoped
public class SysLoginInfoService {

@Inject
Logger logger;

@Inject
SysLoginInfoRepository loginInfoRepository;

@ConsumeEvent("SysLoginInfoService-create-with-SysUser")
@WithTransaction
public Uni<SysLoginInfo> create(SysUser user) {

SysLoginInfo info = new SysLoginInfo();
// info.setInfoId(180L);
info.setUserName(user.getUserName());
info.setIpaddr("127.0.0.1");
info.setLoginLocation("内网IP");
info.setBrowser("Chrome 11");
info.setOs("Unknown");
info.setStatus("1");
info.setMsg("成功");
info.setLoginTime(new Date());


logger.info("创建登录日志");

return loginInfoRepository.persist(info);
}
}
  1. 通过EventBus调用即可:
  
import io.vertx.core.Vertx;

@ApplicationScoped
public class SysUserService {
@Inject
Vertx vertx;

@WithTransaction
public Uni<LoginOutput> login(LoginCommand input) {

String username = input.getUsername();
return userRepository.find("userName", username).firstResult()
// 省略其他步骤
// ...
// 保存登录日志 (想要个异步效果)
.onItem().invoke(u -> {
vertx.eventBus().publish("SysLoginInfoService-create-with-SysUser", u);
}
// 生成token
.onItem().transform(u -> new LoginOutput(tokenService.generateToken(u)))
;
}
}

参考