One minute
Pulsar Schema
pulsar相比于其他的mq产品, 支持了schema管理, 相比于其他MQ产品, 有很强的竞争力.
pulsar schema 主要支持 json、avro 和 protobuf, 还支持keyvalue 和 常见的基础类型. 定义了 Schema 接口统一了 decode encode行为. 通过SchemaInfo将schema信息传递服务端. (参考Commands#newSend Commands#newSubscribe Commands#newGetSchemaResponse)
admin平台: SchemasImpl: getSchemaInfo:
producer:
consumer: 启动的时候, 会发起 subcribe 交互协议. client 在 subscribe 协议中添加了 schema 参数, broker 接收到 subscribe 请求后, 会从 schemaStorage 获取 topic(如果是partition topic: my-topic-partition-1, 返回的是 my-topic) 最新版本的数据 检查schema 的兼容性,不兼容的情况下, 会返回错误. 不存在, 就会将 schema 存储到 schemaStorage 中。 检查兼容性需要满足一下任何一个条件:
- 原来有schema
- 有producer或者consuer在使用,
- 有发送过消息 (PersistentTopic检查ledger, NonPersistentTopic是检查本地记录) 否则, 都是添加schema
兼容性检查原理: 有两种schema检查方式, 满足任意一个就可以:
- 通过比较 schema data 的hash值
- 针对性比较: avro的兼容性通过 avro.SchemaValidator 进行. json区分两种版本, 都是老版本, 通过比较 schema 的id相等性. 如果一方使用新版本(avro), 那么, 兼容.
- 如果没有设置, 都是兼容的
如何做到版本区分: 存储的时候, 如果有历史版本, 会检查 schema 的版本兼容性,
SchemaStorageFormat.IndexEntry emptyIndex = SchemaStorageFormat.IndexEntry.newBuilder()
.setVersion(0)
.setHash(copyFrom(hash))
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
.setEntryId(-1L)
.setLedgerId(-1L)
).build();
newSubscribe
ClientCnx#handleGetSchemaResponse:
客户端:
服务器端:
schenma官方文档: https://pulsar.apache.org/docs/zh-CN/next/concepts-schema-registry/
schema 需要兼容, 不兼容的就不接受了 schema存储在 bookkeeper 上 client 是感知到 schema 的,producer java通过泛型支持 基本类型 avro pb 类型支持
优先支持 pb avro thrift
pulsar 客户端行为 broker存储 我们公司的实现
reference:
136 Words
2019-09-20 11:51 +0800