@castore/in-memory-message-queue-adapter
TypeScript icon, indicating that this package has built-in type declarations

1.25.3 • Public • Published

In Memory Message Queue Adapter

DRY Castore MessageQueue definition using FastQ.

📥 Installation

# npm
npm install @castore/in-memory-message-queue-adapter

# yarn
yarn add @castore/in-memory-message-queue-adapter

This package has @castore/core as peer dependency, so you will have to install it as well:

# npm
npm install @castore/core

# yarn
yarn add @castore/core

👩‍💻 Usage

The simplest way to use this adapter is to use the attachTo static method:

import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';

const messageQueueAdapter =
  InMemoryMessageQueueAdapter.attachTo(appMessageQueue);

This will make your messageQueueAdapter inherit from your appMessageQueue types while plugging them together 🙌

You can also instanciate one on its own, but notice the code duplication:

import type { MessageQueueMessage } from '@castore/core';
import { InMemoryMessageQueueAdapter } from '@castore/in-memory-message-queue-adapter';

const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>();

appMessageQueue.messageQueueAdapter = messageQueueAdapter;

🤖 Set worker

You can provide an async worker for the queue at construction time, or in context later:

const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(
  appMessageQueue,
  {
    worker: async message => {
      // 🙌 Correctly typed!
      const { eventStoreId, event } = message;
    },
  },
);

// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>({
  worker: async message => {
    // 🙌 Correctly typed!
    const { eventStoreId, event } = message;
  },
});

// 👇 Also alternatively
messageQueueAdapter.worker = async message => {
  // 🙌 Correctly typed!
  const { eventStoreId, event } = message;
};

Only one worker at a time can be set up

For more control, the worker has access to more context through its second argument:

messageQueueAdapter.worker = async (message, context) => {
  const { eventStoreId, event } = message;
  const {
    // 👇 See "Retry policy" section below
    attempt,
    retryAttemptsLeft,
    // 👇 If event is replayed
    replay,
  } = context;

  ...
};

♻️ Retry policy

This adapter will retry failed messages handling. You can specify a different retry policy than the default one via its constructor arguments:

  • retryAttempts (?number = 2): The maximum number of retry attempts for a message in case of worker execution failure. If all the retries fail, the error is logged with console.error, and the message ignored.
  • retryDelayInMs (?number = 30000): The minimum delay in milliseconds between the worker execution failure and its first retry.
  • retryBackoffRate (?number = 2): A factor applied to the retryDelayInMs at each subsequent retry.
const messageQueueAdapter = InMemoryMessageQueueAdapter.attachTo(appMessageQueue, {
  retryAttempts: 3,
  retryDelayInMs: 10000,
  retryBackoffRate: 1.5,
});

// 👇 Alternatively
const messageQueueAdapter = new InMemoryMessageQueueAdapter<
  MessageQueueMessage<typeof appMessageQueue>
>({
  retryAttempts: 3,
  retryDelayInMs: 10000,
  retryBackoffRate: 1.5,
});

For instance, if the worker is continously failing for a specific message, the sequence of code execution (with the default retry policy) will look like this:

  • Worker execution: ❌ Failure
  • 30 seconds of delay
  • Worker execution: ❌ Failure
  • 60 seconds of delay (30x2)
  • Worker execution: ❌ Failure
  • No more retry attempt, error is logged

Package Sidebar

Install

npm i @castore/in-memory-message-queue-adapter

Weekly Downloads

1

Version

1.25.3

License

MIT

Unpacked Size

143 kB

Total Files

23

Last publish

Collaborators

  • thomasaribart
  • valentinbeggi
  • charlesgery
  • julietteff