Schema Registry
What is a Schema Registry ?
According to the confluent doc (https://docs.confluent.io/current/schema-registry/index.html), a schema registry will store a versioned history of your avro schema.
How it's work ?
You should have a least a producer and a consumer using kafka. When you produce a message, you should encode it using avro.
When you produce a message using an avro schema for the first time it should be post to the subject in the schema registry. You'll get an uniq id if this is your first schema for this subject or if the new schema is compatible with the old one (see https://docs.confluent.io/current/schema-registry/avro.html#compatibility-types)
When encoding your message especially with a schema registry, you'll need to insert before it a byte called magic byte
and the id of the schema from the schema registry as an integer (4 bytes).
You'll have :
Description | Bytes |
---|---|
Magic Number | N |
Schema Id | 0 |
Schema Id | 0 |
Schema Id | 0 |
Schema Id | 12 |
Your encoded message | ... |
Your encoded message | ... |
Your encoded message | ... |
... | ... |
When consuming the message coming from kafka in your consumer, you'll need to read the schema id
first. For example you get the id 12
(Like the schema abov).
Then you can get the schema which encode the message by it's id from the schema registry.
All routes to the schema registry are documented in the confluent doc : https://docs.confluent.io/current/schema-registry/develop/api.html
Environment config
Access to the schema registry using Basic HTTP auth
-
SCHEMA_REGISTRY_URL
: Url of the schema registry. Default :http://schema-registry:8081
-
SCHEMA_REGISTRY_USERNAME
: Username for Basic HTTP auth. Default :none
-
SCHEMA_REGISTRY_PASSWORD
: Password for Basic HTTP auth. Default :none
How to use it ?
Installation
npm install @nimley/avro-schema-registry --save
Decode your message
From your consumer :
SchemaRegistry = require('@nimley/avro-schema-registry');
...
# Consumer function
const onMessage = async ({ topic, partition, message }) => {
try {
const mess = await SchemaRegistry.decode(message.value)
} catch(e) {
console.log(e)
}
}
Encode your message
From your producer :
SchemaRegistry = require('@nimley/avro-schema-registry');
...
try {
const buffer = await SchemaRegistry.encode(entity, schema, data);
# send 'buffer' variable in kafka
} catch (e) {
console.log(e)
}
entity
: Subject name in the schema registry
schema
: Avro schema. Use require('../avro/entity}')
to get the local file
data
: Data to encode