在开发过程中,我们经常会遇到对业务数据进行模糊搜索的需求,例如电商网站对于商品的搜索,以及内容网站对于内容的关键字检索等等。对于这些高级的搜索功能,显然数据库的 Like 是不合适的,通常我们采用 ElasticSearch 来完成数据的搜索和分析,有了这个利器,我们可以轻松应对上述场景,实现关键字搜索等功能。

不过,由于增加了 ElasticSearch 作为搜索引擎,随之而来的问题就是,如何将业务中的数据同步到 ElasticSearch 中,主要有两种方式:

  1. 业务双写(具有侵入性)
  2. 数据库同步

由于业务双写需要更改业务代码,一般不建议采用此种方式,除非有强一致性要求,或者对业务侵入不敏感的系统可以采取此种方式:

  • 强一致性:同步通过HTTP请求写入 ElasticSearch
  • 最终一致性:
    • 可采取业务写入日志,后端通过日志流数据过滤写入 ElasticSearch(ELK标准模式,推荐)
    • 另一种方案就是同步写入 MQ,后端通过消费MQ异步写入 ElasticSearch

本文主要讨论非代码侵入的数据库同步方式,主要采用的是通过 LogStash 定时扫描数据库来增量同步数据的方案。

数据库脚本

数据库表结构中,需要有一个时间类型的字段作为增量更新的标识字段(例如 lastupdatetime),当该条数据更新时,必须同时更新该字段。

CREATE TABLE user  (   `id` int(11) NOT NULL,   `name` varchar(50) NOT NULL,   `age` int(11) NOT NULL,   `createtime` datetime(0) NOT NULL,   `lastupdatetime` datetime(0) NOT NULL,   PRIMARY KEY (`id`) USING BTREE )   INSERT INTO `user` VALUES(1,"jack",18,Now(),Now()) INSERT INTO `user` VALUES(2,"William",18,Now(),Now())  SELECT * from `user`

查询结果:

id name age createtime lastupdatetime
1 jack 18 2019-10-24 10:31:14 2019-10-24 10:31:14
2 William 18 2019-10-24 10:31:49 2019-10-24 10:31:49

LogStash 配置信息

logstash docker 安装脚本:
mkdir /opt/logstashsync/
mkdir /opt/logstashsync/pipeline
vi /opt/logstashsync/pipeline/logstash.conf

input {   jdbc {     jdbc_driver_library => "/app/mysql-connector-java-8.0.18.jar"     jdbc_driver_class => "com.mysql.jdbc.Driver"     jdbc_connection_string => "jdbc:mysql://192.168.10.102:3306/synctest"     jdbc_user => "root"     jdbc_password => "123456"     tracking_column => "unix_ts_in_secs"     use_column_value => true     schedule => "*/5 * * * * *"      statement => "SELECT *, UNIX_TIMESTAMP(lastupdatetime) AS unix_ts_in_secs FROM user WHERE (UNIX_TIMESTAMP(lastupdatetime) > :sql_last_value AND lastupdatetime < NOW()) ORDER BY lastupdatetime ASC"   } }  filter {   mutate {     copy => { "id" => "[@metadata][_id]"}      remove_field => ["id", "@version", "unix_ts_in_secs"]   } } output {    elasticsearch {                  hosts => "192.168.10.102:9200"                  index =>