diff --git a/notification-service/.env.eample b/notification-service/.env.eample index 80e8ebc..a537c26 100644 --- a/notification-service/.env.eample +++ b/notification-service/.env.eample @@ -7,4 +7,10 @@ AWS_SQS_QUEUE_NAME={NAME_OF_QUEUE} ## MONGO USER ## DATABASE_URL=mongodb://{USER}:{PASSWORD}@localhost:27017/{DB_NAME} -MONGO_DB_NAME={USER_DB_NAME} \ No newline at end of file +MONGO_DB_NAME={USER_DB_NAME} + +RABBITMQ_QUEUE_NAME=todo_queue +RABBITMQ_HOST=localhost +RABBITMQ_PORT=5672 +RABBITMQ_USERNAME=guest +RABBITMQ_PASSWORD=guest \ No newline at end of file diff --git a/notification-service/package-lock.json b/notification-service/package-lock.json index 81b600e..7c2c7fb 100644 --- a/notification-service/package-lock.json +++ b/notification-service/package-lock.json @@ -9,10 +9,27 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "amqplib": "^0.10.3", "aws-sdk": "^2.1413.0", "dotenv": "^16.3.1", "mongodb": "^5.7.0", "socket.io": "^4.7.1" + }, + "devDependencies": { + "@types/amqplib": "^0.10.1" + } + }, + "node_modules/@acuminous/bitsyntax": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz", + "integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "^4.3.4", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" } }, "node_modules/@socket.io/component-emitter": { @@ -20,6 +37,15 @@ "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz", "integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==" }, + "node_modules/@types/amqplib": { + "version": "0.10.1", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.1.tgz", + "integrity": "sha512-j6ANKT79ncUDnAs/+9r9eDujxbeJoTjoVu33gHHcaPfmLQaMhvfbH2GqSe8KUM444epAp1Vl3peVOQfZk3UIqA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/cookie": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", @@ -64,6 +90,20 @@ "node": ">= 0.6" } }, + "node_modules/amqplib": { + "version": "0.10.3", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz", + "integrity": "sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw==", + "dependencies": { + "@acuminous/bitsyntax": "^0.1.2", + "buffer-more-ints": "~1.0.0", + "readable-stream": "1.x >=1.1.9", + "url-parse": "~1.5.10" + }, + "engines": { + "node": ">=10" + } + }, "node_modules/available-typed-arrays": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz", @@ -140,6 +180,11 @@ "isarray": "^1.0.0" } }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==" + }, "node_modules/call-bind": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.2.tgz", @@ -160,6 +205,11 @@ "node": ">= 0.6" } }, + "node_modules/core-util-is": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz", + "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==" + }, "node_modules/cors": { "version": "2.8.5", "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", @@ -515,6 +565,37 @@ "node": ">=0.4.x" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==" + }, + "node_modules/readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==", + "dependencies": { + "core-util-is": "~1.0.0", + "inherits": "~2.0.1", + "isarray": "0.0.1", + "string_decoder": "~0.10.x" + } + }, + "node_modules/readable-stream/node_modules/isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==" + }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==" + }, + "node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + }, "node_modules/saslprep": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/saslprep/-/saslprep-1.0.3.tgz", @@ -600,6 +681,11 @@ "memory-pager": "^1.0.2" } }, + "node_modules/string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==" + }, "node_modules/tr46": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/tr46/-/tr46-3.0.0.tgz", @@ -628,6 +714,15 @@ "querystring": "0.2.0" } }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/util": { "version": "0.12.5", "resolved": "https://registry.npmjs.org/util/-/util-0.12.5.tgz", diff --git a/notification-service/package.json b/notification-service/package.json index a8074f2..2f3dbd3 100644 --- a/notification-service/package.json +++ b/notification-service/package.json @@ -10,9 +10,13 @@ "author": "", "license": "ISC", "dependencies": { + "amqplib": "^0.10.3", "aws-sdk": "^2.1413.0", "dotenv": "^16.3.1", "mongodb": "^5.7.0", "socket.io": "^4.7.1" + }, + "devDependencies": { + "@types/amqplib": "^0.10.1" } } diff --git a/notification-service/src/main.ts b/notification-service/src/main.ts index bb29262..09ffde4 100644 --- a/notification-service/src/main.ts +++ b/notification-service/src/main.ts @@ -1,14 +1,21 @@ -import { Sqs } from './aws/Sqs'; +// import { Sqs } from './aws/Sqs'; +import { RabbitMQ } from './rabbitmq/RabbitMQ'; export class NotificationService { - sqs: Sqs; + rabbitmq: RabbitMQ; constructor() { - this.sqs = new Sqs(); + this.rabbitmq = new RabbitMQ(); + this.rabbitmq.connect().then(() => { + console.log('RabbitMQ connected'); + }).catch((error) => { + console.error('Error connecting to RabbitMQ:', error); + }); + } startListener() { - this.sqs.startConsumer(); + this.rabbitmq.startConsumer(); } } diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts new file mode 100644 index 0000000..7524519 --- /dev/null +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -0,0 +1,97 @@ +import amqp from 'amqplib'; +import { ITodo } from '../interfaces/ITodo'; +import { MongoDb } from '../mongodb/MongoDb'; + +const env = require('dotenv').config().parsed; + +export class RabbitMQ { + channel: amqp.Channel; + queueName: string; + + constructor() { + this.queueName = env.RABBITMQ_QUEUE_NAME; + this.connect().then(() => { + console.log('RabbitMQ connected'); + }).catch((error) => { + console.error('Error connecting to RabbitMQ:', error); + }); + } + + async connect() { + const connection = await amqp.connect({ + protocol: 'amqp', + hostname: env.RABBITMQ_HOST, + port: parseInt(env.RABBITMQ_PORT), + username: env.RABBITMQ_USERNAME, + password: env.RABBITMQ_PASSWORD, + }); + this.channel = await connection.createChannel(); + await this.channel.assertQueue(this.queueName, { durable: true }); + } + + async create(payload: ITodo, delayTimeForQueue: number) { + let reQueueTime = 0; + if (delayTimeForQueue > 900) { + reQueueTime = delayTimeForQueue - 900; + delayTimeForQueue = 900; + } + const message = JSON.stringify({ payload, delayTimeForQueue, reQueueTime }); + const options = { persistent: true, expiration: delayTimeForQueue.toString() }; + + try { + await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); + console.log('Message sent to the queue'); + } catch (error) { + console.error('Error sending message to RabbitMQ:', error); + throw error; + } + } + + async startConsumer() { + try { + await this.channel.prefetch(1); + console.log('Consumer started, waiting for messages...'); + this.channel.consume(this.queueName, async (message) => { + if (message) { + const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.content.toString()) as { + payload: ITodo; + delayTimeForQueue: number; + reQueueTime: number; + }; + + console.log('Received notification:', payload); + + if (reQueueTime === 0) { + try { + await MongoDb.updateTodoStatus(payload); + } catch { + await this.create(payload, delayTimeForQueue); + this.channel.ack(message); + console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); + return; + } + } else if (reQueueTime >= 900) { + const newDelayTime = 900; + const newReQueueTime = reQueueTime - 900; + + await this.create(payload, newDelayTime); + console.log('Published new message with delay:', newDelayTime); + this.channel.ack(message); + + if (newReQueueTime > 0) { + return; // Wait for the next consumer to process the message with the remaining delay + } + } else { + const newDelayTime = reQueueTime; + + await this.create(payload, newDelayTime); + console.log('Published new message with delay:', newDelayTime); + this.channel.ack(message); + } + } + }); + } catch (error) { + console.error('Error consuming messages from RabbitMQ:', error); + } + } +}