响应式网站导航,怎样做自己的销售网站,如何网站建设目标,福田网站建设结业论文#x1f44f;作者简介#xff1a;大家好#xff0c;我是爱吃芝士的土豆倪#xff0c;24届校招生Java选手#xff0c;很高兴认识大家#x1f4d5;系列专栏#xff1a;Spring源码、JUC源码、Kafka原理、分布式技术原理#x1f525;如果感觉博主的文章还不错的话#xff… 作者简介大家好我是爱吃芝士的土豆倪24届校招生Java选手很高兴认识大家系列专栏Spring源码、JUC源码、Kafka原理、分布式技术原理如果感觉博主的文章还不错的话请三连支持一下博主哦博主正在努力完成2023计划中源码溯源一探究竟联系方式nhs19990716加我进群大家一起学习一起进步一起对抗互联网寒冬 文章目录 Dubbo服务的注册流程服务发布步骤思考Dubbo源码分析Dubbo注解的解析流程DubboComponetScanServiceAnnotationBeanPostProcessor注册一个DubboBootstrapApplicationListenerregisterServiceBeansregisterServiceBeanbuildServiceBeanDefinition ServiceBean的初始化阶段DubboBootstrapApplicationListenerstartinitialize()exportServicesexportexporteddoExport doExportUrlsdoExportUrlsFor1ProtocolRegistryProtocol 服务发布流程doLocalExportprotocol.exportopenServerExchangers.bindheaderExchanger.bindgetTransporterNettyTransporter.bindNettyServerdoOpen 服务注册流程RegistryProtocolgetRegistryRegistryFactory$AdaptiveRegistryFactoryWrapperRegistryProtocol.registerListenerRegistryWrapper.registerZookeeperRegistryZookeeperRegistry.doRegister Invoker是什么ProxyFacotory.getInvokerProxyFactoryProxyFactory$AdaptiveJavassistProxyFactory.getInvokerjavassist生成的动态代理代码 Dubbo服务的注册流程
服务发布步骤
注解
DubboService(loadbalance random,cluster failover,retries 2)注解扫描
DubboComponentScan思考
首先需要扫描注解在扫描的过程中可以拿到注解中的数据注解的目的其实是做一个标记的功能可以通过不同的标记去对一些类做一个分类具体扫描哪一个注解取决于对其的关注度然后解析注解获取对应的配置。
解析完成后需要去发布服务像Dubbo是基于URL驱动的其会将所有的配置信息配置在URL的地址上所以这一步主要做的就是URL的组装。
后面要做的事情就是将其注册到zookeeper上相当于把服务端的相关配置信息和服务的地址信息都会保存到第三方的平台上到了第三方平台之后如果我的客户端要去调用的时候可以通过第三方平台知道服务端的调用机制是什么这些都可以在URL上识别到。
接下来就是启动服务了根据URL中配置的协议、配置的端口去发布对应的服务。
Dubbo源码分析
Dubbo发布其实有两种形式其实从前面介绍的博客上来说
分为 xml形式 dubbo:service 和 注解形式 DubboService/ Service
Dubbo注解的解析流程
DubboComponetScan
DubboComponentScan(basePackages com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services)Target(ElementType.TYPE)
Retention(RetentionPolicy.RUNTIME)
Documented
Import(DubboComponentScanRegistrar.class)
public interface DubboComponentScan {
}// 这里面无非就是注册一个bean到 Spring IOC 里面
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {// 这个就是获取我们在注解上 定义的 basePackagesSetString packagesToScan getPackagesToScan(importingClassMetadata);registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);// since 2.7.6 Register the common beansregisterCommonBeans(registry);}
---------------------------------------------------------------------------
private void registerServiceAnnotationBeanPostProcessor(SetString packagesToScan, BeanDefinitionRegistry registry) {// 在这里注册了 一个叫做 ServiceAnnotationBeanPostProcessor 的beanBeanDefinitionBuilder builder rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);// 传递一个构造参数 packagesToScan 这里也就意味着服务的注册流程和 ServiceAnnotationBeanPostProcessor bean有关系builder.addConstructorArgValue(packagesToScan);builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);AbstractBeanDefinition beanDefinition builder.getBeanDefinition();// 将这个bean注册BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);}ServiceAnnotationBeanPostProcessor
public ServiceAnnotationBeanPostProcessor(SetString packagesToScan) {super(packagesToScan);
}// 进入super 因为实现了 BeanDefinitionRegistryPostProcessor所以在bean装载完成之后会触发postProcessBeanDefinitionRegistry 方法public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,ResourceLoaderAware, BeanClassLoaderAware {public ServiceClassPostProcessor(SetString packagesToScan) {this.packagesToScan packagesToScan;}Overridepublic void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {// 注册一个基础的beanregisterInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);SetString resolvedPackagesToScan resolvePackagesToScan(packagesToScan);// 判断我们传过来需要扫描的路径是不是空的如果不是空的会调用下面的方法if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {registerServiceBeans(resolvedPackagesToScan, registry);} else {if (logger.isWarnEnabled()) {logger.warn(packagesToScan is empty , ServiceBean registry will be ignored!);}}} 注册一个DubboBootstrapApplicationListener
这个bean会在spring 容器的上下文装载完成之后触发监听
public class DubboBootstrapApplicationListener extends OneTimeExecutionApplicationContextEventListenerimplements Ordered {/*** The bean name of {link DubboBootstrapApplicationListener}** since 2.7.6*/public static final String BEAN_NAME dubboBootstrapApplicationListener;private final DubboBootstrap dubboBootstrap;public DubboBootstrapApplicationListener() {this.dubboBootstrap DubboBootstrap.getInstance();}Overridepublic void onApplicationContextEvent(ApplicationContextEvent event) {// 上下文刷新的时候也就是bean装载完成的时候if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}registerServiceBeans
private void registerServiceBeans(SetString packagesToScan, BeanDefinitionRegistry registry) {// 定义一个scanner 用作扫描DubboClassPathBeanDefinitionScanner scanner new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);// 生成一个bean的名字BeanNameGenerator beanNameGenerator resolveBeanNameGenerator(registry);scanner.setBeanNameGenerator(beanNameGenerator);// 为了兼容老的版本实际上就是把需要扫描的注解类型设置到Scanner。// refactor since 2.7.7serviceAnnotationTypes.forEach(annotationType - {scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));});// 遍历给的包路径for (String packageToScan : packagesToScan) {// 扫描对应的路径// Registers Service Bean firstscanner.scan(packageToScan);// Finds all BeanDefinitionHolders of Service whether ComponentScan scans or not.// 查找Service的所有beandefinitionholder// 相当于扫描所有加了 DubboService注解的类SetBeanDefinitionHolder beanDefinitionHolders findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {registerServiceBean(beanDefinitionHolder, registry, scanner);}if (logger.isInfoEnabled()) {logger.info(beanDefinitionHolders.size() annotated Dubbos Service Components { beanDefinitionHolders } were scanned under package[ packageToScan ]);}} else {if (logger.isWarnEnabled()) {logger.warn(No Spring Bean annotating Dubbos Service was found under package[ packageToScan ]);}}}}registerServiceBean
该bean和服务有关的信息实际上都在我们刚刚定义的DubboService
DubboService(loadbalance random,cluster failover,retries 2)服务以什么协议发布服务的负载均衡策略服务的容错策略服务发布端口…
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,DubboClassPathBeanDefinitionScanner scanner) {// 获取beanDefinitionHolder中的类对象Class? beanClass resolveClass(beanDefinitionHolder); // 在beanClass中查找Service注解的存在Annotation service findServiceAnnotation(beanClass);/*** The {link AnnotationAttributes} of Service annotation*///获取Service注解的属性信息包括interfaceClass等AnnotationAttributes serviceAnnotationAttributes getAnnotationAttributes(service, false, false);// 根据Service注解的属性信息解析服务接口类Class? interfaceClass resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);// 使用Service注解的属性信息接口类和注解的服务Bean名称构建服务Bean定义String annotatedServiceBeanName beanDefinitionHolder.getBeanName();// 根据Service注解的属性信息和接口类生成服务Bean的名称AbstractBeanDefinition serviceBeanDefinition buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);// ServiceBean Bean name// 使用扫描器检查候选的Bean名称和服务Bean定义是否重复String beanName generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean// 如果候选Bean通过检查将服务Bean定义注册到Bean定义注册表中// 通过 buildServiceBeanDefinition 得知这里面注册的就是ServiceBeanregistry.registerBeanDefinition(beanName, serviceBeanDefinition);// 据注册结果输出相应的日志信息包括注册成功和重复注册的警告信息if (logger.isInfoEnabled()) {logger.info(The BeanDefinition[ serviceBeanDefinition ] of ServiceBean has been registered with name : beanName);}} else {if (logger.isWarnEnabled()) {logger.warn(The Duplicated BeanDefinition[ serviceBeanDefinition ] of ServiceBean[ bean name : beanName ] was be found , Did DubboComponentScan scan to same package in many times?);}}}buildServiceBeanDefinition
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,AnnotationAttributes serviceAnnotationAttributes,Class? interfaceClass,String annotatedServiceBeanName) {// 通过大体阅读可以看到其会将很多的配置信息构建到一个叫做 ServiceBean 里面 BeanDefinitionBuilder builder rootBeanDefinition(ServiceBean.class);AbstractBeanDefinition beanDefinition builder.getBeanDefinition();MutablePropertyValues propertyValues beanDefinition.getPropertyValues();String[] ignoreAttributeNames of(provider, monitor, application, module, registry, protocol,interface, interfaceName, parameters);propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));// References ref property to annotated-Service BeanaddPropertyReference(builder, ref, annotatedServiceBeanName);// Set interfacebuilder.addPropertyValue(interface, interfaceClass.getName());// Convert parameters into mapbuilder.addPropertyValue(parameters, convertParameters(serviceAnnotationAttributes.getStringArray(parameters)));// Add methods parametersListMethodConfig methodConfigs convertMethodConfigs(serviceAnnotationAttributes.get(methods));if (!methodConfigs.isEmpty()) {builder.addPropertyValue(methods, methodConfigs);}/*** Add {link org.apache.dubbo.config.ProviderConfig} Bean reference*/String providerConfigBeanName serviceAnnotationAttributes.getString(provider);if (StringUtils.hasText(providerConfigBeanName)) {addPropertyReference(builder, provider, providerConfigBeanName);}/*** Add {link org.apache.dubbo.config.MonitorConfig} Bean reference*/String monitorConfigBeanName serviceAnnotationAttributes.getString(monitor);if (StringUtils.hasText(monitorConfigBeanName)) {addPropertyReference(builder, monitor, monitorConfigBeanName);}/*** Add {link org.apache.dubbo.config.ApplicationConfig} Bean reference*/String applicationConfigBeanName serviceAnnotationAttributes.getString(application);if (StringUtils.hasText(applicationConfigBeanName)) {addPropertyReference(builder, application, applicationConfigBeanName);}/*** Add {link org.apache.dubbo.config.ModuleConfig} Bean reference*/String moduleConfigBeanName serviceAnnotationAttributes.getString(module);if (StringUtils.hasText(moduleConfigBeanName)) {addPropertyReference(builder, module, moduleConfigBeanName);}/*** Add {link org.apache.dubbo.config.RegistryConfig} Bean reference*/String[] registryConfigBeanNames serviceAnnotationAttributes.getStringArray(registry);ListRuntimeBeanReference registryRuntimeBeanReferences toRuntimeBeanReferences(registryConfigBeanNames);if (!registryRuntimeBeanReferences.isEmpty()) {builder.addPropertyValue(registries, registryRuntimeBeanReferences);}/*** Add {link org.apache.dubbo.config.ProtocolConfig} Bean reference*/String[] protocolConfigBeanNames serviceAnnotationAttributes.getStringArray(protocol);ListRuntimeBeanReference protocolRuntimeBeanReferences toRuntimeBeanReferences(protocolConfigBeanNames);if (!protocolRuntimeBeanReferences.isEmpty()) {builder.addPropertyValue(protocols, protocolRuntimeBeanReferences);}return builder.getBeanDefinition();}最终通过上述代码讲一个 dubbo中提供的ServiceBean注入到Spring IOC容器
ServiceBean的初始化阶段
因为我们向spring注入了一个ServiceBean 那么在spring最后实例化阶段即执行到 finishBeanFactoryInitialization 方法的时候就会调用到getBean方法从而通过反射去实例化那么就会调用到ServiceBean 的构造方法。看看其构造函数
public ServiceBean() {super();this.service null;}当ServiceBean初始化完成之后会调用下面的方法.
Override
public void afterPropertiesSet() throws Exception {if (StringUtils.isEmpty(getPath())) {if (StringUtils.isNotEmpty(beanName) StringUtils.isNotEmpty(getInterface()) beanName.startsWith(getInterface())) {setPath(beanName);}}
}DubboBootstrapApplicationListener
在Dubbo中DubboBootstrapApplicationListener是一个Spring应用程序监听器它在Spring应用程序启动时会监听Dubbo的启动事件。
当启动 Dubbo服务时。
public class DubboBootstrapApplicationListener extends OnceApplicationContextEventListener implements Ordered {/*** The bean name of {link DubboBootstrapApplicationListener}** since 2.7.6*/public static final String BEAN_NAME dubboBootstrapApplicationListener;private final DubboBootstrap dubboBootstrap;public DubboBootstrapApplicationListener(ApplicationContext applicationContext) {super(applicationContext);this.dubboBootstrap DubboBootstrap.getInstance();}Overridepublic void onApplicationContextEvent(ApplicationContextEvent event) {if (event instanceof ContextRefreshedEvent) {onContextRefreshedEvent((ContextRefreshedEvent) event);} else if (event instanceof ContextClosedEvent) {onContextClosedEvent((ContextClosedEvent) event);}}private void onContextRefreshedEvent(ContextRefreshedEvent event) {dubboBootstrap.start();}private void onContextClosedEvent(ContextClosedEvent event) {dubboBootstrap.stop();}Overridepublic int getOrder() {return LOWEST_PRECEDENCE;}
}// 监听的时候会进入到 onContextRefreshedEvent 里面当开启start的时候可能会做的配置
元数据/远程配置信息的初始化拼接url如果是dubbo协议则启动netty server服务注册
start
public DubboBootstrap start() {// 首先通过compareAndSet方法确保started标识为false避免重复执行启动操作if (started.compareAndSet(false, true)) {ready.set(false);// 调用initialize方法进行初始化。initialize();if (logger.isInfoEnabled()) {logger.info(NAME is starting...);}// 1. export Dubbo Services// 导出Dubbo服务即将服务暴露出去。exportServices();// Not only provider register// 如果不仅仅是注册提供者并且已经导出了服务那么还会导出MetadataService。// 如果需要注册本地ServiceInstance。if (!isOnlyRegisterProvider() || hasExportedServices()) {// 2. export MetadataServiceexportMetadataService();//3. Register the local ServiceInstance if requiredregisterServiceInstance();}referServices();// 如果存在异步导出的服务会启动一个新的线程来等待异步导出完成。if (asyncExportingFutures.size() 0) {new Thread(() - {try {this.awaitFinish();} catch (Exception e) {logger.warn(NAME exportAsync occurred an exception.);}// 最后设置ready标识为true表示Dubbo框架已经准备就绪。ready.set(true);if (logger.isInfoEnabled()) {logger.info(NAME is ready.);}}).start();} else {ready.set(true);if (logger.isInfoEnabled()) {logger.info(NAME is ready.);}}if (logger.isInfoEnabled()) {logger.info(NAME has started.);}}return this;}initialize()
public void initialize() {if (!initialized.compareAndSet(false, true)) {return;}// 初始化ApplicationModel初始化应用模型其中会维护很多配置信息。ApplicationModel.initFrameworkExts();// 启动配置中心用于管理和获取配置信息。startConfigCenter();// 加载远程配置信息。loadRemoteConfigs();// 检查全局配置信息。checkGlobalConfigs();// since 2.7.8// 在2.7.8版本之后的Dubbo中启动元数据中心。startMetadataCenter();// 初始化元数据服务。initMetadataService();// 初始化事件监听器。initEventListener();// 打印初始化完成的日志信息。if (logger.isInfoEnabled()) {logger.info(NAME has been initialized!);}}exportServices
发布Dubbo服务
private void exportServices() {// 这里面有我们要发布服务的列表configManager.getServices().forEach(sc - {// TODO, compatible with ServiceConfig.export()ServiceConfig serviceConfig (ServiceConfig) sc;serviceConfig.setBootstrap(this);// 是否异步发布还是同步发布if (exportAsync) {// 使用线程池来异步发布ExecutorService executor executorRepository.getServiceExporterExecutor();Future? future executor.submit(() - {sc.export();exportedServices.add(sc);});asyncExportingFutures.add(future);} else {sc.export();exportedServices.add(sc);}});
}遍历所有dubbo服务进行服务发布.
dubbo:service beanNameServiceBean:com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoService /
dubbo:service beanNameServiceBean:com.gupaoedu.springboot.dubbo.ISayHelloService /dubbo://ip:port?com.gupaoedu.springboot.dubbo.springbootdubbosampleprovider.services.IDemoServicexxxxxx
dubbo://ip:port?com.gupaoedu.springboot.dubbo.ISayHelloServicexxxxxx一个dubbo服务需要发布几次取决于协议的配置数如果一个dubbo服务配置了3个协议rest、webservice、dubbo。
这个时候实际上就会生成三个地址
dubbo://
rest://
webservice://
export
public synchronized void export() {// 首先检查 是否为空如果为空则获取 DubboBootstrap 的实例并进行初始化。if (bootstrap null) {bootstrap DubboBootstrap.getInstance();bootstrap.initialize();}// 调用 方法用于检查和更新子配置。checkAndUpdateSubConfigs();//init serviceMetadata// 初始化 设置服务的版本、分组、接口类型、接口名称和目标引用。serviceMetadata.setVersion(getVersion());serviceMetadata.setGroup(getGroup());serviceMetadata.setDefaultGroup(getGroup());serviceMetadata.setServiceType(getInterfaceClass());serviceMetadata.setServiceInterfaceName(getInterface());serviceMetadata.setTarget(getRef());// 如果不应该导出服务则直接返回。if (!shouldExport()) {return;}// 如果应该延迟导出服务则使用 延迟执行 方法延迟时间由 方法返回时间单位为毫秒。DELAY_EXPORT_EXECUTORdoExportgetDelay()/*为什么延迟发布呢之前老的版本里面考虑spring配置的装载和 dubbo服务启动配置之间会有一个先后的关系如果说spring的一些配置还没有加载但是dubbo服务已经启动了这个时候就会导致一定的问题有的时候就是为了延迟几秒钟等到spring环境加载好了再去启动dubbo。也是为了保障服务启动的安全性。*/if (shouldDelay()) {DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);} else {// 如果不需要延迟导出则直接调用 方法导出服务。doExport();}// 最后调用 方法表示服务已经导出。exported()exported();}exported
public void exported() {ListURL exportedURLs this.getExportedUrls();exportedURLs.forEach(url - {MapString, String parameters getApplication().getParameters();ServiceNameMapping.getExtension(parameters ! null ? parameters.get(MAPPING_KEY) : null).map(url);});// dispatch a ServiceConfigExportedEvent since 2.7.4// 发布完成以后 会发布一个事件服务配置启动成功的事件dispatch(new ServiceConfigExportedEvent(this));}doExport
protected synchronized void doExport() {if (unexported) {throw new IllegalStateException(The service interfaceClass.getName() has already unexported!);}if (exported) {return;}exported true;if (StringUtils.isEmpty(path)) {path interfaceName;}doExportUrls();}doExportUrls
去发布这个url也就是基于url的驱动去进行服务的发布也就到了最关键的阶段了。
private void doExportUrls() {// 构建一个 ServiceRepository将一些服务的描述信息都存储到这里后续如果要去用的话就从这里面拿到ServiceRepository repository ApplicationModel.getServiceRepository();ServiceDescriptor serviceDescriptor repository.registerService(getInterfaceClass());repository.registerProvider(getUniqueServiceName(),ref,serviceDescriptor,this,serviceMetadata);// 在这里先去拿到注册中心的url之前前面提到过一个服务可以配置多个注册中心也可以配置多个协议ListURL registryURLs ConfigValidationUtils.loadRegistries(this, true);// 遍历所有的协议如果有多个协议的话就采用不同的协议去发布for (ProtocolConfig protocolConfig : protocols) {String pathKey URL.buildKey(getContextPath(protocolConfig).map(p - p / path).orElse(path), group, version);// In case user specified path, register service one more time to map it to path.repository.registerService(pathKey, interfaceClass);// TODO, uncomment this line once service key is unifiedserviceMetadata.setServiceKey(pathKey);// 这里将对应的协议 和 多个注册中心传递过去doExportUrlsFor1Protocol(protocolConfig, registryURLs);}
}doExportUrlsFor1Protocol
到了这里也就是最核心的操作
生成url根据url中配置的协议类型调用指定协议进行服务的发布 启动服务注册服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, ListURL registryURLs) {// 如果我们配置协议默认采用Dubbo协议发布String name protocolConfig.getName();if (StringUtils.isEmpty(name)) {name DUBBO;}//用来存储所有的配置信息/* dubbo:servicedubbo:methoddubbo:argument*/MapString, String map new HashMapString, String();map.put(SIDE_KEY, PROVIDER_SIDE);ServiceConfig.appendRuntimeParameters(map);AbstractConfig.appendParameters(map, getMetrics());AbstractConfig.appendParameters(map, getApplication());AbstractConfig.appendParameters(map, getModule());// remove default. prefix for configs from ProviderConfig// appendParameters(map, provider, Constants.DEFAULT_KEY);AbstractConfig.appendParameters(map, provider);AbstractConfig.appendParameters(map, protocolConfig);AbstractConfig.appendParameters(map, this);MetadataReportConfig metadataReportConfig getMetadataReportConfig();if (metadataReportConfig ! null metadataReportConfig.isValid()) {map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);}if (CollectionUtils.isNotEmpty(getMethods())) {// 去遍历解析所有的methodsfor (MethodConfig method : getMethods()) {AbstractConfig.appendParameters(map, method, method.getName());String retryKey method.getName() .retry;if (map.containsKey(retryKey)) {String retryValue map.remove(retryKey);if (false.equals(retryValue)) {map.put(method.getName() .retries, 0);}}ListArgumentConfig arguments method.getArguments();if (CollectionUtils.isNotEmpty(arguments)) {// 然后再去遍历解析里面的 argumentsfor (ArgumentConfig argument : arguments) {// convert argument typeif (argument.getType() ! null argument.getType().length() 0) {Method[] methods interfaceClass.getMethods();// visit all methodsif (methods.length 0) {for (int i 0; i methods.length; i) {String methodName methods[i].getName();// target the method, and get its signatureif (methodName.equals(method.getName())) {Class?[] argtypes methods[i].getParameterTypes();// one callback in the methodif (argument.getIndex() ! -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {AbstractConfig.appendParameters(map, argument, method.getName() . argument.getIndex());} else {throw new IllegalArgumentException(Argument config error : the index attribute and type attribute not match :index : argument.getIndex() , type: argument.getType());}} else {// multiple callbacks in the methodfor (int j 0; j argtypes.length; j) {Class? argclazz argtypes[j];if (argclazz.getName().equals(argument.getType())) {AbstractConfig.appendParameters(map, argument, method.getName() . j);if (argument.getIndex() ! -1 argument.getIndex() ! j) {throw new IllegalArgumentException(Argument config error : the index attribute and type attribute not match :index : argument.getIndex() , type: argument.getType());}}}}}}}} else if (argument.getIndex() ! -1) {AbstractConfig.appendParameters(map, argument, method.getName() . argument.getIndex());} else {throw new IllegalArgumentException(Argument config must set index or type attribute.eg: dubbo:argument index0 .../ or dubbo:argument typexxx .../);}}}} // end of methods for}// 最终这个if结束完了之后参数也就组装完成了。// 针对泛化添加的参数if (ProtocolUtils.isGeneric(generic)) {map.put(GENERIC_KEY, generic);map.put(METHODS_KEY, ANY_VALUE);} else {String revision Version.getVersion(interfaceClass, version);if (revision ! null revision.length() 0) {map.put(REVISION_KEY, revision);}String[] methods Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length 0) {logger.warn(No method found in service interface interfaceClass.getName());map.put(METHODS_KEY, ANY_VALUE);} else {map.put(METHODS_KEY, StringUtils.join(new HashSetString(Arrays.asList(methods)), ,));}}// 针对token添加的参数/*** Here the token value configured by the provider is used to assign the value to ServiceConfig#token*/if(ConfigUtils.isEmpty(token) provider ! null) {token provider.getToken();}if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(TOKEN_KEY, token);}}//init serviceMetadata attachmentsserviceMetadata.getAttachments().putAll(map);在这之前实际上都是参数的组装 这些数据就是service的元数据也就是要将这些数据组装成元数据 // export service// 最终获取到 host的地址 以及 端口号String host findConfigedHosts(protocolConfig, registryURLs, map);Integer port findConfigedPorts(protocolConfig, name, map);// 然后就是组装url了URL url new URL(name, host, port, getContextPath(protocolConfig).map(p - p / path).orElse(path), map); 这也就是我们注册到服务中心的一个地址但是还没有注册 和 发布
// You can customize Configurator to append extra parameters
// 这里就到了扩展点如果有扩展的配置需要去装载的话有的话会根据扩展的配置去比对url替换url里面的各种参数。// 当有了扩展点的知识后现在看这里就清晰很多了
/*
首先代码通过 ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) 获取 ConfiguratorFactory 的扩展加载器。然后它检查是否有针对指定 URL 协议的 ConfiguratorFactory 扩展。这里的 url.getProtocol() 可能是获取 URL 的协议部分比如 http、https 等。如果存在针对该协议的 ConfiguratorFactory 扩展代码会调用它的 getConfigurator 方法来获取一个 Configurator 实例并且使用该实例来对原始的 URL 进行配置得到新的 URL。
*/f (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}// 根据其范围来发布String scope url.getParameter(SCOPE_KEY);// 如果scope nullif (!SCOPE_NONE.equalsIgnoreCase(scope)) {// 如果其不等于远程就发送一个本地if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {exportLocal(url);-----------------------------------------------------------------------------/*
这段代码也是一个 Java 代码片段看起来是在使用 Dubbo 框架进行服务导出export的过程。让我来解释一下首先代码通过 URLBuilder.from(url) 创建了一个新的 URLBuilder 对象并将传入的 url 作为初始 URL。接着它使用 URLBuilder 的 setProtocol、setHost、setPort 方法分别设置了协议LOCAL_PROTOCOL、主机LOCALHOST_VALUE和端口0。然后通过调用 build() 方法将 URLBuilder 对象构建为一个新的 URL 对象即 local。接下来代码使用 PROXY_FACTORY.getInvoker 方法根据 ref、interfaceClass 和 local 创建了一个 Invoker 对象。 PROXY_FACTORY 可能是一个代理工厂用于创建服务的代理对象。然后通过 PROTOCOL.export 方法将该 Invoker 导出为一个 Exporter 对象。 PROTOCOL 可能是 Dubbo 的协议实现用于处理服务的导出和通信。最后代码将导出的 Exporter 对象添加到 exporters 集合中并打印日志信息记录导出的服务接口类名和本地注册的 URL。简而言之这段代码的作用是将一个服务根据指定的 URL 导出到本地注册中心以供本地消费者调用。它会构建一个新的本地 URL创建 Invoker 对象并使用 Dubbo 的协议实现导出服务最后记录日志并将 Exporter 对象添加到 exporters 集合中。*/private void exportLocal(URL url) {URL local URLBuilder.from(url).setProtocol(LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();Exporter? exporter PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);logger.info(Export dubbo service interfaceClass.getName() to local registry url : local);}
-----------------------------------------------------------------------------}// 发布远程服务if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {if (CollectionUtils.isNotEmpty(registryURLs)) {// 判断发布的服务要注册到那几个注册中心// 循环遍历配置的注册中心的列表for (URL registryURL : registryURLs) {//if protocol is only injvm ,not registerif (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {continue;}// 添加参数url url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));URL monitorUrl ConfigValidationUtils.loadMonitor(this, registryURL);if (monitorUrl ! null) {url url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {if (url.getParameter(REGISTER_KEY, true)) {logger.info(Register dubbo service interfaceClass.getName() url url to registry registryURL);} else {logger.info(Export dubbo service interfaceClass.getName() to url url);}}// For providers, this is used to enable custom proxy to generate invokerString proxy url.getParameter(PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {// registry://ip:portregistryURL registryURL.addParameter(PROXY_KEY, proxy);}
Invoker 调用器. 服务提供者、服务的消费者。 // 生成一个InvokerInvoker? invoker PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker new DelegateProviderMetaDataInvoker(invoker, this);// 将这个发布出去Exporter? exporter PROTOCOL.export(wrapperInvoker);exporters.add(exporter);}} else {if (logger.isInfoEnabled()) {logger.info(Export dubbo service interfaceClass.getName() to url url);}Invoker? invoker PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker new DelegateProviderMetaDataInvoker(invoker, this);Exporter? exporter PROTOCOL.export(wrapperInvoker);exporters.add(exporter);}/*** since 2.7.0* ServiceData Store*/WritableMetadataService metadataService WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));if (metadataService ! null) {metadataService.publishServiceDefinition(url);}}}this.urls.add(url);
}
-------------------------------------------------------------------------------
Protocol PROTOCOL ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 其得到的是一个自适应的扩展点,会动态生成Protocol$Adaptive 然后会调用这个实例里面的export方法
// 理论上应该返回 extension RegistryProtocol
// 实际返回的是这个QosProtocolWrapper(ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocolWrapper包装
在ExtensionLoader.loadClass这个方法中有一段这样的判断如果当前这个类是一个wrapper包装类也就是这个wrapper中有构造方法参数是当前被加载的扩展点的类型则把这个wrapper类加入到cacheWrapperClass缓存中。
else if (isWrapperClass(clazz)) {cacheWrapperClass(clazz);
}
private boolean isWrapperClass(Class? clazz) {try {clazz.getConstructor(type);return true;} catch (NoSuchMethodException e) {return false;}
}
//上面的判断是说只要针对当前扩展点的类如果存在一个构造方法参数是当前需要加载的扩展点的对
象那么就会进行包装
public ProtocolListenerWrapper(Protocol protocol) {if (protocol null) {throw new IllegalArgumentException(protocol null);}this.protocol protocol;
}我们可以在dubbo的配置文件中找到三个Wrapper org.apache.dubbo.rpc.Protocol 。
qosorg.apache.dubbo.qos.protocol.QosProtocolWrapper
filterorg.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listenerorg.apache.dubbo.rpc.protocol.ProtocolListenerWrapperProtocolListenerWrapper 用于服务export时候插入监听机制
QosprotocolWrapper 如果当前配置了注册中心则会启动一个Qos server.qos是dubbo的在线运维命令dubbo2.5.8新版本重构了telnet模块提供了新的telnet命令支持新版本的telnet端口与dubbo协议的端口是不同的端口默认为22222
ProtocolFilterWrapper对invoker进行filter的包装实现请求的过滤
接着在getExtension-createExtension方法中会对cacheWrapperClass集合进行判断如果集合不为空则进行包装
SetClass? wrapperClasses cachedWrapperClasses;if (CollectionUtils.isNotEmpty(wrapperClasses)) {for (Class? wrapperClass : wrapperClasses) {instance injectExtension((T)wrapperClass.getConstructor(type).newInstance(instance));}}这三个扩展点在注册场景中都不会生效执行的逻辑中会先判断当前是否是注册协议如果是则直接基 于协议去发布服务
public T ExporterT export(InvokerT invoker) throws RpcException {if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);}return protocol.export(buildInvokerChain(invoker,Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol{public void destroy() {throw new UnsupportedOperationException(The method public abstractvoid org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!);}public int getDefaultPort() {throw new UnsupportedOperationException(The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!);}public Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 null) throw new IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument null);if (arg0.getUrl() null) throw new
IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument getUrl()
null);URL url arg0.getUrl();String extName ( url.getProtocol() null ? dubbo :url.getProtocol() );if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ( url.toString() ) use keys([protocol]));Protocol extension
(org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dub
bo.rpc.Protocol.class).getExtension(extName);// 实际上调用的是动态生成的适配类中的export();return extension.export(arg0);
}public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0,
org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {if (arg1 null) throw new IllegalArgumentException(url null);
org.apache.dubbo.common.URL url arg1;String extName ( url.getProtocol() null ? dubbo :url.getProtocol() );if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url ( url.toString() ) use keys([protocol]));org.apache.dubbo.rpc.Protocol extension (org.apache.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);
}public java.util.List getServers() {throw new UnsupportedOperationException(The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!);}
}------------------------------------------------------------------------------- RegistryProtocol
public T ExporterT export(final InvokerT originInvoker) throws RpcException {URL registryUrl getRegistryUrl(originInvoker);// url to export locallyURL providerUrl getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call// the same service. Because the subscribed is cached key with the name of the service, it causes the// subscription information to cover.final URL overrideSubscribeUrl getSubscribedOverrideUrl(providerUrl);final OverrideListener overrideSubscribeListener new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);providerUrl overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker// 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务// dubbo还是基于url驱动所有每次执行都会去改变urlfinal ExporterChangeableWrapperT exporter doLocalExport(originInvoker, providerUrl);// url to registryfinal Registry registry getRegistry(originInvoker);final URL registeredProviderUrl getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publishboolean register providerUrl.getParameter(REGISTER_KEY, true);if (register) {// 然后到这里才注册了服务register(registryUrl, registeredProviderUrl);}// register stated url on provider modelregisterStatedUrl(registryUrl, registeredProviderUrl, register);exporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);// Deprecated! Subscribe to override rules in 2.6.x or before.registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);notifyExport(exporter);//Ensure that a new exporter instance is returned every time exportreturn new DestroyableExporter(exporter);}服务发布流程
doLocalExport
服务启动过程
private T ExporterChangeableWrapperT doLocalExport(final InvokerT originInvoker, URL providerUrl) {String key getCacheKey(originInvoker);return (ExporterChangeableWrapperT) bounds.computeIfAbsent(key, s - {Invoker? invokerDelegate new InvokerDelegate(originInvoker, providerUrl);return new ExporterChangeableWrapper((ExporterT) protocol.export(invokerDelegate), originInvoker);});}protocol.export
protocol是通过依赖注入来初始化的一个协议扩展点并且我们可以看到这个protocol.export()方法上 增加了Adaptive注解表示它是一个动态适配的扩展点意味着最终的执行链路应该是
ProtocolListenerWrapper -QosProtocolWrapper -ProtocolFilterWrapper-DubboProtocol 所以这里又回到了自适应扩展
如果 ProviderUrl: dubbo:// 是这样那么就会选择 DubboProtocol
所以最终会去调用 DubboProtocol.export()
public T ExporterT export(InvokerT invoker) throws RpcException {URL url invoker.getUrl();// export service.String key serviceKey(url);DubboExporterT exporter new DubboExporterT(invoker, key, exporterMap);// 把服务对应的invoker 存储将来调用的时候从map中拿到即可exporterMap.put(key, exporter);//export an stub service for dispatching eventBoolean isStubSupportEvent url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);Boolean isCallbackservice url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent !isCallbackservice) {String stubServiceMethods url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods null || stubServiceMethods.length() 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException(consumer [ url.getParameter(INTERFACE_KEY) ], has set stubproxy support event ,but no stub methods founded.));}}}// 开启一个服务openServer(url);// 优化序列化optimizeSerialization(url);return exporter;}openServer
往下看这个过程进入到openServer()从名字来看它是用来开启一个服务。
去开启一个服务并且放入到缓存中-在同一台机器上单网卡同一个端口上仅允许启动一个服务器实例。
private void openServer(URL url) {// 获取 host:port并将其作为服务器实例的 key用于标识当前的服务器实例String key url.getAddress();//client 也可以暴露一个只有server可以调用的服务boolean isServer url.getParameter(IS_SERVER_KEY, true);if (isServer) {//是否在serverMap中缓存了ProtocolServer server serverMap.get(key);if (server null) {synchronized (this) {server serverMap.get(key);if (server null) {// 创建服务器实例serverMap.put(key, createServer(url));}}} else {// 服务器已创建则根据 url 中的配置重置服务器server.reset(url);}}}createServer
创建服务. 在很多地方这个地址一直伴随着dubbo的启动、消费、以及整个生命周期中。
private ProtocolServer createServer(URL url) {//组装url在url中添加心跳时间、编解码参数url URLBuilder.from(url)// 当服务关闭以后发送一个只读的事件默认是开启状态.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// 启动心跳配置.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME).build();String str url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);//通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展不存在则抛出异常if (str ! null str.length() 0 !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {throw new RpcException(Unsupported server type: str , url: url);}//创建ExchangeServer.ExchangeServer server;try {server Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException(Fail to start server(url: url ) e.getMessage(), e);}str url.getParameter(CLIENT_KEY);if (str ! null str.length() 0) {SetString supportedTypes ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();if (!supportedTypes.contains(str)) {throw new RpcException(Unsupported client type: str);}}return new DubboProtocolServer(server);}Exchangers.bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url null) {throw new IllegalArgumentException(url null);}if (handler null) {throw new IllegalArgumentException(handler null);}//获取 Exchanger默认为 HeaderExchanger。//调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例url url.addParameterIfAbsent(Constants.CODEC_KEY, exchange);return getExchanger(url).bind(url, handler);}headerExchanger.bind
这里面包含多个逻辑:
new DecodeHandler(new HeaderExchangeHandler(handler))Transporters.bindnew HeaderExchangeServer
这段代码是一个 Java 方法其作用是将一个 ExchangeHandler 对象绑定到某个 URL 上以创建一个 ExchangeServer 对象并返回。ExchangeServer 是一个 Dubbo 框架中的概念表示一个 Dubbo 服务提供方所使用的网络通信服务器。
具体来说这个方法接收两个参数
URL url表示要绑定到的目标 URL。这个 URL 包含了一些必要的信息比如主机名、端口号、协议类型等。ExchangeHandler handler表示要绑定的 ExchangeHandler 对象它是 Dubbo 框架中的核心组件之一负责处理网络通信协议的编解码、消息的序列化和反序列化等工作。
在实现中这个方法首先通过 Transporters.bind() 方法创建一个网络服务器该方法会根据传入的 URL 中指定的协议类型比如 TCP、HTTP 等创建对应的网络服务器。然后将这个新创建的服务器对象传递给一个 DecodeHandler 对象进行进一步的处理。DecodeHandler 是 Dubbo 框架中的一个组件用于对网络数据进行解码和转换。最后将这个 DecodeHandler 对象封装在一个 HeaderExchangeHandler 对象中完成 Dubbo 协议头的添加和解析工作。HeaderExchangeHandler 是 Dubbo 框架中的一个核心组件负责处理 Dubbo 协议头的生成和解析。
最终这个方法会返回一个新的 HeaderExchangeServer 对象该对象封装了之前创建的网络服务器以及协议头解析器等组件。这个 HeaderExchangeServer 对象可以被用来启动 Dubbo 服务提供方的网络通信服务。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url null) {throw new IllegalArgumentException(url null);}if (handlers null || handlers.length 0) {throw new IllegalArgumentException(handlers null);}ChannelHandler handler;if (handlers.length 1) {handler handlers[0];} else {handler new ChannelHandlerDispatcher(handlers);}return getTransporter().bind(url, handler);}getTransporter
getTransporter是一个自适应扩展点它针对bind方法添加了自适应注解意味着bing方法的具体实现会基于Transporter$Adaptive方法进行适配那么在这里面默认的通信协议是netty所以它会采用netty4的实现也就是 org.apache.dubbo.remoting.transport.netty4.NettyTransporter。
public static Transporter getTransporter() { return
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}NettyTransporter.bind
创建一个nettyserver
public Server bind(URL url, ChannelHandler listener) throws RemotingException {return new NettyServer(url, listener);
}NettyServer
初始化一个nettyserver并且从url中获得相应的ip/ port。然后调用 doOpen();
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));}public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);localAddress getUrl().toInetSocketAddress();// 获取 ip 和端口String bindIp getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp ANYHOST_VALUE;}bindAddress new InetSocketAddress(bindIp, bindPort);this.accepts url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);this.idleTimeout url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);try {doOpen(); // 调用模板方法 doOpen 启动服务器if (logger.isInfoEnabled()) {logger.info(Start getClass().getSimpleName() bind getBindAddress() , export getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, Failed to bind getClass().getSimpleName() on getLocalAddress() , cause: t.getMessage(), t);}executor executorRepository.createExecutorIfAbsent(url);}doOpen
开启netty服务
总体来说这段代码的主要功能是使用 Netty 框架创建一个服务器并进行一系列的初始化设置包括线程池的创建、网络通道的初始化、服务器选项的设置以及管道工厂的创建等。最终将服务器绑定到指定地址上准备接受客户端的连接请求。
// doOpen() 方法是一个受保护的方法用于在子类中被调用以进行服务器的打开操作。
protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory(); // 这行代码用于设置 Netty 框架的日志工厂以便配置日志记录器。ExecutorService boss Executors.newCachedThreadPool(new NamedThreadFactory(NettyServerBoss, true)); // 创建一个用于处理接受连接的线程池。ExecutorService worker Executors.newCachedThreadPool(new NamedThreadFactory(NettyServerWorker, true)); // 创建一个用于处理网络 IO 事件的线程池。// 使用 NIO 方式创建 ServerSocketChannel 工厂其中包含了 boss 线程池和 worker 线程池。ChannelFactory channelFactory new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 使用上一步创建的 ChannelFactory 初始化 ServerBootstrap 对象。bootstrap new ServerBootstrap(channelFactory);// NettyHandler 负责处理 Netty 的网络事件这里创建了一个 NettyHandler 对象并获取其通道列表。final NettyHandler nettyHandler new NettyHandler(getUrl(), this);channels nettyHandler.getChannels();/*设置服务器选项设置 child.tcpNoDelay 为 true表示禁用 Nagle 算法即数据立即发送。
设置 backlog指定了未完成连接队列的最大长度。*/bootstrap.setOption(child.tcpNoDelay, true);bootstrap.setOption(backlog, getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));bootstrap.setPipelineFactory(new ChannelPipelineFactory() {Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline Channels.pipeline();pipeline.addLast(decoder, adapter.getDecoder());pipeline.addLast(encoder, adapter.getEncoder());pipeline.addLast(handler, nettyHandler);return pipeline;}});// bind// 使用 ServerBootstrap 绑定到指定的地址并返回一个 ChannelFuture 对象。channel bootstrap.bind(getBindAddress());}然后需要要注意的是它这里用到了一个handler来处理客户端传递过来的请求:
nettyServerHandler
NettyServerHandler nettyServerHandler new NettyServerHandler(getUrl(), this);这个handler是一个链路它的正确组成应该是
MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(DubboProtocol))))
后续接收到的请求会一层一层的处理。比较繁琐
服务注册流程
RegistryProtocol
public T ExporterT export(final InvokerT originInvoker) throws RpcException {URL registryUrl getRegistryUrl(originInvoker);// url to export locallyURL providerUrl getProviderUrl(originInvoker);......// 到这里才是真正意义上启动一个Netty Server,发布Dubbo协议的服务// dubbo还是基于url驱动所有每次执行都会去改变urlfinal ExporterChangeableWrapperT exporter doLocalExport(originInvoker, providerUrl);......if (register) {// 然后到这里才注册了服务register(registryUrl, registeredProviderUrl);}......}了解了服务的发布之后我们继续来看一下服务是如何发起注册的。
服务注册实际上就是把dubbo的协议url地址保存到第三方注册中心上。
private void register(URL registryUrl, URL registeredProviderUrl) {Registry registry registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);}getRegistry
把url转化为对应配置的注册中心的具体协议根据具体协议从registryFactory中获得指定的注册中心实现
那么这个registryFactory具体是怎么赋值的呢
private Registry getRegistry(final Invoker? originInvoker) {//把url转化为配置的具体协议比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现URL registryUrl getRegistryUrl(originInvoker);return registryFactory.getRegistry(registryUrl);
}在RegistryProtocol中存在一段这样的代码很明显这是通过依赖注入来实现的扩展点。
private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {this.registryFactory registryFactory;
}按照扩展点的加载规则我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。
dubboorg.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicastorg.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeperorg.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redisorg.apache.dubbo.registry.redis.RedisRegistryFactory
consulorg.apache.dubbo.registry.consul.ConsulRegistryFactoryetcd3org.apache.dubbo.registry.etcd.EtcdRegistryFactory接着找到RegistryFactory的实现, 发现它里面有一个自适应的方法根据url中protocol传入的值进行适配
SPI(dubbo)
public interface RegistryFactory {Adaptive({protocol})Registry getRegistry(URL url);RegistryFactory$Adaptive
由于在前面的代码中url中的protocol已经改成了zookeeper那么这个时候根据zookeeper获得的spi扩展点应该是RegistryFactoryWrapper
import org.apache.dubbo.common.extension.ExtensionLoader;public class RegistryFactory$Adaptive implements
org.apache.dubbo.registry.RegistryFactory {public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {if (arg0 null) throw new IllegalArgumentException(url null);
org.apache.dubbo.common.URL url arg0;String extName ( url.getProtocol() null ? dubbo :
url.getProtocol() );if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url ( url.toString() ) use keys([protocol]));org.apache.dubbo.registry.RegistryFactory extension
(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);return extension.getRegistry(arg0);}
}RegistryFactoryWrapper
而registryFactory.getRegistry(url)中由于此时的registryFactory已经是ZookeeperRegistryFactory所以这里会得到一个zookeeperRegistry。
public class RegistryFactoryWrapper implements RegistryFactory {private RegistryFactory registryFactory;public RegistryFactoryWrapper(RegistryFactory registryFactory) {this.registryFactory registryFactory;}Overridepublic Registry getRegistry(URL url) {return new ListenerRegistryWrapper(registryFactory.getRegistry(url),Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class).getActivateExtension(url, registry.listeners)));}
}因此最终返回的RegistryListenerRegistryWrapper。
下面这段代码的含义是
获得注册的服务提供者地址调用register发起注册
final URL registeredProviderUrl getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publish
boolean register providerUrl.getParameter(REGISTER_KEY, true);if (register) {register(registryUrl, registeredProviderUrl);}RegistryProtocol.register
发起注册流程registry对象的实例是ListenerRegistryWrapper。所以调用这个对象的register方法。
private void register(URL registryUrl, URL registeredProviderUrl) {Registry registry registryFactory.getRegistry(registryUrl);registry.register(registeredProviderUrl);
}ListenerRegistryWrapper.register
这里做包装的目的其实应该就是增加了一个监听器的处理过程。
Overridepublic void register(URL url) {try {registry.register(url);} finally {if (CollectionUtils.isNotEmpty(listeners)) {RuntimeException exception null;for (RegistryServiceListener listener : listeners) {if (listener ! null) {try {listener.onRegister(url);} catch (RuntimeException t) {logger.error(t.getMessage(), t);exception t;}}}if (exception ! null) {throw exception;}}}}ZookeeperRegistry
这个方法中并没有register方法而ZookeeperRegsitry继承了FailbackRegistry所以直接进入到FailbackRegistry这个类
FailbackRegistry从名字上来看是一个失败重试机制调用父类的register方法讲当前url添加到缓存集合中
public void register(URL url) {if (!acceptable(url)) {logger.info(URL url will not be registered to Registry. Registry url does not accept service of this protocol type.);return;}super.register(url);removeFailedRegistered(url);removeFailedUnregistered(url);try {// Sending a registration request to the server sidedoRegister(url);} catch (Exception e) {Throwable t e;// If the startup detection is opened, the Exception is thrown directly.boolean check getUrl().getParameter(Constants.CHECK_KEY, true) url.getParameter(Constants.CHECK_KEY, true) !CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t t.getCause();}throw new IllegalStateException(Failed to register url to registry getUrl().getAddress() , cause: t.getMessage(), t);} else {logger.error(Failed to register url , waiting for retry, cause: t.getMessage(), t);}// Record a failed registration request to a failed list, retry regularlyaddFailedRegistered(url);}}ZookeeperRegistry.doRegister
通过curator客户端把服务地址写入到注册中心。
Override
public void doRegister(URL url) {try {zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException(Failed to register url to zookeeper getUrl() , cause: e.getMessage(), e);}
}Invoker是什么
Invoker翻译成中文是调用器它在Dubbo中其实是一个比较重要的领域对象最核心的是在服务的发布和调用中都是以Invoker的形态存在。
在刚刚的服务发布过程中整体分为三个阶段
第一个阶段会创造一个invoker第二个阶段会把经历过一系列处理的invoker各种包装在DubboProtocol中保存到exporterMap中第三个阶段把dubbo协议的url地址注册到注册中心上
而Invoker的作用就是收到客户端请求的时候根据接口的全路径作为key找到实例方法然后通过反射去调用。
前面没有分析Invoker我们来简单看看Invoker到底是一个啥东西。
Invoker? invoker PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));ProxyFacotory.getInvoker
这个是一个代理工程用来生成invoker从它的定义来看它是一个自适应扩展点看到这样的扩展点我们几乎可以不假思索的想到它会存在一个动态适配器类
ProxyFactory proxyFactory
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();ProxyFactory
这个方法的简单解读为 它是一个spi扩展点并且默认的扩展实现是javassit, 这个接口中有三个方法并且都是加了Adaptive的自适应扩展点。所以如果调用getInvoker方法应该会返回一个ProxyFactory$Adaptive
SPI(javassist)
public interface ProxyFactory {Adaptive({Constants.PROXY_KEY})T T getProxy(InvokerT invoker) throws RpcException;Adaptive({Constants.PROXY_KEY})T T getProxy(InvokerT invoker, boolean generic) throws RpcException;Adaptive({Constants.PROXY_KEY})T InvokerT getInvoker(T proxy, ClassT type, URL url) throws RpcException;
ProxyFactory$Adaptive
这个自适应扩展点做了两件事情
通过ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点。在dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中定义了javassisJavassisProxyFactory调用JavassisProxyFactory的getInvoker方法
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory
{public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws
org.apache.dubbo.rpc.RpcException {if (arg0 null) throw new
IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument null);if (arg0.getUrl() null) throw new
IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument getUrl() null);org.apache.dubbo.common.URL url arg0.getUrl();String extName url.getParameter(proxy, javassist);if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url ( url.toString() ) use keys([proxy]));org.apache.dubbo.rpc.ProxyFactory extension
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getProxy(arg0);}public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean
arg1) throws org.apache.dubbo.rpc.RpcException {if (arg0 null) throw new
IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument null);if (arg0.getUrl() null) throw new
IllegalArgumentException(org.apache.dubbo.rpc.Invoker argument getUrl() null);org.apache.dubbo.common.URL url arg0.getUrl();String extName url.getParameter(proxy, javassist);if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url ( url.toString() ) use keys([proxy]));org.apache.dubbo.rpc.ProxyFactory extension
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getProxy(arg0, arg1);
}public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0,
java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws
org.apache.dubbo.rpc.RpcException {if (arg2 null) throw new IllegalArgumentException(url null);org.apache.dubbo.common.URL url arg2;String extName url.getParameter(proxy, javassist);if(extName null) throw new IllegalStateException(Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url ( url.toString() ) use keys([proxy]));org.apache.dubbo.rpc.ProxyFactory extension
(org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache
.dubbo.rpc.ProxyFactory.class).getExtension(extName);return extension.getInvoker(arg0, arg1, arg2);}
}JavassistProxyFactory.getInvoker
javassist是一个动态类库用来实现动态代理的。
proxy:接口的实现: com.gupaoedu.practice.dubbo.SayHelloServiceImpl
type:接口全称 com.gupaoedu.dubbo.ISayHelloService
Override
public T InvokerT getInvoker(T proxy, ClassT type, URL url) {final Wrapper wrapper Wrapper.getWrapper(proxy.getClass().getName().indexOf($) 0 ? proxy.getClass() : type);return new AbstractProxyInvokerT(proxy, type, url) {Overrideprotected Object doInvoke(T proxy, String methodName,Class?[] parameterTypes,
Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);}};
}javassist生成的动态代理代码
通过断点的方式在Wrapper.getWrapper中的makeWrapper会创建一个动态代理核心的方法invokeMethod代码如下
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws
java.lang.reflect.InvocationTargetException {com.gupaoedu.dubbo.practice.ISayHelloService w;try {w ((com.gupaoedu.dubbo.practice.ISayHelloService) $1);} catch (Throwable e) {throw new IllegalArgumentException(e);}try {if (sayHello.equals($2) $3.length 1) {return ($w) w.sayHello((java.lang.String) $4[0]);}} catch (Throwable e) {throw new java.lang.reflect.InvocationTargetException(e);}throw new org.apache.dubbo.common.bytecode.NoSuchMethodException(Not
found method \ $2 \ in class com.gupaoedu.dubbo.practice.ISayHelloService.);
}
构建好了代理类之后返回一个AbstractproxyInvoker,并且它实现了doInvoke方法这个地方似乎看到了dubbo消费者调用过来的时候触发的影子因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod.
return new AbstractProxyInvokerT(proxy, type, url) {Overrideprotected Object doInvoke(T proxy, String methodName,Class?[] parameterTypes,
Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);}
};所以简单总结一下Invoke本质上应该是一个代理经过层层包装最终进行了发布。当消费者发起请求的时候会获得这个invoker进行调用。
最终发布出去的invoker, 也不是单纯的一个代理也是经过多层包装
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))