我们可以把loghub当作一个消息中间件来使用。如果能知道当前的消费进度,自然好了,否则消费情况一无所知,总是有点慌!

  loghub消费分两种情况,一是普通消费,二是消费组消费;

  消费组消费,loghub服务端会记录消费情况,这时可以通过调用服务端API进行偏移信息查询。

  普通消费则不同,需要自行维护偏移量,即只有自己知道偏移信息,自己处理延迟。我们主要讨论这种情况。

一、 消费loghub数据的样例如下:

复制代码
    // 普通消费    private static void consumeDataFromShard(int shardId) throws Exception {         String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();         System.out.println("cursor = " +cursor);         try {             while (true) {                 PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);                 PullLogsResponse response = client.pullLogs(request);                 List<LogGroupData> logGroups = response.getLogGroups();                 if (logGroups.isEmpty()) {                     return;                 }                  System.out.println(response.getCount());                 System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());                 logGroups.forEach(rec1 -> {                     // do your biz                });                 cursor = response.getNextCursor();                 Thread.sleep(200);             }         }         catch(LogException e) {             System.out.println(e.GetRequestId() + e.GetErrorMessage());         }     }
复制代码

  因为消费一直在进行,想要进行监控,就插入一些埋点。我们可以使用的 Map 来保存每个 shard 的消费延迟情况。用一个 LoghubCursorDelayTransformer 描述具体信息。

复制代码
        /**      * 消费偏移控制容器      */    public static final ConcurrentMap<Integer, LoghubCursorDelayTransformer> CONSUME_CURSOR_DELAY_TRANSFORMER = new ConcurrentHashMap<>();      /**  * loghub 分区延迟管理器  *  * @author weiy  * @date 2019/11/27  */public class LoghubCursorDelayTransformer {     /**      * 最后一次消费 loghub 数据的时间(大约)      */    private int lastConsumeDataTime;      /**      * 消费延迟 (s)      */    private int delay;      /**      * 分区 shard      */    private int shard;      /**      * 记录创建时间,如果创建时间已很久,说明该消费延迟应已失效      */    private long recordTime = System.currentTimeMillis();      public LoghubCursorDelayTransformer(int lastConsumeDataTime, int delay, int shard) {         this.lastConsumeDataTime = lastConsumeDataTime;         this.delay = delay;         this.shard = shard;     }      public int getLastConsumeDataTime() {         return lastConsumeDataTime;     }