@synonymdev/blocktank-worker2
TypeScript icon, indicating that this package has built-in type declarations

0.2.8-alpha.1 • Public • Published

blocktank-worker2

Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.

Usage

Run DHT for service discovery

npm i -g grenache-grape   # Only once
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'

Run RabbitMQ for events

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management

Open the dashboard http://localhost:15672/ and login with guest/guest.

Worker

A Worker consists of

  • a server that listens on method calls.
  • RabbitMQ event publisher (fanout exchange).
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';

const client = new GrenacheClient()

class HelloWorldWorkerImplementation extends WorkerImplementation {
    /**
     * Every method defined in here can be called by other workers/clients.
     */
    async helloWorld(name1: string, name2: string) {
        return `Hello ${name1} and ${name2}`;
    }

    async callOtherWorkerUsdToBtc(usd: number) {
        const exchangeRate = client.encapsulateWorker('exchange_rate') // Get exchangeRate worker
        const btcUsd = await exchangeRate.getRate("BTCUSD") // Call method on exchangeRate worker.
        console.log('Current BTCUSD price is', btcUsd) 
        // Current BTCUSD price is $30,000
        return usd/btcUsd
    }
}

const runner = new Worker(new HelloWorldWorkerImplementation(), {
    name: 'HelloWorldService', // Name of the worker.
})
try {
    await runner.start();
    await waitOnSigint() // Wait on Ctrl+C
} finally {
    await runner.stop()
}

Class WorkerImplementation

  • Supports, async and sync and callback functions.
    • If callback functions are used, initialize the Worker with callbackSupport: true.
  • Automatically returns Errors.

Class Worker

constructor(worker, config?)

  • worker: WorkerImplementation
  • config? GrenacheServerConfig
    • name? string Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.
    • grapeUrl? string URL to the grape DHT. Default: http://127.0.0.1:30001.
    • port? integer Server port. Default: Random port between 10,000 and 40,000.
    • callbackSupport? boolean Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: false
    • connection? amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl.
    • amqpUrl string RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.
    • namespace string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank

async start() Starts the worker. Listens on given port.

async stop() Stops the worker. Graceful shutdown.

  • options? WorkerStopOptions
    • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.

Class GrenacheClient

GrenacheClient allows to call other workers without exposing your own server.

constructor(grapeUrl?)

  • grapeUrl string URL to the DHT. Default: http://127.0.0.1:30001.
import { GrenacheClient } from '@synonymdev/blocktank-worker2'

const client = new GrenaceClient()

// Method 1 - Call function
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1) // Hello Sepp and Pirmin

async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.

  • workerName string Name of the worker you want to call.
  • method string Method name you want to call.
  • args? any[] List of arguments. Default: [].
  • opts?: Partial GrenacheClientCallOptions
    • timeoutMs? Request timeout in milliseconds. Default: 60,000.

encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.

// Example
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
// Hello Sepp and Pirmin

RabbitMQ / Events

RabbitPublisher and RabbitConsumer manage all events around the worker.

Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.

Checkout RabbitMQ docs to get an overview on the exchange/queue structure.

Class RabbitConsumer

Consume events from RabbitMQ.

const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()

try {
    await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
        console.log('HelloWorldService.invoicePaid event:', event)
    })
    await waitOnCtrlC()
} finally {
    await consumer.stop() // Graceful shutdown
}

async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.

async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.

  • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
  • timeoutMs? number Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.

async onMessage(sourceWorkerName, eventName, callback, options?)

  • sourceWorkerName string Name of the worker that emits the event.
  • eventName string Name of the event.
  • callback function Callback function that is called when the event is received.
    • Type: (msg: RabbitEventMessage) => any
    • May be async or sync.
  • options? RabbitConsumeOptions options for this specific event type.
    • backoffFunction: (attempt: number) => number Function that returns the backoff time in milliseconds. Default: exponential backoff.

Important properties of onMessage

  • At-Least-Once-Delivery: Messages can be delivered multiple times and potentially in a different order.
  • Retries: If an error is thrown, the event is retried with an exponential backoff. The backoff function can be customized.

Class RabbitPublisher

Publish events without a consumer.

constructor(myWorkerName, options?)

  • myWorkerName string Name of the worker that emits the event.
  • options? RabbitConnectionOptions
    • connection? amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl.
    • amqpUrl string RabbitMQ connection URL. Mutually exclusive with connection. Default: amqp://localhost:5672.
    • namespace string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.. Default: blocktank

async init() Initializes the producer. Creates the main RabbitMq Exchange.

async stop(cleanupRabbitMq?) Stops the connection.

  • cleanupRabbitMq? boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.

async publish(eventName: string, data: any) Publishes an event.

  • eventName string Name of the event.
  • data any Any json serializable data that is sent with the event.

Logging

The Worker and RabbitConsumer take a logger option.

logger pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.

MongoDatabase

Experimental The goal of MongoDatabase is to provide convenient methods for testing. This class is not mature though so it might change in the future.

Run mongo locally:

docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest

Checkout the MongoDB Compass if you need a UI.

Define entities in its own folder:

import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";

@Entity()
export class SampleAuthor {
    @PrimaryKey({ name: "_id" })
    id: string = randomUUID();

    @Property()
    name!: string;
}

Create a mikro-orm.config.ts file to configure your database connection.

import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';

const appConfig = AppConfig.get()

const config: Partial<MikroORMOptions<MongoDriver>> = {
  entities: entities,
  clientUrl: appConfig.dbUrl,
  metadataProvider: ReflectMetadataProvider,
  debug: false,
  type: 'mongo',
  migrations: {
    path: 'dist/1_database/migrations',
    pathTs: 'src/1_database/migrations',
    transactional: false
  }
};

export default config;
  • See this mikro-orm.config.ts for an example config.
  • Checkout the mikro-orm docs for more info to set up the ORM.
  • You may choose to use another ORM. In that case, make sure you manage test integrations yourself.
  • Checkout this example Entity SampleAuthor.ts.
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'


try {
    await MongoDatabase.connect(config)
    await MongoDatabase.migrateUp()
    const em = MongoDatabase.createEntityManager()
    const author = new SampleAuthor()
    author.name = 'Sepp'
    await em.persistAndFlush(author)
} finally {
    await MongoDatabase.close()
}

MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.

CLI & Migrations

MikroORM comes with a cli. To use the cli, add this config to your package.json:

  "mikro-orm": {
    "useTsNode": true,
    "configPaths": [
      "./src/mikro-orm.config.ts",
      "./dist/mikro-orm.config.js"
    ]   
  },
  • Use npx mikro-orm migration:create to create a new migration.
  • Use npx mikro-orm migration:up to run migrations.

Development

Testing

  • Test: npm run test. Checkout vscode jest to selectively run tests.

Make tests independent + cleanup RabbitMQ:

import { Worker } from "@synonymdev/blocktank-worker2";

// Use a random RabbitMQ namespace to avoid any conflicts between tests:
const runner = new Worker(worker, {
    namespace: Worker.randomNamespace()
});

try {
    await runner.start()
    // Do your tests here
} finally {
    // Cleanup all existing rabbitMQ objects
    await runner.stop({cleanupRabbitMq: true})
}

Versioning

  1. Increase version in package.json.
  2. Add changes to CHANGELOG.md.
  3. Commit changes.
  4. Tag new version: git tag v0.1.0.
  5. Push tag git push origin v0.1.0.
  6. Publish to npm: npm publish.

Readme

Keywords

none

Package Sidebar

Install

npm i @synonymdev/blocktank-worker2

Weekly Downloads

103

Version

0.2.8-alpha.1

License

MIT

Unpacked Size

666 kB

Total Files

148

Last publish

Collaborators

  • dzdidi_restored
  • sebubu
  • rbndg
  • coreyphillips
  • jayvdb