如何迁移 MongoDB?

所有的数据迁移工具基本都可以分成两部分,Source 和 Sink,Source 负责从源数据库获取数据,Sink 将获取到的数据写入到目标数据库。

Sink

Sink 实现比较简单,通过 MongoDB 驱动修改目标数据库就可以。

Source

Source 获取的数据又分为两种:

  • 快照数据(Snapshot)
  • 增量数据(Increment)

获取快照数据

快照数据就是数据库中已有的数据,使用 db.collection.find() 获取就可以,获取的过程中可能有修改操作发生,这就会造成这个快照没有一个一致性位置,当然你可以禁止所有客户端的修改操作,但这并不优雅和高效。

获取增量数据

MongoDB 提供了 Change Streams订阅数据库(除了 admin local , and  config .)实时更改,版本兼容性需要关注一下:

  • 4.0 之前只支持 collection 级别的订阅,这意味着你想监听多个 collection 需要建立多个订阅通道。
  • 4.0 开始支持 database 级别的订阅。
  • 4.0 开始支持整个实例级别的订阅,通过这个可以获取所有 database 的事件。
String connectionString = "mongodb://root:Admin%[email protected]:27017";  
try (MongoClient mongoClient = MongoClients.create(connectionString)) {  
    MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = mongoClient 
            .watch()  
            .cursor();  
    while (cursor.hasNext()) {  
        ChangeStreamDocument<Document> changeStreamDocument = cursor.next();  
        System.out.println(changeStreamDocument);  
    }  
}

ChangeStreamDocument 对应一个修改事件,可能是 update insert 等。

什么时候开始获取增量数据,如果快照完成之后再开始,那么可能丢失一些数据,MongoDB Change Streams 提供了指定事件位置的参数:

  • resumeAfter :指定 resumeToken ,监听之后的事件
  • startAtOperationTime(4.0) : 监听指定时间之后的事件
  • startAfter(4.2) :类似 startAtOperationTime ,支持 Invalidate Event

我们可以通过设置 resumeToken 或者 opertationTime 指定从哪个位置开始监听事件, resumeToken 是事件的 ID。

迁移流程

  1. 获取当前一致性位置,可以通过 oplog 获取最近事件的 resumeToken ,或者通过 db.hello() 获取 opertationTime
  2. 开始全量快照。
  3. 获取增量数据,获取的时候设置 resumeAfter 或者 startAtOperationTime
  4. Sink 负责将数据写入到目标数据库。

这样有没有问题?

在上面获取快照数据的时候可能有数据被修改,这个修改的结果可能包含在快照中,也可能不在,但是它一定会在订阅的增量数据中,所以这条数据不会丢失,但是可能重复(同时在快照和增量中),但这样影响最终的结果吗?并不影响!