pulsar相比于其他的mq产品, 支持了schema管理, 相比于其他MQ产品, 有很强的竞争力.

pulsar schema 主要支持 json、avro 和 protobuf, 还支持keyvalue 和 常见的基础类型. 定义了 Schema 接口统一了 decode encode行为. 通过SchemaInfo将schema信息传递服务端. (参考Commands#newSend Commands#newSubscribe Commands#newGetSchemaResponse)

admin平台: SchemasImpl: getSchemaInfo:

producer:

consumer: 启动的时候, 会发起 subcribe 交互协议. client 在 subscribe 协议中添加了 schema 参数, broker 接收到 subscribe 请求后, 会从 schemaStorage 获取 topic(如果是partition topic: my-topic-partition-1, 返回的是 my-topic) 最新版本的数据 检查schema 的兼容性,不兼容的情况下, 会返回错误. 不存在, 就会将 schema 存储到 schemaStorage 中。 检查兼容性需要满足一下任何一个条件:

  1. 原来有schema
  2. 有producer或者consuer在使用,
  3. 有发送过消息 (PersistentTopic检查ledger, NonPersistentTopic是检查本地记录) 否则, 都是添加schema

兼容性检查原理: 有两种schema检查方式, 满足任意一个就可以:

  1. 通过比较 schema data 的hash值
  2. 针对性比较: avro的兼容性通过 avro.SchemaValidator 进行. json区分两种版本, 都是老版本, 通过比较 schema 的id相等性. 如果一方使用新版本(avro), 那么, 兼容.
  3. 如果没有设置, 都是兼容的

如何做到版本区分: 存储的时候, 如果有历史版本, 会检查 schema 的版本兼容性,

        SchemaStorageFormat.IndexEntry emptyIndex = SchemaStorageFormat.IndexEntry.newBuilder()
                        .setVersion(0)
                        .setHash(copyFrom(hash))
                        .setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
                                .setEntryId(-1L)
                                .setLedgerId(-1L)
                        ).build();
newSubscribe 
ClientCnx#handleGetSchemaResponse: 

客户端:

服务器端:

schenma官方文档: https://pulsar.apache.org/docs/zh-CN/next/concepts-schema-registry/

schema 需要兼容, 不兼容的就不接受了 schema存储在 bookkeeper 上 client 是感知到 schema 的,producer java通过泛型支持 基本类型 avro pb 类型支持

优先支持 pb avro thrift

pulsar 客户端行为 broker存储 我们公司的实现

reference:

  1. pulsar schema