1053 lines
37 KiB
Markdown
1053 lines
37 KiB
Markdown
|
||
|
||
因收到Google相关通知,网站将会择期关闭。相关通知内容
|
||
|
||
|
||
44 元数据方案深度剖析,如何避免注册中心数据量膨胀?
|
||
在上一课时,我们详细介绍了 Dubbo 传统架构面临的挑战,以及 Dubbo 2.7.5 版本引入的服务自省方案是如何应对这些挑战的。
|
||
|
||
本课时我们将从服务自省方案的基础设施开始介绍其具体实现。我们首先会介绍元数据相关的基础类的定义,然后介绍元数据的上报以及元数据服务的相关内容,同时还会介绍 Service ID 与 Service Name 是如何映射的。
|
||
|
||
ServiceInstance
|
||
|
||
Service Instance 唯一标识一个服务实例,在 Dubbo 的源码中对应 ServiceInstance 接口,该接口的具体定义如下:
|
||
|
||
public interface ServiceInstance extends Serializable {
|
||
|
||
// 唯一标识
|
||
|
||
String getId();
|
||
|
||
// 获取当前ServiceInstance所属的Service Name
|
||
|
||
String getServiceName();
|
||
|
||
// 获取当前ServiceInstance的host
|
||
|
||
String getHost();
|
||
|
||
// 获取当前ServiceInstance的port
|
||
|
||
Integer getPort();
|
||
|
||
// 当前ServiceInstance的状态
|
||
|
||
default boolean isEnabled() {
|
||
|
||
return true;
|
||
|
||
}
|
||
|
||
// 检测当前ServiceInstance的状态
|
||
|
||
default boolean isHealthy() {
|
||
|
||
return true;
|
||
|
||
}
|
||
|
||
// 获取当前ServiceInstance关联的元数据,这些元数据以KV格式存储
|
||
|
||
Map<String, String> getMetadata();
|
||
|
||
// 计算当前ServiceInstance对象的hashCode值
|
||
|
||
int hashCode();
|
||
|
||
// 比较两个ServiceInstance对象
|
||
|
||
boolean equals(Object another);
|
||
|
||
}
|
||
|
||
|
||
DefaultServiceInstance 是 ServiceInstance 的唯一实现,DefaultServiceInstance 是一个普通的 POJO 类,其中的核心字段如下。
|
||
|
||
|
||
id(String 类型):ServiceInstance 唯一标识。
|
||
serviceName(String 类型):ServiceInstance 关联的 Service Name。
|
||
host(String 类型):ServiceInstance 的 host。
|
||
port(Integer 类型):ServiceInstance 的 port。
|
||
enabled(boolean 类型):ServiceInstance 是否可用的状态。
|
||
healthy(boolean 类型):ServiceInstance 的健康状态。
|
||
metadata(Map 类型):ServiceInstance 关联的元数据。
|
||
|
||
|
||
ServiceDefinition
|
||
|
||
Dubbo 元数据服务与我们业务中发布的 Dubbo 服务无异,Consumer 端可以调用一个 ServiceInstance 的元数据服务获取其发布的全部服务的元数据。
|
||
|
||
说到元数据,就不得不提到 ServiceDefinition 这个类,它可以来描述一个服务接口的定义,其核心字段如下。
|
||
|
||
|
||
canonicalName(String 类型):接口的完全限定名称。
|
||
codeSource(String 类型):服务接口所在的完整路径。
|
||
methods(List 类型):接口中定义的全部方法描述信息。在 MethodDefinition 中记录了方法的名称、参数类型、返回值类型以及方法参数涉及的所有 TypeDefinition。
|
||
types(List 类型):接口定义中涉及的全部类型描述信息,包括方法的参数和字段,如果遇到复杂类型,TypeDefinition 会递归获取复杂类型内部的字段。在 dubbo-metadata-api 模块中,提供了多种类型对应的 TypeBuilder 用于创建对应的 TypeDefinition,对于没有特定 TypeBuilder 实现的类型,会使用 DefaultTypeBuilder。
|
||
|
||
|
||
|
||
|
||
TypeBuilder 接口实现关系图
|
||
|
||
在服务发布的时候,会将服务的 URL 中的部分数据封装为 FullServiceDefinition 对象,然后作为元数据存储起来。FullServiceDefinition 继承了 ServiceDefinition,并在 ServiceDefinition 基础之上扩展了 params 集合(Map 类型),用来存储 URL 上的参数。
|
||
|
||
MetadataService
|
||
|
||
接下来看 MetadataService 接口,在上一讲我们提到Dubbo 中的每个 ServiceInstance 都会发布 MetadataService 接口供 Consumer 端查询元数据,下图展示了 MetadataService 接口的继承关系:
|
||
|
||
|
||
|
||
MetadataService 接口继承关系图
|
||
|
||
在 MetadataService 接口中定义了查询当前 ServiceInstance 发布的元数据的相关方法,具体如下所示:
|
||
|
||
public interface MetadataService {
|
||
|
||
String serviceName(); // 获取当前ServiceInstance所属服务的名称
|
||
|
||
default String version() {
|
||
|
||
return VERSION; // 获取当前MetadataService接口的版本
|
||
|
||
}
|
||
|
||
// 获取当前ServiceInstance订阅的全部URL
|
||
|
||
default SortedSet<String> getSubscribedURLs(){
|
||
|
||
throw new UnsupportedOperationException("This operation is not supported for consumer.");
|
||
|
||
}
|
||
|
||
// 获取当前ServiceInstance发布的全部URL
|
||
|
||
default SortedSet<String> getExportedURLs() {
|
||
|
||
return getExportedURLs(ALL_SERVICE_INTERFACES);
|
||
|
||
}
|
||
|
||
// 根据服务接口查找当前ServiceInstance暴露的全部接口
|
||
|
||
default SortedSet<String> getExportedURLs(String serviceInterface) {
|
||
|
||
return getExportedURLs(serviceInterface, null);
|
||
|
||
}
|
||
|
||
// 根据服务接口和group两个条件查找当前ServiceInstance暴露的全部接口
|
||
|
||
default SortedSet<String> getExportedURLs(String serviceInterface, String group) {
|
||
|
||
return getExportedURLs(serviceInterface, group, null);
|
||
|
||
}
|
||
|
||
// 根据服务接口、group和version三个条件查找当前ServiceInstance暴露的全部接口
|
||
|
||
default SortedSet<String> getExportedURLs(String serviceInterface, String group, String version) {
|
||
|
||
return getExportedURLs(serviceInterface, group, version, null);
|
||
|
||
}
|
||
|
||
// 根据服务接口、group、version和protocol四个条件查找当前ServiceInstance暴露的全部接口
|
||
|
||
SortedSet<String> getExportedURLs(String serviceInterface, String group, String version, String protocol);
|
||
|
||
// 根据指定条件查询ServiceDefinition
|
||
|
||
String getServiceDefinition(String interfaceName, String version, String group);
|
||
|
||
String getServiceDefinition(String serviceKey);
|
||
|
||
}
|
||
|
||
|
||
在 MetadataService 接口中定义的都是查询元数据的方法,在其子接口 WritableMetadataService 中添加了一些发布元数据的写方法,具体定义如下:
|
||
|
||
@SPI(DEFAULT_METADATA_STORAGE_TYPE)
|
||
|
||
public interface WritableMetadataService extends MetadataService {
|
||
|
||
@Override
|
||
|
||
default String serviceName() {
|
||
|
||
// ServiceName默认是从ApplicationModel中获取
|
||
|
||
// ExtensionLoader、DubboBootstrap以及ApplicationModel是单个Dubbo进程范围内的单例对象,
|
||
|
||
// ExtensionLoader用于Dubbo SPI机制加载扩展实现,DubboBootstrap用于启动Dubbo进程,
|
||
|
||
// ApplicationModel用于表示一个Dubbo实例,其中维护了多个ProviderModel对象表示当前Dubbo实例发布的服务,
|
||
|
||
// 维护了多个ConsumerModel对象表示当前Dubbo实例引用的服务。
|
||
|
||
return ApplicationModel.getApplication();
|
||
|
||
}
|
||
|
||
boolean exportURL(URL url); // 发布该URL所代表的服务
|
||
|
||
boolean unexportURL(URL url); // 注销该URL所代表的服务
|
||
|
||
default boolean refreshMetadata(String exportedRevision, String subscribedRevision) {
|
||
|
||
return true; // 刷新元数据
|
||
|
||
}
|
||
|
||
boolean subscribeURL(URL url); // 订阅该URL所代表的服务
|
||
|
||
boolean unsubscribeURL(URL url); // 取消订阅该URL所代表的服务
|
||
|
||
// 发布Provider端的ServiceDefinition
|
||
|
||
void publishServiceDefinition(URL providerUrl);
|
||
|
||
// 获取WritableMetadataService的默认扩展实现
|
||
|
||
static WritableMetadataService getDefaultExtension() {
|
||
|
||
return getExtensionLoader(WritableMetadataService.class).getDefaultExtension();
|
||
|
||
}
|
||
|
||
// 获取WritableMetadataService接口指定的扩展实现(无指定扩展名称,则返回默认扩展实现)
|
||
|
||
static WritableMetadataService getExtension(String name) {
|
||
|
||
return getExtensionLoader(WritableMetadataService.class).getOrDefaultExtension(name);
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
WritableMetadataService 接口被 @SPI 注解修饰,是一个扩展接口,在前面的继承关系图中也可以看出,它有两个比较基础的扩展实现,分别是 InMemoryWritableMetadataService(默认扩展实现) 和 RemoteWritableMetadataServiceDelegate,对应扩展名分别是 local 和 remote。
|
||
|
||
下面我们先来看 InMemoryWritableMetadataService 的实现,其中维护了三个核心集合。
|
||
|
||
|
||
exportedServiceURLs(ConcurrentSkipListMap`> 类型):用于记录当前 ServiceInstance 发布的 URL 集合,其中 Key 是 ServiceKey(即 interface、group 和 version 三部分构成),Value 是对应的 URL 集合。
|
||
subscribedServiceURLs(ConcurrentSkipListMap`> 类型):用于记录当前 ServiceInstance 引用的 URL 集合,其中 Key 是 ServiceKey(即 interface、group 和 version 三部分构成),Value 是对应的 URL 集合。
|
||
serviceDefinitions(ConcurrentSkipListMap 类型):用于记录当前 ServiceInstance 发布的 ServiceDefinition 信息,其中 Key 为 Provider URL 的ServiceKey,Value 为对应的 ServiceDefinition 对象序列化之后的 JSON 字符串。
|
||
|
||
|
||
InMemoryWritableMetadataService 对 getExportedURLs()、getSubscribedURLs() 以及 getServiceDefinition() 方法的实现,就是查询上述三个集合的数据;对 (un)exportURL()、(un)subscribeURL() 和 publishServiceDefinition() 方法的实现,就是增删上述三个集合的数据。
|
||
|
||
(un)exportURL()、(un)subscribeURL() 等方法都是非常简单的集合操作,我们就不再展示,你若感兴趣的话可以参考源码进行学习。 这里我们重点来看一下 publishServiceDefinition() 方法对 ServiceDefinition 的处理:
|
||
|
||
public void publishServiceDefinition(URL providerUrl) {
|
||
|
||
// 获取服务接口
|
||
|
||
String interfaceName = providerUrl.getParameter(INTERFACE_KEY);
|
||
|
||
if (StringUtils.isNotEmpty(interfaceName)
|
||
|
||
&& !ProtocolUtils.isGeneric(providerUrl.getParameter(GENERIC_KEY))) {
|
||
|
||
Class interfaceClass = Class.forName(interfaceName);
|
||
|
||
// 创建服务接口对应的ServiceDefinition对象
|
||
|
||
ServiceDefinition serviceDefinition = ServiceDefinitionBuilder.build(interfaceClass);
|
||
|
||
Gson gson = new Gson();
|
||
|
||
// 将ServiceDefinition对象序列化为JSON对象
|
||
|
||
String data = gson.toJson(serviceDefinition);
|
||
|
||
// 将ServiceDefinition对象序列化之后的JSON字符串记录到serviceDefinitions集合
|
||
|
||
serviceDefinitions.put(providerUrl.getServiceKey(), data);
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
在 RemoteWritableMetadataService 实现中封装了一个 InMemoryWritableMetadataService 对象,并对 publishServiceDefinition() 方法进行了覆盖,具体实现如下:
|
||
|
||
public void publishServiceDefinition(URL url) {
|
||
|
||
// 获取URL中的side参数值,决定调用publishProvider()还是publishConsumer()方法
|
||
|
||
String side = url.getParameter(SIDE_KEY);
|
||
|
||
if (PROVIDER_SIDE.equalsIgnoreCase(side)) {
|
||
|
||
publishProvider(url);
|
||
|
||
} else {
|
||
|
||
publishConsumer(url);
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
在 publishProvider() 方法中,首先会根据 Provider URL 创建对应的 FullServiceDefinition 对象,然后通过 MetadataReport 进行上报,具体实现如下:
|
||
|
||
private void publishProvider(URL providerUrl) throws RpcException {
|
||
|
||
// 删除pid、timestamp、bind.ip、bind.port等参数
|
||
|
||
providerUrl = providerUrl.removeParameters(PID_KEY, TIMESTAMP_KEY, Constants.BIND_IP_KEY,
|
||
|
||
Constants.BIND_PORT_KEY, TIMESTAMP_KEY);
|
||
|
||
// 获取服务接口名称
|
||
|
||
String interfaceName = providerUrl.getParameter(INTERFACE_KEY);
|
||
|
||
if (StringUtils.isNotEmpty(interfaceName)) {
|
||
|
||
Class interfaceClass = Class.forName(interfaceName); // 反射
|
||
|
||
// 创建服务接口对应的FullServiceDefinition对象,URL中的参数会记录到FullServiceDefinition的params集合中
|
||
|
||
FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(interfaceClass,
|
||
|
||
providerUrl.getParameters());
|
||
|
||
// 获取MetadataReport并上报FullServiceDefinition
|
||
|
||
getMetadataReport().storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(),
|
||
|
||
providerUrl.getParameter(VERSION_KEY), providerUrl.getParameter(GROUP_KEY),
|
||
|
||
PROVIDER_SIDE, providerUrl.getParameter(APPLICATION_KEY)), fullServiceDefinition);
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
publishConsumer() 方法则相对比较简单:首先会清理 Consumer URL 中 pid、timestamp 等参数,然后将 Consumer URL 中的参数集合进行上报。
|
||
|
||
不过,在 RemoteWritableMetadataService 中的 exportURL()、subscribeURL()、getExportedURLs()、getServiceDefinition() 等一系列方法都是空实现,这是为什么呢?其实我们从 RemoteWritableMetadataServiceDelegate 中就可以找到答案,注意,RemoteWritableMetadataServiceDelegate 才是 MetadataService 接口的 remote 扩展实现。
|
||
|
||
在 RemoteWritableMetadataServiceDelegate 中同时维护了一个 InMemoryWritableMetadataService 对象和 RemoteWritableMetadataService 对象,exportURL()、subscribeURL() 等发布订阅相关的方法会同时委托给这两个 MetadataService 对象,getExportedURLs()、getServiceDefinition() 等查询方法则只会调用 InMemoryWritableMetadataService 对象进行查询。这里我们以 exportURL() 方法为例进行说明:
|
||
|
||
public boolean exportURL(URL url) {
|
||
|
||
return doFunction(WritableMetadataService::exportURL, url);
|
||
|
||
}
|
||
|
||
private boolean doFunction(BiFunction<WritableMetadataService, URL, Boolean> func, URL url) {
|
||
|
||
// 同时调用InMemoryWritableMetadataService对象和RemoteWritableMetadataService对象的exportURL()方法
|
||
|
||
return func.apply(defaultWritableMetadataService, url) && func.apply(remoteWritableMetadataService, url);
|
||
|
||
}
|
||
|
||
|
||
MetadataReport
|
||
|
||
元数据中心是 Dubbo 2.7.0 版本之后新增的一项优化,其主要目的是将 URL 中的一部分内容存储到元数据中心,从而减少注册中心的压力。
|
||
|
||
元数据中心的数据只是给本端自己使用的,改动不需要告知对端,例如,Provider 修改了元数据,不需要实时通知 Consumer。这样,在注册中心存储的数据量减少的同时,还减少了因为配置修改导致的注册中心频繁通知监听者情况的发生,很好地减轻了注册中心的压力。
|
||
|
||
MetadataReport 接口是 Dubbo 节点与元数据中心交互的桥梁,其继承关系如下图所示:
|
||
|
||
|
||
|
||
MetadataReport 继承关系图
|
||
|
||
我们先来看一下 MetadataReport 接口的核心定义:
|
||
|
||
public interface MetadataReport {
|
||
|
||
// 存储Provider元数据
|
||
|
||
void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition);
|
||
|
||
// 存储Consumer元数据
|
||
|
||
void storeConsumerMetadata(MetadataIdentifier consumerMetadataIdentifier, Map<String, String> serviceParameterMap);
|
||
|
||
// 存储、删除Service元数据
|
||
|
||
void saveServiceMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url);
|
||
|
||
void removeServiceMetadata(ServiceMetadataIdentifier metadataIdentifier);
|
||
|
||
// 查询暴露的URL
|
||
|
||
List<String> getExportedURLs(ServiceMetadataIdentifier metadataIdentifier);
|
||
|
||
// 查询订阅数据
|
||
|
||
void saveSubscribedData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, Set<String> urls);
|
||
|
||
List<String> getSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier);
|
||
|
||
// 查询ServiceDefinition
|
||
|
||
String getServiceDefinition(MetadataIdentifier metadataIdentifier);
|
||
|
||
}
|
||
|
||
|
||
了解了 MetadataReport 接口定义的核心行为之后,接下来我们就按照其实现的顺序来介绍:先来分析 AbstractMetadataReport 抽象类提供的公共实现,然后以 ZookeeperMetadataReport 这个具体实现为例,介绍 MetadataReport 如何与 ZooKeeper 配合实现元数据上报。
|
||
|
||
1. AbstractMetadataReport
|
||
|
||
AbstractMetadataReport 中提供了所有 MetadataReport 的公共实现,其核心字段如下:
|
||
|
||
private URL reportURL; // 元数据中心的URL,其中包含元数据中心的地址
|
||
|
||
// 本地磁盘缓存,用来缓存上报的元数据
|
||
|
||
File file;
|
||
|
||
final Properties properties = new Properties();
|
||
|
||
// 内存缓存
|
||
|
||
final Map<MetadataIdentifier, Object> allMetadataReports = new ConcurrentHashMap<>(4);
|
||
|
||
// 该线程池除了用来同步本地内存缓存与文件缓存,还会用来完成异步上报的功能
|
||
|
||
private final ExecutorService reportCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveMetadataReport", true));
|
||
|
||
// 用来暂存上报失败的元数据,后面会有定时任务进行重试
|
||
|
||
final Map<MetadataIdentifier, Object> failedReports = new ConcurrentHashMap<>(4);
|
||
|
||
boolean syncReport; // 是否同步上报元数据
|
||
|
||
// 记录最近一次元数据上报的版本,单调递增
|
||
|
||
private final AtomicLong lastCacheChanged = new AtomicLong();
|
||
|
||
// 用于重试的定时任务
|
||
|
||
public MetadataReportRetry metadataReportRetry;
|
||
|
||
// 当前MetadataReport实例是否已经初始化
|
||
|
||
private AtomicBoolean initialized = new AtomicBoolean(false);
|
||
|
||
|
||
在 AbstractMetadataReport 的构造方法中,首先会初始化本地的文件缓存,然后创建 MetadataReportRetry 重试任务,并启动一个周期性刷新的定时任务,具体实现如下:
|
||
|
||
public AbstractMetadataReport(URL reportServerURL) {
|
||
|
||
setUrl(reportServerURL);
|
||
|
||
// 默认的本地文件缓存
|
||
|
||
String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-metadata-" + reportServerURL.getParameter(APPLICATION_KEY) + "-" + reportServerURL.getAddress().replaceAll(":", "-") + ".cache";
|
||
|
||
String filename = reportServerURL.getParameter(FILE_KEY, defaultFilename);
|
||
|
||
File file = null;
|
||
|
||
if (ConfigUtils.isNotEmpty(filename)) {
|
||
|
||
file = new File(filename);
|
||
|
||
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
|
||
|
||
if (!file.getParentFile().mkdirs()) {
|
||
|
||
throw new IllegalArgumentException("...");
|
||
|
||
}
|
||
|
||
}
|
||
|
||
if (!initialized.getAndSet(true) && file.exists()) {
|
||
|
||
file.delete();
|
||
|
||
}
|
||
|
||
}
|
||
|
||
this.file = file;
|
||
|
||
// 将file文件中的内容加载到properties字段中
|
||
|
||
loadProperties();
|
||
|
||
// 是否同步上报元数据
|
||
|
||
syncReport = reportServerURL.getParameter(SYNC_REPORT_KEY, false);
|
||
|
||
// 创建重试任务
|
||
|
||
metadataReportRetry = new MetadataReportRetry(reportServerURL.getParameter(RETRY_TIMES_KEY, DEFAULT_METADATA_REPORT_RETRY_TIMES),
|
||
|
||
reportServerURL.getParameter(RETRY_PERIOD_KEY, DEFAULT_METADATA_REPORT_RETRY_PERIOD));
|
||
|
||
// 是否周期性地上报元数据
|
||
|
||
if (reportServerURL.getParameter(CYCLE_REPORT_KEY, DEFAULT_METADATA_REPORT_CYCLE_REPORT)) {
|
||
|
||
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboMetadataReportTimer", true));
|
||
|
||
// 默认每隔1天将本地元数据全部刷新到元数据中心
|
||
|
||
scheduler.scheduleAtFixedRate(this::publishAll, calculateStartTime(), ONE_DAY_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
在 AbstractMetadataReport.storeProviderMetadata() 方法中,首先会根据 syncReport 字段值决定是同步上报还是异步上报:如果是同步上报,则在当前线程执行上报操作;如果是异步上报,则在 reportCacheExecutor 线程池中执行上报操作。具体的上报操作是在storeProviderMetadataTask() 方法中完成的:
|
||
|
||
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
|
||
|
||
try {
|
||
|
||
// 将元数据记录到allMetadataReports集合
|
||
|
||
allMetadataReports.put(providerMetadataIdentifier, serviceDefinition);
|
||
|
||
// 如果之前上报失败,则在failedReports集合中有记录,这里上报成功之后会将其删除
|
||
|
||
failedReports.remove(providerMetadataIdentifier);
|
||
|
||
// 将元数据序列化成JSON字符串
|
||
|
||
Gson gson = new Gson();
|
||
|
||
String data = gson.toJson(serviceDefinition);
|
||
|
||
// 上报序列化后的元数据
|
||
|
||
doStoreProviderMetadata(providerMetadataIdentifier, data);
|
||
|
||
// 将序列化后的元数据保存到本地文件缓存中
|
||
|
||
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
|
||
|
||
} catch (Exception e) {
|
||
|
||
// 如果上报失败,则在failedReports集合中进行记录,然后由metadataReportRetry任务中进行重试
|
||
|
||
failedReports.put(providerMetadataIdentifier, serviceDefinition);
|
||
|
||
metadataReportRetry.startRetryTask();
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
我们可以看到这里调用了 doStoreProviderMetadata() 方法和 saveProperties() 方法。其中, doStoreProviderMetadata() 方法是一个抽象方法,对于不同的元数据中心实现有不同的实现,这个方法的具体实现在后面会展开分析。saveProperties() 方法中会更新 properties 字段,递增本地缓存文件的版本号,最后(同步/异步)执行 SaveProperties 任务,更新本地缓存文件的内容,具体实现如下:
|
||
|
||
private void saveProperties(MetadataIdentifier metadataIdentifier, String value, boolean add, boolean sync) {
|
||
|
||
if (file == null) {
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
if (add) { // 更新properties中的元数据
|
||
|
||
properties.setProperty(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY), value);
|
||
|
||
} else {
|
||
|
||
properties.remove(metadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY));
|
||
|
||
}
|
||
|
||
// 递增版本
|
||
|
||
long version = lastCacheChanged.incrementAndGet();
|
||
|
||
if (sync) { // 同步更新本地缓存文件
|
||
|
||
new SaveProperties(version).run();
|
||
|
||
} else { // 异步更新本地缓存文件
|
||
|
||
reportCacheExecutor.execute(new SaveProperties(version));
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
下面我们再来看 SaveProperties 任务的核心方法—— doSaveProperties() 方法,该方法中实现了刷新本地缓存文件的全部操作。
|
||
|
||
private void doSaveProperties(long version) {
|
||
|
||
if (version < lastCacheChanged.get()) { // 对比当前版本号和此次SaveProperties任务的版本号
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
if (file == null) { // 检测本地缓存文件是否存在
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
try {
|
||
|
||
// 创建lock文件
|
||
|
||
File lockfile = new File(file.getAbsolutePath() + ".lock");
|
||
|
||
if (!lockfile.exists()) {
|
||
|
||
lockfile.createNewFile();
|
||
|
||
}
|
||
|
||
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
|
||
|
||
FileChannel channel = raf.getChannel()) {
|
||
|
||
FileLock lock = channel.tryLock(); // 对lock文件加锁
|
||
|
||
if (lock == null) {
|
||
|
||
throw new IOException("Can not lock the metadataReport cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.metadata.file=xxx.properties");
|
||
|
||
}
|
||
|
||
try {
|
||
|
||
if (!file.exists()) { // 保证本地缓存文件存在
|
||
|
||
file.createNewFile();
|
||
|
||
}
|
||
|
||
// 将properties中的元数据保存到本地缓存文件中
|
||
|
||
try (FileOutputStream outputFile = new FileOutputStream(file)) {
|
||
|
||
properties.store(outputFile, "Dubbo metadataReport Cache");
|
||
|
||
}
|
||
|
||
} finally {
|
||
|
||
lock.release(); // 释放lock文件上的锁
|
||
|
||
}
|
||
|
||
}
|
||
|
||
} catch (Throwable e) {
|
||
|
||
if (version < lastCacheChanged.get()) { // 比较版本号
|
||
|
||
return;
|
||
|
||
} else { // 如果写文件失败,则重新提交SaveProperties任务,再次尝试
|
||
|
||
reportCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
|
||
|
||
}
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
了解了刷新本地缓存文件的核心逻辑之后,我们再来看 AbstractMetadataReport 中失败重试的逻辑。MetadataReportRetry 中维护了如下核心字段:
|
||
|
||
// 执行重试任务的线程池
|
||
|
||
ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(0, new NamedThreadFactory("DubboMetadataReportRetryTimer", true));
|
||
|
||
// 重试任务关联的Future对象
|
||
|
||
volatile ScheduledFuture retryScheduledFuture;
|
||
|
||
// 记录重试任务的次数
|
||
|
||
final AtomicInteger retryCounter = new AtomicInteger(0);
|
||
|
||
// 重试任务的时间间隔
|
||
|
||
long retryPeriod;
|
||
|
||
// 无失败上报的元数据之后,重试任务会再执行600次,才会销毁
|
||
|
||
int retryTimesIfNonFail = 600;
|
||
|
||
// 失败重试的次数上限,默认为100次,即重试失败100次之后会放弃
|
||
|
||
int retryLimit;
|
||
|
||
|
||
在 startRetryTask() 方法中,MetadataReportRetry 会创建一个重试任务,并提交到 retryExecutor 线程池中等待执行(如果已存在重试任务,则不会创建新任务)。在重试任务中会调用 AbstractMetadataReport.retry() 方法完成重新上报,当然也会判断 retryLimit 等执行条件,具体实现比较简单,这里就不再展示,你若感兴趣的话可以参考源码进行学习。
|
||
|
||
AbstractMetadataReport.retry() 方法的具体实现如下:
|
||
|
||
public boolean retry() {
|
||
|
||
return doHandleMetadataCollection(failedReports);
|
||
|
||
}
|
||
|
||
private boolean doHandleMetadataCollection(Map<MetadataIdentifier, Object> metadataMap) {
|
||
|
||
if (metadataMap.isEmpty()) { // 没有上报失败的元数据
|
||
|
||
return true;
|
||
|
||
}
|
||
|
||
// 遍历failedReports集合中失败上报的元数据,逐个调用storeProviderMetadata()方法或storeConsumerMetadata()方法重新上报
|
||
|
||
Iterator<Map.Entry<MetadataIdentifier, Object>> iterable = metadataMap.entrySet().iterator();
|
||
|
||
while (iterable.hasNext()) {
|
||
|
||
Map.Entry<MetadataIdentifier, Object> item = iterable.next();
|
||
|
||
if (PROVIDER_SIDE.equals(item.getKey().getSide())) {
|
||
|
||
this.storeProviderMetadata(item.getKey(), (FullServiceDefinition) item.getValue());
|
||
|
||
} else if (CONSUMER_SIDE.equals(item.getKey().getSide())) {
|
||
|
||
this.storeConsumerMetadata(item.getKey(), (Map) item.getValue());
|
||
|
||
}
|
||
|
||
}
|
||
|
||
return false;
|
||
|
||
}
|
||
|
||
|
||
在 AbstractMetadataReport 的构造方法中,会根据 reportServerURL(也就是后面的 metadataReportURL)参数启动一个“天”级别的定时任务,该定时任务会执行 publishAll() 方法,其中会通过 doHandleMetadataCollection() 方法将 allMetadataReports 集合中的全部元数据重新进行上报。该定时任务默认是在凌晨 02:00~06:00 启动,每天执行一次。
|
||
|
||
到此为止,AbstractMetadataReport 为子类实现的公共能力就介绍完了,其他方法都是委托给了相应的 do() 方法,这些 do() 方法都是在 AbstractMetadataReport 子类中实现的。
|
||
|
||
|
||
|
||
2. BaseMetadataIdentifier
|
||
|
||
在 AbstractMetadataReport 上报元数据的时候,元数据对应的 Key 都是BaseMetadataIdentifier 类型的对象,其继承关系如下图所示:
|
||
|
||
|
||
|
||
BaseMetadataIdentifier 继承关系图
|
||
|
||
|
||
MetadataIdentifier 中包含了服务接口、version、group、side 和 application 五个核心字段。
|
||
ServiceMetadataIdentifier 中包含了服务接口、version、group、side、revision 和 protocol 六个核心字段。
|
||
SubscriberMetadataIdentifier 中包含了服务接口、version、group、side 和 revision 五个核心字段。
|
||
|
||
|
||
3. MetadataReportFactory & MetadataReportInstance
|
||
|
||
MetadataReportFactory 是用来创建 MetadataReport 实例的工厂,具体定义如下:
|
||
|
||
@SPI("redis")
|
||
|
||
public interface MetadataReportFactory {
|
||
|
||
@Adaptive({"protocol"})
|
||
|
||
MetadataReport getMetadataReport(URL url);
|
||
|
||
}
|
||
|
||
|
||
MetadataReportFactory 是个扩展接口,从 @SPI 注解的默认值可以看出Dubbo 默认使用 Redis 实现元数据中心。
|
||
Dubbo 提供了针对 ZooKeeper、Redis、Consul 等作为元数据中心的 MetadataReportFactory 实现,如下图所示:
|
||
|
||
|
||
|
||
MetadataReportFactory 继承关系图
|
||
|
||
这些 MetadataReportFactory 实现都继承了 AbstractMetadataReportFactory,在 AbstractMetadataReportFactory 提供了缓存 MetadataReport 实现的功能,并定义了一个 createMetadataReport() 抽象方法供子类实现。另外,AbstractMetadataReportFactory 实现了 MetadataReportFactory 接口的 getMetadataReport() 方法,下面我们就来简单看一下该方法的实现:
|
||
|
||
public MetadataReport getMetadataReport(URL url) {
|
||
|
||
// 清理export、refer参数
|
||
|
||
url = url.setPath(MetadataReport.class.getName())
|
||
|
||
.removeParameters(EXPORT_KEY, REFER_KEY);
|
||
|
||
String key = url.toServiceString();
|
||
|
||
LOCK.lock();
|
||
|
||
try {
|
||
|
||
// 从SERVICE_STORE_MAP集合(ConcurrentHashMap<String, MetadataReport>类型)中查询是否已经缓存有对应的MetadataReport对象
|
||
|
||
MetadataReport metadataReport = SERVICE_STORE_MAP.get(key);
|
||
|
||
if (metadataReport != null) { // 直接返回缓存的MetadataReport对象
|
||
|
||
return metadataReport;
|
||
|
||
}
|
||
|
||
// 创建新的MetadataReport对象,createMetadataReport()方法由子类具体实现
|
||
|
||
metadataReport = createMetadataReport(url);
|
||
|
||
// 将MetadataReport缓存到SERVICE_STORE_MAP集合中
|
||
|
||
SERVICE_STORE_MAP.put(key, metadataReport);
|
||
|
||
return metadataReport;
|
||
|
||
} finally {
|
||
|
||
LOCK.unlock();
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
MetadataReportInstance 是一个单例对象,其中会获取 MetadataReportFactory 的适配器,并根据 init() 方法传入的 metadataReportURL 选择对应的 MetadataReportFactory 创建 MetadataReport 实例,这也是当前 Dubbo 进程全局唯一的 MetadataReport 实例。
|
||
|
||
MetadataReportInstance 的具体实现比较简单,这里就不再展示,你若感兴趣的话可以参考源码进行学习。
|
||
|
||
4. ZookeeperMetadataReport
|
||
|
||
下面我们来看 dubbo-metadata-report-zookeeper 模块是如何接入 ZooKeeper 作为元数据中心的。
|
||
|
||
我们首先关注 dubbo-metadata-report-zookeeper 模块的 SPI 文件,可以看到 ZookeeperMetadataReportFactory 的扩展名称是 zookeeper:
|
||
|
||
zookeeper=org.apache.dubbo.metadata.store.zookeeper.ZookeeperMetadataReportFactory
|
||
|
||
|
||
在 ZookeeperMetadataReportFactory 的 createMetadataReport() 方法中会创建 ZookeeperMetadataReport 这个 MetadataReport 实现类的对象。
|
||
|
||
在 ZookeeperMetadataReport 中维护了一个 ZookeeperClient 实例用来和 ZooKeeper 进行交互。ZookeeperMetadataReport 读写元数据的根目录是 metadataReportURL 的 group 参数值,默认值为 dubbo。
|
||
|
||
下面再来看 ZookeeperMetadataReport 对 AbstractMetadataReport 中各个 do*() 方法的实现,这些方法核心都是通过 ZookeeperClient 创建、查询、删除对应的 ZNode 节点,没有什么复杂的逻辑,关键是明确一下操作的 ZNode 节点的 path 是什么。
|
||
|
||
doStoreProviderMetadata() 方法和 doStoreConsumerMetadata() 方法会调用 storeMetadata() 创建相应的 ZNode 节点:
|
||
|
||
private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
|
||
|
||
zkClient.create(getNodePath(metadataIdentifier), v, false);
|
||
|
||
}
|
||
|
||
String getNodePath(BaseMetadataIdentifier metadataIdentifier) {
|
||
|
||
return toRootDir() + metadataIdentifier.getUniqueKey(KeyTypeEnum.PATH);
|
||
|
||
}
|
||
|
||
|
||
MetadataIdentifier 对象对应 ZNode 节点的 path 默认格式是 :
|
||
|
||
/dubbo/metadata/服务接口/version/group/side/application
|
||
|
||
|
||
对应 ZNode 节点的 Value 是 ServiceDefinition 序列化后的 JSON 字符串。
|
||
|
||
doSaveMetadata()、doRemoveMetadata() 以及 doGetExportedURLs() 方法参数是 ServiceMetadataIdentifier 对象,对应的 ZNode 节点 path 是:
|
||
|
||
/dubbo/metadata/服务接口/version/group/side/protocol/revision
|
||
|
||
|
||
doSaveSubscriberData()、doGetSubscribedURLs() 方法的参数是 SubscriberMetadataIdentifier 对象,对应的 ZNode 节点 path 是:
|
||
|
||
/dubbo/metadata/服务接口/version/group/side/revision
|
||
|
||
|
||
MetadataServiceExporter
|
||
|
||
了解了 MetadataService 接口的核心功能和底层实现之后,我们接着再来看 MetadataServiceExporter 接口,这个接口负责将 MetadataService 接口作为一个 Dubbo 服务发布出去。
|
||
|
||
下面来看 MetadataServiceExporter 接口的具体定义:
|
||
|
||
public interface MetadataServiceExporter {
|
||
|
||
// 将MetadataService作为一个Dubbo服务发布出去
|
||
|
||
MetadataServiceExporter export();
|
||
|
||
// 注销掉MetadataService服务
|
||
|
||
MetadataServiceExporter unexport();
|
||
|
||
// MetadataService可能以多种协议发布,这里返回发布MetadataService服务的所有URL
|
||
|
||
List<URL> getExportedURLs();
|
||
|
||
// 检测MetadataService服务是否已经发布
|
||
|
||
boolean isExported();
|
||
|
||
}
|
||
|
||
|
||
MetadataServiceExporter 只有 ConfigurableMetadataServiceExporter 这一个实现,如下图所示:
|
||
|
||
|
||
|
||
MetadataServiceExporter 继承关系图
|
||
|
||
ConfigurableMetadataServiceExporter 的核心实现是 export() 方法,其中会创建一个 ServiceConfig 对象完成 MetadataService 服务的发布:
|
||
|
||
public ConfigurableMetadataServiceExporter export() {
|
||
|
||
if (!isExported()) {
|
||
|
||
// 创建ServiceConfig对象
|
||
|
||
ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
|
||
|
||
serviceConfig.setApplication(getApplicationConfig());
|
||
|
||
serviceConfig.setRegistries(getRegistries());
|
||
|
||
serviceConfig.setProtocol(generateMetadataProtocol()); // 设置Protocol(默认是Dubbo)
|
||
|
||
serviceConfig.setInterface(MetadataService.class); // 设置服务接口
|
||
|
||
serviceConfig.setRef(metadataService); // 设置MetadataService对象
|
||
|
||
serviceConfig.setGroup(getApplicationConfig().getName()); // 设置group
|
||
|
||
serviceConfig.setVersion(metadataService.version()); // 设置version
|
||
|
||
// 发布MetadataService服务,ServiceConfig发布服务的流程在前面已经详细分析过了,这里不再展开
|
||
|
||
serviceConfig.export();
|
||
|
||
this.serviceConfig = serviceConfig;
|
||
|
||
} else {
|
||
|
||
... // 输出日志
|
||
|
||
}
|
||
|
||
return this;
|
||
|
||
}
|
||
|
||
|
||
ServiceNameMapping
|
||
|
||
ServiceNameMapping 接口的主要功能是实现 Service ID 到 Service Name 之间的转换,底层会依赖配置中心实现数据存储和查询。ServiceNameMapping 接口的定义如下:
|
||
|
||
@SPI("default")
|
||
|
||
public interface ServiceNameMapping {
|
||
|
||
// 服务接口、group、version、protocol四部分构成了Service ID,并与当前Service Name之间形成映射,记录到配置中心
|
||
|
||
void map(String serviceInterface, String group, String version, String protocol);
|
||
|
||
// 根据服务接口、group、version、protocol四部分构成的Service ID,查询对应的Service Name
|
||
|
||
Set<String> get(String serviceInterface, String group, String version, String protocol);
|
||
|
||
// 获取默认的ServiceNameMapping接口的扩展实现
|
||
|
||
static ServiceNameMapping getDefaultExtension() {
|
||
|
||
return getExtensionLoader(ServiceNameMapping.class).getDefaultExtension();
|
||
|
||
}
|
||
|
||
}
|
||
|
||
|
||
DynamicConfigurationServiceNameMapping 是 ServiceNameMapping 的默认实现,也是唯一实现,其中会依赖 DynamicConfiguration 读写配置中心,完成 Service ID 和 Service Name 的映射。首先来看 DynamicConfigurationServiceNameMapping 的 map() 方法:
|
||
|
||
public void map(String serviceInterface, String group, String version, String protocol) {
|
||
|
||
// 跳过MetadataService接口的处理
|
||
|
||
if (IGNORED_SERVICE_INTERFACES.contains(serviceInterface)) {
|
||
|
||
return;
|
||
|
||
}
|
||
|
||
// 获取DynamicConfiguration对象
|
||
|
||
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
|
||
|
||
// 从ApplicationModel中获取Service Name
|
||
|
||
String key = getName();
|
||
|
||
String content = valueOf(System.currentTimeMillis());
|
||
|
||
execute(() -> {
|
||
|
||
// 在配置中心创建映射关系,这里的buildGroup()方法虽然接收四个参数,但是只使用了serviceInterface
|
||
|
||
// 也就是使用创建了服务接口到Service Name的映射
|
||
|
||
// 可以暂时将配置中心理解为一个KV存储,这里的Key是buildGroup()方法返回值+Service Name构成的,value是content(即时间戳)
|
||
|
||
dynamicConfiguration.publishConfig(key, buildGroup(serviceInterface, group, version, protocol), content);
|
||
|
||
});
|
||
|
||
}
|
||
|
||
|
||
在 DynamicConfigurationServiceNameMapping.get() 方法中,会根据传入的服务接口名称、group、version、protocol 组成 Service ID,查找对应的 Service Name,如下所示:
|
||
|
||
public Set<String> get(String serviceInterface, String group, String version, String protocol) {
|
||
|
||
// 获取DynamicConfiguration对象
|
||
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
|
||
|
||
// 根据Service ID从配置查找Service Name
|
||
Set<String> serviceNames = new LinkedHashSet<>();
|
||
|
||
execute(() -> {
|
||
Set<String> keys = dynamicConfiguration.getConfigKeys(buildGroup(serviceInterface, group, version, protocol));
|
||
serviceNames.addAll(keys);
|
||
|
||
});
|
||
|
||
// 返回查找到的全部Service Name
|
||
return Collections.unmodifiableSet(serviceNames);
|
||
|
||
}
|
||
|
||
|
||
总结
|
||
|
||
本课时我们重点介绍了服务自省架构中元数据相关的实现。
|
||
|
||
|
||
首先我们介绍了 ServiceInstance 和 ServiceDefinition 是如何抽象一个服务实例以及服务定义的。
|
||
紧接着讲解了元数据服务接口的定义,也就是 MetadataService 接口及核心的扩展实现。
|
||
接下来详细分析了 MetadataReport 接口的实现,了解了它是如何与元数据中心配合,实现元数据上报功能的。
|
||
然后还说明了 MetadataServiceExporter 的实现,了解了发布元数据服务的核心原理。
|
||
最后,我们介绍了 ServiceNameMapping 接口以及其默认实现,它实现了 Service ID 与 Service Name 的映射,也是服务自省架构中不可或缺的一部分。
|
||
|
||
|
||
|
||
|
||
|