如何迁移 MongoDB?
所有的数据迁移工具基本都可以分成两部分,Source 和 Sink,Source 负责从源数据库获取数据,Sink 将获取到的数据写入到目标数据库。
Sink
Sink 实现比较简单,通过 MongoDB 驱动修改目标数据库就可以。
Source
Source 获取的数据又分为两种:
- 快照数据(Snapshot)
- 增量数据(Increment)
获取快照数据
快照数据就是数据库中已有的数据,使用 db.collection.find()
获取就可以,获取的过程中可能有修改操作发生,这就会造成这个快照没有一个一致性位置,当然你可以禁止所有客户端的修改操作,但这并不优雅和高效。
获取增量数据
MongoDB 提供了 Change Streams订阅数据库(除了 admin
, local
, and config
.)实时更改,版本兼容性需要关注一下:
- 4.0 之前只支持 collection 级别的订阅,这意味着你想监听多个 collection 需要建立多个订阅通道。
- 4.0 开始支持 database 级别的订阅。
- 4.0 开始支持整个实例级别的订阅,通过这个可以获取所有 database 的事件。
String connectionString = "mongodb://root:Admin%[email protected]:27017";
try (MongoClient mongoClient = MongoClients.create(connectionString)) {
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient
.watch()
.cursor();
while (cursor.hasNext()) {
ChangeStreamDocument<Document> changeStreamDocument = cursor.next();
System.out.println(changeStreamDocument);
}
}
ChangeStreamDocument
对应一个修改事件,可能是 update
, insert
等。
什么时候开始获取增量数据,如果快照完成之后再开始,那么可能丢失一些数据,MongoDB Change Streams 提供了指定事件位置的参数:
-
resumeAfter
:指定resumeToken
,监听之后的事件 -
startAtOperationTime(4.0)
: 监听指定时间之后的事件 -
startAfter(4.2)
:类似startAtOperationTime
,支持 Invalidate Event
我们可以通过设置 resumeToken
或者 opertationTime
指定从哪个位置开始监听事件, resumeToken
是事件的 ID。
迁移流程
- 获取当前一致性位置,可以通过
oplog
获取最近事件的resumeToken
,或者通过db.hello()
获取opertationTime
。 - 开始全量快照。
- 获取增量数据,获取的时候设置
resumeAfter
或者startAtOperationTime
。 - Sink 负责将数据写入到目标数据库。
这样有没有问题?
在上面获取快照数据的时候可能有数据被修改,这个修改的结果可能包含在快照中,也可能不在,但是它一定会在订阅的增量数据中,所以这条数据不会丢失,但是可能重复(同时在快照和增量中),但这样影响最终的结果吗?并不影响!