egg-amqpx

1.0.4 • Public • Published

egg-amqpx

NPM version build status Test coverage David deps Known Vulnerabilities npm download

Install

$ npm i egg-amqpx --save

Usage

// {app_root}/config/plugin.js
exports.amqpx = {
  enable: true,
  package: 'egg-amqpx',
};

Configuration

// {app_root}/config/config.default.js
exports.amqpx = {
  client: {
    url: 'amqp://localhost',
    consumer: {
      // ...
    },
    producer: {
      // ...
    },
  },
};

Queue Mode

定义了一个名为 task 的消息队列

// {app_root}/config/config.default.js
{
  consumer: {
    tasks: {
      mode: 'queue',
      queue: {
        name: 'tasks', // 队列名称
        options: { durable: true },
        prefetch: 1, // 预先取几条消费
        noAck: false
      }
    },
  },
  producer: {
    tasks: {
      mode: 'queue',
      queue: {
        name: 'tasks',
        options: { durable: true }
      }
    },
  }
}
// {app_root}/app/controller/test.js
async test() {
  const message = { queue: 'tasks', content: JSON.stringify({ now: Date.now() }), option: { persistent: true } };
  const result = await this.app.amqpx.send(message); // send to tasks queue
  console.log(`send queue ${result} : ${JSON.stringify(message)}`);
}

PubSub Mode

定义一个发布订阅模式的消费者、生产者

{
  consumer: {
    orders: {
      mode: 'pubsub',
      exchange: {
        name: 'logs', // 交换机名称
        type: 'fanout', // 交换机类型
        options: { durable: false },
      },
      queues: [ // 一个队列对应一个消费者,一个队列可以匹配多条规则
        {
          // name: 'A', // 队列名称
          options: { exclusive: true },
        },
        {
          // name: 'B',
          options: { exclusive: true },
        }
      ]
    }
  },
  producer: {
    orders: {
      mode: 'pubsub',
      exchange: {
        name: 'logs', // 交换机名称
        type: 'fanout', // 交换机类型
        options: { durable: false },
      }
    },
  }
}

Router Mode

{
  consumer: {
    logger: {
      mode: 'router',
      exchange: {
        name: 'logger', // 交换机名称
        type: 'direct', // 交换机类型
        options: { durable: false },
      },
      queues: [ // 一个队列对应一个消费者,一个队列可以匹配多条规则
        {
          // name: 'A', // 队列名称
          router: ['info', 'warning', 'error', 'debug'], // 匹配规则
          options: { exclusive: true },
        },
        {
          // name: 'B',
          router: ['error'],
          options: { exclusive: true },
        }
      ]
    }
  },
  producer: {
    logger: {
      mode: 'router',
      exchange: {
        name: 'logger', // 交换机名称
        type: 'direct', // 交换机类型
        options: { durable: false },
      },
    }
  }
}

Topic Mode

定义按主题模式模式的消费者、生产者

// {app_root}/config/config.default.js
{
  consumer: {
    orders: {
      mode: 'topic',
      exchange: {
        name: 'orders', // 交换机名称,主题 topic
        type: 'topic', // 交换机类型
        options: { durable: false },
      },
      queue: {
        name: 'A', // 队列名称
        subscriber: 'orders', // 消费者名称,对应文件名
        rules: ['files.cn.hz.#'], // 匹配规则
        options: { exclusive: true },
      },
    }
  },
  producer: {
    orders: {
      mode: 'topic',
      exchange: {
        name: 'orders', // 交换机名称,主题 topic
        type: 'topic', // 交换机类型
        options: { durable: false },
      },
    }
  }
}
// {app_root}/app/controller/test.js
const message = {
  exchange: 'orders',
  type: 'topic',
  key: 'files.cn.hz.store',
  content: JSON.stringify({ now: Date.now() }),
};
const result = await this.app.amqpx.publish(message); // send to orders exchange
console.log(`send exchange ${result} : ${JSON.stringify(message)}`);

Example

Questions & Suggestions

Please open an issue here.

License

MIT

Package Sidebar

Install

npm i egg-amqpx

Weekly Downloads

1

Version

1.0.4

License

MIT

Unpacked Size

23.2 kB

Total Files

8

Last publish

Collaborators

  • msmaocom