大纲
5.服务变更时怎样关照订阅的客户端
6.微服务实例信息怎样同步集群节点
6.微服务实例信息怎样同步集群节点
(1)服务端处理服务注册时会发布一个ClientChangedEvent变乱
(2)ClientChangedEvent变乱的处理源码
(3)集群节点处理数据同步哀求的源码
(1)服务端处理服务注册时会发布一个ClientChangedEvent变乱
ClientChangedEvent变乱的作用就是向集群节点同步服务实例数据的。- //Instance request handler.
- @Component
- public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
- private final EphemeralClientOperationServiceImpl clientOperationService;
-
- public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
- this.clientOperationService = clientOperationService;
- }
-
- @Override
- @Secured(action = ActionTypes.WRITE)
- public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
- //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
- Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
- switch (request.getType()) {
- case NamingRemoteConstants.REGISTER_INSTANCE:
- //注册实例
- return registerInstance(service, request, meta);
- case NamingRemoteConstants.DE_REGISTER_INSTANCE:
- //注销实例
- return deregisterInstance(service, request, meta);
- default:
- throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
- }
- }
-
- private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
- //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
- //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
- //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
- //参数meta.getConnectionId():这个参数很关键,它是连接ID
- clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
- return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
- }
- ...
- }
- @Component("ephemeralClientOperationService")
- public class EphemeralClientOperationServiceImpl implements ClientOperationService {
- private final ClientManager clientManager;
-
- public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
- this.clientManager = clientManager;
- }
-
- @Override
- public void registerInstance(Service service, Instance instance, String clientId) {
- //从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
- Service singleton = ServiceManager.getInstance().getSingleton(service);
- if (!singleton.isEphemeral()) {
- throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
- }
- //从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
- Client client = clientManager.getClient(clientId);
- if (!clientIsLegal(client, clientId)) {
- return;
- }
- //将请求中的instance实例信息封装为InstancePublishInfo对象
- InstancePublishInfo instanceInfo = getPublishInfo(instance);
- //往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
- client.addServiceInstance(singleton, instanceInfo);
- //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
- client.setLastUpdatedTime();
- //发布客户端注册服务实例的事件
- NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
- //发布服务实例元数据的事件
- NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
- }
- ...
- }
- //Nacos naming client based ip and port.
- //The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
- public class IpPortBasedClient extends AbstractClient {
- ...
- @Override
- public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
- return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
- }
- ...
- }
- //Abstract implementation of {@code Client}.
- public abstract class AbstractClient implements Client {
- //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务
- //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
- //key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象
- protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
- //subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务
- //subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)
- protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
- ...
- @Override
- public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
- //服务注册时,如果是第一次put进去Service对象,会返回null
- if (null == publishers.put(service, instancePublishInfo)) {
- //监视器记录
- MetricsMonitor.incrementInstanceCount();
- }
- //发布客户端改变事件,用于处理集群间的数据同步
- NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
- Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
- return true;
- }
- ...
- }
复制代码 (2)ClientChangedEvent变乱的处理源码
DistroClientDataProcessor的onEvent()方法会相应ClientChangedEvent。该方法如果判断出变乱范例为ClientChangedEvent变乱,那么就会实行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。
DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点实行DistroProtocol的syncToTarget()方法。
在DistroProtocol的syncToTarget()方法中,起首把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟使命,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟使命实行引擎的tasks中添加使命。
NacosDelayTaskExecuteEngine在初始化时会启动一个定时使命,这个定时使命会定时实行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从使命池tasks中取出延迟使命处理,处理DistroDelayTask使命时会调用DistroDelayTaskProcessor的process()方法。
在实行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask使命封装一个DistroSyncChangeTask使命,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask使命添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出使命处理。因此终极会实行DistroSyncChangeTask的run()方法。- public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
- private final ClientManager clientManager;
- private final DistroProtocol distroProtocol;
- ...
- @Override
- public void onEvent(Event event) {
- ...
- if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
- syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
- } else {
- syncToAllServer((ClientEvent) event);
- }
- }
-
- private void syncToAllServer(ClientEvent event) {
- Client client = event.getClient();
- //Only ephemeral data sync by Distro, persist client should sync by raft.
- //临时实例使用Distro协议,持久化实例使用Raft协议
- //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
- if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
- return;
- }
- if (event instanceof ClientEvent.ClientDisconnectEvent) {
- //如果event是客户端注销实例时需要进行集群节点同步的事件
- DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
- distroProtocol.sync(distroKey, DataOperation.DELETE);
- } else if (event instanceof ClientEvent.ClientChangedEvent) {
- //如果event是客户端注册实例时需要进行集群节点同步的事件
- DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
- distroProtocol.sync(distroKey, DataOperation.CHANGE);
- }
- }
- ...
- }
- @Component
- public class DistroProtocol {
- private final ServerMemberManager memberManager;
- private final DistroTaskEngineHolder distroTaskEngineHolder;
- ...
- //Start to sync by configured delay.
- public void sync(DistroKey distroKey, DataOperation action) {
- sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
- }
-
- //Start to sync data to all remote server.
- public void sync(DistroKey distroKey, DataOperation action, long delay) {
- //遍历集群中除自身节点外的其他节点
- for (Member each : memberManager.allMembersWithoutSelf()) {
- syncToTarget(distroKey, action, each.getAddress(), delay);
- }
- }
-
- //Start to sync to target server.
- public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
- //先把要同步的集群节点targetServer包装成DistroKey对象
- DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);
- //然后根据DistroKey对象创建DistroDelayTask任务
- DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
- //接着调用NacosDelayTaskExecuteEngine.addTask()方法
- //往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTask
- distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
- if (Loggers.DISTRO.isDebugEnabled()) {
- Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
- }
- }
- ...
- }
- //延迟任务执行引擎
- public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {
- private final ScheduledExecutorService processingExecutor;
- protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池
-
- public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
- super(logger);
- tasks = new ConcurrentHashMap<>(initCapacity);
- processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
- //开启定时任务,即启动ProcessRunnable线程任务
- processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
- }
- ...
- @Override
- public void addTask(Object key, AbstractDelayTask newTask) {
- lock.lock();
- try {
- AbstractDelayTask existTask = tasks.get(key);
- if (null != existTask) {
- newTask.merge(existTask);
- }
- //最后放入到任务池中
- tasks.put(key, newTask);
- } finally {
- lock.unlock();
- }
- }
-
- protected void processTasks() {
- //获取tasks中所有的任务,然后进行遍历
- Collection<Object> keys = getAllTaskKeys();
- for (Object taskKey : keys) {
- //通过任务key,获取具体的任务,并且从任务池中移除掉
- AbstractDelayTask task = removeTask(taskKey);
- if (null == task) {
- continue;
- }
- //通过任务key获取对应的NacosTaskProcessor延迟任务处理器
- NacosTaskProcessor processor = getProcessor(taskKey);
- if (null == processor) {
- getEngineLog().error("processor not found for task, so discarded. " + task);
- continue;
- }
- try {
- //ReAdd task if process failed
- //调用获取到的NacosTaskProcessor延迟任务处理器的process()方法
- if (!processor.process(task)) {
- //如果失败了,会重试添加task回tasks这个map中
- retryFailedTask(taskKey, task);
- }
- } catch (Throwable e) {
- getEngineLog().error("Nacos task execute error ", e);
- retryFailedTask(taskKey, task);
- }
- }
- }
- ...
- private class ProcessRunnable implements Runnable {
- @Override
- public void run() {
- try {
- processTasks();
- } catch (Throwable e) {
- getEngineLog().error(e.toString(), e);
- }
- }
- }
- }
- //Distro delay task processor.
- public class DistroDelayTaskProcessor implements NacosTaskProcessor {
- ...
- @Override
- public boolean process(NacosTask task) {
- if (!(task instanceof DistroDelayTask)) {
- return true;
- }
- DistroDelayTask distroDelayTask = (DistroDelayTask) task;
- DistroKey distroKey = distroDelayTask.getDistroKey();
- switch (distroDelayTask.getAction()) {
- case DELETE:
- //处理客户端注销实例时的延迟任务(同步数据到集群节点)
- //根据DistroDelayTask任务封装一个DistroSyncTask任务
- DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
- //调用NacosExecuteTaskExecuteEngine.addTask()方法
- distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
- return true;
- case CHANGE:
- case ADD:
- //处理客户端注册实例时的延迟任务(同步数据到集群节点)
- //根据DistroDelayTask任务封装一个DistroSyncChangeTask任务
- DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
- //调用NacosExecuteTaskExecuteEngine.addTask()方法
- distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
- return true;
- default:
- return false;
- }
- }
- }
- //任务执行引擎
- public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
- private final TaskExecuteWorker[] executeWorkers;
-
- public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
- super(logger);
- executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
- for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
- executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
- }
- }
- ...
- @Override
- public void addTask(Object tag, AbstractExecuteTask task) {
- //根据tag获取到TaskExecuteWorker
- NacosTaskProcessor processor = getProcessor(tag);
- if (null != processor) {
- processor.process(task);
- return;
- }
- TaskExecuteWorker worker = getWorker(tag);
- //调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去
- worker.process(task);
- }
-
- private TaskExecuteWorker getWorker(Object tag) {
- int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
- return executeWorkers[idx];
- }
- ...
- }
- public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {
- private final BlockingQueue<Runnable> queue;//任务存储容器
-
- public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
- ...
- this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
- new InnerWorker(name).start();
- }
-
- @Override
- public boolean process(NacosTask task) {
- if (task instanceof AbstractExecuteTask) {
- //把NacosTask任务放入到阻塞队列中
- putTask((Runnable) task);
- }
- return true;
- }
-
- private void putTask(Runnable task) {
- try {
- //把NacosTask任务放入到阻塞队列中
- queue.put(task);
- } catch (InterruptedException ire) {
- log.error(ire.toString(), ire);
- }
- }
- ...
- private class InnerWorker extends Thread {
- InnerWorker(String name) {
- setDaemon(false);
- setName(name);
- }
-
- @Override
- public void run() {
- while (!closed.get()) {
- try {
- //一直取阻塞队列中的任务
- Runnable task = queue.take();
- long begin = System.currentTimeMillis();
- //调用NacosTask中的run方法
- task.run();
- long duration = System.currentTimeMillis() - begin;
- if (duration > 1000L) {
- log.warn("task {} takes {}ms", task, duration);
- }
- } catch (Throwable e) {
- log.error("[TASK-FAILED] " + e.toString(), e);
- }
- }
- }
- }
- }
复制代码 实行DistroSyncChangeTask的run()方法,实在就是实行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取哀求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步哀求,终极会调用RpcClient的request()方法 -> GrpcConnection的request()方法。- public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
- ...
- @Override
- protected void doExecuteWithCallback(DistroCallback callback) {
- String type = getDistroKey().getResourceType();
- //获取请求数据
- DistroData distroData = getDistroData(type);
- if (null == distroData) {
- Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());
- return;
- }
- //默认调用DistroClientTransportAgent.syncData()方法同步集群节点
- getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);
- }
-
- private DistroData getDistroData(String type) {
- DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());
- if (null != result) {
- result.setType(OPERATION);
- }
- return result;
- }
- ...
- }
- public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {
- ...
- @Override
- public void run() {
- //Nacos:Naming:v2:ClientData
- String type = getDistroKey().getResourceType();
- //获取DistroClientTransportAgent对象
- DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
- if (null == transportAgent) {
- Loggers.DISTRO.warn("No found transport agent for type [{}]", type);
- return;
- }
- Loggers.DISTRO.info("[DISTRO-START] {}", toString());
- //默认返回true
- if (transportAgent.supportCallbackTransport()) {
- //默认执行子类的doExecuteWithCallback()方法
- doExecuteWithCallback(new DistroExecuteCallback());
- } else {
- executeDistroTask();
- }
- }
-
- protected abstract void doExecuteWithCallback(DistroCallback callback);
- ...
- }
- public class DistroClientTransportAgent implements DistroTransportAgent {
- private final ClusterRpcClientProxy clusterRpcClientProxy;
- private final ServerMemberManager memberManager;
- ...
- @Override
- public boolean syncData(DistroData data, String targetServer) {
- if (isNoExistTarget(targetServer)) {
- return true;
- }
- //创建请求对象
- DistroDataRequest request = new DistroDataRequest(data, data.getType());
- //找到集群节点
- Member member = memberManager.find(targetServer);
- if (checkTargetServerStatusUnhealthy(member)) {
- Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
- return false;
- }
- try {
- //向集群节点发送RPC异步请求
- Response response = clusterRpcClientProxy.sendRequest(member, request);
- return checkResponse(response);
- } catch (NacosException e) {
- Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
- }
- return false;
- }
- ...
- }
- @Service
- public class ClusterRpcClientProxy extends MemberChangeListener {
- ...
- //send request to member.
- public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {
- RpcClient client = RpcClientFactory.getClient(memberClientKey(member));
- if (client != null) {
- //调用RpcClient.request()方法
- return client.request(request, timeoutMills);
- } else {
- throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);
- }
- }
- ...
- }
- public abstract class RpcClient implements Closeable {
- //在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,
- //会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性
- protected volatile Connection currentConnection;
- ...
- //send request.
- public Response request(Request request, long timeoutMills) throws NacosException {
- int retryTimes = 0;
- Response response;
- Exception exceptionThrow = null;
- long start = System.currentTimeMillis();
- while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
- ...
- //发起gRPC请求,调用GrpcConnection.request()方法
- response = this.currentConnection.request(request, timeoutMills);
- ...
- }
- ...
- }
- ...
- }
复制代码 (3)集群节点处理数据同步哀求的源码
通过DistroClientTransportAgent的syncData()方法发送的数据同步哀求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是终极会调用到DistroClientDataProcessor.processData()方法。
在实行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则实行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的变乱。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。起首移除客户端对象信息,然后发布一个客户端注销服务实例的变乱。
此中客户端注销服务实例的变乱ClientDisconnectEvent,起首会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。- @Component
- public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {
- private final DistroProtocol distroProtocol;
- ...
- @Override
- public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
- try {
- switch (request.getDataOperation()) {
- case VERIFY:
- return handleVerify(request.getDistroData(), meta);
- case SNAPSHOT:
- return handleSnapshot();
- case ADD:
- case CHANGE:
- case DELETE:
- //服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理
- return handleSyncData(request.getDistroData());
- case QUERY:
- return handleQueryData(request.getDistroData());
- default:
- return new DistroDataResponse();
- }
- } catch (Exception e) {
- Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
- DistroDataResponse result = new DistroDataResponse();
- result.setErrorCode(ResponseCode.FAIL.getCode());
- result.setMessage("handle distro request with exception");
- return result;
- }
- }
-
- private DistroDataResponse handleSyncData(DistroData distroData) {
- DistroDataResponse result = new DistroDataResponse();
- //调用DistroProtocol.onReceive()方法
- if (!distroProtocol.onReceive(distroData)) {
- result.setErrorCode(ResponseCode.FAIL.getCode());
- result.setMessage("[DISTRO-FAILED] distro data handle failed");
- }
- return result;
- }
- ...
- }
- @Component
- public class DistroProtocol {
- ...
- //Receive synced distro data, find processor to process.
- public boolean onReceive(DistroData distroData) {
- Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());
- //Nacos:Naming:v2:ClientData
- String resourceType = distroData.getDistroKey().getResourceType();
- //获取DistroClientDataProcessor处理对象
- DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
- if (null == dataProcessor) {
- Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
- return false;
- }
- //调用DistroClientDataProcessor.processData()方法
- return dataProcessor.processData(distroData);
- }
- ...
- }
- public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
- ...
- @Override
- public boolean processData(DistroData distroData) {
- switch (distroData.getType()) {
- case ADD:
- case CHANGE:
- //服务实例添加和改变时的执行逻辑
- ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);
- handlerClientSyncData(clientSyncData);
- return true;
- case DELETE:
- //服务实例删除时的执行逻辑
- String deleteClientId = distroData.getDistroKey().getResourceKey();
- Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
- //调用EphemeralIpPortClientManager.clientDisconnected()方法
- clientManager.clientDisconnected(deleteClientId);
- return true;
- default:
- return false;
- }
- }
-
- private void handlerClientSyncData(ClientSyncData clientSyncData) {
- Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
- clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
- Client client = clientManager.getClient(clientSyncData.getClientId());
- upgradeClient(client, clientSyncData);
- }
-
- private void upgradeClient(Client client, ClientSyncData clientSyncData) {
- List<String> namespaces = clientSyncData.getNamespaces();
- List<String> groupNames = clientSyncData.getGroupNames();
- List<String> serviceNames = clientSyncData.getServiceNames();
- List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
- Set<Service> syncedService = new HashSet<>();
-
- for (int i = 0; i < namespaces.size(); i++) {
- Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
- Service singleton = ServiceManager.getInstance().getSingleton(service);
- syncedService.add(singleton);
- InstancePublishInfo instancePublishInfo = instances.get(i);
- //如果和当前不一样才发布事件
- if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
- client.addServiceInstance(singleton, instancePublishInfo);
- //发布客户端注册服务实例的事件,与客户端进行服务注册时一样
- NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
- }
- }
-
- for (Service each : client.getAllPublishedService()) {
- if (!syncedService.contains(each)) {
- client.removeServiceInstance(each);
- NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
- }
- }
- }
- ...
- }
- @Component("ephemeralIpPortClientManager")
- public class EphemeralIpPortClientManager implements ClientManager {
- //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
- private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
- ...
- @Override
- public boolean clientDisconnected(String clientId) {
- Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
- //移除客户端信息
- IpPortBasedClient client = clients.remove(clientId);
- if (null == client) {
- return true;
- }
- //发布客户端注销服务实例的事件
- NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
- client.release();
- return true;
- }
- ...
- }
- @Component
- public class ClientServiceIndexesManager extends SmartSubscriber {
- //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
- private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
- //订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientId
- private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
- ...
- @Override
- public void onEvent(Event event) {
- if (event instanceof ClientEvent.ClientDisconnectEvent) {
- handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
- } else if (event instanceof ClientOperationEvent) {
- handleClientOperation((ClientOperationEvent) event);
- }
- }
-
- private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
- Client client = event.getClient();
- for (Service each : client.getAllSubscribeService()) {
- //移除订阅者列表的元素
- removeSubscriberIndexes(each, client.getClientId());
- }
- for (Service each : client.getAllPublishedService()) {
- //移除注册表的元素
- removePublisherIndexes(each, client.getClientId());
- }
- }
- ...
- }
- public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {
- ...
- @Override
- public void onEvent(Event event) {
- ...
- if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
- syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
- } else {
- syncToAllServer((ClientEvent) event);
- }
- }
-
- private void syncToAllServer(ClientEvent event) {
- Client client = event.getClient();
- //Only ephemeral data sync by Distro, persist client should sync by raft.
- //临时实例使用Distro协议,持久化实例使用Raft协议
- //ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步
- if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
- return;
- }
- if (event instanceof ClientEvent.ClientDisconnectEvent) {
- //如果event是客户端注销实例时需要进行集群节点同步的事件
- DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
- distroProtocol.sync(distroKey, DataOperation.DELETE);
- } else if (event instanceof ClientEvent.ClientChangedEvent) {
- //如果event是客户端注册实例时需要进行集群节点同步的事件
- DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
- distroProtocol.sync(distroKey, DataOperation.CHANGE);
- }
- }
- ...
- }
复制代码 (4)总结
一.实行引擎的总结
延时使命实行引擎的实现原理是引擎有一个Map范例的tasks使命池,这个使命池可以根据key映射对应的使命处理器。引擎会定时从使命池中获取使命,实行使命处理器的处理方法处理使命。
使命实行引擎的实现原理是会创建多个使命实行Worker,每个使命实行Worker都会有一个阻塞队列。向使命实行引擎添加使命时会将使命添加到此中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的使命来处理。所以使命实行引擎会通过阻塞队列 + 异步使命的方式来实现。
二.用于向集群节点同步数据的客户端改变变乱的处理流程总结
步调一:先创建DistroDelayTask延迟使命放入到延迟使命实行引擎的使命池,DistroDelayTask延迟使命会由DistroDelayTaskProcessor处理器处理。
步调二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask使命,然后再将使命分发添加到实行引擎中的使命实行Worker的阻塞队列中。
步调三:使命实行Worker会从队列中获取并实行DistroSyncChangeTask使命,也就是实行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。
步调四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步哀求。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |