通过Kafka广播消息刷新Eureka客户端缓存,实现服务上下线快速感知



基于特定异常的重试

Eureka手动刷新客户端缓存实现服务上下线快速感知中,服务调用异常时,针对特定的异常类型(例如feign.FeignException.ServiceUnavailablejava.net.ConnectException),通过手动刷新Eureka客户端缓存,及时获取最新服务列表,尽可能保证后续重试能够成功。

采用这种方式能够解决调用到已下线服务或无法调用新上线服务的问题,缺点就是需要在调用服务失败时至少进行一次重试,并且只要服务至少有一个实例能够提供服务,那么新上线的实例就不能被调用方及时感知。

切换下一个实例的重试

Spring Cloud Hoxton.SR9版本(spring-cloud-commons v2.2.6.RELEASE)开始,可以在Loadbalancer中配置重试下一个实例,配置示例如下:

spring:
  cloud:
    loadbalancer:
      retry:
        enabled: false
        max-retries-on-same-service-instance: 0
        max-retries-on-next-service-instance: 1

采用上述方式能够解决当前服务实例不可用时,迅速切换到下一个实例进行重试,但仍然存在新上线的服务不能及时发现的问题。

通过广播消息机制实时刷新客户端缓存

以上两种方式均基于重试机制,下面通过消息广播机制,使用Kafka在应用启动和关闭时发送服务上下线消息。通过监听相同的topic,当一个服务收到其他服务的上下线消息时,就会自动刷新Eureka客户端本地缓存的服务列表。

maven中的相关依赖

<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.13</version>
</dependency>

Spring Cloud和Spring Boot版本分别为Hoxton.SR122.3.8.RELEASE

application.yml中的相关配置

spring:
  kafka:
    bootstrap-servers: kafkanode:9092
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      batch-size: 0
    consumer:
        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
  cloud:
    loadbalancer:
      cache:
        enabled: false
logging:
  level:
    org:
      springframework:
        kafka: ERROR
      apache:
        kafka: ERROR

请注意在配置中禁用loadbalancer缓存,因为目前代码中只刷新了Eureka客户端缓存。使用双重缓存也没有必要,这一点在前面的文章中已经提到。

用于手动刷新Eureka客户端缓存的服务类EurekaClientCacheManualRefresher

import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.TimedSupervisorTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

@SuppressWarnings("unused")
@Slf4j
@Component
public class EurekaClientCacheManualRefresher {
    private final AtomicBoolean isRefreshing = new AtomicBoolean(false);
    private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
    private final String cacheRefreshTaskField = "cacheRefreshTask";
    private TimedSupervisorTask cacheRefreshTask;

    private long lastRefreshMillis = 0;
    private final Duration refreshInterval = Duration.ofSeconds(3);

    @Autowired
    private BeanFactory beanFactory;

    public void refreshOnExceptions(Exception e, List<Class<? extends Exception>> clazzs, boolean enableInterval) {
        if (null == clazzs || clazzs.size() == 0) {
            throw new IllegalArgumentException();
        }

        if (clazzs.stream().anyMatch(clazz -> clazz.isInstance(e) ||
                clazz.isInstance(ExceptionUtils.getRootCause(e)))) {
            refresh(enableInterval);
        }
    }

    public void refresh(boolean enableInterval) {
        if (isRefreshing.compareAndSet(false, true)) {
            refreshExecutor.execute(() -> {
                try {
                    if (enableInterval && System.currentTimeMillis() <= lastRefreshMillis + refreshInterval.toMillis()) {
                        log.warn("Not manually refresh eureka client cache as refresh interval was not exceeded:{}", refreshInterval.getSeconds());
                        return;
                    }

                    if (null == cacheRefreshTask) {
                        Field field = ReflectionUtils.findField(DiscoveryClient.class, cacheRefreshTaskField);
                        if (null != field) {
                            ReflectionUtils.makeAccessible(field);
                            DiscoveryClient discoveryClient = beanFactory.getBean(DiscoveryClient.class);
                            cacheRefreshTask = (TimedSupervisorTask) ReflectionUtils.getField(field, discoveryClient);
                        }
                    }

                    if (null == cacheRefreshTask) {
                        log.error("Field ({}) not found in class '{}'", cacheRefreshTaskField, DiscoveryClient.class.getSimpleName());
                        return;
                    }

                    lastRefreshMillis = System.currentTimeMillis();
                    cacheRefreshTask.run();
                    log.info("Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())");
                } catch (Exception e) {
                    log.error("An exception occurred when manually refresh eureka client cache", e);
                } finally {
                    isRefreshing.set(false);
                }
            });
        }
        else {
            log.warn("Not manually refresh eureka client cache as another thread is refreshing it already");
        }
    }
}

定义广播消息的实体类EurekaClientCacheRefreshMessage

import lombok.Builder;
import lombok.Data;
import lombok.ToString;

@Data
@Builder
@ToString
public class EurekaClientCacheRefreshMessage {
    private String serviceName;
    private ServiceState serviceState;
    private long timestamp;

    public enum ServiceState {
        UP, DOWN
    }
}

定义Kafka消息处理、消息转换器的配置类

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;

@Slf4j
@Configuration
public class KafkaConfiguration {

    /*
     * Boot will autowire this into the container factory.
     */
    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
        return new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer((KafkaOperations<Object, Object>) template), new FixedBackOff(1000L, 2));
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }
}

处理消息监听及发布的服务类EurekaClientCacheMessageHandler

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class EurekaClientCacheMessageHandler implements ApplicationRunner, SmartApplicationListener {
    private static final String TOPIC_EUREKACLIENT_CACHE_REFRESH = "eurekaclient.cache.refresh";

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private EurekaClientCacheManualRefresher eurekaClientCacheManualRefresher;

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${spring.application.name}")
    private String applicationName;

    /**
     * 默认以id作为groupId
     * 在kafaka广播模式下,groupId在每个服务的实例中都是不同的,初始偏移量由消费者属性auto.offset.reset决定(earliest or latest)
     *
     * @param messageBody
     */
    @SneakyThrows
    @KafkaListener(
            topics = TOPIC_EUREKACLIENT_CACHE_REFRESH,
            id = TOPIC_EUREKACLIENT_CACHE_REFRESH + "-" + "#{T(java.util.UUID).randomUUID()})",
            properties = {"auto.offset.reset:latest"}
    )
    public void refresh(String messageBody) {
        EurekaClientCacheRefreshMessage message = objectMapper.readValue(messageBody, EurekaClientCacheRefreshMessage.class);
        if (!applicationName.equalsIgnoreCase(message.getServiceName())) {
            log.info("received message({}) from topic({})", messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
            eurekaClientCacheManualRefresher.refresh(false);
        }
    }

    /**
     * 在应用完全启动后发布刷新缓存消息(确保在服务在注册中心注册完成之后)
     *
     * @param args
     */
    @SneakyThrows
    @Override
    public void run(ApplicationArguments args) {
        EurekaClientCacheRefreshMessage message = EurekaClientCacheRefreshMessage.builder()
                .serviceName(applicationName)
                .serviceState(EurekaClientCacheRefreshMessage.ServiceState.UP)
                .timestamp(System.currentTimeMillis())
                .build();
        String messageBody = objectMapper.writeValueAsString(message);
        log.info("[{}] publish message({}) to topic({})", "ApplicationRunner", messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
        kafkaTemplate.send(TOPIC_EUREKACLIENT_CACHE_REFRESH, messageBody);
    }

    /**
     * 在Spring容器关闭后发布刷新缓存消息(确保在Spring容器关闭之后和服务从注册中心移除之前)
     *
     * @param event
     */
    @SneakyThrows
    @Override
    public void onApplicationEvent(@NonNull ApplicationEvent event) {
        EurekaClientCacheRefreshMessage message = EurekaClientCacheRefreshMessage.builder()
                .serviceName(applicationName)
                .serviceState(EurekaClientCacheRefreshMessage.ServiceState.DOWN)
                .timestamp(System.currentTimeMillis())
                .build();
        String messageBody = objectMapper.writeValueAsString(message);
        log.info("[{}]publish message({}) to topic({})", event.getClass()
                .getSimpleName(), messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
        if (event instanceof ContextClosedEvent) {
            kafkaTemplate.send(TOPIC_EUREKACLIENT_CACHE_REFRESH, messageBody);
        }
    }

    @Override
    public boolean supportsEventType(@NonNull Class<? extends ApplicationEvent> eventType) {
        return ContextClosedEvent.class.isAssignableFrom(eventType);
    }
}

EurekaClientCacheMessageHandler可以看到,使用了Kafka消息广播模式,在KafkaListener中定义的groupId是动态生成的(由topic名称配合UUID生成),并且在每个服务实例中都是不同的。在监听同一个topic时,每个groupId对应的服务都会收到消息(每个服务实例对应一个唯一的groupId)。

同时注意到在KafkaListener中设置了消费者属性auto.offset.reset。对于一个新的groupId,初始偏移量就由这个属性来决定,可以设置为earliest或者latest。对于一个已经存在的groupId,初始偏移量就是该groupId的当前偏移量。因为这里广播消息的作用是实时发送服务实例上下线的消息,因此无需消费历史消息,auto.offset.reset属性应当设置为latest

另外在EurekaClientCacheMessageHandler中,实现了ApplicationRunnerSmartApplicationListener两个接口。

使用ApplicationRunner接口的run方法可以在应用完全启动后发布当前服务上线的消息,这样可以确保消息的发布时间是在当前服务注册完成之后。如果是在服务注册完成之前就发布消息,可能会导致其它服务收到消息后,刷新后的服务列表依然没有该服务。

使用SmartApplicationListener接口的onApplicationEvent方法,可以监听ContextClosedEvent类型的事件,在Spring容器关闭后发布当前服务下线的消息,这样可以确保消息的发布时间是在Spring容器关闭之后和服务从注册中心移除之前。在服务下线时,应该在应用不能提供服务时尽快发布消息,尽可能避免其他服务在当前服务下线之后和消息发布之前调用到已下线的服务。

最后要注意在监听应用关闭时,使用SpringContextShutdownHook向JVM添加了一个钩子,如果是通过kill -9的方式强行关闭进程,那么在JVM注册的钩子不会执行,应该避免直接使用这种方式。