Elasticsearch的乐观并发控制


0 前言


Elasticsearch 采用的是乐观锁并发控制,基于 Elasticsearch 提供的 _version 字段或我们自己定义的外部版本号,可以方便地实现乐观并发控制。

扩展阅读:并发控制(乐观锁、悲观锁)


1 基于 _version 的乐观并发控制


1.1 内部版本号(_version)

_version 是 ES 中提供的一个字段,它表明当前数据的版本号。我们可根据它来对比数据的版本是否一致。

1.2 操作实例

(1)构造一条数据。

PUT test_index/test_type/7
{
  "test_field":"test test"
}

(2)模拟两个客户端,获取同一条数据。

GET test_index/test_type/7

执行结果:两个客户端都能读到 id 为 7 的数据,版本号为 1 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test test"
  }
}

(3)一个客户端先更新数据,同时带上数据的版本号(?version=1)。确保 ES 中的数据版本号与客户端中的数据版本号是相同的才能修改。

PUT test_index/test_type/7?version=1
{
  "test_field":"test client 1"
}

执行结果:修改成功,数据的版本号变为 2 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

(4)另一个客户端尝试基于 version=1 的数据进行修改,同样带上版本号(?version=1)。

PUT test_index/test_type/7?version=1
{
  "test_field":"test client 2"
}

执行结果:修改失败,因为版本号不一致。

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "3",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "3",
    "index": "test_index"
  },
  "status": 409
}

(5)在乐观锁成功阻止并发问题之后,尝试正确地完成更新。

GET /test_index/test_type/7

执行结果:版本号为 2 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}

(6)基于最新的数据和版本号进行修改,只有版本号一致才能修改数据。因为在多线程并发更新同一条数据很频繁的情况下,数据的版本号可能会一直变动,所以这个步骤可能会需要反复执行多次,直至版本号一致才能成功。

PUT /test_index/test_type/7?version=2 
{
  "test_field": "test client 2"
}

执行结果:更新成功,因为版本号一致。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "7",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}


2 基于 external version 的乐观并发控制


2.1 外部版本号(external version)

外部版本号( external version):我们可以不采用 ES 内部提供的 _version 版本号进行并发控制,可以基于自己维护的版本号进行并发控制。

?version=1                          //ES内部版本号
?version=1&version_type=external    //外部版本号
  • 在 _version 下,只有当你提供的 version 与 ES 中的 _version 一模一样的时候才可以进行修改,否则就报错。

  • 在 version_type=external 的情况下,只有当你提供的 version 比 ES 中的 _version 大的时候(等于也不行)才能进行修改。

也就是,

  • ES 中 _version=1 ,只有 ?version=1 ,数据才能更新成功;

  • ES 中 _version=1 ,只要 ?version>1&version_type=external ,数据就能更新成功,如 ?version=2&version_type=external ,因为 2 大于 1 。

2.2 操作实例

(1)构造一条数据。

PUT /test_index/test_type/8
{
  "test_field": "test"
}

执行结果:id 为 8 的数据版本号为 1 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

(2)模拟两个客户端,同时查询数据。

GET /test_index/test_type/8

执行结果:注意版本号为 1 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "test"
  }
}

(3)第一个客户端先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如 2 。

PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 1"
}

执行结果:更新成功,因为外部版本号(2)大于 _version (1)。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}

(4)模拟第二个客户端,其外部版本号也为 2 ,同时基于 version=2 发起修改。

PUT /test_index/test_type/8?version=2&version_type=external
{
  "test_field": "test client 2"
}

执行结果:更新失败,因为版本号(2)与 _version (2)相等。

{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
        "shard": "1",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
    "index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
    "shard": "1",
    "index": "test_index"
  },
  "status": 409
}

(5)获取最新版本号。

GET /test_index/test_type/8

执行结果:版本号为 2 。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 2,
  "found": true,
  "_source": {
    "test_field": "test client 1"
  }
}

(6)在并发控制成功后,重新基于最新的版本号发起更新。

PUT /test_index/test_type/8?version=3&version_type=external
{
  "test_field": "test client 2"
}

执行结果:更新成功,因为版本号(3)大于 _version (2)。

{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "8",
  "_version": 3,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": false
}


3 结语


以上就是 ES 中实现乐观并发控制的思路,我们围绕版本号来控制并发操作,以保证数据安全。版本号可以是 ES 内部提供的 _version 字段,也可以是我们自己维护的外部版本号。


参考资料:http://www.roncoo.com/article/detail/128304


 
comments powered by Disqus