avro-registry-client

0.2.1 • Public • Published

avro-registry-client

A node client to interact with the avro registry. There are methods for updating your subjects and schemas, but also some convenience methods for encoding and decoding.

Installation

npm install avro-registry-client

Prerequisites

If you're planning on using the kafka consuming/producing functionality, you'll want to install kafka-node as well.

You should have a local copy of the confluent avro schema registry running, which means you also need kafka and maybe zookepper running locally. More documentation on how to get that running forthcoming...sometime.

If you plan on using the .decode() method, take special note of some of the caveats described in the description section as it may affect how you need to restrict schemas in your schema registry.

tl;dr All your your schemas should be unique to each subject/version. This means that you should only be using record types and you should have a name/namespace property that is unique to each subject.

Usage

const kafka = require('kafka-node')

const MY_TOPIC = 'my-topic'
const kafkaProducerClient = new kafka.KafkaClient({
  kafkaHost: 'localhost:9092'
})
const kafkaConsumerClient = new kafka.KafkaClient({
  kafkaHost: 'localhost:9092'
})
const kafkaProducer = new kafka.Producer(kafkaProducerClient)
const kafkaConsumer = new kafka.Consumer(
  kafkaConsumerClient,
  [{ topic: MY_TOPIC }],
  { encoding: 'buffer', keyEncoding: 'buffer' }
)

//
;(async function run() {
  const registry = require('avro-registry-client')('http://localhost:8081')
  await registry.createSubjectVersion('MySubject', {
    name: 'MySubject',
    type: 'record',
    fields: [
      {
        name: 'field1',
        type: 'string'
      }
    ]
  })
  await registry.createSubjectVersion('MySubject', {
    name: 'MySubjectId',
    type: 'record',
    fields: [
      {
        name: 'id',
        type: 'string'
      }
    ]
  })
  const mySubjectProducer = registry.createProducer(kafkaProducer, MY_TOPIC, {
    valueSubject: { subject: 'MySubject', version: 'latest' },
    keySubject: { subject: 'MySubjectId', version: 'latest' }
  })
  registry
    .createConsumer(kafkaConsumer, [
      {
        valueSubject: { subject: 'MySubject', version: 'latest' },
        keySubject: { subject: 'MySubjectId', version: 'latest' },
        handler({ key, value, message }) {
          console.log(key) // { id: '123' }
          console.log(value) // { field1: 'my field value' }
          console.log(message) // This is the raw message from kafka-node .on('message')
        }
      }
    ])
    .listen(err => {
      // This gets called if there is an error handling the message
    })
})()

API

createAvroRegistryClient (registryHost) (default export)

Creates a registry client whose methods are described below.

Parameters

  • registryHost (String) - The host of the avro registry

Example

const createAvroRegistryClient = require('avro-registry-client')

const registry = createAvroRegistryClient('http://localhost:8081')

createAvroRegistryClient.COMPATIBILITY_TYPES

An object whose properties are the allowed compatibility types when setting compatibility. Note that you don't have to use this object to put into the compatibility methods' argument. You can just use the raw string value. This object is just helpful for enumeration purposes.

Example

const createAvroRegistryClient = require('avro-registry-client')

const { COMPATIBILITY_TYPES } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')

registry.setGlobalCompatibility(COMPATIBILITY_TYPES.FULL).then(() => {})

createAvroRegistryClient.errors

An object whose values are the known errors that this library will produce when parsing errors from the schema registry responses.

Example

const createAvroRegistryClient = require('avro-registry-client')

const { errors } = createAvroRegistryClient
const registry = createAvroRegistryClient('http://localhost:8081')
const invalidSchema = {
  // no name property
  type: 'record',
  fields: [
    {
      name: 'field1',
      type: 'string'
    }
  ]
}

console.log(errors)
/**
 * { RouteNotFoundError: [Function: RouteNotFoundError],
 *   SubjectNotFoundError: [Function: SubjectNotFoundError],
 *   VersionNotFoundError: [Function: VersionNotFoundError],
 *   SchemaNotFoundError: [Function: SchemaNotFoundError],
 *   IncompatibleSchemaError: [Function: IncompatibleSchemaError],
 *   InvalidSchemaError: [Function: InvalidSchemaError],
 *   InvalidVersionError: [Function: InvalidVersionError],
 *   InvalidCompatibilityLevelError: [Function: InvalidCompatibilityLevelError] }
 */

registry.createSubjectVersion('MyInvalidSchema', invalidSchema).catch(err => {
  if (err instanceof errors.InvalidSchemaError) {
    // do something with it
  }
})

registry.createProducer(kafkaProducer, topic, options)

Creates a Kafka Producer that will encode the key and value with avro.

Parameters

  • kafkaProducer (kafka-node.Producer) - an instance of kafka-node.Producer
  • topic (String) - The topic this producer will publish to
  • options (Object) - An object that contains the keySubject and valueSubject definitions. See example for detailed usage.

Returns

  • (key: any, value: any, sendOptions: Object) => void - Returns a function that takes in three arguments: key, value, and sendOptions.

Examples

const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaProducer = new kafka.Producer(kafkaClient)

const producer = createProducer(kafkaProducer, 'my-topic', {
  valueSubject: { subject: 'MySubject', version: 1 }, // Or pin to a specific version
  keySubject: { subject: 'MySubjectId', version: 'latest' }
})
// You can also set the options this way:
const producer2 = createProducer(kafkaProducer, 'my-topic')
  .valueSubject('MySubject', 1)
  .keySubject('MySubjectId') // You may omit the second argument if you just want 'latest'

// Same api for producer2
producer({ id: '123' }, { field1: 'body' }).then(() => {
  console.log('sent!')
})

registry.createConsumer(kafkaConsumer, messageTypes)

Creates a Kafka Consumer that will decode messages with avro.

Parameters

  • kafkaConsumer (kafka-node.Consumer|kafka-node.ConsumerGroup) - An instance of a kafka-node consumer. Can be a Consumer or ConsumerGroup. Note that when you configure the consumer, you will have already told it which topics to listen to. Also note that the consumer must be configured to handle the keys and values as buffers. See Example for details.
  • messageTypes (Object[]) - An array of message types that this consumer will be able to handle. Message Type definitions consist of a valueSubject, keySubject, and a handler. The reason this is an array is because it is possible to receive multiple message types on a single topic. For more details, see the note under registry.decode (possibleSchemas, message).

Example

const kafka = require('kafka-node')
const kafkaClient = new kafka.KafkaClient({ kafkaHost: 'localhost:9092' })
const kafkaConsumer = new kafka.Producer(kafkaClient, [{ topic: 'my-topic' }], {
  encoding: 'buffer',
  keyEncoding: 'buffer'
})

registry
  .createConsumer(kafkaConsumer, [{
    valueSubject: { version: 'MySubject', version: 1 },
    keySubject: { version: 'MySubjectId', version: 'latest' },
    handler(({ key, value, message }) {
      // The `key` is the avro decoded key
      // The `value` is the avro decoded value
      // The `message` is the raw message from kafka-node .on('message') handler
    })
  }])
  .listen((err) => {
    // This will be called on any error that occurs when there is an error
    // handling the message. This will also get called if a message comes
    // through that you have not setup a message type handler for.
  })

// You can also handle passing in message types this way:
registry
  .createConsumer(kafkaConsumer)
  .messageType({
    valueSubject: { version: 'MySubject', version: 1 },
    keySubject: { version: 'MySubjectId', version: 'latest' },
    handler({ key, value, message }) {}
  })
  .listen((err) => {})

// Or this way
registry
  .createConsumer(kafkaConsumer)
  .messageType()
  .valueSubject('MySubject', 1)
  .keySubject('MySubjectId') // No need to pass the second argument if you intend to use the latest version
  .handler(({ key, value, message }) => {})
  // Calling this breaks out of the messageType definition. Calling .messageType() also does this
  .listen((err) => {})

.listen() returns the result of calling kafkaConsumer.on('message'), so it just returns the kafkaConsumer so you can listen for error messages and the like.

registry.encode (subject, version, value)

Encodes a value with the schema that has the given subject and subject version.

Parameters

  • subject (String) - The subject of the schema to encode with
  • version (Number|String) - The version of the subject to encode with. Must be a number or the string 'latest'

Returns

  • Promise< Buffer > - The encoded message

Example

registry.encode('MySchema', 'latest', { field1: 'foobar' }).then(response => {
  console.log(response) // <Buffer 00 00 00 00 01 ...>
})

registry.decode (possibleSchemas, message)

Decodes a value with the one of the given subjects defined in the handlerMap.

NOTE!!!!

The reason that you pass in an object (possibleSchemas) as the first argument is because it is possible to send multiple "Subjects" of messages down a single pipe, but the avro schema registry encoding only encodes the schema ID into the message, which by itself isn't enough information to map it to a handler for a specific subject. The possibleSchemas is kind of a "seed" to help the decoder figure out which subject it is mapped to so you can handle them differently.

For example:

You're sending PersonUpdated and PersonCreated encoded messages down a People kafka topic, and you want different behavior depending on whether you get a PersonCreated event or a PersonUpdated event. The messages themselves only have a schema ID encoded in them, which you can't use to lookup which subject the message was intended to be written with (limitations of the schema registry). We can get around that because there is a way to lookup to see if a schema (which we can get by the schema ID) exists in a given subject. The problem with this is that the schema registry allows a schema (with a schema ID) to belong to more than one subject/version, so it would be possible to map the message to the wrong subject.

For the above reason, this library is really only compatible with schema registries in which unique schemas are published to schema versions. With care, this can be done by following these tenets:

  • Put the "name" and "namespace" properties on the top-level of your schemas that together are unique to the "Subject".
  • Only use "record" types. Simple types in the top level of schemas (such as "string") don't use the "name" and "namespace" properties to calculate the "uniqueness" of a schema, for whatever reason. You can get around this by adding an unuses property, like "x-id" or something with a unique value, then that will be used to calculate the "uniqueness" of a schema.

Parameters

  • handlerMap (Object.<String, Number|String>)
  • message (Buffer) - The message to decode.

Returns

  • Promise< Any > - The decoded value

Example

// message being something we got from kafka or wherever
registry.decode({ MySchema: 'latest' }, message).then(response => {
  console.log(response)
  /**
   * { subject: 'MySchema',
   *   value: MySchema { field1: 'test' } }
   */
})

registry.primeSubjectByVersion (subject, version)

Parameters

  • subject (String) - The subject to prime
  • version (Number|String) - The version to prime

Returns

  • Promise<null>

Example

registry.primeSubjectByVersion('MySubject', 1).catch(err => {
  console.log('Priming failed', err)
})

registry.primeSubjectByAllVersions (subject)

Parameters

  • subject (String) - The subject to prime

Returns

  • Promise<null>

Example

registry.primeSubjectByAllVersions('MySubject').catch(err => {
  console.log('Priming failed', err)
})

registry.getSchemaById (id)

Get a schema by its specific id (unique to all schemas within a registry)

Parameters

  • id (Number) - The id of the schema to fetch

Returns

  • Promise< { schema: String } > - An object with a "schema" property which is the stringified JSON object of the schema.

Example

registry.getSchemaById(1).then(response => {
  console.log(response) // { schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}' }
})

registry.listSubjects ()

Lists all available subjects in the registry

Returns

  • Promise< [ String ] > - An array of subject names

Example

registry.listSubjects().then(response => {
  console.log(response) // [ 'MySubject' ]
})

registry.listSubjectVersions (subject)

Lists the version numbers of the versions under a given subject

Parameters

  • subject (String) - The subject whose versions we want to check

Returns

  • Promise< [ Number ] > - An array of version numbers

Example

registry.listSubjectVersions('MySubject').then(response => {
  console.log(response) // [ 1 ]
})

registry.deleteSubject (subject)

Deletes all of the versions from a subject

Parameters

  • subject (String) - The subject to delete a version from

Returns

  • Promise< [ Number ] > - An array of version numbers that were deleted

Example

registry.deleteSubject('MySubject').then(response => {
  console.log(response) // [ 1 ]
})

registry.getSubjectByVersion (subject, version)

Get a schema by its subject and version number

Parameters

  • subject (String) - The subject to fetch
  • version (Number|String) - The specific version number to fetch, or 'latest' if you want the latest version. There is another method to fetch the latest which just calls this function with 'latest' as the version parameter

Returns

  • Promise< { subject: String, version: Number, id: Number, schema: String } > - An object with the properties describing the schema.

Example

registry.getSubjectByVersion('MySubject', 1).then(response => {
  console.log(response)
  /**
   * {
   *   subject: 'MySubject',
   *   version: 1,
   *   id: 1,
   *   schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
   * }
   */
})

registry.createSubjectVersion (subject, schema)

Registers a new version of the given subject

Parameters

  • subject (String) - The subject to add the schema version to
  • schema (Object) - The avro compliant schema to push up

Returns

  • Promise< { id: Number } > - An object with the unique schema id as the id property.

Example

const schema = {
  name: 'MySubject',
  type: 'record',
  fields: [
    {
      name: 'field1',
      type: 'string'
    }
  ]
}
registry.createSubjectVersion('MySubject', schema).then(response => {
  console.log(response) // { id: 1 }
})

registry.checkSchemaBySubject (subject, schema)

Checks to see if a schema exists in a subject

Parameters

  • subject (String) - The subject to check
  • schema (Object) - The avro compliant schema to check

Returns

  • Promise< { subject: String, version: Number, id: Number, schema: String } >

Example

const schema = {
  name: 'MySubject',
  type: 'record',
  fields: [
    {
      name: 'field1',
      type: 'string'
    }
  ]
}
registry.checkSchemaBySubject('MySubject', schema).then(response => {
  console.log(response)
  /**
   * {
   *   subject: 'MySubject',
   *   version: 1,
   *   id: 1,
   *   schema: '{"name":"MySubject","type":"record","fields":[{"name":"field1","type":"string"}]}'
   * }
   */
})

registry.deleteSubjectVersion (subject, version)

Deletes a version out of the given subject

Parameters

  • subject (String) - The subject to delete a version from
  • version (Number|String) - The specific version number to delete

Returns

  • Promise< Number > - The number is the version number that was deleted

Example

registry.deleteSubjectVersion('MySubject', 1).then(response => {
  console.log(response) // 1
})

registry.subjectVersionCompatibilityWithSchema (subject, version, schema)

Checks to see if a given schema is compatible with the schema at the given subject/version.

Parameters

  • subject (String) - The subject to delete a version from
  • version (Number|String) - The specific version number to delete
  • schema (Object) - The schema to check

Returns

  • Promise< { is_compatible: Boolean } >

Example

registry
  .subjectVersionCompatibilityWithSchema('MySchema', 1, {
    name: 'MySchema',
    type: 'record',
    fields: [
      {
        name: 'field2',
        type: 'string'
      }
    ]
  })
  .then(response => {
    console.log(response) // { is_compatible: false }
  })

registry.setGlobalCompatibility (compatibility)

Sets the default global compatibility level for all subjects (except for the ones that have their own set)

Parameters

  • compatibility (String) - Has to be 'NONE', 'BACKWARD', 'FORWARD', or 'FULL'

Returns

  • Promise< { compatibility: String } >

Example

registry.setGlobalCompatibility('FULL').then(response => {
  console.log(response) // { compatibility: 'FULL' }
})

registry.getGlobalCompatibility ()

Sets the default global compatibility level for all subjects (except for the ones that have their own set)

Returns

  • Promise< { compatibility: String } >

Example

registry.getGlobalCompatibility().then(response => {
  console.log(response) // { compatibility: 'FULL' }
})

registry.setSubjectCompatibility (subject, compatibility)

Sets the compatibility level for a specific subject

Parameters

  • compatibility (String) - Has to be 'NONE', 'BACKWARD', 'FORWARD', or 'FULL'

Returns

  • Promise< { compatibility: String } >

Example

registry.setSubjectCompatibility('MySubject', 'FULL').then(response => {
  console.log(response) // { compatibility: 'FULL' }
})

registry.getSubjectCompatibility (subject)

Sets the compatibility level for a specific subject

Returns

  • Promise< { compatibility: String } >

Example

registry.getSubjectCompatibility('MySubject').then(response => {
  console.log(response) // { compatibility: 'FULL' }
})

Readme

Keywords

none

Package Sidebar

Install

npm i avro-registry-client

Weekly Downloads

6

Version

0.2.1

License

Apache-2.0

Unpacked Size

52.9 kB

Total Files

13

Last publish

Collaborators

  • jantaylor
  • ksmithut
  • mjgyesme