graphql-snssqs-subscriptions
This package implements the PubSubEngine Interface from the graphql-subscriptions package. Once initiated this library automatically create subscriptions between SNS and SQS by the given configuration.
npm install -g graphql-snssqs-subscriptions
Usage
import { SNSSQSPubSub } from ' graphql-snssqs-subscriptions ' ;
import env from ' ../utils/env ' ;
export type SNSSQSPubSubType = SNSSQSPubSub ;
let awsEndpoints = { } ;
if ( ! env . SERVICE_PRODUCTION ) {
awsEndpoints = {
sns : {
endpoint : ` ${ env . AWS_SNS_ENDPOINT } ` ,
} ,
sqs : {
endpoint : ` ${ env . AWS_SQS_ENDPOINT } ` ,
} ,
} ;
}
export const getPubSub = async ( ) : Promise < SNSSQSPubSub > => {
const pubsub = new SNSSQSPubSub (
{
accessKeyId : env . AWS_ACCESS_KEY_ID ,
secretAccessKey : env . AWS_SECRET_ACCESS_KEY ,
region : env . AWS_REGION ,
... awsEndpoints ,
} ,
{
serviceName : env . SERVICE_NAME ,
}
) ;
await pubsub . init ( ) ;
return pubsub ;
} ;
const bootstrap = async ( ) => {
const pubSub = await getPubSub ( ) ;
const server = new ApolloServer ( {
schema ,
context : ( req : any ) : MyServiceContext => ( {
... req ,
pubSub ,
} ) ,
} ) ;
await server . listen ( env . SERVICE_PORT ) ;
logger . info ( ` Service is listening on port: ${ env . SERVICE_PORT } ` ) ;
} ;
bootstrap ( ) . catch ( logger . error ) ;
Simple usage in graphql context with TypeGraphQL
import { MessageAttributes } from ' graphql-snssqs-subscriptions ' ;
class SimpleMessageDTO {
readonly $name = ` ${ env . APP_DOMAIN } / ${ env . MY_SERVICE_NAME } /message-subject-or-anything ` ;
readonly $version = 1 ;
msgDataString : string ;
constructor ( msgDataString : string ) {
this . msgDataString = msgDataString ;
}
}
export class Resolver {
Mutation ( ( ) => UpdateSomethingResponse)
async updateSomething (
@ Ctx ( ) ctx : MyServiceContext ,
@ Arg ( ' input ' ) inputData : UpdateSomethigInput
) : Promise < UpdateSomethingResponse > {
ctx . pubSub . publish (
env . MY_SERVICE_NAME ,
new SimpleMessageDTO ( {
msgDataString : ' some data in message ' ,
} ) ,
new MessageAttributes ( {
correlationId : ` ${ ctx . account . id } ` ,
} )
) ;
return UpdateSomethingResponse ( true ) ;
}
@ Subscription ( ( ) => Notification , { topics : env . MY_SERVICE_NAME , nullable : true } )
simpleSubscription ( @ Root ( ) { msgDataString } : NotificationPayload ) {
return { msgDataString } ;
}
}
Simple usage with TypeGraphQL and @node-ts/bus-workflow
import { MessageAttributes } from ' graphql-snssqs-subscriptions ' ;
class SimpleMessageDTO {
readonly $name = ` ${ env . APP_DOMAIN } / ${ env . MY_SERVICE_NAME } /message-subject-or-anything ` ;
readonly $version = 1 ;
msgDataString : string ;
constructor ( msgDataString : string ) {
this . msgDataString = msgDataString ;
}
}
export class Resolver {
Mutation ( ( ) => UpdateSomethingResponse)
async updateSomething (
@ Ctx ( ) ctx : MyServiceContext ,
@ Arg ( ' input ' ) inputData : UpdateSomethigInput
) : Promise < UpdateSomethingResponse > {
ctx . pubSub . sendEvent (
new SimpleMessageDTO ( {
msgDataString : ' some data in message ' ,
} ) ,
new MessageAttributes ( {
correlationId : ` ${ ctx . account . id } ` ,
} )
) ;
return UpdateSomethingResponse ( true ) ;
}
}
class SimpleMessageDTO {
readonly $name = ` ${ env . APP_DOMAIN } / ${ env . MY_SERVICE_NAME } /message-subject-or-anything ` ;
readonly $version = 1 ;
msgDataString : string ;
constructor ( msgDataString : string ) {
super ( ) ;
this . msgDataString = msgDataString ;
}
}
@ injectable ( )
export class MyWorkflow extends Workflow < MyWorkflowData > {
constructor (
@ inject ( BUS_SYMBOLS . Bus ) private readonly bus : Bus ,
@ inject ( LOGGER_SYMBOLS . Logger ) private readonly logger : Logger
) {
super ( ) ;
}
@ StartedBy < SimpleMessageDTO , MyWorkflowData , ' handleSimpleMessage ' > (
SimpleMessageDTO
)
async handlesSimpleMessage (
event : SimpleMessageDTO ,
_ : MyWorkflowData ,
messageAttributes : MessageAttributes
) : Promise < Partial < MyWorkflowData > > {
const { msgDataString } = event ;
this . bus . send ( new SomeOtherMessageDto ( ) )
return {
msgDataString ,
correlationId : messageAttributes . correlationId ,
} ;
}
@ Handles <
SomeOtherMessageDto ,
MyWorkflowData ,
' someNewMessageHandler '
> ( SomeOtherMessageDto , ( event , attributes ) => attributes.correlationId , ' correlationId ' )
someNewMessageHandler ( ) : Partial < MyWorkflowData > {
this . bus . publish ( new MessageToSomeIntegrationServiceMaybe ( ) ) ;
this . complete ( ) ;
}
}
Benefits
Automatically creates subscriptions from SNS to SQS.
Automatically creates Dead Letter Queues.
Automatically maps MessageAttributes
Fully compatable with @node-ts/bus package
Typescript Based
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/cto2bOpenSource/graphql-snssqs-subscriptions/issues . This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.
License
The gem is available as open source under the terms of the MIT License .
Code of Conduct
Everyone interacting in the graphql-snssqs-subscriptions project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct .