如何通过 Docker 部署 Logstash 同步 Mysql 数据库数据到 ElasticSearch
在开发过程中,我们经常会遇到对业务数据进行模糊搜索的需求,例如电商网站对于商品的搜索,以及内容网站对于内容的关键字检索等等。对于这些高级的搜索功能,显然数据库的 Like 是不合适的,通常我们采用 ElasticSearch 来完成数据的搜索和分析,有了这个利器,我们可以轻松应对上述场景,实现关键字搜索等功能。
不过,由于增加了 ElasticSearch 作为搜索引擎,随之而来的问题就是,如何将业务中的数据同步到 ElasticSearch 中,主要有两种方式:
- 业务双写(具有侵入性)
- 数据库同步
由于业务双写需要更改业务代码,一般不建议采用此种方式,除非有强一致性要求,或者对业务侵入不敏感的系统可以采取此种方式:
- 强一致性:同步通过HTTP请求写入 ElasticSearch
- 最终一致性:
- 可采取业务写入日志,后端通过日志流数据过滤写入 ElasticSearch(ELK标准模式,推荐)
- 另一种方案就是同步写入 MQ,后端通过消费MQ异步写入 ElasticSearch
本文主要讨论非代码侵入的数据库同步方式,主要采用的是通过 LogStash 定时扫描数据库来增量同步数据的方案。
数据库脚本
数据库表结构中,需要有一个时间类型的字段作为增量更新的标识字段(例如 lastupdatetime),当该条数据更新时,必须同时更新该字段。
CREATE TABLE user ( `id` int(11) NOT NULL, `name` varchar(50) NOT NULL, `age` int(11) NOT NULL, `createtime` datetime(0) NOT NULL, `lastupdatetime` datetime(0) NOT NULL, PRIMARY KEY (`id`) USING BTREE ) INSERT INTO `user` VALUES(1,"jack",18,Now(),Now()) INSERT INTO `user` VALUES(2,"William",18,Now(),Now()) SELECT * from `user`
查询结果:
id | name | age | createtime | lastupdatetime |
---|---|---|---|---|
1 | jack | 18 | 2019-10-24 10:31:14 | 2019-10-24 10:31:14 |
2 | William | 18 | 2019-10-24 10:31:49 | 2019-10-24 10:31:49 |
LogStash 配置信息
logstash docker 安装脚本:
mkdir /opt/logstashsync/
mkdir /opt/logstashsync/pipeline
vi /opt/logstashsync/pipeline/logstash.conf
input { jdbc { jdbc_driver_library => "/app/mysql-connector-java-8.0.18.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.10.102:3306/synctest" jdbc_user => "root" jdbc_password => "123456" tracking_column => "unix_ts_in_secs" use_column_value => true schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(lastupdatetime) AS unix_ts_in_secs FROM user WHERE (UNIX_TIMESTAMP(lastupdatetime) > :sql_last_value AND lastupdatetime < NOW()) ORDER BY lastupdatetime ASC" } } filter { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["id", "@version", "unix_ts_in_secs"] } } output { elasticsearch { hosts => "192.168.10.102:9200" index =>