目录 Change Stream特点 Change Stream试用 准备环境 Insert replace delete update update fullDocument resume change stream Change Stream应用 DDIA cdc Change Stream实现与问题 总结 References 正文   在MongoDB3.6引入的新feature中,change stream无疑是非常吸引人的。   Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog.   Change stream允许应用实时获取mongodb数据的变更,这是个呼声很高的一个的需求,可以用于ETL、跨平台数据同步、通知服务等。以前没有change stream的时候,也可以通过tail oplog来追踪修改,但这是复杂、危险的野路子。   本文地址:https://www.cnblogs.com/xybaby/p/9464328.html Change Stream特点 回到顶部 在an-introduction-to-change-streams一文中,总结了change stream的几个特点 Targeted changes   Changes can be filtered to provide relevant and targeted changes to listening applications. Resumablility   Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. resume token Total ordering   MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster. Durability   Change streams only include majority-committed changes. Security   Change streams are secure – users are only able to create change streams on collections to which they have been granted read access. Ease of use   Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format. Idempotence   All changes are transformed into a format that’s safe to apply multiple times. Listening applications can use a resume token from any prior change stream event, not just the most recent one, because reapplying operations is safe and will reach the same consistent state.   相比自动tail oplog,change stream 有以下优点: 如果只有单个节点持久化,那么oplog对应的操作是可能被回滚的,而change stream有Durability特性 在sharded cluster环境,change stream跨shards,可以通过mongos tail oplog,而不用去每一个replica set上分别tail   Change stream对MongoDB的部署有一些需求: 只对replica sets 或者sharded cluster(MongoDB3.6中shard必须是replica set)有用,这个不难理解,因为change stream也是利用了oplog。如果是sharded cluster,必须都过mongos连接。 必须使用WiredTiger 引擎,使用replica set protocol version 1 Change Stream试用 回到顶部   在文章免费试用MongoDB云数据库 (MongoDB Atlas)教程中,介绍了如何使用MongoDB Atlas提供的云数据库服务,免费提供的集群刚好是使用WiredTiger 引擎的Replica set,因此本文基于这个环境来测试。主要测试Change Stream所支持的所有事件(change event)、fullDocument特性、resume特性。   change event包括: insert delete replace update invalidate   有意思的是,相比CRUD,多了一个replace事件。update 与 replace的区别在于   A replace operation uses the update command, and consists of two stages: Delete the original document with the documentKey and Insert the new document using the same documentkey   测试方法:启动两个Mongo shell,一个操作数据库,一个watch。为了方便区分,浅绿色背景为Operate,灰色背景为Watch 准备环境   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering order 2 MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() assert: command failed: { "operationTime" : Timestamp(1533888296, 2), "ok" : 0, "errmsg" : "cannot open $changeStream for non-existent database: engineering", "code" : 26, "codeName" : "NamespaceNotFound", "$clusterTime" : { "clusterTime" : Timestamp(1533888296, 2), "signature" : { "hash" : BinData(0,"fWTN4Kuv7cq9xCcC0vCF4AkTxuU="), "keyId" : NumberLong("6563302068054917121") } } } : aggregate failed 复制代码   从watch报错可以看出,只能对已经存在的db watch,因此可以先插入一条数据,创建对应的DB、Collection   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() 2018-08-10T16:08:49.200+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1 复制代码   此时已经创建好用于监听的cursor,此时还没有change event。 Insert   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test2', age: 19, 'email':'test2@gmail.con'}) WriteResult({ "nInserted" : 1 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSC0AAAADRmRfaWQAZFttSCb45nBxa/FSsABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0"), "username" : "test2", "age" : 19, "email" : "test2@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0") } } 复制代码 replace   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {age: 19}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSSMAAAACRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "replace", "fullDocument" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af"), "age" : 19 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } } 复制代码   可以看到,操作的时候使用的是db.collection.update,但change event 却是replace,原因在eplace-a-document-entirely中有介绍 If the document contains only field:value expressions, then: The update() method replaces the matching document with the document. The update()method does not replace the _id value. For an example, see Replace All Fields. update() cannot update multiple documents. delete   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({ "_id" : ObjectId("5b6d47eaf8e670716bf152af")}) WriteResult({ "nRemoved" : 1 }) 复制代码   watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSfAAAAAFRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } } 复制代码 update   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 19}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSmQAAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 18, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSn0AAAABRmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 19 }, "removedFields" : [ ] } } 复制代码 update fullDocument   db.collection.watch 可以设置选项fullDocument参数,这个在change event:update的时候就可以返回对用documents的完整信息。 MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch([], {fullDocument:'updateLookup'} )   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 29}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttS88AAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 29, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 29 }, "removedFields" : [ ] } } 复制代码 resume change stream   Operate 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({"username": "test3"}) WriteResult({ "nRemoved" : 2 }) 复制代码   Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> ret = cursor.next() { "_id" : { "_data" : BinData(0,"gltusJ4AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } Mongo 复制代码   Resume Watch 复制代码 MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor = db.users.watch([], {"resumeAfter": ret['_id']}) { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor.next() 2018-08-11T17:49:13.127+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1 复制代码   在resume_cursor中,resumeAfter的参数设置为了之前的watch document,在watch的时候会一次性返回已经被消费过的change event Change Stream应用 回到顶部 DDIA cdc   在Designing Dat