replacing sqs with rabit consumer
This commit is contained in:
parent
fa04e992f3
commit
6253d0360c
5 changed files with 214 additions and 5 deletions
|
@ -8,3 +8,9 @@ AWS_SQS_QUEUE_NAME={NAME_OF_QUEUE}
|
|||
## MONGO USER ##
|
||||
DATABASE_URL=mongodb://{USER}:{PASSWORD}@localhost:27017/{DB_NAME}
|
||||
MONGO_DB_NAME={USER_DB_NAME}
|
||||
|
||||
RABBITMQ_QUEUE_NAME=todo_queue
|
||||
RABBITMQ_HOST=localhost
|
||||
RABBITMQ_PORT=5672
|
||||
RABBITMQ_USERNAME=guest
|
||||
RABBITMQ_PASSWORD=guest
|
95
notification-service/package-lock.json
generated
95
notification-service/package-lock.json
generated
|
@ -9,10 +9,27 @@
|
|||
"version": "1.0.0",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"amqplib": "^0.10.3",
|
||||
"aws-sdk": "^2.1413.0",
|
||||
"dotenv": "^16.3.1",
|
||||
"mongodb": "^5.7.0",
|
||||
"socket.io": "^4.7.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.10.1"
|
||||
}
|
||||
},
|
||||
"node_modules/@acuminous/bitsyntax": {
|
||||
"version": "0.1.2",
|
||||
"resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz",
|
||||
"integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==",
|
||||
"dependencies": {
|
||||
"buffer-more-ints": "~1.0.0",
|
||||
"debug": "^4.3.4",
|
||||
"safe-buffer": "~5.1.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=0.8"
|
||||
}
|
||||
},
|
||||
"node_modules/@socket.io/component-emitter": {
|
||||
|
@ -20,6 +37,15 @@
|
|||
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz",
|
||||
"integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg=="
|
||||
},
|
||||
"node_modules/@types/amqplib": {
|
||||
"version": "0.10.1",
|
||||
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.1.tgz",
|
||||
"integrity": "sha512-j6ANKT79ncUDnAs/+9r9eDujxbeJoTjoVu33gHHcaPfmLQaMhvfbH2GqSe8KUM444epAp1Vl3peVOQfZk3UIqA==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/cookie": {
|
||||
"version": "0.4.1",
|
||||
"resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz",
|
||||
|
@ -64,6 +90,20 @@
|
|||
"node": ">= 0.6"
|
||||
}
|
||||
},
|
||||
"node_modules/amqplib": {
|
||||
"version": "0.10.3",
|
||||
"resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz",
|
||||
"integrity": "sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw==",
|
||||
"dependencies": {
|
||||
"@acuminous/bitsyntax": "^0.1.2",
|
||||
"buffer-more-ints": "~1.0.0",
|
||||
"readable-stream": "1.x >=1.1.9",
|
||||
"url-parse": "~1.5.10"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=10"
|
||||
}
|
||||
},
|
||||
"node_modules/available-typed-arrays": {
|
||||
"version": "1.0.5",
|
||||
"resolved": "https://registry.npmjs.org/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz",
|
||||
|
@ -140,6 +180,11 @@
|
|||
"isarray": "^1.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/buffer-more-ints": {
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz",
|
||||
"integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg=="
|
||||
},
|
||||
"node_modules/call-bind": {
|
||||
"version": "1.0.2",
|
||||
"resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.2.tgz",
|
||||
|
@ -160,6 +205,11 @@
|
|||
"node": ">= 0.6"
|
||||
}
|
||||
},
|
||||
"node_modules/core-util-is": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz",
|
||||
"integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ=="
|
||||
},
|
||||
"node_modules/cors": {
|
||||
"version": "2.8.5",
|
||||
"resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz",
|
||||
|
@ -515,6 +565,37 @@
|
|||
"node": ">=0.4.x"
|
||||
}
|
||||
},
|
||||
"node_modules/querystringify": {
|
||||
"version": "2.2.0",
|
||||
"resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz",
|
||||
"integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ=="
|
||||
},
|
||||
"node_modules/readable-stream": {
|
||||
"version": "1.1.14",
|
||||
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz",
|
||||
"integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==",
|
||||
"dependencies": {
|
||||
"core-util-is": "~1.0.0",
|
||||
"inherits": "~2.0.1",
|
||||
"isarray": "0.0.1",
|
||||
"string_decoder": "~0.10.x"
|
||||
}
|
||||
},
|
||||
"node_modules/readable-stream/node_modules/isarray": {
|
||||
"version": "0.0.1",
|
||||
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
|
||||
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ=="
|
||||
},
|
||||
"node_modules/requires-port": {
|
||||
"version": "1.0.0",
|
||||
"resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz",
|
||||
"integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ=="
|
||||
},
|
||||
"node_modules/safe-buffer": {
|
||||
"version": "5.1.2",
|
||||
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
|
||||
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
|
||||
},
|
||||
"node_modules/saslprep": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/saslprep/-/saslprep-1.0.3.tgz",
|
||||
|
@ -600,6 +681,11 @@
|
|||
"memory-pager": "^1.0.2"
|
||||
}
|
||||
},
|
||||
"node_modules/string_decoder": {
|
||||
"version": "0.10.31",
|
||||
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
|
||||
"integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ=="
|
||||
},
|
||||
"node_modules/tr46": {
|
||||
"version": "3.0.0",
|
||||
"resolved": "https://registry.npmjs.org/tr46/-/tr46-3.0.0.tgz",
|
||||
|
@ -628,6 +714,15 @@
|
|||
"querystring": "0.2.0"
|
||||
}
|
||||
},
|
||||
"node_modules/url-parse": {
|
||||
"version": "1.5.10",
|
||||
"resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz",
|
||||
"integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==",
|
||||
"dependencies": {
|
||||
"querystringify": "^2.1.1",
|
||||
"requires-port": "^1.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/util": {
|
||||
"version": "0.12.5",
|
||||
"resolved": "https://registry.npmjs.org/util/-/util-0.12.5.tgz",
|
||||
|
|
|
@ -10,9 +10,13 @@
|
|||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"amqplib": "^0.10.3",
|
||||
"aws-sdk": "^2.1413.0",
|
||||
"dotenv": "^16.3.1",
|
||||
"mongodb": "^5.7.0",
|
||||
"socket.io": "^4.7.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.10.1"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,21 @@
|
|||
import { Sqs } from './aws/Sqs';
|
||||
// import { Sqs } from './aws/Sqs';
|
||||
import { RabbitMQ } from './rabbitmq/RabbitMQ';
|
||||
|
||||
export class NotificationService {
|
||||
sqs: Sqs;
|
||||
rabbitmq: RabbitMQ;
|
||||
|
||||
constructor() {
|
||||
this.sqs = new Sqs();
|
||||
this.rabbitmq = new RabbitMQ();
|
||||
this.rabbitmq.connect().then(() => {
|
||||
console.log('RabbitMQ connected');
|
||||
}).catch((error) => {
|
||||
console.error('Error connecting to RabbitMQ:', error);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
startListener() {
|
||||
this.sqs.startConsumer();
|
||||
this.rabbitmq.startConsumer();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
97
notification-service/src/rabbitmq/RabbitMQ.ts
Normal file
97
notification-service/src/rabbitmq/RabbitMQ.ts
Normal file
|
@ -0,0 +1,97 @@
|
|||
import amqp from 'amqplib';
|
||||
import { ITodo } from '../interfaces/ITodo';
|
||||
import { MongoDb } from '../mongodb/MongoDb';
|
||||
|
||||
const env = require('dotenv').config().parsed;
|
||||
|
||||
export class RabbitMQ {
|
||||
channel: amqp.Channel;
|
||||
queueName: string;
|
||||
|
||||
constructor() {
|
||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||
this.connect().then(() => {
|
||||
console.log('RabbitMQ connected');
|
||||
}).catch((error) => {
|
||||
console.error('Error connecting to RabbitMQ:', error);
|
||||
});
|
||||
}
|
||||
|
||||
async connect() {
|
||||
const connection = await amqp.connect({
|
||||
protocol: 'amqp',
|
||||
hostname: env.RABBITMQ_HOST,
|
||||
port: parseInt(env.RABBITMQ_PORT),
|
||||
username: env.RABBITMQ_USERNAME,
|
||||
password: env.RABBITMQ_PASSWORD,
|
||||
});
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||
}
|
||||
|
||||
async create(payload: ITodo, delayTimeForQueue: number) {
|
||||
let reQueueTime = 0;
|
||||
if (delayTimeForQueue > 900) {
|
||||
reQueueTime = delayTimeForQueue - 900;
|
||||
delayTimeForQueue = 900;
|
||||
}
|
||||
const message = JSON.stringify({ payload, delayTimeForQueue, reQueueTime });
|
||||
const options = { persistent: true, expiration: delayTimeForQueue.toString() };
|
||||
|
||||
try {
|
||||
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
|
||||
console.log('Message sent to the queue');
|
||||
} catch (error) {
|
||||
console.error('Error sending message to RabbitMQ:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async startConsumer() {
|
||||
try {
|
||||
await this.channel.prefetch(1);
|
||||
console.log('Consumer started, waiting for messages...');
|
||||
this.channel.consume(this.queueName, async (message) => {
|
||||
if (message) {
|
||||
const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.content.toString()) as {
|
||||
payload: ITodo;
|
||||
delayTimeForQueue: number;
|
||||
reQueueTime: number;
|
||||
};
|
||||
|
||||
console.log('Received notification:', payload);
|
||||
|
||||
if (reQueueTime === 0) {
|
||||
try {
|
||||
await MongoDb.updateTodoStatus(payload);
|
||||
} catch {
|
||||
await this.create(payload, delayTimeForQueue);
|
||||
this.channel.ack(message);
|
||||
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||
return;
|
||||
}
|
||||
} else if (reQueueTime >= 900) {
|
||||
const newDelayTime = 900;
|
||||
const newReQueueTime = reQueueTime - 900;
|
||||
|
||||
await this.create(payload, newDelayTime);
|
||||
console.log('Published new message with delay:', newDelayTime);
|
||||
this.channel.ack(message);
|
||||
|
||||
if (newReQueueTime > 0) {
|
||||
return; // Wait for the next consumer to process the message with the remaining delay
|
||||
}
|
||||
} else {
|
||||
const newDelayTime = reQueueTime;
|
||||
|
||||
await this.create(payload, newDelayTime);
|
||||
console.log('Published new message with delay:', newDelayTime);
|
||||
this.channel.ack(message);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error consuming messages from RabbitMQ:', error);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue