在Eureka手动刷新客户端缓存实现服务上下线快速感知中,服务调用异常时,针对特定的异常类型(例如feign.FeignException.ServiceUnavailable
、java.net.ConnectException
),通过手动刷新Eureka客户端缓存,及时获取最新服务列表,尽可能保证后续重试能够成功。
采用这种方式能够解决调用到已下线服务或无法调用新上线服务的问题,缺点就是需要在调用服务失败时至少进行一次重试,并且只要服务至少有一个实例能够提供服务,那么新上线的实例就不能被调用方及时感知。
从Spring Cloud
Hoxton.SR9
版本(spring-cloud-commons
v2.2.6.RELEASE
)开始,可以在Loadbalancer
中配置重试下一个实例,配置示例如下:
spring:
cloud:
loadbalancer:
retry:
enabled: false
max-retries-on-same-service-instance: 0
max-retries-on-next-service-instance: 1
采用上述方式能够解决当前服务实例不可用时,迅速切换到下一个实例进行重试,但仍然存在新上线的服务不能及时发现的问题。
以上两种方式均基于重试机制,下面通过消息广播机制,使用Kafka在应用启动和关闭时发送服务上下线消息。通过监听相同的topic,当一个服务收到其他服务的上下线消息时,就会自动刷新Eureka客户端本地缓存的服务列表。
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.13</version>
</dependency>
Spring Cloud和Spring Boot版本分别为Hoxton.SR12
、2.3.8.RELEASE
。
spring:
kafka:
bootstrap-servers: kafkanode:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
batch-size: 0
consumer:
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
cloud:
loadbalancer:
cache:
enabled: false
logging:
level:
org:
springframework:
kafka: ERROR
apache:
kafka: ERROR
请注意在配置中禁用loadbalancer
缓存,因为目前代码中只刷新了Eureka客户端缓存。使用双重缓存也没有必要,这一点在前面的文章中已经提到。
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.TimedSupervisorTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("unused")
@Slf4j
@Component
public class EurekaClientCacheManualRefresher {
private final AtomicBoolean isRefreshing = new AtomicBoolean(false);
private final ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
private final String cacheRefreshTaskField = "cacheRefreshTask";
private TimedSupervisorTask cacheRefreshTask;
private long lastRefreshMillis = 0;
private final Duration refreshInterval = Duration.ofSeconds(3);
@Autowired
private BeanFactory beanFactory;
public void refreshOnExceptions(Exception e, List<Class<? extends Exception>> clazzs, boolean enableInterval) {
if (null == clazzs || clazzs.size() == 0) {
throw new IllegalArgumentException();
}
if (clazzs.stream().anyMatch(clazz -> clazz.isInstance(e) ||
clazz.isInstance(ExceptionUtils.getRootCause(e)))) {
refresh(enableInterval);
}
}
public void refresh(boolean enableInterval) {
if (isRefreshing.compareAndSet(false, true)) {
refreshExecutor.execute(() -> {
try {
if (enableInterval && System.currentTimeMillis() <= lastRefreshMillis + refreshInterval.toMillis()) {
log.warn("Not manually refresh eureka client cache as refresh interval was not exceeded:{}", refreshInterval.getSeconds());
return;
}
if (null == cacheRefreshTask) {
Field field = ReflectionUtils.findField(DiscoveryClient.class, cacheRefreshTaskField);
if (null != field) {
ReflectionUtils.makeAccessible(field);
DiscoveryClient discoveryClient = beanFactory.getBean(DiscoveryClient.class);
cacheRefreshTask = (TimedSupervisorTask) ReflectionUtils.getField(field, discoveryClient);
}
}
if (null == cacheRefreshTask) {
log.error("Field ({}) not found in class '{}'", cacheRefreshTaskField, DiscoveryClient.class.getSimpleName());
return;
}
lastRefreshMillis = System.currentTimeMillis();
cacheRefreshTask.run();
log.info("Manually refresh eureka client cache completed(DiscoveryClient.cacheRefreshTask#run())");
} catch (Exception e) {
log.error("An exception occurred when manually refresh eureka client cache", e);
} finally {
isRefreshing.set(false);
}
});
}
else {
log.warn("Not manually refresh eureka client cache as another thread is refreshing it already");
}
}
}
import lombok.Builder;
import lombok.Data;
import lombok.ToString;
@Data
@Builder
@ToString
public class EurekaClientCacheRefreshMessage {
private String serviceName;
private ServiceState serviceState;
private long timestamp;
public enum ServiceState {
UP, DOWN
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;
@Slf4j
@Configuration
public class KafkaConfiguration {
/*
* Boot will autowire this into the container factory.
*/
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
return new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer((KafkaOperations<Object, Object>) template), new FixedBackOff(1000L, 2));
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class EurekaClientCacheMessageHandler implements ApplicationRunner, SmartApplicationListener {
private static final String TOPIC_EUREKACLIENT_CACHE_REFRESH = "eurekaclient.cache.refresh";
@Autowired
private ObjectMapper objectMapper;
@Autowired
private EurekaClientCacheManualRefresher eurekaClientCacheManualRefresher;
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${spring.application.name}")
private String applicationName;
/**
* 默认以id作为groupId
* 在kafaka广播模式下,groupId在每个服务的实例中都是不同的,初始偏移量由消费者属性auto.offset.reset决定(earliest or latest)
*
* @param messageBody
*/
@SneakyThrows
@KafkaListener(
topics = TOPIC_EUREKACLIENT_CACHE_REFRESH,
id = TOPIC_EUREKACLIENT_CACHE_REFRESH + "-" + "#{T(java.util.UUID).randomUUID()})",
properties = {"auto.offset.reset:latest"}
)
public void refresh(String messageBody) {
EurekaClientCacheRefreshMessage message = objectMapper.readValue(messageBody, EurekaClientCacheRefreshMessage.class);
if (!applicationName.equalsIgnoreCase(message.getServiceName())) {
log.info("received message({}) from topic({})", messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
eurekaClientCacheManualRefresher.refresh(false);
}
}
/**
* 在应用完全启动后发布刷新缓存消息(确保在服务在注册中心注册完成之后)
*
* @param args
*/
@SneakyThrows
@Override
public void run(ApplicationArguments args) {
EurekaClientCacheRefreshMessage message = EurekaClientCacheRefreshMessage.builder()
.serviceName(applicationName)
.serviceState(EurekaClientCacheRefreshMessage.ServiceState.UP)
.timestamp(System.currentTimeMillis())
.build();
String messageBody = objectMapper.writeValueAsString(message);
log.info("[{}] publish message({}) to topic({})", "ApplicationRunner", messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
kafkaTemplate.send(TOPIC_EUREKACLIENT_CACHE_REFRESH, messageBody);
}
/**
* 在Spring容器关闭后发布刷新缓存消息(确保在Spring容器关闭之后和服务从注册中心移除之前)
*
* @param event
*/
@SneakyThrows
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
EurekaClientCacheRefreshMessage message = EurekaClientCacheRefreshMessage.builder()
.serviceName(applicationName)
.serviceState(EurekaClientCacheRefreshMessage.ServiceState.DOWN)
.timestamp(System.currentTimeMillis())
.build();
String messageBody = objectMapper.writeValueAsString(message);
log.info("[{}]publish message({}) to topic({})", event.getClass()
.getSimpleName(), messageBody, TOPIC_EUREKACLIENT_CACHE_REFRESH);
if (event instanceof ContextClosedEvent) {
kafkaTemplate.send(TOPIC_EUREKACLIENT_CACHE_REFRESH, messageBody);
}
}
@Override
public boolean supportsEventType(@NonNull Class<? extends ApplicationEvent> eventType) {
return ContextClosedEvent.class.isAssignableFrom(eventType);
}
}
在EurekaClientCacheMessageHandler
可以看到,使用了Kafka消息广播模式,在KafkaListener
中定义的groupId是动态生成的(由topic名称配合UUID生成),并且在每个服务实例中都是不同的。在监听同一个topic时,每个groupId对应的服务都会收到消息(每个服务实例对应一个唯一的groupId)。
同时注意到在KafkaListener
中设置了消费者属性auto.offset.reset
。对于一个新的groupId,初始偏移量就由这个属性来决定,可以设置为earliest
或者latest
。对于一个已经存在的groupId,初始偏移量就是该groupId的当前偏移量。因为这里广播消息的作用是实时发送服务实例上下线的消息,因此无需消费历史消息,auto.offset.reset
属性应当设置为latest
。
另外在EurekaClientCacheMessageHandler
中,实现了ApplicationRunner
和SmartApplicationListener
两个接口。
使用ApplicationRunner
接口的run方法可以在应用完全启动后发布当前服务上线的消息,这样可以确保消息的发布时间是在当前服务注册完成之后。如果是在服务注册完成之前就发布消息,可能会导致其它服务收到消息后,刷新后的服务列表依然没有该服务。
使用SmartApplicationListener
接口的onApplicationEvent
方法,可以监听ContextClosedEvent
类型的事件,在Spring容器关闭后发布当前服务下线的消息,这样可以确保消息的发布时间是在Spring容器关闭之后和服务从注册中心移除之前。在服务下线时,应该在应用不能提供服务时尽快发布消息,尽可能避免其他服务在当前服务下线之后和消息发布之前调用到已下线的服务。
最后要注意在监听应用关闭时,使用SpringContextShutdownHook
向JVM添加了一个钩子,如果是通过kill -9
的方式强行关闭进程,那么在JVM注册的钩子不会执行,应该避免直接使用这种方式。