上节解决了单一线程的 trace id 传递,如果子线程和线程池怎么办呢,还有 rpc 远程调用,怎么玩呢?
我们在做项目中肯定有很多时候希望通过异步的方式来提升接口的响应速度,但是我们在开发的时候很容易忽略 trace 信息的丢失。我们将上节的用户信息的获取拆成两个方法,并且都非常耗时,彼此不相互依赖,所以可以改为并行去处理。
@Service
@Slf4j
public class UserService {
public Integer getAge(Long id){
log.info("getAge id:{}",id);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 18;
}
public String getName(Long id){
log.info("getName id:{}",id);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "加班写Bug";
}
}
@GetMapping("/{id}")
public ResponseData<UserDTO> detail(@PathVariable Long id) {
Preconditions.checkNotNull(id, "id is null");
UserDTO userDTO = new UserDTO();
userDTO.setId(id);
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> userService.getAge(id));
userDTO.setUsername(userService.getName(id));
try {
userDTO.setAge(ageFuture.get());
} catch (Exception e) {
log.error("user service error:{}", e.getMessage(), e);
}
return ResponseData.success(userDTO);
}
访问 http://localhost:8080/user/1001 查看日志,发现线程池打印的时候没有 trace id
2021-12-05 10:57:47.511 INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.aop.ControllerHandler : UserController#detail args:{"id":1001}
2021-12-05 10:57:47.583 INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.service.UserService : getName id:1001
2021-12-05 10:57:47.584 INFO 58592 --- [ForkJoinPool.commonPool-worker-1] [] com.example.demo.service.UserService : getAge id:1001
2021-12-05 10:57:48.600 INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.aop.ControllerHandler : UserController#detail response:{"code":200,"data":{"age":18,"id":1001,"username":"加班写Bug"},"message":"OK","success":true,"traceId":"a71e8fad-0284-4d66-97fe-bb06d2dbc37c"}
增加一个自定义线程池
@Configuration
public class AsyncExecutorConfiguration {
@Bean(name = "asyncExecutor1")
public ExecutorService asyncExecutor1Service() {
return new ThreadPoolExecutor(
10,
100,
60L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(128),
new ThreadFactoryBuilder().setNameFormat("async-executor-1-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
对 CompletableFuture
进行一次包装,将 MDC 的上下文传递到子线程中去。
@Component
public class CompletableFutureWrapper {
@Autowired
@Qualifier("asyncExecutor1")
private ExecutorService asyncExecutorService;
public interface Task<U> {
U callback();
}
public <U> CompletableFuture<U> supplyAsync(Task<U> task) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return CompletableFuture.supplyAsync(() -> {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
return task.callback();
} finally {
MDC.clear();
}
}, asyncRpcExecutorService);
}
}
CompletableFuture<Integer> ageFuture = completableFutureWrapper.supplyAsync(() -> userService.getAge(id));
重启然后再观察日志
2021-12-12 21:02:25.252 INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.aop.ControllerHandler : UserController#detail args:{"id":1001}
2021-12-12 21:02:25.260 INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.service.UserService : getName id:1001
2021-12-12 21:02:25.260 INFO 29358 --- [async-executor-1-pool-0] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.service.UserService : getAge id:1001
2021-12-12 21:02:26.277 INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.aop.ControllerHandler : UserController#detail response:{"code":200,"data":{"age":18,"id":1001,"sex":null,"username":"加班写Bug"},"message":"OK","success":true,"traceId":"6f4c80ca-daf4-4a1d-93c6-21bd3212f215"}
在 AsyncExecutorConfiguration 中再增加一个自定义线程池
@Bean(name ="asyncExecutor2")
public ExecutorService asyncExecutor2Service() {
return new ThreadPoolExecutorMdcWrapper(
10,
100,
60,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(128),
new ThreadFactoryBuilder().setNameFormat("async-executor-2-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
然后继承 ThreadPoolExecutor
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {
public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private <T> Callable<T> wrap(final Callable<T> callable) {
Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
if (context != null) {
MDC.setContextMap(context);
}
try {
return callable.call();
} finally {
MDC.clear();
}
};
}
private Runnable wrap(final Runnable runnable) {
Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
if (context != null) {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
MDC.clear();
}
};
}
@Override
public void execute(Runnable task) {
super.execute(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(wrap(task), result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(wrap(task));
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(wrap(task));
}
}
使用上,就可以
...
@Autowired
@Qualifier("asyncExecutor2")
ExecutorService executorService;
...
Future<Byte> sexFuture = executorService.submit(()-> studentService.getSex(id));
在 AsyncExecutorConfiguration 中再增加一个 ThreadPoolTaskExecutor 线程池
@Bean(name ="taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(128);
executor.setThreadNamePrefix("task-executor-pool-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new MdcTaskDecorator());
executor.initialize();
return executor;
}
然后配置适配器 MdcTaskDecorator
public class MdcTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
try {
if (contextMap != null) {
MDC.setContextMap(contextMap);
}
runnable.run();
} finally {
MDC.clear();
}
};
}
}
@Service
@Slf4j
public class StudentService {
@Async("taskExecutor")
public Future<Integer> getAge(Long id){
log.info("getAge id:{}",id);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return AsyncResult.forValue(18);
}
}
使用方式
Future<Integer> ageFuture = studentService.getAge(id);
但是很多时候某个方法,有时候需要同步调用,有时候是异步调用,个人觉得还是方案 1 和 方案 2 更灵活点。
记得对我们增加的线程池进行监控,当出现线程池排队,导致服务性能降低的时候,即使对线程池进行调整。以方案 1 中的线程池名为例
$ jstack 20275|grep async-executor-1-pool
"async-executor-1-pool-0" #36 prio=5 os_prio=31 tid=0x00007fe92badd800 nid=0x520f waiting on condition [0x00007000110d9000]
...
$ jstack 20275|grep async-executor-1-pool|grep waiting|wc -l
100
$ jstack 20275|grep async-executor-1-pool|grep runnable|wc -l
0
除了线程上 trace 信息的传递,还要 http 、自定义 rpc 协议之间的传递都需要透传 trace 信息,比如 http 就可以设置在 header 信息里,rpc 也可以专门在 rpc head 协议中设置 trace 字段。设置还可以在 sql 、redis 执行上都可以记录下。
http://blog.xqlee.com/article/2411270918559290.html