ITPub博客

首页 > 应用开发 > Java > Java 多线程上下文传递在复杂场景下的实践

Java 多线程上下文传递在复杂场景下的实践

原创 Java 作者:vivo互联网技术 时间:2021-02-01 10:48:56 0 删除 编辑

一、引言

海外商城从印度做起,慢慢的会有一些其他国家的诉求,这个时候需要我们针对当前的商城做一个改造,可以支撑多个国家的商城,这里会涉及多个问题,多语言,多国家,多时区,本地化等等。在多国家的情况下如何把识别出来的国家信息传递下去,一层一层直到代码执行的最后一步。甚至还有一些多线程的场景需要处理。

二、背景技术

2.1 ThreadLocal

ThreadLocal是最容易想到了,入口识别到国家信息后,丢进ThreadLocal,这样后续代码、redis、DB等做国家区分的时候都能使用到。

这里先简单介绍一下ThreadLocal:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
  * Sets the current thread's copy of this thread-local variable
  * to the specified value.  Most subclasses will have no need to
  * override this method, relying solely on the {@link #initialValue}
  * method to set the values of thread-locals.
  *
  * @param value the value to be stored in the current thread's copy of
  *        this thread-local.
  */
public  void  set(T value) {
     Thread t = Thread.currentThread();
     ThreadLocalMap map = getMap(t);
     if  (map !=  null )
         map.set( this , value);
     else
         createMap(t, value);
}
  
  
/**
  * Returns the value in the current thread's copy of this
  * thread-local variable.  If the variable has no value for the
  * current thread, it is first initialized to the value returned
  * by an invocation of the {@link #initialValue} method.
  *
  * @return the current thread's value of this thread-local
  */
public  T get() {
     Thread t = Thread.currentThread();
     ThreadLocalMap map = getMap(t);
     if  (map !=  null ) {
         ThreadLocalMap.Entry e = map.getEntry( this );
         if  (e !=  null ) {
             @SuppressWarnings ( "unchecked" )
             T result = (T)e.value;
             return  result;
         }
     }
     return  setInitialValue();
}
  
  
/**
  * Get the map associated with a ThreadLocal. Overridden in
  * InheritableThreadLocal.
  *
  * @param  t the current thread
  * @return the map
  */
ThreadLocalMap getMap(Thread t) {
     return  t.threadLocals;
}
  
  
/**
  * Get the entry associated with key.  This method
  * itself handles only the fast path: a direct hit of existing
  * key. It otherwise relays to getEntryAfterMiss.  This is
  * designed to maximize performance for direct hits, in part
  * by making this method readily inlinable.
  *
  * @param  key the thread local object
  * @return the entry associated with key, or null if no such
  */
private  Entry getEntry(ThreadLocal<?> key) {
     int  i = key.threadLocalHashCode & (table.length -  1 );
     Entry e = table[i];
     if  (e !=  null  && e.get() == key)
         return  e;
     else
         return  getEntryAfterMiss(key, i, e);
}
  • 每一个Thread线程都有属于自己的threadLocals(ThreadLocalMap),里面有一个弱引用的Entry(ThreadLocal,Object)。
  • get方法首先通过Thread.currentThread得到当前线程,然后拿到线程的threadLocals(ThreadLocalMap),再从Entry中取得当前线程存储的value。
  • set值的时候更改当前线程的threadLocals(ThreadLocalMap)中Entry对应的value值。

实际使用中除了同步方法之外,还有起异步线程处理的场景,这个时候就需要把ThreadLocal的内容从父线程传递给子线程,这个怎么办呢?

不急,Java 还有InheritableThreadLocal来帮我们解决这个问题。

2.2 InheritableThreadLoca

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public  class  InheritableThreadLocal<T>  extends  ThreadLocal<T> {
     /**
      * Computes the child's initial value for this inheritable thread-local
      * variable as a function of the parent's value at the time the child
      * thread is created.  This method is called from within the parent
      * thread before the child is started.
      * <p>
      * This method merely returns its input argument, and should be overridden
      * if a different behavior is desired.
      *
      * @param parentValue the parent thread's value
      * @return the child thread's initial value
      */
     protected  T childValue(T parentValue) {
         return  parentValue;
     }
  
     /**
      * Get the map associated with a ThreadLocal.
      *
      * @param t the current thread
      */
     ThreadLocalMap getMap(Thread t) {
        return  t.inheritableThreadLocals;
     }
  
     /**
      * Create the map associated with a ThreadLocal.
      *
      * @param t the current thread
      * @param firstValue value for the initial entry of the table.
      */
     void  createMap(Thread t, T firstValue) {
         t.inheritableThreadLocals =  new  ThreadLocalMap( this , firstValue);
     }
}
  • java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)
1
2
3
if  (inheritThreadLocals && parent.inheritableThreadLocals !=  null )
     this .inheritableThreadLocals =
         ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
  • InheritableThreadLocal操作的是inheritableThreadLocals这个变量,而不是ThreadLocal操作的threadLocals变量。
  • 创建新线程的时候会检查父线程中parent.inheritableThreadLocals变量是否为null,如果不为null则复制一份parent.inheritableThreadLocals的数据到子线程的this.inheritableThreadLocals中去。
  • 因为复写了getMap(Thread)和CreateMap()方法直接操作inheritableThreadLocals,这样就实现了在子线程中获取父线程ThreadLocal值。

现在在使用多线程的时候,都是通过线程池来做的,这个时候用InheritableThreadLocal可以吗?会有什么问题吗?先看下下面的代码的执行情况:

  • test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static  InheritableThreadLocal<String> inheritableThreadLocal =  new  InheritableThreadLocal<>();
  
public  static  void  main(String[] args)  throws  InterruptedException {
  
     ExecutorService executorService = Executors.newFixedThreadPool( 1 );
  
     inheritableThreadLocal.set( "i am a inherit parent" );
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(inheritableThreadLocal.get());
         }
     });
  
     TimeUnit.SECONDS.sleep( 1 );
     inheritableThreadLocal.set( "i am a new inherit parent" ); // 设置新的值
  
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(inheritableThreadLocal.get());
         }
     });
}
  
  
i am a inherit parent
i am a inherit parent
  
  
public  static  void  main(String[] args)  throws  InterruptedException {
  
     ExecutorService executorService = Executors.newFixedThreadPool( 1 );
  
     inheritableThreadLocal.set( "i am a inherit parent" );
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(inheritableThreadLocal.get());
             inheritableThreadLocal.set( "i am a old inherit parent" ); // 子线程中设置新的值
  
  
         }
     });
  
     TimeUnit.SECONDS.sleep( 1 );
     inheritableThreadLocal.set( "i am a new inherit parent" ); // 主线程设置新的值
  
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(inheritableThreadLocal.get());
         }
     });
}
  
  
i am a inherit parent
i am a old inherit parent 

这里看第一个执行结果,发现主线程第二次设置的值,没有改掉,还是第一次设置的值“i am a inherit parent”,这是什么原因呢?

再看第二个例子的执行结果,发现在第一个任务中设置的“i am a old inherit parent"的值,在第二个任务中打印出来了。这又是什么原因呢?

回过头来看看上面的源码,在线程池的情况下,第一次创建线程的时候会从父线程中copy inheritableThreadLocals中的数据,所以第一个任务成功拿到了父线程设置的”i am a inherit parent“,第二个任务执行的时候复用了第一个任务的线程,并不会触发复制父线程中的inheritableThreadLocals操作,所以即使在主线程中设置了新的值,也会不生效。同时get()方法是直接操作inheritableThreadLocals这个变量的,所以就直接拿到了第一个任务设置的值。

那遇到线程池应该怎么办呢?

2.3 TransmittableThreadLocal

TransmittableThreadLocal(TTL)这个时候就派上用场了。这是阿里开源的一个组件,我们来看看它怎么解决线程池的问题,先来一段代码,在上面的基础上修改一下,使用TransmittableThreadLocal。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static  TransmittableThreadLocal<String> transmittableThreadLocal =  new  TransmittableThreadLocal<>(); // 使用TransmittableThreadLocal
  
  
public  static  void  main(String[] args)  throws  InterruptedException {
  
     ExecutorService executorService = Executors.newFixedThreadPool( 1 );
     executorService = TtlExecutors.getTtlExecutorService(executorService);  // 用TtlExecutors装饰线程池
  
     transmittableThreadLocal.set( "i am a transmittable parent" );
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(transmittableThreadLocal.get());
             transmittableThreadLocal.set( "i am a old transmittable parent" ); // 子线程设置新的值
  
         }
     });
     System.out.println(transmittableThreadLocal.get());
  
     TimeUnit.SECONDS.sleep( 1 );
     transmittableThreadLocal.set( "i am a new transmittable parent" ); // 主线程设置新的值
  
     executorService.execute( new  Runnable() {
         @Override
         public  void  run() {
  
             System.out.println(transmittableThreadLocal.get());
         }
     });
}
  
  
i am a transmittable parent
i am a transmittable parent
i am a  new  transmittable parent

执行代码后发现,使用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)装饰线程池之后,在每次调用任务的时,都会将当前的主线程的TransmittableThreadLocal数据copy到子线程里面,执行完成后,再清除掉。同时子线程里面的修改回到主线程时其实并没有生效。这样可以保证每次任务执行的时候都是互不干涉的。这是怎么做到的呢?来看源码。

  • TtlExecutors和TransmittableThreadLocal源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
private  TtlRunnable(Runnable runnable,  boolean  releaseTtlValueReferenceAfterRun) {
     this .capturedRef =  new  AtomicReference<Object>(capture());
     this .runnable = runnable;
     this .releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
  
com.alibaba.ttl.TtlRunnable#run
/**
  * wrap method {@link Runnable#run()}.
  */
@Override
public  void  run() {
     Object captured = capturedRef.get(); // 获取线程的ThreadLocalMap
     if  (captured ==  null  || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured,  null )) {
         throw  new  IllegalStateException( "TTL value reference is released after run!" );
     }
  
     Object backup = replay(captured); // 暂存当前子线程的ThreadLocalMap到backup
     try  {
         runnable.run();
     finally  {
         restore(backup); // 恢复线程执行时被改版的Threadlocal对应的值
     }
}
  
  
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay
  
  
/**
  * Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()},
  * and return the backup {@link TransmittableThreadLocal} values in current thread before replay.
  *
  * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()}
  * @return the backup {@link TransmittableThreadLocal} values before replay
  * @see #capture()
  * @since 2.3.0
  */
public  static  Object replay(Object captured) {
     @SuppressWarnings ( "unchecked" )
     Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
     Map<TransmittableThreadLocal<?>, Object> backup =  new  HashMap<TransmittableThreadLocal<?>, Object>();
  
     for  (Iterator<?  extends  Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
          iterator.hasNext(); ) {
         Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
         TransmittableThreadLocal<?> threadLocal = next.getKey();
  
         // backup
         backup.put(threadLocal, threadLocal.get());
  
         // clear the TTL value only in captured
         // avoid extra TTL value in captured, when run task.
         if  (!capturedMap.containsKey(threadLocal)) {
             iterator.remove();
             threadLocal.superRemove();
         }
     }
  
     // set value to captured TTL
     for  (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) {
         @SuppressWarnings ( "unchecked" )
         TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
         threadLocal.set(entry.getValue());
     }
  
     // call beforeExecute callback
     doExecuteCallback( true );
  
     return  backup;
}
  
  
  
com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore
  
/**
  * Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}.
  *
  * @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}
  * @since 2.3.0
  */
public  static  void  restore(Object backup) {
     @SuppressWarnings ( "unchecked" )
     Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
     // call afterExecute callback
     doExecuteCallback( false );
  
     for  (Iterator<?  extends  Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
          iterator.hasNext(); ) {
         Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
         TransmittableThreadLocal<?> threadLocal = next.getKey();
  
         // clear the TTL value only in backup
         // avoid the extra value of backup after restore
         if  (!backupMap.containsKey(threadLocal)) {
             iterator.remove();
             threadLocal.superRemove();
         }
     }
  
     // restore TTL value