still testing delayed queue
This commit is contained in:
parent
2333b78f8c
commit
e3268b4633
3 changed files with 23 additions and 3 deletions
|
@ -9,10 +9,17 @@ export class RabbitMQ {
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
queueName: string;
|
queueName: string;
|
||||||
mongoClient: MongoDbModel;
|
mongoClient: MongoDbModel;
|
||||||
|
exchange: string;
|
||||||
|
queue: string;
|
||||||
|
routingKey: string;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.mongoClient = new MongoDbModel();
|
this.mongoClient = new MongoDbModel();
|
||||||
|
|
||||||
|
this.exchange = 'delayed_exchange';
|
||||||
|
this.queue = 'delayed_queue';
|
||||||
|
this.routingKey = 'delayed_routing_key'
|
||||||
|
|
||||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
this.connect().then(() => {
|
this.connect().then(() => {
|
||||||
console.log('RabbitMQ connected');
|
console.log('RabbitMQ connected');
|
||||||
|
@ -30,6 +37,11 @@ export class RabbitMQ {
|
||||||
password: env.RABBITMQ_PASSWORD,
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
this.channel = await connection.createChannel();
|
this.channel = await connection.createChannel();
|
||||||
|
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||||
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
|
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||||
|
|
||||||
|
|
||||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ Content-Type: application/json
|
||||||
{
|
{
|
||||||
"title": "Go to the gymNEW!!!",
|
"title": "Go to the gymNEW!!!",
|
||||||
"description": "Go to the gym at 8pm",
|
"description": "Go to the gym at 8pm",
|
||||||
"due_date": "2023-07-08T15:05:00.000Z"
|
"due_date": "2023-07-08T15:24:00.000Z"
|
||||||
}
|
}
|
||||||
|
|
||||||
### update request
|
### update request
|
||||||
|
|
|
@ -6,10 +6,17 @@ const env = require('dotenv').config().parsed;
|
||||||
export class RabbitMQ {
|
export class RabbitMQ {
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
queueName: string;
|
queueName: string;
|
||||||
|
exchange: string;
|
||||||
|
queue: string;
|
||||||
|
routingKey: string;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
|
|
||||||
|
this.exchange = 'delayed_exchange';
|
||||||
|
this.queue = 'delayed_queue';
|
||||||
|
this.routingKey = 'delayed_routing_key'
|
||||||
|
|
||||||
this.connect().then(() => {
|
this.connect().then(() => {
|
||||||
console.log('RabbitMQ connected');
|
console.log('RabbitMQ connected');
|
||||||
}).catch((error) => {
|
}).catch((error) => {
|
||||||
|
@ -28,8 +35,9 @@ export class RabbitMQ {
|
||||||
password: env.RABBITMQ_PASSWORD,
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
this.channel = await connection.createChannel();
|
this.channel = await connection.createChannel();
|
||||||
await this.channel.assertExchange('delayed', 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
|
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||||
console.log('Channel and queue asserted successfully');
|
console.log('Channel and queue asserted successfully');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error connecting to RabbitMQ:', error);
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
|
@ -43,7 +51,7 @@ export class RabbitMQ {
|
||||||
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
|
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
|
||||||
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
||||||
try {
|
try {
|
||||||
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
|
await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options);
|
||||||
console.log('Message sent to the queue');
|
console.log('Message sent to the queue');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error sending message to RabbitMQ:', error);
|
console.error('Error sending message to RabbitMQ:', error);
|
||||||
|
|
Loading…
Reference in a new issue