无法在后备方法中从父线程访问InheritableThreadLocal对象

问题描述 投票:1回答:1

我有InheritableThreadLocal<ConcurrentHashMap<String, Object>>线程,当请求来自过滤器并在其中设置一些transaction_id时初始化。

现在在服务层,我通过CompletableFuture调用10个不同的API调用。所有API服务类都有一个execute方法,该方法使用RestTempate进行API调用。我把@HystrixCommand放在execute方法上。

execute方法是void类型,但它将API响应放在InheritableThreadLocal对象中。

问题是当API调用失败时Hystrix调用FallBackMethod,当我在InheritableThreadLocal中放入错误响应时,我无法将错误响应发送到客户端。

ThreadLocalUtil.class

public class ThreadLocalUtil {

    private static InheritableThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new InheritableThreadLocal<>();

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null) 
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }

    public static void initThreadLocals() {
        ConcurrentHashMap<String, Object> dataForDataMap = new ConcurrentHashMap<String, Object>();
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        dataForDataMap.put("REQUEST_ID", requestId);
        transmittableThreadLocal.set(dataForDataMap);
    }
}

CommonFilter.class

@Component
@Order(1)
public class CommonFilter extends OncePerRequestFilter {

  @Override
  protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
          throws ServletException, IOException {
      try {
          ThreadLocalUtil.initThreadLocals();
          filterChain.doFilter(request, response);
      } catch (Exception e) {
          if (e instanceof ServletException) {
              throw (ServletException) e;
          }
      } finally {
          ThreadLocalUtil.clearThreadLocalDataMap();
      }

  }

EmployeeService.class

@Component
public abstract class EmployeeService {

    @Autowired
    private ThreadLocalUtil threadLocalUtil;

    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

EmployeeServiceImpl.class

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;

    @Autowired
    private ThreadLocalUtil threadLocalUtil;
}
java spring-boot hystrix thread-local
1个回答
1
投票

所以,首先,因为内部Hystrix使用ThreadPoolExecutor(线程创建一次并重复使用),所以使用InheritableThreadLocal是错误的。

从上面的问题以及你在我的blog中提出的问题,我明白你的问题是

InheritableThreadLocal在hystrix后备方法中变为null

进一步添加到此(您可以验证这一点)

只有在超时的情况下,InheritableThreadLocal才会在hystrix回退方法中变为null,而在任何其他异常的情况下都不会

我建议其他人参考我的blog。在超时的情况下,Hystrix回退发生在hystrix-timer线程中。 Hystrix fallback execution thread您可以通过记录Thread.currentThread().getName()来验证这一点

由于hystrix-timer thread的父级不是您的调用线程,因此您的transmittableThreadLocal.get()变为null。

为了解决这个问题,我建议使用HystrixCommandExecutionHookHystrixRequestVariableDefault。使用它你可以实现像onStart, onExecutionStart, onFallbackStart等钩子,你需要获取/设置threadLocal变量。有关更多详细信息,请参阅博客的最后一部分。

更新:对于您的用例,您可以按如下方式修改代码:

thread local UT IL.Java

public class ThreadLocalUtil {

    private static ThreadLocal<ConcurrentHashMap<String, Object>> transmittableThreadLocal = new ThreadLocal<>();

    public static ConcurrentHashMap<String, Object> getThreadLocalData() {
        return transmittableThreadLocal.get();
    }

    public static void setThreadLocalData(ConcurrentHashMap<String, Object> data) {
        transmittableThreadLocal.set(data);
    }

    public static void addDataToThreadLocalMap(String key, Object value) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (value != null) {
            existingDataMap.put(key, value);
        }
    }

    public static Object getDataFromThreadLocalMap(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        return existingDataMap.get(key);
    }

    public static void clearThreadLocalDataMap() {
        if (transmittableThreadLocal != null) 
            transmittableThreadLocal.remove();
    }

    public static Object getRequestData(String key) {
        Map<String, Object> existingDataMap = transmittableThreadLocal.get();
        if (existingDataMap != null) {
            return existingDataMap.get(key);
        }
        return "-1";
    }



    public static void initThreadLocals() {
        transmittableThreadLocal.set(new ConcurrentHashMap<>());
        String requestId = "REQUEST_ID_" + System.currentTimeMillis();
        addDataToThreadLocalMap("REQUEST_ID", requestId);
    }
}

employee service.Java

@Component
public abstract class EmployeeService {
    public abstract void getEmployee(int employeeId);

    public void fallbackMethod(int employeeid) {
        threadLocalUtil.addDataToThreadLocalMap("ErrorResponse", "Fallback response:: No employee details available temporarily");
    }
}

employee service imp了.Java

@Service
public class EmployeeServiceImpl extends EmployeeService {

    @HystrixCommand(fallbackMethod = "fallbackMethod", commandProperties = {
            @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "900"),
            @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10") })
    public void getEmployee(int employeeId) {
        System.out.println("Getting Employee details for " + employeeId + ", threadLocalUtil : " + threadLocalUtil.getDataFromThreadLocalMap("EMPLOYEE_ID"));
        String response = restTemplate.exchange("http://localhost:8011/findEmployeeDetails/{employeeid}",
                HttpMethod.GET, null, new ParameterizedTypeReference<String>() {
                }, employeeId).getBody();

        threadLocalUtil.addDataToThreadLocalMap("Response", response);
    }

    @Autowired
    RestTemplate restTemplate;
}

H YST日系hook.Java

public class HystrixHook extends HystrixCommandExecutionHook {

    private HystrixRequestVariableDefault<ConcurrentHashMap<String, Object>> hrv = new HystrixRequestVariableDefault<>();

    @Override
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.initializeContext();
        getThreadLocals();
    }

    @Override
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        setThreadLocals();
    }


    @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        super.onSuccess(commandInstance);
    }

    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return super.onError(commandInstance, failureType, e);
    }

    private void getThreadLocals() {
        hrv.set(ThreadLocalUtil.getThreadLocalData());
    }

    private void setThreadLocals() {
        ThreadLocalUtil.setThreadLocalData(hrv.get());
    }
}

ABC application.Java

public class AbcApplication {
    public static void main(String[] args) {
        HystrixPlugins.getInstance().registerCommandExecutionHook(new HystrixHook());
        SpringApplication.run(Abc.class, args);
    }
}

希望这可以帮助

© www.soinside.com 2019 - 2024. All rights reserved.