博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊Elasticsearch的MonitorService
阅读量:6890 次
发布时间:2019-06-27

本文共 12979 字,大约阅读时间需要 43 分钟。

  hot3.png

本文主要研究一下Elasticsearch的MonitorService

MonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.java

public class MonitorService extends AbstractLifecycleComponent {    private final JvmGcMonitorService jvmGcMonitorService;    private final OsService osService;    private final ProcessService processService;    private final JvmService jvmService;    private final FsService fsService;    public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool,                          ClusterInfoService clusterInfoService) throws IOException {        this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool);        this.osService = new OsService(settings);        this.processService = new ProcessService(settings);        this.jvmService = new JvmService(settings);        this.fsService = new FsService(settings, nodeEnvironment, clusterInfoService);    }    public OsService osService() {        return this.osService;    }    public ProcessService processService() {        return this.processService;    }    public JvmService jvmService() {        return this.jvmService;    }    public FsService fsService() {        return this.fsService;    }    @Override    protected void doStart() {        jvmGcMonitorService.start();    }    @Override    protected void doStop() {        jvmGcMonitorService.stop();    }    @Override    protected void doClose() {        jvmGcMonitorService.close();    }}
  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法

JvmGcMonitorService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

public class JvmGcMonitorService extends AbstractLifecycleComponent {    private static final Logger logger = LogManager.getLogger(JvmGcMonitorService.class);    private final ThreadPool threadPool;    private final boolean enabled;    private final TimeValue interval;    private final Map
gcThresholds; private final GcOverheadThreshold gcOverheadThreshold; private volatile Cancellable scheduledFuture; public static final Setting
ENABLED_SETTING = Setting.boolSetting("monitor.jvm.gc.enabled", true, Property.NodeScope); public static final Setting
REFRESH_INTERVAL_SETTING = Setting.timeSetting("monitor.jvm.gc.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), Property.NodeScope); private static String GC_COLLECTOR_PREFIX = "monitor.jvm.gc.collector."; public static final Setting
GC_SETTING = Setting.groupSetting(GC_COLLECTOR_PREFIX, Property.NodeScope); public static final Setting
GC_OVERHEAD_WARN_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.warn", 50, 0, 100, Property.NodeScope); public static final Setting
GC_OVERHEAD_INFO_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.info", 25, 0, 100, Property.NodeScope); public static final Setting
GC_OVERHEAD_DEBUG_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.debug", 10, 0, 100, Property.NodeScope); //...... public JvmGcMonitorService(Settings settings, ThreadPool threadPool) { this.threadPool = threadPool; this.enabled = ENABLED_SETTING.get(settings); this.interval = REFRESH_INTERVAL_SETTING.get(settings); Map
gcThresholds = new HashMap<>(); Map
gcThresholdGroups = GC_SETTING.get(settings).getAsGroups(); for (Map.Entry
entry : gcThresholdGroups.entrySet()) { String name = entry.getKey(); TimeValue warn = getValidThreshold(entry.getValue(), entry.getKey(), "warn"); TimeValue info = getValidThreshold(entry.getValue(), entry.getKey(), "info"); TimeValue debug = getValidThreshold(entry.getValue(), entry.getKey(), "debug"); gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis())); } gcThresholds.putIfAbsent(GcNames.YOUNG, new GcThreshold(GcNames.YOUNG, 1000, 700, 400)); gcThresholds.putIfAbsent(GcNames.OLD, new GcThreshold(GcNames.OLD, 10000, 5000, 2000)); gcThresholds.putIfAbsent("default", new GcThreshold("default", 10000, 5000, 2000)); this.gcThresholds = unmodifiableMap(gcThresholds); if (GC_OVERHEAD_WARN_SETTING.get(settings) <= GC_OVERHEAD_INFO_SETTING.get(settings)) { final String message = String.format( Locale.ROOT, "[%s] must be greater than [%s] [%d] but was [%d]", GC_OVERHEAD_WARN_SETTING.getKey(), GC_OVERHEAD_INFO_SETTING.getKey(), GC_OVERHEAD_INFO_SETTING.get(settings), GC_OVERHEAD_WARN_SETTING.get(settings)); throw new IllegalArgumentException(message); } if (GC_OVERHEAD_INFO_SETTING.get(settings) <= GC_OVERHEAD_DEBUG_SETTING.get(settings)) { final String message = String.format( Locale.ROOT, "[%s] must be greater than [%s] [%d] but was [%d]", GC_OVERHEAD_INFO_SETTING.getKey(), GC_OVERHEAD_DEBUG_SETTING.getKey(), GC_OVERHEAD_DEBUG_SETTING.get(settings), GC_OVERHEAD_INFO_SETTING.get(settings)); throw new IllegalArgumentException(message); } this.gcOverheadThreshold = new GcOverheadThreshold( GC_OVERHEAD_WARN_SETTING.get(settings), GC_OVERHEAD_INFO_SETTING.get(settings), GC_OVERHEAD_DEBUG_SETTING.get(settings)); logger.debug( "enabled [{}], interval [{}], gc_threshold [{}], overhead [{}, {}, {}]", this.enabled, this.interval, this.gcThresholds, this.gcOverheadThreshold.warnThreshold, this.gcOverheadThreshold.infoThreshold, this.gcOverheadThreshold.debugThreshold); } @Override protected void doStart() { if (!enabled) { return; } scheduledFuture = threadPool.scheduleWithFixedDelay(new JvmMonitor(gcThresholds, gcOverheadThreshold) { @Override void onMonitorFailure(Exception e) { logger.debug("failed to monitor", e); } @Override void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowGcEvent) { logSlowGc(logger, threshold, seq, slowGcEvent, JvmGcMonitorService::buildPools); } @Override void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) { logGcOverhead(logger, threshold, current, elapsed, seq); } }, interval, Names.SAME); } @Override protected void doStop() { if (!enabled) { return; } scheduledFuture.cancel(); } @Override protected void doClose() { } //......}
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务

JvmMonitor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java

abstract static class JvmMonitor implements Runnable {        enum Threshold { DEBUG, INFO, WARN }        static class SlowGcEvent {            final GarbageCollector currentGc;            final long collectionCount;            final TimeValue collectionTime;            final long elapsed;            final JvmStats lastJvmStats;            final JvmStats currentJvmStats;            final ByteSizeValue maxHeapUsed;            SlowGcEvent(                final GarbageCollector currentGc,                final long collectionCount,                final TimeValue collectionTime,                final long elapsed,                final JvmStats lastJvmStats,                final JvmStats currentJvmStats,                final ByteSizeValue maxHeapUsed) {                this.currentGc = currentGc;                this.collectionCount = collectionCount;                this.collectionTime = collectionTime;                this.elapsed = elapsed;                this.lastJvmStats = lastJvmStats;                this.currentJvmStats = currentJvmStats;                this.maxHeapUsed = maxHeapUsed;            }        }        private long lastTime = now();        private JvmStats lastJvmStats = jvmStats();        private long seq = 0;        private final Map
gcThresholds; final GcOverheadThreshold gcOverheadThreshold; JvmMonitor(final Map
gcThresholds, final GcOverheadThreshold gcOverheadThreshold) { this.gcThresholds = Objects.requireNonNull(gcThresholds); this.gcOverheadThreshold = Objects.requireNonNull(gcOverheadThreshold); } @Override public void run() { try { monitorGc(); } catch (Exception e) { onMonitorFailure(e); } } abstract void onMonitorFailure(Exception e); synchronized void monitorGc() { seq++; final long currentTime = now(); JvmStats currentJvmStats = jvmStats(); final long elapsed = TimeUnit.NANOSECONDS.toMillis(currentTime - lastTime); monitorSlowGc(currentJvmStats, elapsed); monitorGcOverhead(currentJvmStats, elapsed); lastTime = currentTime; lastJvmStats = currentJvmStats; } final void monitorSlowGc(JvmStats currentJvmStats, long elapsed) { for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) { GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i]; GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i]; // no collection has happened long collections = gc.getCollectionCount() - prevGc.getCollectionCount(); if (collections == 0) { continue; } long collectionTime = gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis(); if (collectionTime == 0) { continue; } GcThreshold gcThreshold = gcThresholds.get(gc.getName()); if (gcThreshold == null) { gcThreshold = gcThresholds.get("default"); } long avgCollectionTime = collectionTime / collections; Threshold threshold = null; if (avgCollectionTime > gcThreshold.warnThreshold) { threshold = Threshold.WARN; } else if (avgCollectionTime > gcThreshold.infoThreshold) { threshold = Threshold.INFO; } else if (avgCollectionTime > gcThreshold.debugThreshold) { threshold = Threshold.DEBUG; } if (threshold != null) { onSlowGc(threshold, seq, new SlowGcEvent( gc, collections, TimeValue.timeValueMillis(collectionTime), elapsed, lastJvmStats, currentJvmStats, JvmInfo.jvmInfo().getMem().getHeapMax())); } } } final void monitorGcOverhead(final JvmStats currentJvmStats, final long elapsed) { long current = 0; for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) { GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i]; GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i]; current += gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis(); } checkGcOverhead(current, elapsed, seq); } void checkGcOverhead(final long current, final long elapsed, final long seq) { final int fraction = (int) ((100 * current) / (double) elapsed); Threshold overheadThreshold = null; if (fraction >= gcOverheadThreshold.warnThreshold) { overheadThreshold = Threshold.WARN; } else if (fraction >= gcOverheadThreshold.infoThreshold) { overheadThreshold = Threshold.INFO; } else if (fraction >= gcOverheadThreshold.debugThreshold) { overheadThreshold = Threshold.DEBUG; } if (overheadThreshold != null) { onGcOverhead(overheadThreshold, current, elapsed, seq); } } JvmStats jvmStats() { return JvmStats.jvmStats(); } long now() { return System.nanoTime(); } abstract void onSlowGc(Threshold threshold, long seq, SlowGcEvent slowGcEvent); abstract void onGcOverhead(Threshold threshold, long total, long elapsed, long seq); }
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现
  • monitorGc方法首先获取currentJvmStats,然后执行monitorSlowGc及monitorGcOverhead
  • monitorSlowGc方法主要是计算avgCollectionTime,然后判断是否超出指定level的阈值,超出则回调onSlowGc方法;monitorGcOverhead方法主要是计算gc耗时占比,如果判断是否超过指定level的阈值,超出则回调onGcOverhead方法

小结

  • MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法
  • JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务
  • JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现

doc

转载于:https://my.oschina.net/go4it/blog/3054300

你可能感兴趣的文章
JDK容器学习之List: CopyOnWriteArrayList,ArrayList,LinkedList对比
查看>>
JDK容器学习之LinkedHashMap (一):底层存储结构分析
查看>>
acl_cpp 编程之 xml 流式解析与创建
查看>>
Web端开发发展历程简单概述
查看>>
无聊的函数和方法之间的区别讨论
查看>>
最值得收藏的30本MySQL精品电子书
查看>>
android的消息处理机制(图+源码分析)——Looper,Handler,Message
查看>>
人脸识别 以及SoundPool 学习笔记
查看>>
内核启动过程
查看>>
王峰:数据中心全面迈入云时代
查看>>
二维码扫描和生成
查看>>
hbase-0.98整合hadoop-2.6,附java操作代码
查看>>
同一台电脑运行两个tomcat
查看>>
开源 免费 java CMS - FreeCMS1.4-功能说明-栏目管理
查看>>
Maven 是怎样创建War 包?
查看>>
Android4.2蓝牙Enable完全分析
查看>>
手动部署QtSylixOS的方法
查看>>
p6spy监控sql语句
查看>>
排序算法总结(java实现)(一.冒泡排序)
查看>>
排序算法总结(java实现)(二.插入排序)
查看>>