博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
异步线程池内部MDC仍继承主线程值的解决方案
阅读量:7221 次
发布时间:2019-06-29

本文共 11715 字,大约阅读时间需要 39 分钟。

hot3.png

增加日志输出标识 

import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)@Target({ ElementType.METHOD })public @interface MDC {}

切面: 

import java.util.UUID;import lombok.extern.slf4j.Slf4j;import org.aspectj.lang.ProceedingJoinPoint;import org.aspectj.lang.annotation.Around;import org.slf4j.MDC;@Slf4jpublic abstract class MdcInterceptor {    protected final static String LOG_ID = "logId";    @Pointcut("execution(* com.noob.api..*.*(..)) || @annotation(com.noob.aspectj.MDC)")    public void pointCut() {        // do nothings    };    @Around(value = "pointCut()")    public Object invoke(ProceedingJoinPoint point) throws Throwable {        Object result;        try {            setTradeId();            result = point.proceed(point.getArgs());            removeTraceId();        } catch (Throwable throwable) {            throw throwable;        } finally {            removeTraceId();        }        return result;    }    /**     * 璁剧疆traceId     */    public static void setTradeId() {        try {            MDC.put(LOG_ID, UUID.randomUUID().toString().replace("-", ""));        } catch (Exception e) {            log.error("set log no exception", e);        }    }    /**     * remove traceId     */    public static void removeTraceId() {        try {            MDC.remove(LOG_ID);        } catch (Exception e) {            log.error("remove log no exception", e);        }    }}

@Sync继承MDC上下文

在使用@Sync时,Spring默认提供的TaskExecutor下的ThreadPoolTaskExecutor无法支持MDC在线程切换时上下文内容的继承。因为MDC本质上使用ThreadLocal来保存上下文。

需要注意的是:切面的织入一定要符合【外部调入】的原则!

解决方法:

重写ThreadPoolTaskExecutor,在callable(真实处理过程)前注入父线程内容。

import java.util.Map;import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.UUID;import lombok.extern.slf4j.Slf4j;import org.apache.commons.collections.MapUtils;import org.slf4j.MDC;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import com.google.common.base.Strings;import com.alibaba.fastjson.JSONObject;@Slf4jpublic class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {    private final static String LOG_ID           = "logId";    /**     *      */    private static final long   serialVersionUID = 1L;    @Override    public 
Future
submit(Callable
task) { Map
context = MDC.getCopyOfContextMap(); log.info("----MDC content:{}", JSONObject.toJSONString(context)); return super.submit(() -> { // 将父线程的MDC内容传给子线程 T result = null; if (MapUtils.isNotEmpty(context) && !Strings.isNullOrEmpty(context.get(LOG_ID))) { MDC.setContextMap(context); } else { MDC.put(LOG_ID, UUID.randomUUID().toString().replace("-", "")); //为空设置新值 } try { result = task.call(); } finally { try { MDC.clear(); } catch (Exception e2) { log.warn("mdc clear exception.", e2); } } return result; }); }}

源码分析

1> 在应用启动时,初始化对于@Sync的切面:Advisor、advice、pointcut

AsyncAnnotationBeanPostProcessor: extends AbstractBeanFactoryAwareAdvisingPostProcessor

通过BeanFactoryAware给AsyncAnnotationAdvisor注入beanFactory;  并初始化AsyncAnnotationAdvisor

@Override	public void setBeanFactory(BeanFactory beanFactory) {		super.setBeanFactory(beanFactory);		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);		if (this.asyncAnnotationType != null) {			advisor.setAsyncAnnotationType(this.asyncAnnotationType);		}		advisor.setBeanFactory(beanFactory);		this.advisor = advisor;	}

AsyncAnnotationAdvisor :

初始化Advisor: 含advice、pointcut 、设置asyncAnnotationTypes为Async.class;

在初始化advice时,真正使用的就是AnnotationAsyncExecutionInterceptor!

public AsyncAnnotationAdvisor(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {		Set
> asyncAnnotationTypes = new LinkedHashSet
>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class
) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } if (exceptionHandler != null) { this.exceptionHandler = exceptionHandler; } else { this.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler(); } this.advice = buildAdvice(executor, this.exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); } protected Advice buildAdvice(Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) { return new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler); } /** * Calculate a pointcut for the given async annotation types, if any. * @param asyncAnnotationTypes the async annotation types to introspect * @return the applicable Pointcut object, or {@code null} if none */ protected Pointcut buildPointcut(Set
> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class
asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return result; }

AbstractAdvisingBeanPostProcessor: extends ProxyProcessorSupport implements BeanPostProcessor

通过BeanPostProcessor的postProcessAfterInitialization方法在bean对象初始化后创建异步的代理ProxyFactory,并填入advisor;

@Override	public Object postProcessAfterInitialization(Object bean, String beanName) {		if (bean instanceof AopInfrastructureBean) {			// Ignore AOP infrastructure such as scoped proxies.			return bean;		}		if (bean instanceof Advised) {			Advised advised = (Advised) bean;			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {				// Add our local Advisor to the existing proxy's Advisor chain...				if (this.beforeExistingAdvisors) {					advised.addAdvisor(0, this.advisor);				}				else {					advised.addAdvisor(this.advisor);				}				return bean;			}		}		if (isEligible(bean, beanName)) {			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);			if (!proxyFactory.isProxyTargetClass()) {				evaluateProxyInterfaces(bean.getClass(), proxyFactory);			}			proxyFactory.addAdvisor(this.advisor);			customizeProxyFactory(proxyFactory);			return proxyFactory.getProxy(getProxyClassLoader());		}		// No async proxy needed.		return bean;	}

2、从MethodInvocationProceedingJoinPoint的proceed()执行方法切面. MethodInterceptor 最终继承的是Advice!

AsyncExecutionAspectSupport

  1. 入口在 determineAsyncExecutor 来选取线程池AsyncTaskExecutor,getDefaultExecutor方法中获取容器中的Executor线程池对象:要么是TaskExecutor.class,要么是name=“taskExecutor”;
  2. doSubmit方法中调用TaskExecutor的submit执行Callable.
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {		AsyncTaskExecutor executor = this.executors.get(method);		if (executor == null) {			Executor targetExecutor;			String qualifier = getExecutorQualifier(method);			if (StringUtils.hasLength(qualifier)) {				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);			}			else {				targetExecutor = this.defaultExecutor;				if (targetExecutor == null) {					synchronized (this.executors) {						if (this.defaultExecutor == null) {							this.defaultExecutor = getDefaultExecutor(this.beanFactory);						}						targetExecutor = this.defaultExecutor;					}				}			}			if (targetExecutor == null) {				return null;			}			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));			this.executors.put(method, executor);		}		return executor;	}protected Executor getDefaultExecutor(BeanFactory beanFactory) {		if (beanFactory != null) {			try {				// Search for TaskExecutor bean... not plain Executor since that would				// match with ScheduledExecutorService as well, which is unusable for				// our purposes here. TaskExecutor is more clearly designed for it.				return beanFactory.getBean(TaskExecutor.class);			}			catch (NoUniqueBeanDefinitionException ex) {				logger.debug("Could not find unique TaskExecutor bean", ex);				try {					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);				}				catch (NoSuchBeanDefinitionException ex2) {					if (logger.isInfoEnabled()) {						logger.info("More than one TaskExecutor bean found within the context, and none is named " +								"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +								"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());					}				}			}			catch (NoSuchBeanDefinitionException ex) {				logger.debug("Could not find default TaskExecutor bean", ex);				try {					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);				}				catch (NoSuchBeanDefinitionException ex2) {					logger.info("No task executor bean found for async processing: " +							"no bean of type TaskExecutor and no bean named 'taskExecutor' either");				}				// Giving up -> either using local default executor or none at all...			}		}		return null;	}	/**	 * Delegate for actually executing the given task with the chosen executor.	 * @param task the task to execute	 * @param executor the chosen executor	 * @param returnType the declared return type (potentially a {@link Future} variant)	 * @return the execution result (potentially a corresponding {@link Future} handle)	 */	protected Object doSubmit(Callable task, AsyncTaskExecutor executor, Class
returnType) { if (completableFuturePresent) { Future result = CompletableFutureDelegate.processCompletableFuture(returnType, task, executor); if (result != null) { return result; } } if (ListenableFuture.class.isAssignableFrom(returnType)) { return ((AsyncListenableTaskExecutor) executor).submitListenable(task); } else if (Future.class.isAssignableFrom(returnType)) { return executor.submit(task); } else { executor.submit(task); return null; } }

AsyncExecutionInterceptor: extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered

@Override	public Object invoke(final MethodInvocation invocation) throws Throwable {		Class
targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); //获取taskExecutor对象 if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable task = new Callable() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future
) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); //提交要执行的任务callable }

 

转载于:https://my.oschina.net/u/3434392/blog/2885718

你可能感兴趣的文章
phpcms 整合 discuz!
查看>>
转:说说JSON和JSONP
查看>>
Cozmo 机器人编程环境搭建
查看>>
四位科研牛人介绍的文献阅读经验
查看>>
1212: [HNOI2004]L语言
查看>>
VBS中的Asc/AscB/AscW和Chr/ChrB/ChrW函数之间的区别(转)
查看>>
ADO.NET 结构图
查看>>
js学习篇1--数组
查看>>
前端学习过程中做的错题集
查看>>
Linux C socket 编程之UDP
查看>>
MySQL的驱动和SQL server的驱动
查看>>
低价数字证书所引发网络信任危机
查看>>
C# ini文件读取、软件初始化和配置
查看>>
mysql 百万级数据的模糊查询 优化 笔记
查看>>
[Poi2012]Festival 差分约束+tarjan
查看>>
通过注册表修改默认打开方式
查看>>
结构体类型
查看>>
SQL SERVER数据库 三种 恢复模式
查看>>
android.os.NetworkOnMainThreadException的解决方案
查看>>
16、SpringBoot-CRUD错误处理机制(3)
查看>>