Prometheus监控有所思:多标签埋点及Mbean
使用 grafana+prometheus+jmx 作为普通的监控手段,是比较有用的。我之前的文章介绍了相应的实现办法。https://www.cnblogs.com/yougewe/p/11140129.html
但是,按照之前的实现,我们更多的只能是监控 单值型的数据,如请求量,tps 等等,对于复杂组合型的指标却不容易监控。
这种情况一般带有一定的业务属性,比如想监控mq中的每个topic的消费情况,每类产品的实时订单情况等等。当然,对于看过完整的 prometheus 的监控数据的同学来说,会觉得很正常,因为你会看到如下的数据:
复制代码
# HELP java_lang_MemoryPool_PeakUsage_max java.lang.management.MemoryUsage (java.langmax)
# TYPE java_lang_MemoryPool_PeakUsage_max untyped
java_lang_MemoryPool_PeakUsage_max{name="Metaspace",} -1.0
java_lang_MemoryPool_PeakUsage_max{name="PS Old Gen",} 1.415053312E9
java_lang_MemoryPool_PeakUsage_max{name="PS Eden Space",} 6.96778752E8
java_lang_MemoryPool_PeakUsage_max{name="Code Cache",} 2.5165824E8
java_lang_MemoryPool_PeakUsage_max{name="Compressed Class Space",} 1.073741824E9
java_lang_MemoryPool_PeakUsage_max{name="PS Survivor Space",} 5242880.0
复制代码
这里面的 name 就是普通标签嘛,同理于其他埋点咯。应该是可以实现的。
是的,prometheus 是方便实现这玩意的,但是我们之前不是使用 jmx_exportor 作为导出工具嘛,使用的埋点组件是 io.dropwizard.metrics:metrics-core 。
而它则是重在单值的监控,所以,用它我们是实现不了带指标的数据的监控了。
那怎么办呢?三个办法!
1. 直接替换原有的 metrics-core 组件为 prometheus 的client 组件,因为官方是支持这种操作的;
2. 使用 prometheus-client 组件与 metrics-core 组件配合,各自使用各自的功能;
3. 自行实现带标签的埋点,这可能是基于 MBean 的;
以上这几种方案,各有优劣。方案1可能改动太大,而且可能功能不兼容不可行; 方案2可能存在整合不了或者功能冲突情况,当然如果能整合,绝对是最好的; 方案3实现复杂度就高了,比如监控值维护、线程安全、MBean数据吐出方式等等。
好吧,不管怎么样,我们还是都看看吧。
一、 使用 prometheus-client 埋点实现带标签的监控
1. 引入 pom 依赖
复制代码
io.prometheus
simpleclient
0.8.0
io.prometheus
simpleclient_hotspot
0.8.0
io.prometheus
simpleclient_servlet
0.8.0
复制代码
2. 框架注册监控
复制代码
@Configuration
public class PrometheusConfig {
@Bean
public ServletRegistrationBean servletRegistrationBean(){
// 将埋点指标吐出到 /metrics 节点
return new ServletRegistrationBean(new MetricsServlet(), "/metrics");
}
}
复制代码
3. 业务埋点数据
复制代码
// 注册指标实例
io.prometheus.client.Counter c = io.prometheus.client.Counter.build()
.name("jmx_test_abc_ffff")
.labelNames("topic")
.help("topic counter usage.")
.register();
public void incTopicMetric(String topic) {
// c.labels("test").inc(); // for test
}
复制代码
4. 获取埋点数据信息
复制代码
curl http://localhost:8080/metrics
# 对外暴露http接口调用,结果如下
# HELP jmx_test_abc_ffff counter usage.
# TYPE jmx_test_abc_ffff counter
jmx_test_abc_ffff{topic="bbb",} 1.0
jmx_test_abc_ffff{topic="2",} 2.0
jmx_test_abc_ffff{topic="test",} 1.0
复制代码
可以看出,效果咱们是实现了。但是,对于已经运行的东西,要改这玩意可能不是那么友好。主要有以下几点:
1. 暴露数据方式变更,原来由javaagent进行统一处理的数据,现在可能由于应用端口的不一,导致收集的配置会变更,不一定符合运维场景;
2. 需要将原来的埋点进行替换;
二、 prometheus-client 与 metrics-core 混合埋点
不处理以前的监控,将新监控带标签数据吐入到 jmx_exportor 中。
我们试着使用如上的埋点方式:
复制代码
// 注册指标实例
io.prometheus.client.Counter c = io.prometheus.client.Counter.build()
.name("jmx_test_abc_ffff")
.labelNames("topic")
.help("topic counter usage.")
.register();
public void incTopicMetric(String topic) {
// c.labels("test").inc(); // for test
}
复制代码
好像数据是不会进入的到 jmx_exportor 的。这也不奇怪,毕竟咱们也不了解其原理,难道想靠运气取胜??
细去查看 metrics-core 组件的埋点实现方案,发现其是向 MBean 中吐入数据,从而被 jmx_exportor 抓取的。
复制代码
// com.codahale.metrics.jmx.JmxReporter.JmxListener#onCounterAdded
@Override
public void onCounterAdded(String name, Counter counter) {
try {
if (filter.matches(name, counter)) {
final ObjectName objectName = createName("counters", name);
registerMBean(new JmxCounter(counter, objectName), objectName);
}
} catch (InstanceAlreadyExistsException e) {
LOGGER.debug("Unable to register counter", e);
} catch (JMException e) {
LOGGER.warn("Unable to register counter", e);
}
}
// 向 mBeanServer 注册监控实例
// 默认情况下 mBeanServer = ManagementFactory.getPlatformMBeanServer();
private void registerMBean(Object mBean, ObjectName objectName) throws InstanceAlreadyExistsException, JMException {
ObjectInstance objectInstance = mBeanServer.registerMBean(mBean, objectName);
if (objectInstance != null) {
// the websphere mbeanserver rewrites the objectname to include
// cell, node & server info
// make sure we capture the new objectName for unregistration
registered.put(objectName, objectInstance.getObjectName());
} else {
registered.put(objectName, objectName);
}
}
复制代码
而 prometheus-client 则是通过 CollectorRegistry.defaultRegistry 进行注册实例的。
复制代码
// io.prometheus.client.SimpleCollector.Builder#register()
/**
* Create and register the Collector with the default registry.
*/
public C register() {
return register(CollectorRegistry.defaultRegistry);
}
/**
* Create and register the Collector with the given registry.
*/
public C register(CollectorRegistry registry) {
C sc = create();
registry.register(sc);
return sc;
}
复制代码
所以,好像原理上来讲是不同的。至于到底为什么不能监控到数据,那还不好说。至少,你可以学习 metrics-core 使用 MBean 的形式将数据导出。这是我们下一个方案要讨论的事。
这里我可以给到一个最终简单又不失巧合的方式,实现两个监控组件的兼容,同时向 jmx_exportor 进行导出。如下:
1. 引入 javaagent 依赖包
复制代码
io.prometheus.jmx
jmx_prometheus_javaagent
0.12.0
复制代码
2. 使用 agent 的工具类进行埋点
因为 javaagent 里面提供一套完整的 client 工具包,所以,我们可以使用。
复制代码
// 注册指标实例
// 将 io.prometheus.client.Counter 包替换为 io.prometheus.jmx.shaded.io.prometheus.client.Counter
io.prometheus.client.Counter c = io.prometheus.client.Counter.build()
.name("jmx_test_abc_ffff")
.labelNames("topic")
.help("topic counter usage.")
.register();
public void incTopicMetric(String topic) {
// c.labels("test").inc(); // for test
}
复制代码
3. 原样使用 jmx_exportor 就可以导出监控数据了
为什么换一个包这样就可以了?
因为 jmx_exportor 也是通过注册 CollectorRegistry.defaultRegistry 来进行收集数据的,我们只要保持与其实例一致,就可以做到在同一个jvm内共享数据了。
三、 基于 MBean自行实现带标签的埋点
复制代码
// 测试类
public class PrometheusMbeanMetricsMain {
private static ConcurrentHashMap topicContainer = new ConcurrentHashMap<>();
private static MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
public static void main(String[] args) throws Exception {
// 模拟某个topic
String commingTopic = "test_topic";
AtomicInteger myTopic1Counter = getMetricCounter(commingTopic);
System.out.println("jmx started!");
while(true){
System.out.println("---");
// 计数增加
myTopic1Counter.incrementAndGet();
Thread.sleep(10000);
}
}
private static AtomicInteger getMetricCounter(String topic) throws MalformedObjectNameException, NotCompliantMBeanException, InstanceAlreadyExistsException, MBeanRegistrationException {
AtomicInteger myTopic1Counter = topicContainer.get(topic);
if(myTopic1Counter == null) {
myTopic1Counter = new AtomicInteger(0);
Hashtable tab = new Hashtable<>();
tab.put("topic", topic);
// 占位符,虽然不知道什么意思,但是感觉很厉害的样子
tab.put("_", "_value");
ObjectName objectName = new ObjectName("mydomain_test", tab);
// 注册监控实例 到 MBeanServer 中
ObjectInstance objectInstance = mBeanServer.registerMBean(new JmxCounter(myTopic1Counter, objectName), objectName);
}
return myTopic1Counter;
}
}
// JmxCounter, MBean 要求: 1. 接口必须定义成Public的; 2. 接口命名规范符合要求, 即接口名叫 XYZMBean ,那么实现名就必须一定是XYZ;
// DynamicMBean
public interface JmxCounterMBean {
public Object getCount() throws Exception;
}
public class JmxCounter implements JmxCounterMBean {
private AtomicInteger metric;
private ObjectName objectName;
public JmxCounter(AtomicInteger metric, ObjectName objectName) {
this.objectName = objectName;
this.metric = metric;
}
@Override
public Object getCount() throws Exception {
// 返回监控结果
return metric.get();
}
}
复制代码
最后,见证奇迹的时刻。结果如下:
复制代码
# HELP mydomain_test_value_Count Attribute exposed for management (mydomain_test<_=_value, topic=b_topic><>Count)
# TYPE mydomain_test_value_Count untyped
mydomain_test_value_Count{topic="b_topic",} 1.0
mydomain_test_value_Count{topic="a_topic",} 88.0
复制代码
很明显,这是一个糟糕的实现,不要学他。仅为了演示效果。
所以,总结下来,自然是使用方案2了。两个组件兼容,实现简单,性能也不错。如果只是为了使用,到此就可以了。不过你得明白,以上方案有取巧的成分在。
四、 原理: jmx_exportor 是如何获取数据的?
jmx_exportor 也是可以通过 http_server 暴露数据。
复制代码
// io.prometheus.client.exporter.HTTPServer
/**
* Start a HTTP server serving Prometheus metrics from the given registry.
*/
public HTTPServer(InetSocketAddress addr, CollectorRegistry registry, boolean daemon) throws IOException {
server = HttpServer.create();
server.bind(addr, 3);
// 使用 HTTPMetricHandler 处理请求
HttpHandler mHandler = new HTTPMetricHandler(registry);
// 绑定到 /metrics 地址上
server.createContext("/", mHandler);
server.createContext("/metrics", mHandler);
executorService = Executors.newFixedThreadPool(5, DaemonThreadFactory.defaultThreadFactory(daemon));
server.setExecutor(executorService);
start(daemon);
}
/**
* Start a HTTP server by making sure that its background thread inherit proper daemon flag.
*/
private void start(boolean daemon) {
if (daemon == Thread.currentThread().isDaemon()) {
server.start();
} else {
FutureTask startTask = new FutureTask(new Runnable() {
@Override
public void run() {
server.start();
}
}, null);
DaemonThreadFactory.defaultThreadFactory(daemon).newThread(startTask).start();
try {
startTask.get();
} catch (ExecutionException e) {
throw new RuntimeException("Unexpected exception on starting HTTPSever", e);
} catch (InterruptedException e) {
// This is possible only if the current tread has been interrupted,
// but in real use cases this should not happen.
// In any case, there is nothing to do, except to propagate interrupted flag.
Thread.currentThread().interrupt();
}
}
}
复制代码
所以,可以主要逻辑是 HTTPMetricHandler 处理。来看看。
复制代码
// io.prometheus.client.exporter.HTTPServer.HTTPMetricHandler#handle
public void handle(HttpExchange t) throws IOException {
String query = t.getRequestURI().getRawQuery();
ByteArrayOutputStream response = this.response.get();
response.reset();
OutputStreamWriter osw = new OutputStreamWriter(response);
// 主要由该 TextFormat 进行格式化输出
// registry.filteredMetricFamilySamples() 进行数据收集
TextFormat.write004(osw,
registry.filteredMetricFamilySamples(parseQuery(query)));
osw.flush();
osw.close();
response.flush();
response.close();
t.getResponseHeaders().set("Content-Type",
TextFormat.CONTENT_TYPE_004);
if (shouldUseCompression(t)) {
t.getResponseHeaders().set("Content-Encoding", "gzip");
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, 0);
final GZIPOutputStream os = new GZIPOutputStream(t.getResponseBody());
response.writeTo(os);
os.close();
} else {
t.getResponseHeaders().set("Content-Length",
String.valueOf(response.size()));
t.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.size());
// 写向客户端
response.writeTo(t.getResponseBody());
}
t.close();
}
}
复制代码
五、 原理: jmx_exportor 是如何获取Mbean 的数据的?
jmx_exportor 有一个 JmxScraper, 专门用于处理 MBean 的值。
复制代码
// io.prometheus.jmx.JmxScraper#doScrape
/**
* Get a list of mbeans on host_port and scrape their values.
*
* Values are passed to the receiver in a single thread.
*/
public void doScrape() throws Exception {
MBeanServerConnection beanConn;
JMXConnector jmxc = null;
// 默认直接获取本地的 jmx 信息
// 即是通过共享 ManagementFactory.getPlatformMBeanServer() 变量来实现通信的
if (jmxUrl.isEmpty()) {
beanConn = ManagementFactory.getPlatformMBeanServer();
} else {
Map environment = new HashMap