如何迁移 MongoDB?
所有的数据迁移工具基本都可以分成两部分,Source 和 Sink,Source 负责从源数据库获取数据,Sink 将获取到的数据写入到目标数据库。
Sink
Sink 实现比较简单,通过 MongoDB 驱动修改目标数据库就可以。
Source
Source 获取的数据又分为两种:
- 快照数据(Snapshot)
- 增量数据(Increment)
获取快照数据
快照数据就是数据库中已有的数据,使用  db.collection.find()  获取就可以,获取的过程中可能有修改操作发生,这就会造成这个快照没有一个一致性位置,当然你可以禁止所有客户端的修改操作,但这并不优雅和高效。
获取增量数据
MongoDB 提供了 Change Streams订阅数据库(除了  admin ,  local , and  config .)实时更改,版本兼容性需要关注一下:
- 4.0 之前只支持 collection 级别的订阅,这意味着你想监听多个 collection 需要建立多个订阅通道。
- 4.0 开始支持 database 级别的订阅。
- 4.0 开始支持整个实例级别的订阅,通过这个可以获取所有 database 的事件。
String connectionString = "mongodb://root:Admin%[email protected]:27017";  
try (MongoClient mongoClient = MongoClients.create(connectionString)) {  
    MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient 
            .watch()  
            .cursor();  
    while (cursor.hasNext()) {  
        ChangeStreamDocument<Document> changeStreamDocument = cursor.next();  
        System.out.println(changeStreamDocument);  
    }  
} ChangeStreamDocument  对应一个修改事件,可能是  update , insert  等。
什么时候开始获取增量数据,如果快照完成之后再开始,那么可能丢失一些数据,MongoDB Change Streams 提供了指定事件位置的参数:
-  resumeAfter:指定resumeToken,监听之后的事件
-  startAtOperationTime(4.0): 监听指定时间之后的事件
-  startAfter(4.2):类似startAtOperationTime,支持 Invalidate Event
我们可以通过设置  resumeToken  或者  opertationTime 指定从哪个位置开始监听事件, resumeToken  是事件的 ID。
迁移流程
- 获取当前一致性位置,可以通过  oplog获取最近事件的resumeToken,或者通过db.hello()获取opertationTime。
- 开始全量快照。
- 获取增量数据,获取的时候设置  resumeAfter或者startAtOperationTime。
- Sink 负责将数据写入到目标数据库。
这样有没有问题?
在上面获取快照数据的时候可能有数据被修改,这个修改的结果可能包含在快照中,也可能不在,但是它一定会在订阅的增量数据中,所以这条数据不会丢失,但是可能重复(同时在快照和增量中),但这样影响最终的结果吗?并不影响!