v一、前言
为什么要设计kafka告警方案?现成的监控项目百度一下一大堆,KafkaOffsetMonitor、KafkaManager、 Burrow等,具体参考:kafka的消息挤压监控。由于本小组的项目使用的kafka集群并没有被公司的kafka-manager管理,所以只能自己简单做一个告警。
v二、告警方案
首先需要两个定时任务,之间的通信依靠延迟队列。
左边的定时任务按周期扫面配置Topic-Consumer列表,通过kafka api获取消费详情并判断消息积压量是否已经大于阈值,如果阈值校验失败则放入延迟队里。
右边的定时任务按照周期从延迟队列对应的真实队列中取出一个Topic-Consumer关系,再次进行一下阈值的校验,如果检验失败才发送告警短信。
v三、准备工作
v 1、依赖配置中心
配置中心是实现告警方案的一个关键点,通过配置中心可以动态获取告警相关的属性配置,并刷新对应的 java bean。如下是告警对应的配置bean。
复制代码
@ConfigCenterBean
@ConfigurationProperties(prefix = "wmhcontrol.alarm")
@Component
public class AlarmConstants extends BaseConfigCenterBean {
private static Logger LOGGER = LoggerFactory.getLogger(AlarmConstants.class);
//告警电话号码
@ConfigField
private String[] phones;
//短信模板ID
@ConfigField
private String templateId;
//延迟时间
@ConfigField
private Integer delay = 600;
//轮训时间
@ConfigField
private Integer period = 60;
//获取topic-consumer消费详情地址
@ConfigField
private String tcsr;
//查看topic-consumer消费详情地址
@ConfigField
private String tclr;
//全局统一阈值
@ConfigField
private Integer threshold = 1000;
//topic和consumer关系列表
@ConfigField
private List
tcrs;
@ToInitial
private void refreshProperties() {
try {
super.doBind();
LOGGER.info(String.format("%s 刷新成功..., 当前配置=%s...", this.getModuleName(), this));
} catch (Exception e) {
LOGGER.error("AlarmConstants 对象属性绑定失败...", e);
}
}
private void toRefresh() {
try {
if (isCfgCenterEffect()) {
ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource();
if (ConfigCenterUtils.propertySourceShouldRefresh(this.getModuleName(), propertySource)) {
this.refreshProperties();
}
}
} catch (Exception e) {
LOGGER.error("AlarmConstants 对象属性刷新失败", e);
}
}
@ToRefresh
public Integer getThreshold() {
return threshold;
}
public void setThreshold(Integer threshold) {
this.threshold = threshold;
}
@ToRefresh
public List getTcrs() {
return tcrs;
}
public void setTcrs(List tcrs) {
this.tcrs = tcrs;
}
@ToRefresh
public String getTcsr() {
return tcsr;
}
public void setTcsr(String tcsr) {
this.tcsr = tcsr;
}
@ToRefresh
public Integer getPeriod() {
return period;
}
public void setPeriod(Integer period) {
this.period = period;
}
@ToRefresh
public Integer getDelay() {
return delay;
}
public void setDelay(Integer delay) {
this.delay = delay;
}
@ToRefresh
public String[] getPhones() {
return phones;
}
public void setPhones(String[] phones) {
this.phones = phones;
}
@ToRefresh
public String getTemplateId() {
return templateId;
}
public void setTemplateId(String templateId) {
this.templateId = templateId;
}
@ToRefresh
public String getTclr() {
return tclr;
}
public void setTclr(String tclr) {
this.tclr = tclr;
}
@Override
public String toString() {
return ReflectionToStringBuilder.toString(this
, ToStringStyle.JSON_STYLE
, false
, false
, AlarmConstants.class);
}
@Override
public String getDefaultResourcePath() {
return "config/alarm.properties";
}
@Override
public String getConfigPrefix() {
return "wmhcontrol.alarm";
}
@Override
public String getModuleName() {
return "告警配置";
}
@Override
public void refreshForEvent() {
this.refreshProperties();
}
/**
* topic 和 consumer之间的关系实体
*/
public static class TCR {
private String topic;
private String consumer;
private String channel;
private Integer threshold;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getConsumer() {
return consumer;
}
public void setConsumer(String consumer) {
this.consumer = consumer;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public Integer getThreshold() {
return threshold;
}
public void setThreshold(Integer threshold) {
this.threshold = threshold;
}
@Override
public String toString() {
return "TCR{" +
"topic='" + topic + '\'' +
", consumer='" + consumer + '\'' +
", channel='" + channel + '\'' +
", threshold=" + threshold +
'}';
}
}
public static class TopicConsumerDetail {
private String group;
private String topic;
private Integer pid;
private Long offset;
private Long logsize;
@Override
public String toString() {
return "TopicConsumerDetail{" +
"group='" + group + '\'' +
", topic='" + topic + '\'' +
", pid=" + pid +
", offset=" + offset +
", logsize=" + logsize +
", lag=" + lag +
", owner='" + owner + '\'' +
'}';
}
private Long lag;
private String owner;
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getPid() {
return pid;
}
public void setPid(Integer pid) {
this.pid = pid;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
public Long getLogsize() {
return logsize;
}
public void setLogsize(Long logsize) {
this.logsize = logsize;
}
public Long getLag() {
return lag;
}
public void setLag(Long lag) {
this.lag = lag;
}
public String getOwner() {
return owner;
}
public void setOwner(String owner) {
this.owner = owner;
}
}
}
复制代码
告警有个全局统一的阈值,每一个topic可以指定不同的阈值。
配置中心 和 java bean建立关联请参考:依赖配置中心实现注有@ConfigurationProperties的bean相关属性刷新。
v 2、定时任务的周期性可动态配置
借助 org.springframework.scheduling.annotation.SchedulingConfigurer。
由@EnableScheduling注释的@Configuration类实现的可选接口。通常用于设置在执行计划任务时使用的特定TaskScheduler bean,或者用于以编程方式注册计划任务,而不是使用@Scheduled注释的声明性方法。例如,在实现基于触发器的任务时可能需要这样做,而@Scheduled注释不支持这些任务。
基本的代码轮廓如下。
复制代码
@Configuration
public class MessageCenterAlarmTask implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
try {
//每5s检测一下队列
taskRegistrar.addFixedRateTask(() -> {
}, 5 * 1000L);
//动态修改定时任务周期
taskRegistrar.addTriggerTask(() -> {
}, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));
} catch (Exception e) {
LOGGER.error("消息中心topic-consumer定时任务初始化失败...", e);
}
}
}
复制代码
上面的代码中的定时任务分别表示了告警方案中右边和左边的定时任务。
v 3、延迟队列的实现
借助redisson分布式延迟队列 或者 java delayqueue + redistemplate 实现分布式延迟队列。
参考:Redisson分布式延迟队列官方文档
参考:Redisson DelayedQueue实现原理
Redisson的集群模式配置如下。
复制代码
public class RedissonBuilder {
private static Logger LOGGER = LoggerFactory.getLogger(RedissonBuilder.class);
public static RedissonClient getRedisson(String cluster) {
String[] nodes = cluster.split(",");
for (int i = 0; i < nodes.length; i++) {
nodes[i] = "redis://" + nodes[i];
}
Config config = new Config();
config.useClusterServers() //这是用的集群server
.setScanInterval(2000) //设置集群状态扫描时间
.setConnectTimeout(2000)
.addNodeAddress(nodes);
try {
RedissonClient rc = Redisson.create(config);
return rc;
} catch (Exception e) {
LOGGER.error("RedissonClient getRedisson errors...", e);
return null;
}
}
}
@Configuration
public class RedissonConfig {
private static Logger LOGGER = LoggerFactory.getLogger(RedissonConfig.class);
@Bean
public RedissonClient redissonClient(@Value("${redisAddress}") String cacheAddress) {
RedissonClient rc = RedissonBuilder.getRedisson(cacheAddress);
try {
if (!Objects.isNull(rc)) {
LOGGER.info(rc.getConfig().toJSON());
}
} catch (IOException e) {
LOGGER.error("RedissonConfig redissonClient errors... ", e);
}
return rc;
}
}
复制代码
Redisson创建延迟队列方式
复制代码
RQueue topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
RDelayedQueue topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);
复制代码
首先创建目标队列,然后通过目标队列拿到延迟队列。
v 4、kafka api返回数据处理
参考:简单封装kafka相关的api
更具topic和consumer可以拿到如下数据。其中Lag对应的这一列表示未消费的消息数量。
需要做的是把如上数据映射到 AlarmConstants.TopicConsumerDetail 这个java bean中,借助Spring BeanWrapperImpl,如下。
复制代码
private static List retrieveDetail(String detailResponse) {
List result = new ArrayList<>();
try {
Scanner scanner = new Scanner(detailResponse.replace("", StringUtils.EMPTY).replace("", StringUtils.EMPTY));
String[] propertyNames = null;
//第一行对应java bean的field name
if (scanner.hasNextLine()) {
String nameLine = scanner.nextLine();
if (StringUtils.isBlank(nameLine)) {
return result;
}
propertyNames = Arrays.stream(nameLine.split("\\s+"))
.map(propertyName -> propertyName.toLowerCase())
.toArray(String[]::new);
}
//剩余行对应java bean的field value
while (scanner.hasNextLine()) {
AlarmConstants.TopicConsumerDetail tcd = new AlarmConstants.TopicConsumerDetail();
BeanWrapper br = new BeanWrapperImpl(tcd);
String valueLine = scanner.nextLine();
if (StringUtils.isBlank(valueLine)) {
continue;
}
String[] propertyValues = valueLine.split("\\s+");
for (int i = 0; i < propertyValues.length; i++) {
br.setPropertyValue(propertyNames[i], propertyValues[i]);
}
result.add(tcd);
}
LOGGER.info("消息中心提取topic-consumer详情信息:" + result);
} catch (Exception e) {
LOGGER.error("消息中心提取topic-consumer信息异常..." + detailResponse, e);
}
return result;
}
复制代码
处理之后的效果如下。
复制代码
[TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=0, offset=10956087, logsize=10956091, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=1, offset=10950487, logsize=10950502, lag=15, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=2, offset=10958523, logsize=10958529, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=3, offset=10955709, logsize=10955717, lag=8, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=4, offset=10956550, logsize=10956563, lag=13, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=5, offset=10956343, logsize=10956347, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=6, offset=10954124, logsize=10954128, la