Nacos配置中心设计思想
Nacos致力于帮助您发现、配置和管理微服务。Nacos提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos是什么
Nacos的介绍我就引用官网的一段解释:https://nacos.io/zh-cn/docs/what-is-nacos.html
Nacos致力于帮助您发现、配置和管理微服务。Nacos提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos环境搭建
官方建议的预备环境:
Nacos 依赖 Java 环境来运行。如果您是从代码开始构建并运行Nacos,还需要为此配置 Maven环境,请确保是在以下版本环境中安装使用:
- 64 bit OS,支持 Linux/Unix/Mac/Windows,推荐选用 Linux/Unix/Mac。
- 64 bit JDK 1.8+;下载 & 配置。
- Maven 3.2.x+;下载 & 配置。
Nacos服务端的搭建有两种方式:
- 通过源码构建
- 下载Raelease包
本文通过源码构建方式来构建,并且通过阿里云服务器方式环境部署
官网地址下载最新版的nacos-1.1.3.tar.gz
https://github.com/alibaba/nacos/releases
步骤如下:
1.解压tar文件到指定文件夹
tar zxvf nacos-1.1.3.tar.gz -C /usr/local/
到解压文件中执行编译命令
mvn -Prelease-nacos clean install -U
2.
cd /usr/local/nacos-1.1.3/distribution/target
解压:nacos-server-1.1.3.tar.gz 文件
tar zxvf nacos-server-1.1.3.tar.gz -C /usr/local/
3. 启动Nacos server
cd /usr/local/nacos/bin
启动命令(standalone代表着单机模式运行,非集群模式):
sh startup.sh -m standalone
关闭服务器
sh shutdown.sh
访问控制台:http://ip:8848/nacos/index.html 用户名:nacos 密码:nacos
注:访问ip为大家部署环境云服务器的ip
源码导入idea:
访问官网https://github.com/alibaba/nacos/releases
下载:nacos-1.1.3.zip文件解压然后导入idea工具即可
聊聊动态配置管理
访问到控制台后,在配置管理中就可以实现配置信息的动态管理设置。
适用的场景
了解动态配置管理的设置之后,我们知道了大概原理就是Nacos服务端保存了配置信息,客户端连接到服务端之后,根据dataID,group可以获取到具体的配置信息,当服务端的配置发生变更时,客户端会收到通知。当客户端拿到变更后的最新配置信息后,就可以自己做处理。所有需要使用配置的场景都可以通过Nacos来进行做管理。
如Nacos适用的场景,包括但不限于以下:
- 数据库,缓存,消息等连接信息
- 限流规则和降级开关
- 流量的动态调度等
Nacos客户端是如何实时动态获取服务端的最新配置信息
大家熟知客户端和服务端之间的数据交互,大概就两种情况
- 服务端推数据给客户端
- 客户端从数据段拉数据
那么Nacos是通过推送或者拉取呢?
从客户端接收服务端配置信息来看是Nacos客户端通过启动一个线程来Listener来监听接收最新数据来看感觉好像是服务器端数据变化后会主动推送数据;但是到底是不是这样,需要从源码中获取答案。
Nacos client连接服务端配置中心demo
public class NacosSdkDemo {
public static void main(String[] args) {
//连接到目标服务的地址
//指定dataid、 groupid
String serverAddr="ip:8848";
String dataId="example";
String groupId="DEFAULT_GROUP";
Properties properties=new Properties();
properties.put("serverAddr",serverAddr);
try {
//ConfigService-> NacosConfigService
ConfigService configService=NacosFactory.createConfigService(properties);
String content=configService.getConfig(dataId,groupId,3000);
System.out.println(content);
configService.addListener(dataId, groupId, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("configInfo:"+configInfo);
}
});
System.in.read();
} catch (NacosException | IOException e) {
e.printStackTrace();
}
}
}
createConfigService
从代码中看到是通过反射调用了NacosConfigService的构造方法创建ConfigService
/**
* Create Config
*
* @param properties init param
* @return ConfigService
* @throws NacosException Exception
*/
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
ConfigService实例化
/**
* http agent
*/
private HttpAgent agent;
/**
* longpolling 长轮询方式
*/
private ClientWorker worker;
private String namespace;
private String encode;
private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
agent.start();
worker = new ClientWorker(agent, configFilterChainManager, properties);
}
实例化主要初始化上述中两个对象
-
HttpAgent
其中HttpAgent采用了装饰器模式,ServerHttpAgent是主要工作类,MetricsHttpAgent是做了一些统计操作
-
ClientWorker
ClientWorker
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
//创建了一个定时任务的线程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
//创建了一个保持长连接的线程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
//创建了一个延迟任务线程池每隔10ms来检查配置信息的线程池
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
-
此类构造方法中中传入了HttpAgent
-
还创建了三个个线程池
1.第一个线程池用来执行定时任务的executor,executor每隔10ms就会执行一次checkConfigInfo()方法。
2.第二个线程池是做长轮询的
3.一个普通定时任务的线程池
public void checkConfigInfo() {
// 分任务
int listenerSize = cacheMap.get().size();
// 向上取整为批数
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
checkConfigInfo的检查配置的逻辑:
分批次处理任务思想,一批次3000个
取出一批次任务提交给executorService线程池执行,执行的任务就是LongPollingRunnable,每个任务都有一个taskId
设计思想:如果在pull数据,数据太多的时候采用分批次处理方案
LongPollingRunnable中主要做了两件事情:
1.检查本地配置信息
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
......省略部分代码
首先取出与改taskId相关的CacheData,然后对CacheData进行检查
checkLocalConfig();
包括本地检查和监听器的md5的检查,本地检查主要做一些高可用故障容错的处理,当服务端挂掉后,Nacos客户端可以从本地的文件系统中获取相关的配置信息。
checkLocalConfig代码如下:
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
// 没有 -> 有
//判断是否使用本地配置 && 是否存在本地文件
//C:\Users\fanshuang\nacos\config\fixed-120.79.232.219_8848_nacos\data\config- data\DEFAULT_GROUP\example
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
dataId, group, tenant);
return;
}
// 有变更
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
}
}
本地检查逻辑中有三种情况
- 没有设置本地配置 && 本地文件存在
- 设置本地配置 && 本地文件不存在
- 设置本地配置 && 文件存在 && 配置信息有变更
发现在本地保存了配置信息的快照信息
C:\Users\fanshuang\nacos\config\fixed-120.79.232.219_8848_nacos\snapshot\DEFAULT_GROUP
2.服务端检查
// check server config 检查服务端的配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
这里会去获取一个发生变化的GroupKeys 集合:
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isUseLocalConfigInfo()) {
sb.append(cacheData.dataId).append(WORD_SEPARATOR);
sb.append(cacheData.group).append(WORD_SEPARATOR);
if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// cacheData 首次出现在cacheMap中&首次check更新
inInitializingCacheList
.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
这里将可能发生变化的配置信息封装,然后调用checkUpdateConfigStr
/**
* 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
*/
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
}
} catch (IOException e) {
setHealthServer(false);
LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
throw e;
}
return Collections.emptyList();
}
以上从server获取变化DataId列表,获取到列表后就遍历这个列表然后去服务器端获取变更后对应的配置值,然后将配置设置到缓存
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
String message = String.format(
"[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
发起请求从服务器获取配置getServerConfig
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
String message = String.format(
"[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
dataId, group, tenant);
LOGGER.error(message, e);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
switch (result.code) {
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
case HttpURLConnection.HTTP_CONFLICT: {
LOGGER.error(
"[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
}
}
当从远程获取服务器的配置之后,还有一个循环操作
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
就是对变化的配置进行监听回调用 cacheData.checkListenerMd5()#safeNotifyListener
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
至此基本上大概流程就此结束。
思考问题
1.客户端发起的长轮询超时时间为什么是30s,为何如此设计
2.在超时时间内数据发生变化,客户端为何会立即受到最新推送变化?
带着这些问题我们继续探究服务端源码来寻找答案
继续分析服务端
客户端发起长轮询http请求:/v1/cs/configs/listener
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,agent.getEncode(), timeout);
timeout是在ClientWorker#init()初始化方法中默认超时时间是30s,当然也可以自定义自己修改此时间
但是我打开服务器$NACOS_HOME/nacos/logs/config-client-request.log 日志文件中发现
2019-09-04 09:39:47,168|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:40:16,681|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:40:46,193|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:41:15,704|29501|timeout|218.17.101.98|polling|1|55
2019-09-04 09:41:45,215|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:42:14,730|29502|timeout|218.17.101.98|polling|1|55
2019-09-04 09:42:44,243|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:43:13,755|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:43:45,364|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:44:15,329|29501|timeout|218.17.101.98|polling|1|55
2019-09-04 09:44:44,842|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:45:14,353|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:45:43,865|29501|timeout|218.17.101.98|polling|1|55
2019-09-04 09:46:13,378|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:46:42,893|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:47:12,404|29500|timeout|218.17.101.98|polling|1|55
2019-09-04 09:47:41,924|29503|timeout|218.17.101.98|polling|1|55
2019-09-04 09:48:11,436|29501|timeout|218.17.101.98|polling|1|55
可以看到一个显现,在配置文件没有发生变化情况先客户端会等29.5s以上,才请求到服务器的结果,这又是为何?带着疑问继续看源码分析答案。
我们找到/v1/cs/configs/listener这个接口所在的ConfigController中
/**
* 比较MD5
*/
@RequestMapping(value = "/listener", method = RequestMethod.POST)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
Nacos的服务端是通过spring对外提供http服务,对HttpServletRequest中参数进行转换后调用inner.doPollingConfig,轮询接口,我们这里只看长轮询接口
/**
* 轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException, ServletException {
// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// else 兼容短轮询逻辑
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// 兼容短轮询result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
…………//省略代码
}
分析长轮询代码块
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
该方法逻辑是从方法名可以猜出应该是把客户端的长轮询请求添加到一个任务中取。
获取到客户端传递来的超时时间,减去0.5s提前返回相应,这也就能解释为什么客户端相应的超时时间是29.5s+了。
逻辑中有根据客户端请求过来md5和服务器端对应的group下对应内容的md5进行比较如果不一致,则通过generateResponse返回结果,如果配置文件没有发生变化,则通过scheduler.execute启动一个定时任务,将客户端的一个长轮询封装成一个CliengLongPolling交给scheduler去执行。
说明:当isFixedPolling()方法为true时,timeout将会是一个固定的间隔时间。
下面我们去看ClientLongPolling中run方法
class ClientLongPolling implements Runnable {
@Override
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
@Override
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
/**
* 删除订阅关系
*/
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - createTime),
"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
"polling",
clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
ClientLongPolling被提交给scheduler执行之后,主要做了一下几件事、
1.创建一个调度的任务,调度的延时时间为29.5s
2.将该CliengLongPolling自身的实例添加到一个allSubs队列中
3.延时时间到了后,首先将该cliengLongPolling自身的实例从allSubs中移除
代码中注释说“删除订阅关系”,从这里我们可知allSubs和ClientLongPolling之间维持了一种订阅关系,而ClientLongPolling是被订阅的,删除订阅关系之后,订阅方就无法对被订阅方进行通知了
4.获取服务端中保存的对应客户端请求groupKeys是否发生变更,如果发现groupKey的md5值还不是最新的,则则说明客户端的配置项还没发生变更,所以将gruoupKey放到一个changedGroupKeys列表中,最后将changeGroupKeys将结果写入response返回给客户端。
代码中有个allSubs队列,该队列是一个ConcurrentLinkedQueue队列。
此时将有一个疑问allSubs是干什么的呢?
allSubs干么事?
服务端直到调度任务的延时时间到了之前,ClientLongPolling都不会干其它事情,该allSubs队列肯定会有其它事情处理。
我们联想到我们在Nacos的dashboard上修改配置后,客户端立即得到结果响应,由此可以联系猜测allSubs跟这个配置变更有点关系。
在Nacos的dashboard的修改配置POST请求url为:v1/cs/configs, ConfigController#publishConfig
/**
* 增加或更新非聚合数据。
*
* @throws NacosException
*/
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam("content") String content,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema)
throws NacosException {
final String srcIp = RequestUtil.getRemoteIp(request);
String requestIpApp = RequestUtil.getAppName(request);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
…………//省略代码
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(),
LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
此段代码逻辑比较主要也就是服务端首先将配置的值进行持久化层的保存,然后触发ConfigDataChangeEvent的事件
public class EventDispatcher {
/**
* add event listener
*/
static public void addEventListener(AbstractEventListener listener) {
for (Class<? extends Event> type : listener.interest()) {
getEntry(type).listeners.addIfAbsent(listener);
}
}
/**
* fire event, notify listeners.
*/
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
fireEvent方法实际上触发的是AbstractEventListener的onEvent方法,而所有的listener是保存在listeners对象中,被触发的AbstractEventListener对象则是通过addEventListener方法添加到listeners中的,所以我们找到addEventListener方法在何处被调用的,就知道有那些AbstractEventListener需要被触发onEvent回调方法了。
找到AbstractEventListener类的构造方法中,将自身注册进去了如下代码:
static public abstract class AbstractEventListener {
public AbstractEventListener() {
/**
* automatic register
*/
EventDispatcher.addEventListener(this);
}
/**
* 感兴趣的事件列表
*
* @return event list
*/
abstract public List<Class<? extends Event>> interest();
/**
* 处理事件
*
* @param event event
*/
abstract public void onEvent(Event event);
}
然而AbstractEventListener是一个抽象类,所以实际注册应该是此抽象类的子类,所以我们只需要找到所有继承它的子类 LongPollingService
所以到这里终于真相大白,我们从dashboard中修改配置项后,实际上会调用LongPollingService的onEvent方法。
现在我们看LongPollingService中,查看一下onEvent方法如下
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
发现触发LongPollingService的onEvent方法时,实际上是执行了一个DataChangeTask的任务,应该是通过改任务来通知客户端服务端的数据已经发生了变更。然后我们看看DataChangeTash代码
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// 如果beta发布且不在beta列表直接跳过
if (isBeta && !betaIps.contains(clientSub.ip)) {
continue;
}
// 如果tag发布且不在tag列表直接跳过
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // 删除订阅关系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
(System.currentTimeMillis() - changeTime),
"in-advance",
RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
"polling",
clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
}
}
上述代码很简单逻辑如下
1.遍历allSubs队列,该队列中维持的是所有客户端请求的任务,需要找到与当前发生变更的配置项的groupKey相同的ClientLongPolling任务
2.向客户端写响应数据
引申一个问题:
如果DataChangeTask任务完成数据的推送之后,ClientLongPolling中调度任务又重复执行改如何解决?
只需要在进行推送操作之前先将原来等待执行的调度任务取消掉就可以了,这样就防止推送操作写完响应数据之后调度任务又要重复去写相应数据,肯定会报错的。
从源码sendResponse方法中也是这样处理的
void sendResponse(List<String> changedGroups) {
/**
* 取消超时任务
*/
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
generateResponse(changedGroups);
}
源码分析到此时,应该文章之前我们产生的疑问应该都迎刃而解了。
疑问的解答
1.客户端发起的长轮询超时时间为什么是30s,为何如此设计
这个应该是一个生产上经验设置值,客户端发起长轮询请求,在29.5s只需要等待,最后0.5s进行配置变更检查;如果超时设置时间太短,那么配置变更比较频繁,那么很可能在等待期无法完成对客户端做推送,而是下放到超时时间到达时对数据进行检查后才能将数据变更相应给客户端。超时时间后检查配置操作涉及到IO操作,IO操作是非常消耗资源的,所以我们尽量在等待期中就把数据变更直接“推送”给客户端。
2.在超时时间内数据发生变化,客户端为何会立即收到最新推送变化?
其实这个就是设计巧妙的地方,因为服务端更改配置信息之后,找到对应长轮询中保持的http强求,然后将结果写入respinse返回给客户端,以为此时客户端的保持的长轮询请求可能在29.5s等待,借助此连接就像服务端对客户端进行数据“推送”一样,所以客户端会很快得到相应。
总结
到这里我们基本上大概了解了,到底谁是采用推还是拉方式来实时获取最新配置更新数据了。
1.客户端的请求到服务端后,服务端将该请求加入到allSubs队列中,等待配置发生变化时,DataCHangeTask主动去触发,并将变更后的数据写入对应客户端长轮询保持http响应对象。
2.与此同时,服务器端也将该请求封装成一个调度任务去执行,等待调度期间就是等待DataChangeTask主动触发,如果延时时间到了DataChangeTask还没有触发,则调度任务开始执行数据变更检查,然后将检查结果写入相应对象。
3.所以客户端能够实时的感知配置的变化,是建立咋客户端拉和服务器端的“推”想配合基础上进行的,这里推是一个类push所以打上引号,因为服务端和客户端直接是通过http进行数据通讯,之所以说推,是因为服务端借助客户端长轮询的http请求主动将比变更后的数据写入的response对象。这就是设计的巧妙之处
由此我们应该很清楚,Nacos的配置中心实时更新原理应该很清楚了。
Nacos的集群选举
Nacos支持集群模式,大家知道一旦涉及集群肯定就设计到主从模式,下面就分析一下Nacos采用什么方式来实现集群。
Nacos采用zookeeper,它分为leader和follower角色,从这个定义上可以看出Nacos集群存在自己的选举机制。因为按照自己经验来看,如果中间件自己不具备选举功能,集群角色命名一般就是master,slave
选举算法
Nacos集群采用raft算法实现其实和zookeeper的zab协议大同小异,只是更简单一点。
我在网上找了一个raft算法的动画演示网站,这个动画可以更有助于理解raft协议,非常直观
raft算法动画演示地址:http://thesecretlivesofdata.com/raft/
在raft中,节点有三种角色定义:
- leader:负责接收客户端的请求
- Candidate:用于选举leader的一种角色
- Follower:负责响应来自leader或者Candidate的请求
选举时机
- 服务启动的时候
- leader挂了的时候
说明:所有节点启动时候都是follower状态
如果在一段时间内如果没有收到leader的心跳(可能是服务刚启动没有leader,或者leader挂了),那么follower会变成Candidate然后发起选举,选举之前会term+1,这个term类似于zookeeper中epoch朝代概念。
投票规则
-
单个节点最多只能投票一次
-
选举时候每个follower都维护一个lection timeout随机在150ms-300ms
-
follower节点谁最先达到超时时间都会变成Candidate
follower会投自己一票,并且给其他节点发送票据vote,等到其他节点回复这个过程中,可能会出现
1.收到过半的票数通过,则成为leader
2.被告知其他节点已经成为leader,则自己切换follower
3.一段时间内没有收到过半投票,则重新发起选举
选举过程中可能会存在以下几种情况
假若有三个节点A,B,C
1.节点A,B同时发起选举,A的选举vote信息先到达C,C给A投了一票,此时B的消息到达C后,因为每个节点只能投一票约束,C不会给B投票,而A,B都不会给对方投票,则A选举胜出会给B,C发心跳信息,节点B发现节点A的term不低于自己term,知道已经有leader,所以自己转换成follower
2.平票的情况,四个节点A,B,C,D中C,D节点同时成为候选节点Candudate,节点A给C投了一票,节点B 给D投了一票,就出现平票的情况,此时直到超时时间到了重新发起选举。
如果出现平票情况就相应的延长了系统的不可用的时间,是不可取的要避免,因为raft设计引入了
The election timeout is randomized to be between 150ms and 300ms 来避免发生平票情况