
本文共 11149 字,大约阅读时间需要 37 分钟。
MongoDB事务详解
事务简介
数据库事务需要包含4个基本特性,常称为ACID。具体如下:
在MongoDB中,对单文档的操作是原子的。由于可以在单个文档中使用内嵌文档或数组来获取数据之间的关系,而不必跨多个文档或集合进行范式化。对于那些需要对多个文档(在单个或多个集合中)进行原子性读写的场景,MongoDB支持多文档事务。MongoDB4.2版本全面支持多文档事务。
MongoDB对事务的支持
事务属性 | 支持程度 |
---|---|
Atomocity | 单表单文档:1.x 就支持;复制集多表多行:4.0;分片集群多表多行:4.2 |
Consistency | writeConcern, readConcern (3.2) |
Isolation | readConcern (3.2) |
Durability | Journal日志机制 和 Replication副本集机制 |
writeConcern
writeConcern决定一个写操作落到多少个节点上才算成功。
语法格式:
{ w:, j: , wtimeout: }
- w:请求确认写入操作已传播到指定数量的实例或带有指定标签的实例。值可以为“majority”或数字。
- j:写入操作的journal持久化后才向客户端确认。
- wtimeout:指定写入超时时间,仅w的值大于1时有效。
MongoDB5.0版本开始,默认是w: majority
。当w: majority
时,j
的默认值取决于writeConcernMajorityJournalDefault
的值,它默认为true,即j: true
确认时要求对磁盘上日志进行写入操作。
w: 数据写入到number个节点才向用客户端确认
{w: 0}
:对客户端的写入不需要发送任何确认,适用于性能要求高,但不关注正确性的场景。{w: 1}
:默认的writeConcern,数据写入到Primary就向客户端发送确认。{w: “majority”}
:数据写入到副本集大多数成员后向客户端发送确认,适用于对数据安全性要求比较高的场景,该选项会降低写入性能。
j: 写入操作的journal持久化后才向客户端确认
j
未指定:确认取决于j: true
时要求对磁盘上日志进行写入操作,默认为j: true
。j: true
:确认时要求对磁盘上日志进行写入操作。j: false
:确认要求在内存中进行写入操作。
wtimeout: 写入超时时间,仅w的值大于1时有效。
- 当指定
{w: }
时,数据需要成功写入number
个节点才算成功。如果写入过程中有节点故障,可能导致这个条件一直不能满足,从而一直不能向客户端发送确认结果,针对这种情况,客户端可设置wtimeout
选项来指定超时时间,当写入过程持续超过该时间仍未结束,则认为写入失败。
- 当指定
readPreference
readPreFerence决定使用哪一个节点进行读请求。
可选值包括:
- primary: 只选择主节点,默认模式;
- primaryPreferred:优先选择主节点,如果主节点不可用则选择从节点;
- secondary:只选择从节点;
- secondaryPreferred:优先选择从节点, 如果从节点不可用则选择主节点;
- nearest:根据客户端对节点的Ping值判断节点的远近,选择从最近的节点读取。
场景举例
- 用户订单下单后马上跳转至订单详情页——primary/primaryPreferred。因为此时从节点可能还没复制到新订单;
- 用户查询自己下过的订单——secondary/secondaryPreferred。查询历史订单对时效性通常没有太高要求;
- 生成报表——secondary。报表对时效性要求不高,但资源需求大,可以在从节点单独处理,避免对线上用户造成影响;
- 将用户上传的图片分发到全世界,让各地用户能够就近读取——nearest。每个地区的应用选择最近的节点读取数据。
配置
通过MongoDB的连接串参数:
# 连接参数后面添加readPreference=secondary 表示从备节点读取数据mongodb://host1:27107,host2:27107,host3:27017/?replicaSet=rs0&readPreference=secondary
通过MongoDB 驱动程序 API:
MongoCollection.withReadPreference(ReadPreference readPref)
通过Mongo Shell:
db.collection.find().readPref( "secondary" )
从节点读取测试
- 主节点写入{count:1},观察该条数据在各个节点均可见。
- 运行以下命令:
[root@localhost ~]# mongosh --host rs0/localhost:28017 -u hushang -p 123456rs0 [primary] test> db.user.insertOne({ name: "hs2"},{ writeConcern: { w: 1}})
- 在两个从节点分别执行
db.fsyncLock()
来锁定写入(模拟同步延迟):rs0 [direct: secondary] test> db.fsyncLock()
- 主节点插入一条新数据:
rs0 [primary] test> db.user.insert({ name: "hs3"},{ writeConcern: { w: 1}})
- 从节点查询不到数据:
rs0 [primary] test> db.user.find().readPref("secondary")
- 解除从节点锁定:
rs0 [direct: secondary] test> db.fsyncUnlock()
- 此时主节点再查询就能查询到数据了:
rs0 [primary] test> db.user.find().readPref("secondary")
扩展:Tag
readPreference 只能控制使用一类节点。Tag 则可以将节点选择控制到一个或几个节点。
考虑以下场景:
- 一个5个节点的复制集;
- 3个节点硬件较好,专用于服务线上客户;
- 2个节点硬件较差,专用于生成报表。
可以使用Tag来达到这样的控制目的:
- 为3个较好的节点打上
{purpose: “online”}
; - 为2个较差的节点打上
{purpose: “analyze”}
; - 在线应用读取时指定 online,报表读取时指定 analyze。
注意事项
- 指定 readPreference 时也应注意高可用问题。例如将 readPreference 指定 primary,则发生故障转移不存在 primary 期间将没有节点可读。如果业务允许,则应选择 primaryPreferred;
- 使用 Tag 时也会遇到同样的问题,如果只有一个节点拥有一个特定 Tag,则在这个节点失效时将无节点可读。这在有时候是期望的结果,有时候不是。例如:
- 如果报表使用的节点失效,即使不生成报表,通常也不希望将报表负载转移到其他节点上,此时只有一个节点有报表 Tag 是合理的选择;
- 如果线上节点失效,通常希望有替代节点,所以应该保持多个节点有同样的 Tag;
- Tag 有时需要与优先级、选举权综合考虑。例如做报表的节点通常不会希望它成为主节点,则优先级应为 0。
readConcern
在 readPreference 选择了指定的节点后,readConcern 决定这个节点上的数据哪些是可读的,类似于关系数据库的隔离级别。可选值包括:
- available:读取所有可用的数据;
- local:读取所有可用且属于当前分片的数据;
- majority:读取在大多数节点上提交完成的数据;
- linearizable:可线性化读取文档,仅支持从主节点读;
- snapshot:读取最近快照中的数据,仅可用于多文档事务。
local 和 available
在复制集中 local 和 available 是没有区别的,两者的区别主要体现在分片集上。
当发生数据迁移时,分片A把一部分数据迁移至分片B,迁移进行中此时分片B存在了一部分的数据。当readConcern为available时这一部分数据是能读取到的,当readConcern为local时这一部分数据读取不到。
majority
只读取大多数数据节点上都提交了的数据。考虑如下场景:
- 集合中原有文档 {x: 0};
- 将x值更新为1;
对于readConcern为majority时,primary必须要到t3时刻才能读取到{x: 1}的值;secondary1必须要到t5时刻才能读取到{x: 1}的值;secondary必须要到t6时刻。
实现方式
节点上维护多个x版本(类似于MVCC机制),MongoDB通过维护多个快照来链接不同的版本:
- 每个被大多数节点确认过的版本都将是一个快照;
- 快照持续到没有人使用为止才被删除。
测试readConcern: majority vs local
- 将复制集中的两个从节点使用
db.fsyncLock()
锁住写入(模拟同步延迟):rs0 [direct: secondary] test> db.fsyncLock()
- 主节点插入一条数据:
rs0 [primary] test> db.user.insert({ name: "hs"},{ writeConcern: { w: 1}})
- readConcern为local的查询结果,能查询到数据:
rs0 [primary] test> db.user.find().readConcern("local")[ { _id: ObjectId("66a9bc98765490df764272fc"), name: 'hs' } ]
- readConcern为majority的查询结果,查询不到数据:
rs0 [primary] test> db.user.find().readConcern("majority")rs0 [primary] test>
- 解除从节点锁定:
rs0 [direct: secondary] test> db.fsyncUnlock()
- 此时主节点再查询就能查询到数据了:
rs0 [primary] test> db.user.find().readConcern("majority")
readConcern: majority 与脏读
MongoDB中的回滚:
- 写操作到达大多数节点之前都是不安全的,一旦主节点崩溃,而从节点还没复制到该次操作,刚才的写操作就丢失了;
- 把一次写操作视为一个事务,从事务的角度,可以认为事务被回滚了。
所以从分布式系统的角度来看,事务的提交被提升到了分布式集群的多个节点级别的“提交”,而不再是单个节点上的“提交”。
在可能发生回滚的前提下考虑脏读问题:
- 如果在一次写操作到达大多数节点前读取了这个写操作,然后因为系统故障该操作回滚了,则发生了脏读问题;
- 所以使用
{readConcern: “majority”}
可以有效避免脏读。
如何安全的读写分离
考虑如下场景:
- 向主节点写入一条数据;
- 立即从从节点读取这条数据。
思考:如何保证自己能够读到刚刚写入的数据?
- 使用
{writeConcern: “majority”}
和{readConcern: “majority”}
可以解决。 - 下述方式有可能读不到刚写入的订单:
db.orders.insert({ oid:101,sku:"kite",q:1})db.orders.find({ oid:101}).readPref("secondary")
- 使用
{writeConcern: “majority”}
和{readConcern: “majority”}
:db.orders.insert({ oid:101,sku:"kite",q:1},{ writeConcern:{ w:"majority"}}) db.orders.find({ oid:101}).readPref("secondary").readConcern("majority")
linearizable
只读取大多数节点确认过的数据。和 majority 最大差别是保证绝对的操作线性顺序。
- 在写操作自然时间后面的发生的读,一定可以读到之前的写;
- 只对读取单个文档时有效;
- 可能导致非常慢的读,因此总是建议配合使用
maxTimeMS
。
snapshot
{readConcern: “snapshot”}
只在多文档事务中生效。
将一个事务的 readConcern 设置为 snapshot,将保证在事务中的读:
- 不出现脏读;
- 不出现不可重复读;
- 不出现幻读。
因为所有的读都将使用同一个快照,直到事务提交为止该快照才被释放。
小结
- available:读取所有可用的数据;
- local:读取所有可用且属于当前分片的数据,默认设置;
- majority:数据读一致性的充分保证,可能你最需要关注的;
- linearizable:增强处理 majority 情况下主节点失联时候的例外情况;
- snapshot:最高隔离级别,接近于关系型数据库的Serializable。
事务隔离级别
开启事务语句:
var session = db.getMongo().startSession()session.startTransaction({ readConcern: { level: "majority"}, writeConcern: { w: "majority"}})var coll = session.getDatabase("数据库名").getCollection("集合名")
事务完成前,事务外的操作对该事务所做的修改不可访问:
db.tx.insertMany([{ x: 1 }, { x: 2 }]) var session = db.getMongo().startSession()session.startTransaction()var coll = session.getDatabase("test").getCollection("tx")coll.updateOne({ x: 1}, { $set: { y: 1}})coll.findOne({ x: 1}) //{ x:1, y:1}db.tx.findOne({ x: 1}) //{ x:1}session.commitTransaction()
如果事务内使用{readConcern: “snapshot”}
,则可以达到可重复读 Repeatable Read。
事务超时机制
在执行事务的过程中,如果操作太多,或者存在一些长时间的等待,则可能会产生如下异常:
rs0 [direct: primary] test> session.commitTransaction()MongoServerError: Transaction with { txnNumber: 1 } has been aborted.
原因在于,默认情况下MongoDB会为每个事务设置1分钟的超时时间,如果在该时间内没有提交,就会强制将其终止。该超时时间可以通过transactionLifetimeLimitSecond
变量设定。
事务错误处理机制
- 两个事务修改同一个文档,晚修改的事务会报异常。异常 – 解决方案:重新开启事务即可;
- 如果一个事务已经开始修改一个文档,在事务以外尝试修改同一个文档,则事务以外的修改会等待事务完成才能继续进行。
测试
- 开启事务:
var session = db.getMongo().startSession()session.startTransaction({ readConcern: { level: "majority"}, writeConcern: { w: "majority"}})var coll = session.getDatabase('test').getCollection("tx")
- 执行修改语句:
coll.update({ _id: ObjectId("66a9bfdb765490df764272fd")},{ $set: { y: 2}})
- 在事务外执行:
db.tx.update({ _id: ObjectId("66a9bfdb765490df764272fd")},{ $set: { y: 3}})db.tx.find()coll.find()
- 提交事务:
session.commitTransaction()
- 如果把
int i = 1/0;
注释掉,则能看到正常事务提交的执行结果。
注意事项
- 可以实现和关系型数据库类似的事务场景;
- 必须使用与 MongoDB 4.2及以上兼容的驱动;
- 事务默认必须在60秒(可调)内完成,否则将被取消;
- 参与事务的分片不能使用仲裁节点;
- 事务会影响 chunk 迁移效率。正在迁移的 chunk 也可能造成事务提交失败(重试即可);
- 多文档事务中的读操作必须使用主节点读;
- readConcern 只应该在事务级别设置,不能设置在每次读写操作上。
SpringBoot整合Mongodb事务操作
编程式事务
import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import com.mongodb.client.ClientSession;import com.mongodb.client.MongoClient;import com.mongodb.client.MongoClients;import com.mongodb.client.MongoCollection;import com.mongodb.client.model.Filters;import com.mongodb.client.model.Updates;import org.bson.Document;import org.junit.jupiter.api.Test;
@Testpublic void updateTest() { // 连接复制集 MongoClient mongoClient = MongoClients.create("mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0"); // 获取两个collection集合对象 MongoCollection empCollection = mongoClient.getDatabase("test").getCollection("emp"); MongoCollection eventsCollection = mongoClient.getDatabase("test").getCollection("events"); // 事务操作配置 TransactionOptions transactionOptions = TransactionOptions.builder() .writeConcern(WriteConcern.MAJORITY) .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .build(); try (ClientSession clientSession = mongoClient.startSession()) { // 开启事务 clientSession.startTransaction(); try { empCollection.updateOne(clientSession, Filters.eq("_id", "66a9dbb8bb75fd99739d6aea"), Updates.set("name", "hushang")); int i = 1 / 0; eventsCollection.insertOne(clientSession, new Document("username", "hs1").append("status", new Document("new", "inactive").append("old", "Active"))); // 提交事务 clientSession.commitTransaction(); } catch (Exception e) { // 回滚事务 clientSession.abortTransaction(); } }}
- 进行测试,事务会正常回滚;如果把
int i = 1/0;
注释掉,则能看到正常事务提交的执行结果。
声明式事务
package com.hs.learn.config;import com.mongodb.ReadConcern;import com.mongodb.ReadPreference;import com.mongodb.TransactionOptions;import com.mongodb.WriteConcern;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.mongodb.MongoDatabaseFactory;import org.springframework.data.mongodb.MongoTransactionManager;
@Configurationpublic class MongodbConfig { @Bean public MongoTransactionManager transactionManager(MongoDatabaseFactory factory) { TransactionOptions transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.MAJORITY) .writeConcern(WriteConcern.MAJORITY) .build(); return new MongoTransactionManager(factory, transactionOptions); }}
编程测试service
package com.hs.learn.service;import com.hs.learn.entity.Employee;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.mongodb.core.MongoTemplate;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.Date;
@Service@Transactional(rollbackFor = Exception.class)public class EmployeeService { @Autowired private MongoTemplate mongoTemplate; public void addEmployee() { Employee employee1 = new Employee(1, "hushang1", 25, new Date(), "测试数据1"); Employee employee2 = new Employee(2, "hushang2", 25, new Date(), "测试数据2"); mongoTemplate.insert(employee1); int i = 1 / 0; mongoTemplate.insert(employee2); }}
测试
@Autowiredprivate EmployeeService employeeService;@Testpublic void test() { employeeService.addEmployee();}
发表评论
最新留言
关于作者
