基于loghub的消息消费延迟监控
我们可以把loghub当作一个消息中间件来使用。如果能知道当前的消费进度,自然好了,否则消费情况一无所知,总是有点慌!
loghub消费分两种情况,一是普通消费,二是消费组消费;
消费组消费,loghub服务端会记录消费情况,这时可以通过调用服务端API进行偏移信息查询。
普通消费则不同,需要自行维护偏移量,即只有自己知道偏移信息,自己处理延迟。我们主要讨论这种情况。
一、 消费loghub数据的样例如下:
// 普通消费 private static void consumeDataFromShard(int shardId) throws Exception { String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor(); System.out.println("cursor = " +cursor); try { while (true) { PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor); PullLogsResponse response = client.pullLogs(request); List<LogGroupData> logGroups = response.getLogGroups(); if (logGroups.isEmpty()) { return; } System.out.println(response.getCount()); System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor()); logGroups.forEach(rec1 -> { // do your biz }); cursor = response.getNextCursor(); Thread.sleep(200); } } catch(LogException e) { System.out.println(e.GetRequestId() + e.GetErrorMessage()); } }
因为消费一直在进行,想要进行监控,就插入一些埋点。我们可以使用的 Map 来保存每个 shard 的消费延迟情况。用一个 LoghubCursorDelayTransformer 描述具体信息。
/** * 消费偏移控制容器 */ public static final ConcurrentMap<Integer, LoghubCursorDelayTransformer> CONSUME_CURSOR_DELAY_TRANSFORMER = new ConcurrentHashMap<>(); /** * loghub 分区延迟管理器 * * @author weiy * @date 2019/11/27 */public class LoghubCursorDelayTransformer { /** * 最后一次消费 loghub 数据的时间(大约) */ private int lastConsumeDataTime; /** * 消费延迟 (s) */ private int delay; /** * 分区 shard */ private int shard; /** * 记录创建时间,如果创建时间已很久,说明该消费延迟应已失效 */ private long recordTime = System.currentTimeMillis(); public LoghubCursorDelayTransformer(int lastConsumeDataTime, int delay, int shard) { this.lastConsumeDataTime = lastConsumeDataTime; this.delay = delay; this.shard = shard; } public int getLastConsumeDataTime() { return lastConsumeDataTime; }