- Installation
- Introduce
- Usage
- Test
- Stay in touch
- License
## npm
npm install --save nestjs-redis-stream-microservice
yarn add nestjs-redis-stream-microservice
Note: The library was developed based on the nestjs-redis-streams package.
- The official Redis Transporter provided by NestJs is implemented based on Redis PUB/SUB and is not suitable for use when the SUB server is a distributed system.
- We implemented a redis-stream based Custom Transporter to ensure that messages are not duplicated and that messages are received or not.
What I wanted to improve from the library I referenced
- Inconvenience of having to format response data
- Message handler must be configured with the specified decorator
- User needs to know technical concepts for setting up redis-stream
The library implements the NestJs Custom Transporter, which can be used in a NestJs-based MSA distributed environment.
-
Server Mode Support
- server mode is the mode used by the Responder server on the right side of the image.
- mode used when processing requests from requesters and configuring responses and returning them.
- e.g. nestjs Micro Service App
-
Client Mode Support
- client mode is the mode used by the requester server on the left side of the image.
- You can configure and send a message to the responder, receive a response, or forget it after sending (Fire&Forgot);
- e.g. nestjs API Gateway App or other Micro Service App
Basically, it is designed to leverage Redis Stream capabilities to implement the Custom Transporter interface provided by Nestjs.
// each module that you want to send a message to
@Module({
imports: [
// sync mode
RedisStreamClientModule.register({
connection: {
host: '127.0.0.1',
port: 6388,
password: 'beobwoo',
},
}),
// async mode
RedisStreamClientModule.registerAsync({
useFactory: (configService: ConfigService) => ({
connection: {
host: configService.get('HOST'),
port: configService.get('PORT'),
password: configService.get('PASSWORD'),
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}
Note : When using client mode, it will be modified so that it can only be registered once in the root module globally.
- If necessary, you can register using environment variables according to the nestjs Factory Provider registration method.
- The module registers the RedisStreamClient(ClientFroxy) instance globally.
// main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// NOTE : ShutDownHook must be enabled to automatically delete the consumer of redis-stream.
// If you do not enable it, you must manually delete the consumer in the redis-stream.
app.enableShutdownHooks();
await app.listen(3000);
}
- To avoid wasting Redis resources, it includes the ability to automatically clean up unnecessary Redis Stream resources upon server shutdown.
- You must enable enableShutdownHooks to ensure that the resource cleanup function works correctly.
// your service or controller
import { ClientProxy } from '@nestjs/microservices';
constructor(
@InjectRedisStreamClient()
private readonly client: ClientProxy,
) {}
If you received the client instance that you registered in course 1, you can use the client instance to send a message to the server. Depending on the purpose, the message can be sent in two ways.
Note : Emit a message without waiting for a response
@Controller()
export class Requestor1Controller {
constructor(
@InjectRedisStreamClient()
private readonly client: ClientProxy,
) {}
@Post()
emit() {
this.clientProxy.emit('stream:name:hear', data);
return 'Emit Success';
}
}
@Controller()
export class Requestor1Controller {
constructor(
@InjectRedisStreamClient()
private readonly client: ClientProxy,
) {}
@Post()
send() {
const response$ = this.clientProxy.send('stream:name:hear', data);
const response = await lastValueFrom(observable); // get the last value from the observable
return JSON.stringify(response);
}
}
- Internally, it generates its own response stream.
- Even if the server that sent the request is deployed, the response is returned only to the exact server that sent the request.
Note : use uuid v4 to identify the requester and cause extremely low probability of collision.
// main.ts
async function bootstrap() {
const app = await NestFactory.create(Responder1Module);
app.connectMicroservice({
strategy: new RedisStreamServer({
connection: {
host: '127.0.0.1',
port: 6388,
password: 'beobwoo',
},
option: {
// The logical group that wants to receive the message.
// NOTE : Give the same group name if it is a distributed server with the same functionality
consumerGroup: 'my-server',
},
}),
});
// NOTE : ShutDownHook must be enabled to automatically delete the consumer of redis-stream.
// If you do not enable it, you must manually delete the consumer in the redis-stream.
app.enableShutdownHooks();
await app.startAllMicroservices();
await app.listen(3080);
}
bootstrap();
@Controller()
export class WelcomeEmailController {
@MessagePattern('stream-1')
async returnObservable(@Payload() data: any) {
return of([1, 2, 3]);
}
@MessagePattern('stream-2')
async returnObject(@Payload() data: any) {
return {
status: 'good',
};
}
@MessagePattern('stream-3')
async returnPrimitiveType(@Payload() data: any) {
return true;
}
}
Note : The feature to get request metadata using
@Ctx()
will be added in the next version.
- Return data configurations are designed to support return of response data configurations in all possible ways, including
Observable
,Object
, andPrimary type
, to free themselves without a fixed format.
The entire git project includes an e2e
test set to verify normal behavior in a distributed environment.
The following procedures must be followed to run the test.
Note : Don't worry! The npm package only has a necessary source code.
yarn test
yarn test:set-up
# start Requestor 1
yarn start:dev requestor-1
# start Requestor 2
yarn start:dev requestor-2
# start Responder 1
yarn start:dev responder-1
# start Responder 2
yarn start:dev responder-2
yarn test:e2e
Author/Developer - KIMBEOBWOO
Nest is MIT licensed.