Motan

Motan - 服务注册中心

  • RegistryService 接口: 注册、设置 URL 的可用
  • DiscoveryService 接口: 订阅 URL
  • Registry 接口: 一个 URL 就是一个 Registry
1
2
3
4
public interface Registry
extends RegistryService, DiscoveryService {
URL getUrl();
}
  • RegistryFactory 接口: 根据 URL 创建 Registry

Motan - 代理

  • ProxyFactory 接口: 创建某个类的代理

关注实现类 RefererInvocationHandlerinvoke 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {

if ( 是本地方法( method ) ) {
throw new Exception();
}

boolean async = false;
if ( method 以 'Async' 结尾 ||
method 返回 ResponseFuture.class ) {
async = true;
}

Request 请求 = new Request( 请求 Id );

for ( 集群: 所有集群 ) {
if ( 集群关了 )
continue;

try {
Response r = 集群.call( 请求 );
if ( async )
return response;
else
return response.value;
} catch( RuntimeException e ) {
if ( 业务方逻辑抛出的异常 )
throw e;
}
}
}

Motan - 数据传输

异步请求

在源码编译阶段,让编译器创建一个类名为 {interfaceName}Async ,同时所有方法以 Async 结尾的生成类,并默认放在目录 target/generated-sources/annotations/ 下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean process(Set<? extends TypeElement> annotations,
RoundEnvironment roundEnv) {

for ( Element c: 所有注解了 MotanAsync.class 的类) {
if ( c 是接口 ) {



} else {
throw new Exception();
}
}

}

传输

  • Transport 接口: 将 byte[] 请求从这头发送到那一头,并返回 byte[]
  • Channel 接口: 将 Request 请求从这头发送到另一头,并返回 Response
  • Endpoint 接口: 继承 Channel
  • Clent 接口: 继承 Endpoint,异步发送心跳包请求
  • Server 接口: 继承 Endpoint,可以得到所有的 Channels
  • MessageHandler 接口: 处理一个 Channel 上的 message,并返回 Object 类型
  • EndpointFactory 接口: 可以创建 ServerClient
  • EndpointManager 接口: 可以初始化/销毁,也可以添加/移除 Endpoint
  • HeartbeatFactory 接口: 创建心跳包请求,对 MessageHandler 进行包装

下面看 AbstractClient 的实现:

1
2
3
4
5
6
7
8
public abstract class AbstractClient
implements Client {

protected InetSocketAddress localAddress;
protected InetSocketAddress remoteAddress;
protected volatile ChannelState state = '未初始化状态';

}

AbstractServer 的实现和 AbstractClient 的实现大同小异,不再赘述。

AbstractPoolClient 的实现,实际上内部使用了 org.apache.commons.pool.impl.GenericObjectPool 作为默认对象池的实现,其可以支持如下机制:

  • 设置最小空闲对象个数
  • 设置最大空闲对象个数
  • 设置最大活跃对象个数
  • 设置最大等待对象个数
  • 设置以 LIFO 或 FIFO 机制来借对象
    • LIFO: 借对象的时候会返回上次归还的 idle 对象
    • FIFO: 借对象的时候会返回很久未使用过的 idle 对象
  • 如果非延迟初始化,则会添加最小空闲个数量的对象
  • 借对象
  • 使对象失效
  • 还对象

DefaultRpcHeartbeatFactory 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class DefaultRpcHeartbeatFactory
implements HeartbeatFactory {

@Override
public Request createRequest() {
Request request = new Request();
request.接口名("com.weibo.api.motan.rpc.heartbeat");
request.方法名("heartbeat");
request.参数("void");
return request;
}

@Override
public MessageHandler wrapMessageHandler(MessageHandler handler) {
return new HeartMessageHandleWrapper(handler);
}

private class HeartMessageHandleWrapper
implements MessageHandler {

@Override
public Object handle(Channel channel, Object message) {
// 根据接口名、方法名、参数来判断是否是心跳包请求
if ( 是心跳包请求(message) ) {
DefaultResponse response = new DefaultResponse();
response.setValue("heartbeat");
return response;
}

return messageHandler.handle(channel, message);
}

}

}

HeartbeatClientEndpointManager 中有一个核心数据结构:

1
private ConcurrentMap<Client, HeartbeatFactory> endpoints;

然后在构造器中又开启了一个 ScheduledExecutorService 服务,每隔 500 ms 检查一下 Client 是否联通:

1
2
3
4
5
6
7
// 如果节点是存活状态,那么没必要走心跳
if (endpoint.isAvailable()) {
continue;
}

HeartbeatFactory factory = entry.getValue();
endpoint.heartbeat(factory.createRequest());

Motan - 集群

负载均衡

  • LoadBalance<T> 接口: 对请求选择 Referer
  • AbstractLoadBalance<T> 抽象类: 实现了 LoadBalance<T> 接口,主要对可用的 Referer<T> 在数量上进行了一些检查,并交由子类来实现 doSelect 方法

Motan 主要提供了如下几种负载均衡算法,所有算法均继承自 AbstractLoadBalance<T>:

ActiveWeightLoadBalance<T>: 低并发优化负载均衡

  • 随机抽取 10 台服务器,筛选出 activeRefererCount 值最小的可用的 Referer

ConsistentHashLoadBalance<T>: 一致性 Hash

生成一致性 Hash 数组算法:

  • 把所有 referers 全部打乱
  • 每打乱一次,就放进数组里面,
  • 上述两步骤重复 1000 次
  • 然后最终使 consistentHashReferers 指向这个数组

Hash 算法的生成:

  • 无参数,就基于 Request 对象本身生成 hashCode
  • 有参数,就使用 Arrays.hashCode(request.getArguments()) 来生成 hashCode
  • 最后,使用 0x7fffffff & originValue 来得到最终的 hash

请求选择 Referer:

  • 遍历所有的 referers
  • 然后进行下面的取余操作
1
ref = consistentHashReferers.get((hash + i) % consistentHashReferers.size());
  • 如果这个 ref 可用,那么直接返回

LocalFirstLoadBalance<T>: 本地服务优先

referers 里面包含本地暴露的服务时,并此服务为 available 的情况下,优先使用此服务。当不存在本地暴露的服务时,默认使用低并发 ActiveWeight 负载均衡策略。

referers 根据 ip 顺序查找本地服务,多存在多个本地服务,获取 Active 最小的本地服务进行服务。当不存在本地服务,但是存在远程 RPC 服务,则根据 ActivWeight 获取远程 RPC 服务,当两者都存在,所有本地服务都应优先于远程服务,本地 RPC 服务与远程 RPC 服务内部则根据 ActiveWeight 进行.

获取本地 IP 算法: 检查缓存 -> 检查 Hostname -> 根据 Socket -> 根据网口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
IP 得到本地IP() {
if (本地缓存了)
return 缓存的;

if (InetAddress.getLocalHost())
return InetAddress.getLocalHost();

if (创建 Socket 连接指定目标地址)
return socket.getLocalAddress();

if (NetworkInterface.getNetworkInterfaces())
return 轮询到的 IP;

return null;
}

找到本地 IP 后,会将 Referres 里面所有等于这个 IPReferer 筛选出来,然后从这几个 Referer 中找到 Referer.activeRefererCount() 数量最少的的 Referer

RandomLoadBalance<T>: 随机负载均衡

随机从 referers 中挑选一个可用的 Referer 返回

RoundRobinLoadBalance<T>: 轮询负载均衡

referers 上轮询可用的 Referer,其中获取下一个索引的算法如下:

1
2
3
4
private AtomicInteger idx = new AtomicInteger(0);
private int getNextPositive() {
return MathUtil.getPositive(idx.incrementAndGet());
}

ConfigurableWeightLoadBalance<T>: 权重可配置的负载均衡

TODO

HaStrategy: 高可用策略

HaStrategy<T> 接口定义如下:

1
2
3
4
public interface HaStrategy<T> {
void setUrl(URL url);
Response call(Request request, LoadBalance<T> loadBalance);
}

AbstractHaStrategy<T> 抽象类主要是定义了一个 URL 字段,目前主要提供两种策略:

  • FailfastHaStrategy<T>: 快速失败策略
1
2
3
4
5
@Override
public Response call(Request request, LoadBalance<T> loadBalance) {
Referer<T> refer = loadBalance.select(request);
return refer.call(request);
}
  • FailoverHaStrategy<T>: 失败重试策略

先使用负载均衡筛选出一批符合策略的 referers,然后轮询重试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (int i = 0; i <= tryCount; i++) {
Referer<T> refer = referers.get(i % referers.size());
try {
request.setRetries(i);
return refer.call(request);
} catch (RuntimeException e) {
// 对于业务异常,直接抛出
if (ExceptionUtil.isBizException(e)) {
throw e;
} else if (i >= tryCount) {
throw e;
}
}
}

Motan - 序列化

实现 Objectbyte[] 之间的相互转化:

  • FactJson 序列化
  • Hessian2 序列化

Motan - 服务开关

  • SwitchListener 接口: 通知服务开/关了
  • Switcher: 代表一个开关

两个核心数据结构:

1
2
private static ConcurrentMap<String, Switcher> switchers;
private Map<String, List<SwitcherListener>> listenerMap;

每次 setValue 的时候,都会遍历这个开关上的所有 SwitchListener 来通知值变了:

1
2
3
4
5
6
7
8
9
10
public void setValue(String switcherName, boolean value) {
putSwitcher(new Switcher(switcherName, value));

List<SwitcherListener> listeners = listenerMap.get(switcherName);
if(listeners != null) {
for (SwitcherListener listener : listeners) {
listener.onValueChanged(switcherName, value);
}
}
}

SPI 扩展

  • DefaultThreadFactory: 默认线程工厂
  • Spi 注解:
1
2
3
4
5
6
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Spi {
Scope scope() default Scope.PROTOTYPE;
}
  • 作用域可以设置为:

    • Scope.SINGLETON: 单例模式
    • Scope.PROTOTYPE: 多例模式
  • SpiMeta 注解: Spi 元信息

  • Activation 注解: 用来对 Spi 的实现根据条件进行过滤、排序等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Activation {

/** seq号越小,在返回的list<Instance>中的位置越靠前,尽量使用 0-100以内的数字 */
int sequence() default 20;

/** spi 的key,获取spi列表时,根据key进行匹配,当key中存在待过滤的search-key时,匹配成功 */
String[] key() default "";

/** 是否支持重试的时候也调用 */
boolean retry() default true;
}
  • ActivationComparator: 比较器
  • ExtensionLoader<T>: 自己实现的 ServiceProvider

构造器,需要传入想要实例化的接口,如 com.weibo.api.motan.cluster.HaStrategy.class:

1
2
3
4
private ExtensionLoader(Class<T> type) {
this.type = type;
this.classLoader = Thread.currentThread().getContextClassLoader();
}

从资源文件夹加载 URL :

1
2
String fullName = "META-INF/services/" + type.getName();
Enumeration<URL> urls = classLoader.getResources(fullName);

然后解析文件中的所有声明的实现类:

1
2
3
4
while ((line = reader.readLine()) != null) {
indexNumber++;
parseLine(type, url, line, indexNumber, classNames);
}

解析完类之后,装载类:

1
clz = (Class<T>) Class.forName(className, true, classLoader);

这里的 true 参数代表所有位于这个类中的所有静态块都被初始化

接着检查刚装载的类是否符合实现要求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// check is class public
if (!Modifier.isPublic(clz.getModifiers())) {
failThrows(clz, "Error is not a public class");
}

// check is constructor public
Constructor<?>[] constructors = clz.getConstructors();
if (constructors == null || constructors.length == 0) {
failThrows(clz, "Error has no public no-args constructor");
}
for (Constructor<?> constructor : constructors) {
if (Modifier.isPublic(constructor.getModifiers()) && constructor.getParameterTypes().length == 0) {
return;
}
}

// check inherit
if (!type.isAssignableFrom(clz)) {
failThrows(clz, "Error is not instanceof " + type.getName());
}

获取单例:

1
2
3
4
5
6
7
8
9
10
11
12
13
Spi spi = type.getAnnotation(Spi.class);
if (spi.scope() == Scope.SINGLETON) {
synchronized (singletonInstances) {
obj = singletonInstances.get(name);
if (obj != null) {
return obj;
}

obj = clz.newInstance();
singletonInstances.put(name, obj);
}
return obj;
}

获取 PROTOTYPE 类:

1
2
Class<T> clz = extensionClasses.get(name);
return clz.newInstance();

获取所有扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
List<T> exts = new ArrayList<T>(extensionClasses.size());
for (Map.Entry<String, Class<T>> entry : extensionClasses.entrySet()) {
Activation activation = entry.getValue().getAnnotation(Activation.class);
if (StringUtils.isBlank(key)) {
exts.add(getExtension(entry.getKey()));
} else if (activation != null && activation.key() != null) {
for (String k : activation.key()) {
if (key.equals(k)) {
exts.add(getExtension(entry.getKey()));
break;
}
}
}
}
Collections.sort(exts, new ActivationComparator<T>());
return exts;

Motan - Config 配置

  • AbstractConfig:

从像下面的这样的方法中提取 key 以及执行方法获取 value:

1
2
3
4
5
6
7
public boolean isXXX() {
return false;
}

public String getName() {
return "";
}
  • ProtocolConfig:
属性
name 服务协议
serialization 序列化方式
codec 协议编码
iothreads IO 线程池大小
requestTimeout 请求超时
minClientConnection 最小连接数
maxClientConnection 最大连接数
minWorkerThread 最小工作池线程数
maxWorkerThread 最大工作池线程数
maxContentLength 请求响应包的最大长度限制
maxServerConnection Server 支持的最大连接数
poolLifo 连接池管理方式,是否 LIFO
lazyInit 是否延迟
endpointFactory endpointFactory
cluster 采用哪种 cluster 的实现
loadbalance 负载均衡方式
haStrategy high available strategy
workerQueueSize server worker queue size
server accept connections count acceptConnections
proxy proxy type, like jdk or javassist
filter filter, 多个filter用”,”分割,blank string 表示采用默认的filter配置
retries retry count if call failure
async if the request is called async, a taskFuture result will be sent back
isDefault 是否缺省配置
Map parameters 扩展参数

Motan - Spring 扩展

先简单介绍一下几个扩展 Spring 的重要接口:

InitializingBean

1
2
3
public interface InitializingBean {
void afterPropertiesSet() throws Exception;
}

Bean 的所有属性均被设置完的时候,此方法会被调用

DisposableBean

1
2
3
public interface DisposableBean {
void destroy() throws Exception;
}

Spring 容器释放这个 Bean 的时候,次方法会被调用

BeanFactoryAware

Interface to be implemented by beans that wish to be aware of their owning BeanFactory.

1
2
3
4
5
6
public interface BeanFactoryAware extends Aware {
void setBeanFactory(BeanFactory var1) throws BeansException;
}

public interface Aware {
}

BeanPostProcessor

The BeanPostProcessor interface defines callback methods that you can implement to provide your own (or override the container’s default) instantiation logic, dependency-resolution logic, and so forth. If you want to implement some custom logic after the Spring container finishes instantiating, configuring, and initializing a bean, you can plug in one or more BeanPostProcessor implementations.

Motan - log

  • LogService: 常见的 Log 方法

默认实现 DefaultLogService 采用的是: org.slf4j.Logger

参考

推荐文章