From f1d02aadc097bf2dd8ee6bd0abefe23e31f568da Mon Sep 17 00:00:00 2001 From: Kfir Dayan Date: Sat, 8 Jul 2023 11:29:50 +0300 Subject: [PATCH] bug of delaying the message --- notification-service/src/rabbitmq/RabbitMQ.ts | 44 +++++-------------- request.http | 2 +- todo-service/src/rabbitmq/RabbitMQ.ts | 28 +++++++----- 3 files changed, 29 insertions(+), 45 deletions(-) diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts index 7524519..4402b63 100644 --- a/notification-service/src/rabbitmq/RabbitMQ.ts +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -30,12 +30,7 @@ export class RabbitMQ { } async create(payload: ITodo, delayTimeForQueue: number) { - let reQueueTime = 0; - if (delayTimeForQueue > 900) { - reQueueTime = delayTimeForQueue - 900; - delayTimeForQueue = 900; - } - const message = JSON.stringify({ payload, delayTimeForQueue, reQueueTime }); + const message = JSON.stringify({ payload, delayTimeForQueue }); const options = { persistent: true, expiration: delayTimeForQueue.toString() }; try { @@ -53,41 +48,24 @@ export class RabbitMQ { console.log('Consumer started, waiting for messages...'); this.channel.consume(this.queueName, async (message) => { if (message) { - const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.content.toString()) as { + const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as { payload: ITodo; delayTimeForQueue: number; - reQueueTime: number; }; console.log('Received notification:', payload); - if (reQueueTime === 0) { - try { - await MongoDb.updateTodoStatus(payload); - } catch { - await this.create(payload, delayTimeForQueue); - this.channel.ack(message); - console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); - return; - } - } else if (reQueueTime >= 900) { - const newDelayTime = 900; - const newReQueueTime = reQueueTime - 900; - - await this.create(payload, newDelayTime); - console.log('Published new message with delay:', newDelayTime); - this.channel.ack(message); - - if (newReQueueTime > 0) { - return; // Wait for the next consumer to process the message with the remaining delay - } - } else { - const newDelayTime = reQueueTime; - - await this.create(payload, newDelayTime); - console.log('Published new message with delay:', newDelayTime); + try { + await MongoDb.updateTodoStatus(payload); + console.log('Updated todo status in the DB'); + } catch { + await this.create(payload, delayTimeForQueue); this.channel.ack(message); + console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); + return; } + + this.channel.ack(message); } }); } catch (error) { diff --git a/request.http b/request.http index 77b48ef..5ce3897 100644 --- a/request.http +++ b/request.http @@ -9,7 +9,7 @@ Content-Type: application/json { "title": "Go to the gym", "description": "Go to the gym at 8pm", - "due_date": "2023-07-08 10:55:00" + "due_date": "2023-07-08 12:00:00" } ### update request diff --git a/todo-service/src/rabbitmq/RabbitMQ.ts b/todo-service/src/rabbitmq/RabbitMQ.ts index da6aeb3..9d31cb9 100644 --- a/todo-service/src/rabbitmq/RabbitMQ.ts +++ b/todo-service/src/rabbitmq/RabbitMQ.ts @@ -18,20 +18,26 @@ export class RabbitMQ { } async connect() { - const connection = await amqp.connect({ - protocol: 'amqp', - hostname: env.RABBITMQ_HOST, - port: parseInt(env.RABBITMQ_PORT), - username: env.RABBITMQ_USERNAME, - password: env.RABBITMQ_PASSWORD, - }); - this.channel = await connection.createChannel(); - await this.channel.assertQueue(this.queueName, { durable: true }); + try { + const connection = await amqp.connect({ + protocol: 'amqp', + hostname: env.RABBITMQ_HOST, + port: parseInt(env.RABBITMQ_PORT), + username: env.RABBITMQ_USERNAME, + password: env.RABBITMQ_PASSWORD, + }); + this.channel = await connection.createChannel(); + await this.channel.assertQueue(this.queueName, { durable: true }); + console.log('Channel and queue asserted successfully'); + } catch (error) { + console.error('Error connecting to RabbitMQ:', error); + throw error; + } } async create(payload: ITodo, delayTimeForQueue: number) { - const message = JSON.stringify({ payload, delayTimeForQueue }); - const options = { persistent: true, expiration: delayTimeForQueue.toString() }; + const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); + const options = { persistent: true, delayTimeForQueue: delayTimeForQueue.toString() }; try { await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);