Nacos 注册服务源码分析
Nacos 注册服务分析
分析源码,首先我们得找到入口。在源码入口,我们可以看到一个叫nacos-example的工程,看命名我们就能知道这个是一个样例工程,就是为了方便我们快速入手的。在这个工程中有三个启动测试类,这里我们找到我们关注点NamingExample。在这个main方法中,我们查看关键的两行代码
// 根据NamingFactory创建一个Service服务类
NamingService naming = NamingFactory.createNamingService(properties);
// 通过服务类去注向注册中心注册自己的服务
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");
我们首先跟踪一下NamingFactory#createNamingService方法,这个方法很简单,就是根据构造方法创建一个NacosNamingService
/** * Create a new naming service. * * @param properties naming service properties * @return new naming service * @throws NacosException nacos exception */ public static NamingService createNamingService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); return (NamingService) constructor.newInstance(properties); } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
这里有一点要注意,创建一个NacosNamingService,是根据构造方法创建,但是千万不要小看构造方法。如果平时写习惯了spring工程代码,往往直接放容器中托管,不需要操心属性的注入,spring会自动帮忙注入处理,但是在非spring的代码工程中,很多属性都需要自己去创建和处理,而这些一般都是通过静态代码块,或者构造方法来处理的。
接下来从这个里面的注册方法开始分析registerInstance,这个才是我们需要看的重点。
我们找到这个registerInstance的实现方法
@Override public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException { Instance instance = new Instance(); instance.setIp(ip); instance.setPort(port); instance.setWeight(1.0); instance.setClusterName(clusterName); // 创建了一个Instance,这个Instance,查看它的包名,是com.alibaba.nacos.api.naming.pojo,也就是需要传递属性的一个实体类 registerInstance(serviceName, groupName, instance); } @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { // 检查合法性 NamingUtils.checkInstanceIsLegal(instance); // 通过客户端代理去注册服务 clientProxy.registerService(serviceName, groupName, instance); }
当我们准备去跟踪clientProxy.registerService,其提示如下图所示
那应用是跟踪哪个类?
这里,我提供两种方法。
- 代码分析法,如果有多个实现类,肯定在前面的某个步骤会去创建该类,因为这个类不会无缘无故的就产生的。计算机的程序代码是很规范的,你要他怎么做,他才怎么做,不会无故产生,俗话说事出反常必有妖。
- 断点调试法,这个方法简单粗暴,只要你debug一下,跟着debug进去,到哪个类处理,那就是哪个类处理。
我们还是倒回来看下,通过代码分析具体一下。还记得上面说的创建NamingService吗,这里我们进去看下这个NamingService的具体的实现方法NacosNamingService的构造方法。
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException {
final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
ValidatorUtils.checkInitParam(nacosClientProperties);
this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(nacosClientProperties);
initLogName(nacosClientProperties);
this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
// 创建代理类
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
}
从构造方法,我们可以看到在init方法中创建了这个clientProxy,具体的实现类是NamingClientProxyDelegate。现在我们跟着这个实现类具体往下跟。
从命名可以看到,这个是一个代理类,也就是设计模式中的代理模式,如果不清楚代理模式的,需要去补充一下代理模式的知识。看源码,在懂得设计模式的情况下,可以快速理解作者的意图。
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { // 获取代理类去注册 getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); } private NamingClientProxy getExecuteClientProxy(Instance instance) { // 默认是ephemeral,为true,也就是返回grpcClientProxy,2.0改动中将http的请求换成了gRpc了 return instance.isEphemeral() ? grpcClientProxy : httpClientProxy; }
我们倒回来看下这个类的命名NamingClientProxyDelegate,NameClient代理委托类,为啥叫代理的委托类呢?
原来这个类并不是真正的代理类,真正的代理类是grpcClientProxy和httpClientProxy,这个类仅仅是做了一个委托功能,将处理的方法委托给了这两个代理类去处理。所以这个类叫代理委托类。
知道了具体的代理类是grpcClientProxy后,我们看下grpcClientProxy#registerService具体实现
@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance); // 这里做了一下缓存 redoService.cacheInstanceForRedo(serviceName, groupName, instance); // 继续注册 doRegisterService(serviceName, groupName, instance); } public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException { // 构造一个请求对象 InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); // 请求到服务端注册 getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName())); // rpcClient去进行Rpc请求,没有抛出异常说明调用成功 Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout); if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) { throw new NacosException(response.getErrorCode(), response.getMessage()); } if (responseClass.isAssignableFrom(response.getClass())) { return (T) response; } NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName()); } catch (NacosException e) { throw e; } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e); } throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response"); }
到此,整体的注册脉络就清晰了。先是创建了一个NamingService,根据NamingService调用NamingClientProxyDelegate中的代理grpcClientProxy,通过gRpc的方法进行远程调用服务端。调用成功后即注册成功了。下面我们继续深入rpcClient#request这个方法,看Nacos是怎么进行调用的。
rpc调用
public Response request(Request request) throws NacosException {
// 获取超时时间,如果没有配置,默认3s超时
return request(request, rpcClientConfig.timeOutMills());
}
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Throwable exceptionThrow = null;
long start = System.currentTimeMillis();
while (retryTimes < rpcClientConfig.retryTimes() && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
// 拿到连接去请求
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Throwable e) {
if (waitReconnect) {
try {
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER,
"Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes,
e.getMessage());
exceptionThrow = e;
}
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
这里有两个点需要分析,一个是currentConnection是怎么来的,另一个是currentConnection是怎么请求的。我们首先看下这个currentConnection是怎么来的。
currentConnection的创建过程
通过代码搜索,我们发现在com.alibaba.nacos.common.remote.client.RpcClient#start中有一段代码
Connection connectToServer = null;
rpcClientStatus.set(RpcClientStatus.STARTING);
int startUpRetryTimes = rpcClientConfig.retryTimes();
while (startUpRetryTimes > 0 && connectToServer == null) {
try {
startUpRetryTimes--;
ServerInfo serverInfo = nextRpcServer();
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",
rpcClientConfig.name(), serverInfo);
// 拿到连接
connectToServer = connectToServer(serverInfo);
} catch (Throwable e) {
LoggerUtils.printIfWarnEnabled(LOGGER,
"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",
rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);
}
}
//如果连接不为空
if (connectToServer != null) {
LoggerUtils
.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),
connectToServer.getConnectionId());
// 对currentConnection赋值
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
} else {
switchServerAsync();
}
如果想再跟踪一下start方法在哪里调用,可以往上继续查找,可以发现其在构造函数中就调用了这个start()
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,
NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
super(securityProxy);
this.namespaceId = namespaceId;
this.uuid = UUID.randomUUID().toString();
this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
Map<String, String> labels = new HashMap<>();
labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
this.redoService = new NamingGrpcRedoService(this);
//在这里调用
start(serverListFactory, serviceInfoHolder);
}
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
rpcClient.serverListFactory(serverListFactory);
rpcClient.registerConnectionListener(redoService);
rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
//在这里调用
rpcClient.start();
NotifyCenter.registerSubscriber(this);
}
我们回到connectToServer(serverInfo)方法,看下是如何拿到连接的。
// 抽象方法,由子类实现 public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception; @Override public Connection connectToServer(ServerInfo serverInfo) { try { if (grpcExecutor == null) { // 创建线程池 this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp()); } int port = serverInfo.getServerPort() + rpcPortOffset(); // 根据ip和端口创建Grpc的ManagedChannel ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port); // 创建Grpc请求客户端 RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel); if (newChannelStubTemp != null) { Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); if (response == null || !(response instanceof ServerCheckResponse)) { shuntDownChannel(managedChannel); return null; } BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); // 包装了一个连接池,用于异步请求的回调处理 GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); //create stream request and bind connection event to this connection. StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); // stream observer to send response to server grpcConn.setPayloadStreamObserver(payloadStreamObserver); grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); grpcConn.setChannel(managedChannel); //send a setup request. ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setAbilities(super.clientAbilities); conSetupRequest.setTenant(super.getTenant()); grpcConn.sendRequest(conSetupRequest); //wait to register connection setup Thread.sleep(100L); // 返回连接,包装了chenel, client request return grpcConn; } return null; } catch (Exception e) { LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); } return null; }
rpc的请求
rpc的请求就比较简单了,拿到grpc的连接和请求Stub,直接请求就行了
@Override public Response request(Request request, long timeouts) throws NacosException { // 转换请求对象 Payload grpcRequest = GrpcUtils.convert(request); // gprc请求 ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest); Payload grpcResponse; try { // 获取返回结果 grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } return (Response) GrpcUtils.parse(grpcResponse); }
总结
本篇文章只写了client是如何请求服务端的,整理还是比较清晰易懂的。一般我们使用会依赖Nacos客户端,由客户端去访问服务端。而Spring Cloud会定义顶层接口,决定何时去调用服务端。关键在于服务端如何处理客户端的请求,这也是Nacos的核心所在。
查看16道真题和解析