bug of delaying the message

This commit is contained in:
Kfir Dayan 2023-07-08 11:29:50 +03:00
parent 44feb4d01c
commit f1d02aadc0
3 changed files with 29 additions and 45 deletions

View file

@ -30,12 +30,7 @@ export class RabbitMQ {
} }
async create(payload: ITodo, delayTimeForQueue: number) { async create(payload: ITodo, delayTimeForQueue: number) {
let reQueueTime = 0; const message = JSON.stringify({ payload, delayTimeForQueue });
if (delayTimeForQueue > 900) {
reQueueTime = delayTimeForQueue - 900;
delayTimeForQueue = 900;
}
const message = JSON.stringify({ payload, delayTimeForQueue, reQueueTime });
const options = { persistent: true, expiration: delayTimeForQueue.toString() }; const options = { persistent: true, expiration: delayTimeForQueue.toString() };
try { try {
@ -53,41 +48,24 @@ export class RabbitMQ {
console.log('Consumer started, waiting for messages...'); console.log('Consumer started, waiting for messages...');
this.channel.consume(this.queueName, async (message) => { this.channel.consume(this.queueName, async (message) => {
if (message) { if (message) {
const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.content.toString()) as { const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
payload: ITodo; payload: ITodo;
delayTimeForQueue: number; delayTimeForQueue: number;
reQueueTime: number;
}; };
console.log('Received notification:', payload); console.log('Received notification:', payload);
if (reQueueTime === 0) {
try { try {
await MongoDb.updateTodoStatus(payload); await MongoDb.updateTodoStatus(payload);
console.log('Updated todo status in the DB');
} catch { } catch {
await this.create(payload, delayTimeForQueue); await this.create(payload, delayTimeForQueue);
this.channel.ack(message); this.channel.ack(message);
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
return; return;
} }
} else if (reQueueTime >= 900) {
const newDelayTime = 900;
const newReQueueTime = reQueueTime - 900;
await this.create(payload, newDelayTime);
console.log('Published new message with delay:', newDelayTime);
this.channel.ack(message); this.channel.ack(message);
if (newReQueueTime > 0) {
return; // Wait for the next consumer to process the message with the remaining delay
}
} else {
const newDelayTime = reQueueTime;
await this.create(payload, newDelayTime);
console.log('Published new message with delay:', newDelayTime);
this.channel.ack(message);
}
} }
}); });
} catch (error) { } catch (error) {

View file

@ -9,7 +9,7 @@ Content-Type: application/json
{ {
"title": "Go to the gym", "title": "Go to the gym",
"description": "Go to the gym at 8pm", "description": "Go to the gym at 8pm",
"due_date": "2023-07-08 10:55:00" "due_date": "2023-07-08 12:00:00"
} }
### update request ### update request

View file

@ -18,6 +18,7 @@ export class RabbitMQ {
} }
async connect() { async connect() {
try {
const connection = await amqp.connect({ const connection = await amqp.connect({
protocol: 'amqp', protocol: 'amqp',
hostname: env.RABBITMQ_HOST, hostname: env.RABBITMQ_HOST,
@ -27,11 +28,16 @@ export class RabbitMQ {
}); });
this.channel = await connection.createChannel(); this.channel = await connection.createChannel();
await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.assertQueue(this.queueName, { durable: true });
console.log('Channel and queue asserted successfully');
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
throw error;
}
} }
async create(payload: ITodo, delayTimeForQueue: number) { async create(payload: ITodo, delayTimeForQueue: number) {
const message = JSON.stringify({ payload, delayTimeForQueue }); const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
const options = { persistent: true, expiration: delayTimeForQueue.toString() }; const options = { persistent: true, delayTimeForQueue: delayTimeForQueue.toString() };
try { try {
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);