ITPub博客

首页 > 应用开发 > Java > Java教程:nacos入门系列之配置中心

Java教程:nacos入门系列之配置中心

原创 Java 作者:千锋Python唐小强 时间:2020-07-07 17:43:51 0 删除 编辑

配置的发布与订阅

我们先来看看如何使用nacos提供的api来实现配置的发布与订阅
发布配置:

public 

class 
ConfigPub {


     public  static  void  main (String[] args)  throws NacosException {

         final String dataId= "test";

         final String group= "DEFAULT_GROUP";

        ConfigService configService= NacosFactory.createConfigService( "localhost:8848");

        configService.publishConfig(dataId,group, "test config body");
    }
}
订阅配置:
   

public 
static 
void 
main
(String[] args) 
throws NacosException, InterruptedException {


         final String dataId= "test";

         final String group= "DEFAULT_GROUP";

        ConfigService configService= NacosFactory.createConfigService( "localhost:8848");

        configService.addListener(dataId, group,  new Listener() {
             @Override
             public Executor  getExecutor () {
                 return  null;
            }

             @Override
             public  void  receiveConfigInfo (String configInfo) {

                System.out.println( "receiveConfigInfo:"+configInfo);
            }
        });

        Thread.sleep(Integer.MAX_VALUE);
    }
}
根据上面的demo可以看到通过dataId和group可以定位一个配置文件。
Java教程:nacos入门系列之配置中心

深入了解配置发布

1-发布的配置信息会通过http请求调用具体的服务

agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
服务类为 ConfigController:处理配置相关的http请求
persistService

      .insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time,  false);
EventDispatcher.fireEvent(
       new ConfigDataChangeEvent( false, dataId,  group, tenant, tag,
            time.getTime()));

可以看到发布的配置首先会进行持久化,然后会触发变更通知。

持久化这里就不做分析,我们来看看fireEvent这个方法:

EventDispatcher.fireEvent:

static  public  void  fireEvent( Event  event) {
     if ( null ==  event) {
         throw  new IllegalArgumentException( "event is null");
    }

     for (AbstractEventListener listener : getEntry( event.getClass()).listeners) {
         try {
            listener.onEvent( event);
        }  catch (Exception e) {
            log.error(e.toString(), e);
        }
    }
}

这里可以看到具体调用了listener.onEvent( event);
这里只要找到AbstractEventListener 具体的实现类是哪个就可以。
AbstractEventListener主要有两个实现类:
AsyncNotifyService
LongPollingService

我们可以通过 event的类型去判断,因为这里onEvent的参数类型为ConfigDataChangeEvent,
所以我们可以清楚的知道我们要找的实现类是AsyncNotifyService。
每个AbstractEventListener初始化的时候都会先将自己加入到listeners中
final CopyOnWriteArrayList<AbstractEventListener> listeners;
public  AbstractEventListener( ) {
     /**
     * automatic register
     */
    EventDispatcher.addEventListener( this);
}

我们可以直接看看AsyncNotifyService的onEvent方法:
public  void  onEvent( Event  event) {

    // 并发产生 ConfigDataChangeEvent
   if (event instanceof ConfigDataChangeEvent) {
      ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
      long dumpTs = evt.lastModifiedTs;
      String dataId = evt.dataId;
      String group = evt.group;
      String tenant = evt.tenant;
      String tag = evt.tag;
      //Member{address='192.168.31.192:8848'}
      Collection<Member> ipList = memberManager.allMembers();

      // 其实这里任何类型队列都可以
      Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
      for (Member member : ipList) {
         queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs,
               member.getAddress(), evt.isBeta));
      }
      EXECUTOR.execute(new AsyncTask(httpclient, queue));
   }
}

上面的方法主要实现的是:
获取所有的nacos服务节点,然后对其执行异步任务AsyncTask。
AsyncTask中会从队列中获取每个节点的NotifySingleTask信息,然后进行http请求,调用通知配置信息改变
的服务。具体服务在CommunicationController中实现。

/**
 * 通知配置信息改变
 */
@GetMapping("/dataChange")

这个方法放在后面分析。

深入了解配置订阅

初始化:

NacosConfigService初始化的时候构造了ClientWorker,并且通过ClientWorker启动了两个线程池。

worker = new ClientWorker(agent, configFilterChainManager, properties);
第一个线程池每 10ms执行一次checkConfigInfo();
executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        try {
            checkConfigInfo();
        } catch (Throwable e) {
           LOGGER. error( "[" + agent.getName() +  "] [sub-check] rotate check 
           error", e);
        }
    }
},  1L,  10L, TimeUnit.MILLISECONDS);

我们来看看checkConfigInfo具体是做什么的
public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数,限制LongPollingRunnable处理配置的个数。
    int longingTaskCount =(int) Math. ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     if (longingTaskCount > currentLongingTaskCount) {
         for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 
            //任务列表现在是无序的。变化过程可能有问题
            executorService. execute(new LongPollingRunnable(i));
            //这里的i就代表taskId
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

这里主要的作用是提交LongPollingRunnable任务到第二个线程池中去运行。
并且每个LongPollingRunnable只会处理 3000个配置。

我们来看看LongPollingRunnable的实现
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
    // check failover  config
     for (CacheData cacheData : cacheMap.get().values()) {
         if (cacheData.getTaskId() == taskId) {
            cacheDatas.add(cacheData);
            ...
        }
    }
cacheMap中保存了配置信息,从磁盘中加载获取。
通过taskId从 cacheMap中获取需要被当前LongPollingRunnable任务处理的配置,放入到cacheDatas集合。

我们来看看是在哪里设置的taskId
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
cache.setTaskId(taskId);
可以看到这里和上面相对应,每 3000个配置的taskId是相同的。因为每个LongPollingRunnable线程会处理
3000个配置。


// check server  config  向服务端请求变化的配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

//从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);

这里订阅配置的客户端会向服务端发送http长轮询请求,来获取变化的配置信息
长轮询请求不会立刻返回结果,而是当有配置发生变化时返回,设置了超时时间 30s,如果超过了设置的
超时时间没有配置更新,则会默认返回。然后重新发起一次长轮询的请求。

HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH +  "/listener"
headers, params,
    agent.getEncode(), readTimeoutMs);

长轮询的周期默认为 30s:
timeout=Math. max(NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
    Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);

具体服务实现类在ConfigController中:
@PostMapping( "/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
      throws ServletException, IOException {
   ....

   //  do long-polling
   inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig方法:
// 服务端处理长轮询请求
if (LongPollingService.isSupportLongPolling(request)) {
    longPollingService.addLongPollingClient(request, response, clientMd5Map, 
    probeRequestSize);
     return HttpServletResponse.SC_OK +  "";
}

使用线程池处理请求:
scheduler. execute(
    new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, 
    appName, tag));

接着来看ClientLongPolling是一个线程实现类
首先会触发一个延时任务,然后将自己加入到队列:allSubs.add(this);
allSubs中维护了所有长轮训请求。

那么肯定会有一个地方去消费allSubs队列中的请求.
这个消费的地方就是onEvent方法:
LongPollingService其实就是我们上面提到的AbstractEventListener,因此也实现了onEvent方法。

@Override
public void onEvent(Event event) {
     if (isFixedPolling()) {
        // ignore
    }  else {
         if (event instanceof LocalDataChangeEvent) {
            LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
            scheduler. execute(new DataChangeTask(evt.groupKey, evt.isBeta, 
            evt.betaIps));
        }
    }
}

这个event方法就是去处理配置变化的情况,主要逻辑在DataChangeTask中:
从allSubs获取维护的请求中相同dataId+group的请求,比如:(test+DEFAULT_GROUP)
然后进行这个对长轮询的请求进行返回。
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
    ClientLongPolling clientSub = iter. next();
    //groupKey test+DEFAULT_GROUP
     if (clientSub.clientMd5Map.containsKey(groupKey)) {
        ......
        iter. remove(); // 删除订阅关系
        LogUtil.clientLog.info( "{}|{}|{}|{}|{}|{}|{}",
        (System.currentTimeMillis() - changeTime),
         "in-advance",
        RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
             "polling",
            clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
        clientSub.sendResponse(Arrays.asList(groupKey));
    }
}


那是哪里触发了LongPollingService里面的onEvent 方法呢?
当然是在配置发布后进行触发的,还记得CommunicationController中的dataChange服务吗?
配置发布后会通过http请求调用nacos服务中的dataChange服务。通过dataChange服务就可以通知
nacos服务中保存的长轮训的请求了。

并且这个方法是获取所有nacos服务节点去遍历执行的,因此不管变更配置对应的长轮询保存在哪个节点,
都会可以被获取到。

/**
 * 通知配置信息改变
 */
@GetMapping( "/dataChange")

此处会调用DumpService中的方法保存配置文件到磁盘,并缓存md5.

DiskUtil.saveToDisk(dataId, group, tenant, content);

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
    CacheItem cache = makeSure(groupKey);
     if (cache.md5 == null || !cache.md5.equals(md5)) {
        cache.md5 = md5;
        cache.lastModifiedTs = lastModifiedTs;
        EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
    }
}

可以看到当配置变更,就会触发fireEvent的LocalDataChangeEvent事件。


总结:

到这里,配置中心整体是实现基本上告一段落,还要很多细节没有涉及到,需要在真正的使用过程中来探索和发现


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/69923331/viewspace-2703052/,如需转载,请注明出处,否则将追究法律责任。

下一篇: mysql主从复制
请登录后发表评论 登录
全部评论

注册时间:2019-05-14

  • 博文量
    995
  • 访问量
    622546