Compare commits
9 commits
6ba0fd0ffe
...
e3268b4633
Author | SHA1 | Date | |
---|---|---|---|
e3268b4633 | |||
2333b78f8c | |||
c2695c7eb8 | |||
8bc7e1d8a0 | |||
f1d02aadc0 | |||
44feb4d01c | |||
6253d0360c | |||
fa04e992f3 | |||
adc62a94e9 |
21 changed files with 479 additions and 143 deletions
|
@ -55,8 +55,8 @@ npm run dev
|
||||||
```
|
```
|
||||||
|
|
||||||
## Requests
|
## Requests
|
||||||
|
Examples are located in request.http - can be run in VSCode with the REST Client extension.
|
||||||
### Examples are located in request.http - can be run in VSCode with the REST Client extension.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -8,3 +8,7 @@ DB_USERNAME=user
|
||||||
DB_PASSWORD=12345
|
DB_PASSWORD=12345
|
||||||
DB_NAME=user_db
|
DB_NAME=user_db
|
||||||
DB_USER_ROLE=readWrite
|
DB_USER_ROLE=readWrite
|
||||||
|
|
||||||
|
## RABITMQ ##
|
||||||
|
RABBITMQ_DEFAULT_USER=guest
|
||||||
|
RABBITMQ_DEFAULT_PASS=guest
|
||||||
|
|
|
@ -7,7 +7,7 @@ services:
|
||||||
- 27017:27017
|
- 27017:27017
|
||||||
volumes:
|
volumes:
|
||||||
- mongodb_vol:/data/db
|
- mongodb_vol:/data/db
|
||||||
- ./init-scripts/init.js:/docker-entrypoint-initdb.d/mongo-init.js
|
- ./mongo-init-scripts/init.js:/docker-entrypoint-initdb.d/mongo-init.js
|
||||||
environment:
|
environment:
|
||||||
- MONGO_INITDB_DATABASE=${MONGO_INITDB_DATABASE}
|
- MONGO_INITDB_DATABASE=${MONGO_INITDB_DATABASE}
|
||||||
- MONGO_INITDB_ROOT_USERNAME=${MONGO_INITDB_ROOT_USERNAME}
|
- MONGO_INITDB_ROOT_USERNAME=${MONGO_INITDB_ROOT_USERNAME}
|
||||||
|
@ -15,5 +15,26 @@ services:
|
||||||
platform: linux/arm64/v8
|
platform: linux/arm64/v8
|
||||||
expose:
|
expose:
|
||||||
- 27017
|
- 27017
|
||||||
|
rabbitmq:
|
||||||
|
image: heidiks/rabbitmq-delayed-message-exchange:latest
|
||||||
|
restart: always
|
||||||
|
ports:
|
||||||
|
- 5672:5672
|
||||||
|
- 15672:15672
|
||||||
|
volumes:
|
||||||
|
- rabbitmq_vol:/var/lib/rabbitmq
|
||||||
|
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
|
||||||
|
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
|
||||||
|
command: |
|
||||||
|
bash -c "
|
||||||
|
rabbitmq-plugins enable rabbitmq_delayed_message_exchange && \
|
||||||
|
rabbitmq-server
|
||||||
|
"
|
||||||
|
expose:
|
||||||
|
- 5672
|
||||||
|
- 15672
|
||||||
volumes:
|
volumes:
|
||||||
mongodb_vol:
|
mongodb_vol:
|
||||||
|
rabbitmq_vol:
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
#!/bin/bash
|
||||||
|
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
||||||
|
rabbitmqctl restart
|
|
@ -8,3 +8,9 @@ AWS_SQS_QUEUE_NAME={NAME_OF_QUEUE}
|
||||||
## MONGO USER ##
|
## MONGO USER ##
|
||||||
DATABASE_URL=mongodb://{USER}:{PASSWORD}@localhost:27017/{DB_NAME}
|
DATABASE_URL=mongodb://{USER}:{PASSWORD}@localhost:27017/{DB_NAME}
|
||||||
MONGO_DB_NAME={USER_DB_NAME}
|
MONGO_DB_NAME={USER_DB_NAME}
|
||||||
|
|
||||||
|
RABBITMQ_QUEUE_NAME=todo_queue
|
||||||
|
RABBITMQ_HOST=localhost
|
||||||
|
RABBITMQ_PORT=5672
|
||||||
|
RABBITMQ_USERNAME=guest
|
||||||
|
RABBITMQ_PASSWORD=guest
|
95
notification-service/package-lock.json
generated
95
notification-service/package-lock.json
generated
|
@ -9,10 +9,27 @@
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"amqplib": "^0.10.3",
|
||||||
"aws-sdk": "^2.1413.0",
|
"aws-sdk": "^2.1413.0",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"mongodb": "^5.7.0",
|
"mongodb": "^5.7.0",
|
||||||
"socket.io": "^4.7.1"
|
"socket.io": "^4.7.1"
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"@types/amqplib": "^0.10.1"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/@acuminous/bitsyntax": {
|
||||||
|
"version": "0.1.2",
|
||||||
|
"resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz",
|
||||||
|
"integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==",
|
||||||
|
"dependencies": {
|
||||||
|
"buffer-more-ints": "~1.0.0",
|
||||||
|
"debug": "^4.3.4",
|
||||||
|
"safe-buffer": "~5.1.2"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=0.8"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@socket.io/component-emitter": {
|
"node_modules/@socket.io/component-emitter": {
|
||||||
|
@ -20,6 +37,15 @@
|
||||||
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz",
|
"resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz",
|
||||||
"integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg=="
|
"integrity": "sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg=="
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/amqplib": {
|
||||||
|
"version": "0.10.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.1.tgz",
|
||||||
|
"integrity": "sha512-j6ANKT79ncUDnAs/+9r9eDujxbeJoTjoVu33gHHcaPfmLQaMhvfbH2GqSe8KUM444epAp1Vl3peVOQfZk3UIqA==",
|
||||||
|
"dev": true,
|
||||||
|
"dependencies": {
|
||||||
|
"@types/node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/@types/cookie": {
|
"node_modules/@types/cookie": {
|
||||||
"version": "0.4.1",
|
"version": "0.4.1",
|
||||||
"resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz",
|
"resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz",
|
||||||
|
@ -64,6 +90,20 @@
|
||||||
"node": ">= 0.6"
|
"node": ">= 0.6"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/amqplib": {
|
||||||
|
"version": "0.10.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.3.tgz",
|
||||||
|
"integrity": "sha512-UHmuSa7n8vVW/a5HGh2nFPqAEr8+cD4dEZ6u9GjP91nHfr1a54RyAKyra7Sb5NH7NBKOUlyQSMXIp0qAixKexw==",
|
||||||
|
"dependencies": {
|
||||||
|
"@acuminous/bitsyntax": "^0.1.2",
|
||||||
|
"buffer-more-ints": "~1.0.0",
|
||||||
|
"readable-stream": "1.x >=1.1.9",
|
||||||
|
"url-parse": "~1.5.10"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": ">=10"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/available-typed-arrays": {
|
"node_modules/available-typed-arrays": {
|
||||||
"version": "1.0.5",
|
"version": "1.0.5",
|
||||||
"resolved": "https://registry.npmjs.org/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz",
|
"resolved": "https://registry.npmjs.org/available-typed-arrays/-/available-typed-arrays-1.0.5.tgz",
|
||||||
|
@ -140,6 +180,11 @@
|
||||||
"isarray": "^1.0.0"
|
"isarray": "^1.0.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/buffer-more-ints": {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz",
|
||||||
|
"integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg=="
|
||||||
|
},
|
||||||
"node_modules/call-bind": {
|
"node_modules/call-bind": {
|
||||||
"version": "1.0.2",
|
"version": "1.0.2",
|
||||||
"resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.2.tgz",
|
"resolved": "https://registry.npmjs.org/call-bind/-/call-bind-1.0.2.tgz",
|
||||||
|
@ -160,6 +205,11 @@
|
||||||
"node": ">= 0.6"
|
"node": ">= 0.6"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/core-util-is": {
|
||||||
|
"version": "1.0.3",
|
||||||
|
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.3.tgz",
|
||||||
|
"integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ=="
|
||||||
|
},
|
||||||
"node_modules/cors": {
|
"node_modules/cors": {
|
||||||
"version": "2.8.5",
|
"version": "2.8.5",
|
||||||
"resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz",
|
"resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz",
|
||||||
|
@ -515,6 +565,37 @@
|
||||||
"node": ">=0.4.x"
|
"node": ">=0.4.x"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/querystringify": {
|
||||||
|
"version": "2.2.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz",
|
||||||
|
"integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ=="
|
||||||
|
},
|
||||||
|
"node_modules/readable-stream": {
|
||||||
|
"version": "1.1.14",
|
||||||
|
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz",
|
||||||
|
"integrity": "sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==",
|
||||||
|
"dependencies": {
|
||||||
|
"core-util-is": "~1.0.0",
|
||||||
|
"inherits": "~2.0.1",
|
||||||
|
"isarray": "0.0.1",
|
||||||
|
"string_decoder": "~0.10.x"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/readable-stream/node_modules/isarray": {
|
||||||
|
"version": "0.0.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
|
||||||
|
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ=="
|
||||||
|
},
|
||||||
|
"node_modules/requires-port": {
|
||||||
|
"version": "1.0.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz",
|
||||||
|
"integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ=="
|
||||||
|
},
|
||||||
|
"node_modules/safe-buffer": {
|
||||||
|
"version": "5.1.2",
|
||||||
|
"resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz",
|
||||||
|
"integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g=="
|
||||||
|
},
|
||||||
"node_modules/saslprep": {
|
"node_modules/saslprep": {
|
||||||
"version": "1.0.3",
|
"version": "1.0.3",
|
||||||
"resolved": "https://registry.npmjs.org/saslprep/-/saslprep-1.0.3.tgz",
|
"resolved": "https://registry.npmjs.org/saslprep/-/saslprep-1.0.3.tgz",
|
||||||
|
@ -600,6 +681,11 @@
|
||||||
"memory-pager": "^1.0.2"
|
"memory-pager": "^1.0.2"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/string_decoder": {
|
||||||
|
"version": "0.10.31",
|
||||||
|
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
|
||||||
|
"integrity": "sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ=="
|
||||||
|
},
|
||||||
"node_modules/tr46": {
|
"node_modules/tr46": {
|
||||||
"version": "3.0.0",
|
"version": "3.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/tr46/-/tr46-3.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/tr46/-/tr46-3.0.0.tgz",
|
||||||
|
@ -628,6 +714,15 @@
|
||||||
"querystring": "0.2.0"
|
"querystring": "0.2.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/url-parse": {
|
||||||
|
"version": "1.5.10",
|
||||||
|
"resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz",
|
||||||
|
"integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==",
|
||||||
|
"dependencies": {
|
||||||
|
"querystringify": "^2.1.1",
|
||||||
|
"requires-port": "^1.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/util": {
|
"node_modules/util": {
|
||||||
"version": "0.12.5",
|
"version": "0.12.5",
|
||||||
"resolved": "https://registry.npmjs.org/util/-/util-0.12.5.tgz",
|
"resolved": "https://registry.npmjs.org/util/-/util-0.12.5.tgz",
|
||||||
|
|
|
@ -10,9 +10,13 @@
|
||||||
"author": "",
|
"author": "",
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"amqplib": "^0.10.3",
|
||||||
"aws-sdk": "^2.1413.0",
|
"aws-sdk": "^2.1413.0",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"mongodb": "^5.7.0",
|
"mongodb": "^5.7.0",
|
||||||
"socket.io": "^4.7.1"
|
"socket.io": "^4.7.1"
|
||||||
|
},
|
||||||
|
"devDependencies": {
|
||||||
|
"@types/amqplib": "^0.10.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,122 +1,122 @@
|
||||||
import aws from 'aws-sdk';
|
// import aws from 'aws-sdk';
|
||||||
import { ITodo } from '../interfaces/ITodo';
|
// import { ITodo } from '../interfaces/ITodo';
|
||||||
import { MongoDb } from '../mongodb/MongoDb';
|
// import { MongoDb } from '../mongodb/MongoDb';
|
||||||
|
|
||||||
const env = require('dotenv').config().parsed;
|
// const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
export class Sqs {
|
// export class Sqs {
|
||||||
sqs: aws.SQS;
|
// sqs: aws.SQS;
|
||||||
queueUrl: string;
|
// queueUrl: string;
|
||||||
|
|
||||||
constructor() {
|
// constructor() {
|
||||||
aws.config.update({
|
// aws.config.update({
|
||||||
accessKeyId: env.AWS_ACCESS_KEY_ID,
|
// accessKeyId: env.AWS_ACCESS_KEY_ID,
|
||||||
secretAccessKey: env.AWS_SECRET_ACCESS_KEY,
|
// secretAccessKey: env.AWS_SECRET_ACCESS_KEY,
|
||||||
});
|
// });
|
||||||
|
|
||||||
this.sqs = new aws.SQS();
|
// this.sqs = new aws.SQS();
|
||||||
this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME;
|
// this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME;
|
||||||
|
|
||||||
if (this.sqs) {
|
// if (this.sqs) {
|
||||||
console.log("SQS connected");
|
// console.log("SQS connected");
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
async create(payload: ITodo, delayTimeForQueue: number) {
|
// async create(payload: ITodo, delayTimeForQueue: number) {
|
||||||
let reQueueTime = 0;
|
// let reQueueTime = 0;
|
||||||
if (delayTimeForQueue > 900) {
|
// if (delayTimeForQueue > 900) {
|
||||||
reQueueTime = delayTimeForQueue - 900;
|
// reQueueTime = delayTimeForQueue - 900;
|
||||||
delayTimeForQueue = 900;
|
// delayTimeForQueue = 900;
|
||||||
}
|
// }
|
||||||
|
|
||||||
const params = {
|
// const params = {
|
||||||
DelaySeconds: delayTimeForQueue,
|
// DelaySeconds: delayTimeForQueue,
|
||||||
MessageAttributes: {},
|
// MessageAttributes: {},
|
||||||
MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }),
|
// MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }),
|
||||||
QueueUrl: this.queueUrl,
|
// QueueUrl: this.queueUrl,
|
||||||
};
|
// };
|
||||||
|
|
||||||
try {
|
// try {
|
||||||
const data = await this.sqs.sendMessage(params).promise();
|
// const data = await this.sqs.sendMessage(params).promise();
|
||||||
console.log("Message sent to the queue", data.MessageId);
|
// console.log("Message sent to the queue", data.MessageId);
|
||||||
return data;
|
// return data;
|
||||||
} catch (error) {
|
// } catch (error) {
|
||||||
console.log("Error", error);
|
// console.log("Error", error);
|
||||||
throw error;
|
// throw error;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
async startConsumer() {
|
// async startConsumer() {
|
||||||
while (true) {
|
// while (true) {
|
||||||
const message = await this.getNextQueue();
|
// const message = await this.getNextQueue();
|
||||||
if (message) {
|
// if (message) {
|
||||||
const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as {
|
// const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as {
|
||||||
payload: ITodo;
|
// payload: ITodo;
|
||||||
delayTimeForQueue: number;
|
// delayTimeForQueue: number;
|
||||||
reQueueTime: number;
|
// reQueueTime: number;
|
||||||
};
|
// };
|
||||||
|
|
||||||
console.log("Received notification:", payload);
|
// console.log("Received notification:", payload);
|
||||||
|
|
||||||
if (reQueueTime === 0) {
|
// if (reQueueTime === 0) {
|
||||||
try {
|
// try {
|
||||||
await MongoDb.updateTodoStatus(payload);
|
// await MongoDb.updateTodoStatus(payload);
|
||||||
} catch {
|
// } catch {
|
||||||
await this.create(payload, delayTimeForQueue);
|
// await this.create(payload, delayTimeForQueue);
|
||||||
await this.deleteMessage(message.ReceiptHandle);
|
// await this.deleteMessage(message.ReceiptHandle);
|
||||||
console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue);
|
// console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue);
|
||||||
}
|
// }
|
||||||
await this.deleteMessage(message.ReceiptHandle);
|
// await this.deleteMessage(message.ReceiptHandle);
|
||||||
} else if (reQueueTime >= 900) {
|
// } else if (reQueueTime >= 900) {
|
||||||
const newDelayTime = 900;
|
// const newDelayTime = 900;
|
||||||
const newReQueueTime = reQueueTime - 900;
|
// const newReQueueTime = reQueueTime - 900;
|
||||||
|
|
||||||
await this.create(payload, newDelayTime);
|
// await this.create(payload, newDelayTime);
|
||||||
await this.deleteMessage(message.ReceiptHandle);
|
// await this.deleteMessage(message.ReceiptHandle);
|
||||||
|
|
||||||
console.log("Published new queue with delay:", newDelayTime);
|
// console.log("Published new queue with delay:", newDelayTime);
|
||||||
} else {
|
// } else {
|
||||||
const newDelayTime = reQueueTime;
|
// const newDelayTime = reQueueTime;
|
||||||
|
|
||||||
await this.create(payload, newDelayTime);
|
// await this.create(payload, newDelayTime);
|
||||||
await this.deleteMessage(message.ReceiptHandle);
|
// await this.deleteMessage(message.ReceiptHandle);
|
||||||
|
|
||||||
console.log("Published new queue with delay:", newDelayTime);
|
// console.log("Published new queue with delay:", newDelayTime);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
private getNextQueue = async () => {
|
// private getNextQueue = async () => {
|
||||||
const params = {
|
// const params = {
|
||||||
QueueUrl: this.queueUrl,
|
// QueueUrl: this.queueUrl,
|
||||||
MaxNumberOfMessages: 1,
|
// MaxNumberOfMessages: 1,
|
||||||
VisibilityTimeout: 30,
|
// VisibilityTimeout: 30,
|
||||||
WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling
|
// WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling
|
||||||
};
|
// };
|
||||||
|
|
||||||
try {
|
// try {
|
||||||
const data = await this.sqs.receiveMessage(params).promise();
|
// const data = await this.sqs.receiveMessage(params).promise();
|
||||||
const message = data.Messages ? data.Messages[0] : null;
|
// const message = data.Messages ? data.Messages[0] : null;
|
||||||
return message;
|
// return message;
|
||||||
} catch (error) {
|
// } catch (error) {
|
||||||
console.error("Error retrieving message from SQS:", error);
|
// console.error("Error retrieving message from SQS:", error);
|
||||||
return null;
|
// return null;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
private deleteMessage = async (receiptHandle: string) => {
|
// private deleteMessage = async (receiptHandle: string) => {
|
||||||
const params = {
|
// const params = {
|
||||||
QueueUrl: this.queueUrl,
|
// QueueUrl: this.queueUrl,
|
||||||
ReceiptHandle: receiptHandle,
|
// ReceiptHandle: receiptHandle,
|
||||||
};
|
// };
|
||||||
|
|
||||||
try {
|
// try {
|
||||||
await this.sqs.deleteMessage(params).promise();
|
// await this.sqs.deleteMessage(params).promise();
|
||||||
console.log("Message deleted");
|
// console.log("Message deleted");
|
||||||
} catch (error) {
|
// } catch (error) {
|
||||||
console.error("Error deleting message from SQS:", error);
|
// console.error("Error deleting message from SQS:", error);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
}
|
// }
|
||||||
|
|
|
@ -1,16 +1,18 @@
|
||||||
import { Sqs } from './aws/Sqs';
|
// import { Sqs } from './aws/Sqs';
|
||||||
|
import { RabbitMQ } from './rabbitmq/RabbitMQ';
|
||||||
|
|
||||||
export class NotificationService {
|
export class NotificationService {
|
||||||
sqs: Sqs;
|
rabbitmq: RabbitMQ;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.sqs = new Sqs();
|
this.rabbitmq = new RabbitMQ();
|
||||||
|
this.startListener();
|
||||||
}
|
}
|
||||||
|
|
||||||
startListener() {
|
async startListener() {
|
||||||
this.sqs.startConsumer();
|
await this.rabbitmq.connect();
|
||||||
|
this.rabbitmq.startConsumer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const notificationService = new NotificationService();
|
const notificationService = new NotificationService();
|
||||||
notificationService.startListener();
|
|
||||||
|
|
|
@ -3,14 +3,17 @@ import { ITodo } from '../interfaces/ITodo';
|
||||||
|
|
||||||
const env = require('dotenv').config().parsed;
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
export class MongoDb {
|
export class MongoDbModel {
|
||||||
|
client: MongoClient;
|
||||||
|
|
||||||
public static updateTodoStatus = async (todo: ITodo) => {
|
constructor() {
|
||||||
const client = new MongoClient(env.DATABASE_URL);
|
this.client = new MongoClient(env.DATABASE_URL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public updateTodoStatus = async (todo: ITodo) => {
|
||||||
try {
|
try {
|
||||||
await client.connect();
|
await this.client.connect();
|
||||||
const db = client.db(env.MONGO_DB_NAME);
|
const db = this.client.db(env.MONGO_DB_NAME);
|
||||||
const todosCollection = db.collection('todos');
|
const todosCollection = db.collection('todos');
|
||||||
const result = await todosCollection.updateOne(
|
const result = await todosCollection.updateOne(
|
||||||
{ _id: new ObjectId(todo._id) },
|
{ _id: new ObjectId(todo._id) },
|
||||||
|
@ -20,7 +23,7 @@ export class MongoDb {
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error updating Todo status:", error);
|
console.error("Error updating Todo status:", error);
|
||||||
} finally {
|
} finally {
|
||||||
await client.close();
|
await this.client.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
91
notification-service/src/rabbitmq/RabbitMQ.ts
Normal file
91
notification-service/src/rabbitmq/RabbitMQ.ts
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
import amqp from 'amqplib';
|
||||||
|
import { ITodo } from '../interfaces/ITodo';
|
||||||
|
import { MongoDbModel } from '../mongodb/MongoDb';
|
||||||
|
|
||||||
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
|
|
||||||
|
export class RabbitMQ {
|
||||||
|
channel: amqp.Channel;
|
||||||
|
queueName: string;
|
||||||
|
mongoClient: MongoDbModel;
|
||||||
|
exchange: string;
|
||||||
|
queue: string;
|
||||||
|
routingKey: string;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.mongoClient = new MongoDbModel();
|
||||||
|
|
||||||
|
this.exchange = 'delayed_exchange';
|
||||||
|
this.queue = 'delayed_queue';
|
||||||
|
this.routingKey = 'delayed_routing_key'
|
||||||
|
|
||||||
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
|
this.connect().then(() => {
|
||||||
|
console.log('RabbitMQ connected');
|
||||||
|
}).catch((error) => {
|
||||||
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async connect() {
|
||||||
|
const connection = await amqp.connect({
|
||||||
|
protocol: 'amqp',
|
||||||
|
hostname: env.RABBITMQ_HOST,
|
||||||
|
port: parseInt(env.RABBITMQ_PORT),
|
||||||
|
username: env.RABBITMQ_USERNAME,
|
||||||
|
password: env.RABBITMQ_PASSWORD,
|
||||||
|
});
|
||||||
|
this.channel = await connection.createChannel();
|
||||||
|
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||||
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
|
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||||
|
|
||||||
|
|
||||||
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
async create(payload: ITodo, delayTimeForQueue: number) {
|
||||||
|
const message = JSON.stringify({ payload, delayTimeForQueue });
|
||||||
|
const options = { persistent: true, expiration: delayTimeForQueue.toString() };
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
|
||||||
|
console.log('Message sent to the queue');
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error sending message to RabbitMQ:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async startConsumer() {
|
||||||
|
try {
|
||||||
|
await this.channel.prefetch(1);
|
||||||
|
console.log('Consumer started, waiting for messages...');
|
||||||
|
this.channel.consume(this.queueName, async (message) => {
|
||||||
|
if (message) {
|
||||||
|
const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
||||||
|
payload: ITodo;
|
||||||
|
delayTimeForQueue: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
console.log('Received notification:', payload);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.mongoClient.updateTodoStatus(payload);
|
||||||
|
console.log('Updated todo status in the DB');
|
||||||
|
} catch {
|
||||||
|
await this.create(payload, delayTimeForQueue);
|
||||||
|
this.channel.ack(message);
|
||||||
|
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.channel.ack(message);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error consuming messages from RabbitMQ:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,7 +7,7 @@
|
||||||
"moduleResolution": "node",
|
"moduleResolution": "node",
|
||||||
"sourceMap": true,
|
"sourceMap": true,
|
||||||
"outDir": "./dist",
|
"outDir": "./dist",
|
||||||
"rootDir": "./src",
|
"rootDir": "./src"
|
||||||
"noImplicitAny": true,
|
// "noImplicitAny": true,
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -7,9 +7,9 @@ POST http://localhost:3000/todo
|
||||||
Content-Type: application/json
|
Content-Type: application/json
|
||||||
|
|
||||||
{
|
{
|
||||||
"title": "Go to the gym",
|
"title": "Go to the gymNEW!!!",
|
||||||
"description": "Go to the gym at 8pm",
|
"description": "Go to the gym at 8pm",
|
||||||
"due_date": "2023-07-08 01:21:00"
|
"due_date": "2023-07-08T15:24:00.000Z"
|
||||||
}
|
}
|
||||||
|
|
||||||
### update request
|
### update request
|
||||||
|
|
41
todo-service/package-lock.json
generated
41
todo-service/package-lock.json
generated
|
@ -10,13 +10,19 @@
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@aws-sdk/types": "^3.357.0",
|
"@aws-sdk/types": "^3.357.0",
|
||||||
|
"amqplib": "^0.10.3",
|
||||||
"aws-sdk": "^2.1413.0",
|
"aws-sdk": "^2.1413.0",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"express": "^4.18.2",
|
"express": "^4.18.2",
|
||||||
|
"luxon": "^3.3.0",
|
||||||
|
"moment": "^2.29.4",
|
||||||
|
"moment-timezone": "^0.5.43",
|
||||||
"mongoose": "^7.3.1"
|
"mongoose": "^7.3.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/express": "^4.17.17"
|
"@types/amqplib": "^0.10.1",
|
||||||
|
"@types/express": "^4.17.17",
|
||||||
|
"@types/luxon": "^3.3.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@acuminous/bitsyntax": {
|
"node_modules/@acuminous/bitsyntax": {
|
||||||
|
@ -127,6 +133,12 @@
|
||||||
"integrity": "sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==",
|
"integrity": "sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==",
|
||||||
"dev": true
|
"dev": true
|
||||||
},
|
},
|
||||||
|
"node_modules/@types/luxon": {
|
||||||
|
"version": "3.3.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/@types/luxon/-/luxon-3.3.0.tgz",
|
||||||
|
"integrity": "sha512-uKRI5QORDnrGFYgcdAVnHvEIvEZ8noTpP/Bg+HeUzZghwinDlIS87DEenV5r1YoOF9G4x600YsUXLWZ19rmTmg==",
|
||||||
|
"dev": true
|
||||||
|
},
|
||||||
"node_modules/@types/mime": {
|
"node_modules/@types/mime": {
|
||||||
"version": "1.3.2",
|
"version": "1.3.2",
|
||||||
"resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz",
|
"resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz",
|
||||||
|
@ -731,6 +743,14 @@
|
||||||
"node": ">=12.0.0"
|
"node": ">=12.0.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/luxon": {
|
||||||
|
"version": "3.3.0",
|
||||||
|
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.3.0.tgz",
|
||||||
|
"integrity": "sha512-An0UCfG/rSiqtAIiBPO0Y9/zAnHUZxAMiCpTd5h2smgsj7GGmcenvrvww2cqNA8/4A5ZrD1gJpHN2mIHZQF+Mg==",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=12"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/media-typer": {
|
"node_modules/media-typer": {
|
||||||
"version": "0.3.0",
|
"version": "0.3.0",
|
||||||
"resolved": "https://registry.npmjs.org/media-typer/-/media-typer-0.3.0.tgz",
|
"resolved": "https://registry.npmjs.org/media-typer/-/media-typer-0.3.0.tgz",
|
||||||
|
@ -788,6 +808,25 @@
|
||||||
"node": ">= 0.6"
|
"node": ">= 0.6"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"node_modules/moment": {
|
||||||
|
"version": "2.29.4",
|
||||||
|
"resolved": "https://registry.npmjs.org/moment/-/moment-2.29.4.tgz",
|
||||||
|
"integrity": "sha512-5LC9SOxjSc2HF6vO2CyuTDNivEdoz2IvyJJGj6X8DJ0eFyfszE0QiEd+iXmBvUP3WHxSjFH/vIsA0EN00cgr8w==",
|
||||||
|
"engines": {
|
||||||
|
"node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/moment-timezone": {
|
||||||
|
"version": "0.5.43",
|
||||||
|
"resolved": "https://registry.npmjs.org/moment-timezone/-/moment-timezone-0.5.43.tgz",
|
||||||
|
"integrity": "sha512-72j3aNyuIsDxdF1i7CEgV2FfxM1r6aaqJyLB2vwb33mXYyoyLly+F1zbWqhA3/bVIoJ4szlUoMbUnVdid32NUQ==",
|
||||||
|
"dependencies": {
|
||||||
|
"moment": "^2.29.4"
|
||||||
|
},
|
||||||
|
"engines": {
|
||||||
|
"node": "*"
|
||||||
|
}
|
||||||
|
},
|
||||||
"node_modules/mongodb": {
|
"node_modules/mongodb": {
|
||||||
"version": "5.6.0",
|
"version": "5.6.0",
|
||||||
"resolved": "https://registry.npmjs.org/mongodb/-/mongodb-5.6.0.tgz",
|
"resolved": "https://registry.npmjs.org/mongodb/-/mongodb-5.6.0.tgz",
|
||||||
|
|
|
@ -11,12 +11,15 @@
|
||||||
"license": "ISC",
|
"license": "ISC",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@aws-sdk/types": "^3.357.0",
|
"@aws-sdk/types": "^3.357.0",
|
||||||
|
"amqplib": "^0.10.3",
|
||||||
"aws-sdk": "^2.1413.0",
|
"aws-sdk": "^2.1413.0",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"express": "^4.18.2",
|
"express": "^4.18.2",
|
||||||
|
"moment": "^2.29.4",
|
||||||
"mongoose": "^7.3.1"
|
"mongoose": "^7.3.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
"@types/amqplib": "^0.10.1",
|
||||||
"@types/express": "^4.17.17"
|
"@types/express": "^4.17.17"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,17 +2,18 @@ import { NextFunction, Request, Response } from 'express';
|
||||||
import { ApiError } from '../utils/ApiError';
|
import { ApiError } from '../utils/ApiError';
|
||||||
import { ITodo } from '../schemas/todoSchema';
|
import { ITodo } from '../schemas/todoSchema';
|
||||||
import { TodoModel } from '../models/todoModel';
|
import { TodoModel } from '../models/todoModel';
|
||||||
import { Sqs } from '../aws/Sqs';
|
// import { Sqs } from '../aws/Sqs';
|
||||||
|
import { RabbitMQ } from '../rabbitmq/RabbitMQ';
|
||||||
|
|
||||||
const env = require('dotenv').config().parsed;
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
export class TodoController {
|
export class TodoController {
|
||||||
private todoModel: TodoModel;
|
private todoModel: TodoModel;
|
||||||
queue: Sqs;
|
queue: RabbitMQ;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.todoModel = new TodoModel();
|
this.todoModel = new TodoModel();
|
||||||
this.queue = new Sqs();
|
this.queue = new RabbitMQ();
|
||||||
}
|
}
|
||||||
|
|
||||||
public getAll = async (req: Request, res: Response, next: NextFunction) => {
|
public getAll = async (req: Request, res: Response, next: NextFunction) => {
|
||||||
|
@ -38,10 +39,7 @@ export class TodoController {
|
||||||
if (todo instanceof ApiError) {
|
if (todo instanceof ApiError) {
|
||||||
return next(todo);
|
return next(todo);
|
||||||
}
|
}
|
||||||
const id = todo._id;
|
this.queue.create(todo);
|
||||||
const delayTimeForQueue = Math.floor((new Date(todo.due_date).getTime() - new Date().getTime()) / 1000);
|
|
||||||
|
|
||||||
this.queue.create(todo, delayTimeForQueue);
|
|
||||||
|
|
||||||
return res.json(todo);
|
return res.json(todo);
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
@ -1,23 +1,29 @@
|
||||||
import { Request, Response, NextFunction } from 'express';
|
|
||||||
import { ApiError } from '../utils/ApiError';
|
import { ApiError } from '../utils/ApiError';
|
||||||
|
import { Request, Response, NextFunction } from 'express';
|
||||||
|
import moment from 'moment';
|
||||||
|
|
||||||
const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => {
|
const createTodoMiddleware = async (req: Request, res: Response, next: NextFunction) => {
|
||||||
const { title, description, due_date } = req.body;
|
const { title, description, due_date } = req.body;
|
||||||
|
|
||||||
if (!title || !due_date) {
|
if (!title || !due_date) {
|
||||||
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
|
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
|
||||||
return next(error);
|
return next(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if(new Date(due_date) < new Date()) {
|
const inputDate = new Date(due_date);
|
||||||
const error = new ApiError(`due_date must be greater than current date`, 400, 'Bad Request');
|
if (isNaN(inputDate.getTime()) || inputDate < new Date()) {
|
||||||
|
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
||||||
return next(error);
|
return next(error);
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
const error = new ApiError(`due_date must be a valid date`, 400, 'Bad Request');
|
const parsedDate = moment(due_date, 'YYYY-MM-DD HH:mm:ss');
|
||||||
return next(error);
|
if (!parsedDate.isValid() || parsedDate < moment()) {
|
||||||
|
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
||||||
|
return next(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!description) {
|
if (!description) {
|
||||||
req.body.description = '';
|
req.body.description = '';
|
||||||
}
|
}
|
||||||
|
@ -26,5 +32,5 @@ const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunct
|
||||||
}
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
createTodoMiddleWare
|
createTodoMiddleware
|
||||||
}
|
}
|
61
todo-service/src/rabbitmq/RabbitMQ.ts
Normal file
61
todo-service/src/rabbitmq/RabbitMQ.ts
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
import amqp, { Options } from 'amqplib';
|
||||||
|
import { ITodo } from '../schemas/todoSchema';
|
||||||
|
|
||||||
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
|
export class RabbitMQ {
|
||||||
|
channel: amqp.Channel;
|
||||||
|
queueName: string;
|
||||||
|
exchange: string;
|
||||||
|
queue: string;
|
||||||
|
routingKey: string;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
|
|
||||||
|
this.exchange = 'delayed_exchange';
|
||||||
|
this.queue = 'delayed_queue';
|
||||||
|
this.routingKey = 'delayed_routing_key'
|
||||||
|
|
||||||
|
this.connect().then(() => {
|
||||||
|
console.log('RabbitMQ connected');
|
||||||
|
}).catch((error) => {
|
||||||
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async connect() {
|
||||||
|
console.log("IN CONNECT")
|
||||||
|
try {
|
||||||
|
const connection = await amqp.connect({
|
||||||
|
protocol: 'amqp',
|
||||||
|
hostname: env.RABBITMQ_HOST,
|
||||||
|
port: parseInt(env.RABBITMQ_PORT),
|
||||||
|
username: env.RABBITMQ_USERNAME,
|
||||||
|
password: env.RABBITMQ_PASSWORD,
|
||||||
|
});
|
||||||
|
this.channel = await connection.createChannel();
|
||||||
|
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||||
|
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||||
|
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||||
|
console.log('Channel and queue asserted successfully');
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async create(payload: ITodo) {
|
||||||
|
console.log("IN CREATE!")
|
||||||
|
const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now();
|
||||||
|
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
|
||||||
|
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
||||||
|
try {
|
||||||
|
await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options);
|
||||||
|
console.log('Message sent to the queue');
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error sending message to RabbitMQ:', error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
import { Router } from 'express';
|
import { Router } from 'express';
|
||||||
import { TodoController } from '../controllers/todoController';
|
import { TodoController } from '../controllers/todoController';
|
||||||
import { createTodoMiddleWare } from '../middleware/createTodoMiddleWare';
|
import { createTodoMiddleware } from '../middleware/createTodoMiddleWare';
|
||||||
|
|
||||||
class TodoRouter {
|
class TodoRouter {
|
||||||
router: Router;
|
router: Router;
|
||||||
|
@ -15,8 +15,8 @@ class TodoRouter {
|
||||||
private setRoutes() {
|
private setRoutes() {
|
||||||
this.router.get('/', this.todoController.getAll);
|
this.router.get('/', this.todoController.getAll);
|
||||||
this.router.get('/:id', this.todoController.getOne);
|
this.router.get('/:id', this.todoController.getOne);
|
||||||
this.router.post('/', createTodoMiddleWare, this.todoController.createOne);
|
this.router.post('/', createTodoMiddleware, this.todoController.createOne);
|
||||||
this.router.put('/:id', createTodoMiddleWare, this.todoController.updateOne);
|
this.router.put('/:id', createTodoMiddleware, this.todoController.updateOne);
|
||||||
this.router.delete('/:id', this.todoController.deleteOne);
|
this.router.delete('/:id', this.todoController.deleteOne);
|
||||||
this.router.delete('/', this.todoController.removeAll)
|
this.router.delete('/', this.todoController.removeAll)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue