TypeScript icon, indicating that this package has built-in type declarations

2.10.1 • Public • Published


support the nats message broker for CAP NodeJS runtime.

npm node-test node-lint codecov

Quality Gate Status Security Rating Technical Debt

Get Started

install dependency

npm i -S cds-nats

and ref the Process Environment document to configure the Nats Connection-Options


Nats Messaging Service

Use Nats as a message broker

Queue and Subscription

Different from than the default behavior of CAP Messaging Service, cds-nats will prefer to listen event on Queue Group instead of general Subscription, because in micro-service architecture, in most case we need the Queue/Consumer Group each message only be consumed by single service instance.

graph LR

Producer_1[Application A, Instance 1] -->|Message 1| Nats
Producer_2[Application A, Instance 2] -->|Message 2| Nats
Producer_3[Application B, Instance 1] -->|Message 3| Nats

Nats(Nats: Queue XXXX)

Nats -->|Message 3| Consumer_1[Application C, Instance 1]
Nats -->|Message 1| Consumer_2[Application C, Instance 2]
Nats -->|Message 2| Consumer_3[Application C, Instance 3]

Also, if you want to apply the Publisher/Subscriber - Broadcast pattern, just add the @topic annotation to the event (the @queue annotation is also supported).

graph LR

Publisher_1[Application A, Instance 1] -->|Message 1| Nats
Publisher_2[Application A, Instance 2] -->|Message 2| Nats
Publisher_3[Application B, Instance 1] -->|Message 3| Nats

Nats(Nats: Topic YYYY)

Nats -->|Message 1, Message 2, Message 3| Subscriber_1[Application C, Instance 1]
Nats -->|Message 1, Message 2, Message 3| Subscriber_2[Application C, Instance 2]
Nats -->|Message 1, Message 2, Message 3| Subscriber_3[Application D, Instance 1]
service PeopleService {

  // queue group event, producer/consumer exclusive consume
  // @queue: 'queueName' annotation is also supported
  event changeAmount {
    peopleID : UUID;
    amount   : Decimal;

  // subscription event, publisher/subscriber broadcast consume
  @topic : ''
  event updateName {
    peopleID : UUID;
    Name     : String;
    Age      : Integer;

  @topic : ''
  event updateAge {
    peopleID : UUID;
    Name     : String;
    Age      : Integer;




  "cds": {
    "requires": {
      "messaging": {
        "kind": "nats"
      "nats": {
        "impl": "cds-nats"

Nats KV Service

Use Nats as a KV store

This is an experimental feature of Nats, you MUST enable the jetstream feature in nats server

const kv = await"kv") as NatsKVService;

const id = cds.utils.uuid()
const v = await kv.get(id)

expect(await kv.keys()).toHaveLength(0)

await kv.set(id, "v1")

expect(await kv.keys()).toHaveLength(1)

expect(await kv.get(id)).toBe("v1")
await kv.set(id, "v2")
expect(await kv.get(id)).toBe("v2")
await kv.remove(id)
expect(await kv.get(id)).toBeNull()
expect(await kv.keys()).toHaveLength(0)

expect(await kv.get("k3", () => "v4")).toBe("v4")
expect(await kv.get("k3")).toBe("v4")

await kv.removeAll()


configure the kv service

  "cds": {
    "requires": {
      "kv": {
        "kind": "nats-kv",
        "ttl": 100
      "kv5000": {
        "kind": "nats-kv",
        "ttl": 5000
      "nats-kv": {
        "impl": "cds-nats/lib/NatsKVService"
  • ttl, the maximum validity for each key, in milliseconds.

Nats Distributed Lock Service

Use Nats as a distributed lock service

This is an experimental feature of Nats, you MUST enable the jetstream feature in nats server


  "cds": {
    "requires": {
      "lock": {
        "kind": "nats-lock",
        "check": {
          "interval": 10
        "lock": {
          "acquire": 10000
      "nats-lock": {
        "impl": "cds-nats/lib/NatsLockService"
  • check.interval: NatsLockService check lock in polling mode, so this is the check interval
  • lock.acquire: if the lock of target resource could be acquired immediately, NatsLockService will pending, if the target lock could not be acquired in specific timeout duration, NatsLockService will throw error to avoid to long time pending
  • lock.timeout: the maximum timeout for single lock, if a resource is locked too long time, client will force acquire it, the defualt value is 1 HOUR

Nats RFC Service

Use Nats as a RFC communication tool

To use the NatsRFCService, MUST enable Nats Messaging Service firstly

graph LR

ServiceA[Service A Instance 1]
ServiceA -->|1. Run CQN XXX| Nats
Nats(Nats Queue Group - ServiceC)
Nats -->|2. RUN CQN XXX| ServiceC
ServiceC -->|3. Execution Result| Nats
Nats -->|4. Execution Result| ServiceA
ServiceC[Service C instance 1]


const cds = cwdRequireCDS()
const { INSERT } = cds.ql
const messaging = await"rfc") as NatsRFCService
const remoteApp ="demo-app-micro-service");
const remotePeopleService = remoteApp.service("")
const newPeople = await
  INSERT.into("People").entries({ Name: cds.utils.uuid() })
const updatedPeople = await remotePeopleService.updateWeight(newPeople.ID, 12.3)

await expect(() => remotePeopleService.notExistFunction())
  .toThrow("method/action/function 'notExistFunction' is not existed on the service ''")


  "cds": {
    "requires": {
      "messaging": {
        "kind": "nats"
      "rfc": {
        "kind": "nats-rfc",
        "app": {
          "name": "demo-app-micro-service"
        "invoke": {
          "timeout": 180000
      "nats": {
        "impl": "cds-nats"
      "nats-rfc": {
        "impl": "cds-nats/lib/NatsRFCService"
  • - the app name of current application, its an identifier which will be used for RFC
  • app.timeout - the timeout of each invocation, for example, if remote server do not respond result in 3 minutes, NatsRFCService will stop waiting and throw error


  • [x] Nats Messaging Service
    • [x] Pub/Sub
      • [ ] complex test case
    • [x] Produce/Consume
      • [x] basic support and test case
    • [x] tenant aware
      • [x] tenant recover
      • [x] user recover
        • [x] user-attr recover
    • [ ] messaging
      • [ ] srv.on
      • [x] srv.emit
    • [ ] Outbox enable
    • [ ] Nats options documentation
  • [x] Nats KV Store
    • [ ] tenant aware
    • [x] get
      • [x] get with provider
    • [x] set
    • [x] delete
    • [ ] Nats options documentation
  • [x] Nats Distributed Lock Service
    • [ ] tenant aware
    • [x] 100 values test
    • [x] acquire timeout
    • [x] lock timeout (dead lock)
    • [x] synchronized method (high level API)
    • [ ] Nats options documentation
  • [x] Nats RFC Service
    • [x] tenant aware
    • [x] OData Service query
    • [x] OData Unbounded Function/Action
    • [ ] Rest Adapter operation
    • [x] Error handler
    • [ ] Demo Micro Service

Compatibility Table

@sap/cds version cds-mysql version
5.x 2.9.x
6.x 2.10.x



Dependencies (3)

Dev Dependencies (11)

Package Sidebar


npm i cds-nats

Weekly Downloads






Unpacked Size

712 kB

Total Files


Last publish


  • suntao