Motan - 服务注册中心
RegistryService
接口: 注册、设置 URL 的可用DiscoveryService
接口: 订阅 URLRegistry
接口: 一个 URL 就是一个Registry
1 | public interface Registry |
RegistryFactory
接口: 根据 URL 创建Registry
Motan - 代理
ProxyFactory
接口: 创建某个类的代理
关注实现类 RefererInvocationHandler
的 invoke
方法:
1 | public Object invoke(Object proxy, Method method, Object[] args) |
Motan - 数据传输
异步请求
在源码编译阶段,让编译器创建一个类名为 {interfaceName}Async
,同时所有方法以 Async
结尾的生成类,并默认放在目录 target/generated-sources/annotations/
下面:
1 |
|
传输
Transport
接口: 将byte[]
请求从这头发送到那一头,并返回byte[]
Channel
接口: 将Request
请求从这头发送到另一头,并返回Response
Endpoint
接口: 继承Channel
Clent
接口: 继承Endpoint
,异步发送心跳包请求Server
接口: 继承Endpoint
,可以得到所有的Channels
MessageHandler
接口: 处理一个Channel
上的message
,并返回Object
类型EndpointFactory
接口: 可以创建Server
和Client
EndpointManager
接口: 可以初始化/销毁,也可以添加/移除Endpoint
HeartbeatFactory
接口: 创建心跳包请求,对MessageHandler
进行包装
下面看 AbstractClient
的实现:
1 | public abstract class AbstractClient |
AbstractServer
的实现和 AbstractClient
的实现大同小异,不再赘述。
AbstractPoolClient
的实现,实际上内部使用了 org.apache.commons.pool.impl.GenericObjectPool
作为默认对象池的实现,其可以支持如下机制:
- 设置最小空闲对象个数
- 设置最大空闲对象个数
- 设置最大活跃对象个数
- 设置最大等待对象个数
- 设置以 LIFO 或 FIFO 机制来借对象
- LIFO: 借对象的时候会返回上次归还的
idle
对象 - FIFO: 借对象的时候会返回很久未使用过的
idle
对象
- LIFO: 借对象的时候会返回上次归还的
- 如果非延迟初始化,则会添加最小空闲个数量的对象
- 借对象
- 使对象失效
- 还对象
DefaultRpcHeartbeatFactory
的实现如下:
1 | public class DefaultRpcHeartbeatFactory |
在 HeartbeatClientEndpointManager
中有一个核心数据结构:
1 | private ConcurrentMap<Client, HeartbeatFactory> endpoints; |
然后在构造器中又开启了一个 ScheduledExecutorService
服务,每隔 500 ms 检查一下 Client
是否联通:
1 | // 如果节点是存活状态,那么没必要走心跳 |
Motan - 集群
负载均衡
LoadBalance<T>
接口: 对请求选择Referer
AbstractLoadBalance<T>
抽象类: 实现了LoadBalance<T>
接口,主要对可用的Referer<T>
在数量上进行了一些检查,并交由子类来实现doSelect
方法
Motan 主要提供了如下几种负载均衡算法,所有算法均继承自 AbstractLoadBalance<T>
:
ActiveWeightLoadBalance<T>
: 低并发优化负载均衡
- 随机抽取 10 台服务器,筛选出
activeRefererCount
值最小的可用的Referer
ConsistentHashLoadBalance<T>
: 一致性 Hash
生成一致性 Hash
数组算法:
- 把所有
referers
全部打乱 - 每打乱一次,就放进数组里面,
- 上述两步骤重复 1000 次
- 然后最终使
consistentHashReferers
指向这个数组
Hash
算法的生成:
- 无参数,就基于
Request
对象本身生成hashCode
- 有参数,就使用
Arrays.hashCode(request.getArguments())
来生成hashCode
- 最后,使用
0x7fffffff & originValue
来得到最终的hash
值
请求选择 Referer
:
- 遍历所有的
referers
- 然后进行下面的取余操作
1 | ref = consistentHashReferers.get((hash + i) % consistentHashReferers.size()); |
- 如果这个
ref
可用,那么直接返回
LocalFirstLoadBalance<T>
: 本地服务优先
当 referers
里面包含本地暴露的服务时,并此服务为 available
的情况下,优先使用此服务。当不存在本地暴露的服务时,默认使用低并发 ActiveWeight
负载均衡策略。
对 referers
根据 ip
顺序查找本地服务,多存在多个本地服务,获取 Active
最小的本地服务进行服务。当不存在本地服务,但是存在远程 RPC
服务,则根据 ActivWeight
获取远程 RPC
服务,当两者都存在,所有本地服务都应优先于远程服务,本地 RPC
服务与远程 RPC
服务内部则根据 ActiveWeight
进行.
获取本地 IP 算法: 检查缓存 -> 检查 Hostname
-> 根据 Socket
-> 根据网口
1 | IP 得到本地IP() { |
找到本地 IP 后,会将 Referres
里面所有等于这个 IP
的 Referer
筛选出来,然后从这几个 Referer
中找到 Referer.activeRefererCount()
数量最少的的 Referer
RandomLoadBalance<T>
: 随机负载均衡
随机从 referers
中挑选一个可用的 Referer
返回
RoundRobinLoadBalance<T>
: 轮询负载均衡
在 referers
上轮询可用的 Referer
,其中获取下一个索引的算法如下:
1 | private AtomicInteger idx = new AtomicInteger(0); |
ConfigurableWeightLoadBalance<T>
: 权重可配置的负载均衡
TODO
HaStrategy: 高可用策略
HaStrategy<T>
接口定义如下:
1 | public interface HaStrategy<T> { |
AbstractHaStrategy<T>
抽象类主要是定义了一个 URL
字段,目前主要提供两种策略:
FailfastHaStrategy<T>
: 快速失败策略
1 |
|
FailoverHaStrategy<T>
: 失败重试策略
先使用负载均衡筛选出一批符合策略的 referers
,然后轮询重试:
1 | for (int i = 0; i <= tryCount; i++) { |
Motan - 序列化
实现 Object
和 byte[]
之间的相互转化:
FactJson
序列化Hessian2
序列化
Motan - 服务开关
SwitchListener
接口: 通知服务开/关了- Switcher: 代表一个开关
两个核心数据结构:
1 | private static ConcurrentMap<String, Switcher> switchers; |
每次 setValue
的时候,都会遍历这个开关上的所有 SwitchListener
来通知值变了:
1 | public void setValue(String switcherName, boolean value) { |
SPI 扩展
DefaultThreadFactory
: 默认线程工厂Spi
注解:
1 |
|
作用域可以设置为:
Scope.SINGLETON
: 单例模式Scope.PROTOTYPE
: 多例模式
SpiMeta
注解: Spi 元信息Activation
注解: 用来对 Spi 的实现根据条件进行过滤、排序等
1 |
|
ActivationComparator
: 比较器ExtensionLoader<T>
: 自己实现的ServiceProvider
构造器,需要传入想要实例化的接口,如 com.weibo.api.motan.cluster.HaStrategy.class
:
1 | private ExtensionLoader(Class<T> type) { |
从资源文件夹加载 URL
:
1 | String fullName = "META-INF/services/" + type.getName(); |
然后解析文件中的所有声明的实现类:
1 | while ((line = reader.readLine()) != null) { |
解析完类之后,装载类:
1 | clz = (Class<T>) Class.forName(className, true, classLoader); |
这里的 true
参数代表所有位于这个类中的所有静态块都被初始化。
接着检查刚装载的类是否符合实现要求:
1 | // check is class public |
获取单例:
1 | Spi spi = type.getAnnotation(Spi.class); |
获取 PROTOTYPE 类:
1 | Class<T> clz = extensionClasses.get(name); |
获取所有扩展:
1 | List<T> exts = new ArrayList<T>(extensionClasses.size()); |
Motan - Config 配置
AbstractConfig
:
从像下面的这样的方法中提取 key
以及执行方法获取 value
:
1 | public boolean isXXX() { |
ProtocolConfig
:
属性 | 值 |
---|---|
name | 服务协议 |
serialization | 序列化方式 |
codec | 协议编码 |
iothreads | IO 线程池大小 |
requestTimeout | 请求超时 |
minClientConnection | 最小连接数 |
maxClientConnection | 最大连接数 |
minWorkerThread | 最小工作池线程数 |
maxWorkerThread | 最大工作池线程数 |
maxContentLength | 请求响应包的最大长度限制 |
maxServerConnection | Server 支持的最大连接数 |
poolLifo | 连接池管理方式,是否 LIFO |
lazyInit | 是否延迟 |
endpointFactory | endpointFactory |
cluster | 采用哪种 cluster 的实现 |
loadbalance | 负载均衡方式 |
haStrategy | high available strategy |
workerQueueSize | server worker queue size |
server accept connections count | acceptConnections |
proxy | proxy type, like jdk or javassist |
filter | filter, 多个filter用”,”分割,blank string 表示采用默认的filter配置 |
retries | retry count if call failure |
async | if the request is called async, a taskFuture result will be sent back |
isDefault | 是否缺省配置 |
Map |
扩展参数 |
Motan - Spring 扩展
先简单介绍一下几个扩展 Spring
的重要接口:
InitializingBean
1 | public interface InitializingBean { |
当 Bean
的所有属性均被设置完的时候,此方法会被调用
DisposableBean
1 | public interface DisposableBean { |
当 Spring
容器释放这个 Bean
的时候,次方法会被调用
BeanFactoryAware
Interface to be implemented by beans that wish to be aware of their owning BeanFactory
.
1 | public interface BeanFactoryAware extends Aware { |
BeanPostProcessor
The BeanPostProcessor
interface defines callback methods that you can implement to provide your own (or override the container’s default) instantiation logic, dependency-resolution logic, and so forth. If you want to implement some custom logic after the Spring container finishes instantiating, configuring, and initializing a bean, you can plug in one or more BeanPostProcessor implementations.
Motan - log
LogService
: 常见的 Log 方法
默认实现 DefaultLogService
采用的是: org.slf4j.Logger