Compare commits
3 commits
e3268b4633
...
a0334bf886
Author | SHA1 | Date | |
---|---|---|---|
a0334bf886 | |||
9ffc77e95c | |||
c5164fdba1 |
8 changed files with 131 additions and 105 deletions
5
infra/docker/Dockerfile
Normal file
5
infra/docker/Dockerfile
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
FROM rabbitmq:3.8-management
|
||||||
|
RUN apt-get update && apt-get install -y wget unzip
|
||||||
|
RUN wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
|
||||||
|
RUN mv rabbitmq_delayed_message_exchange-3.8.0.ez $RABBITMQ_HOME/plugins/
|
||||||
|
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
|
|
@ -16,25 +16,20 @@ services:
|
||||||
expose:
|
expose:
|
||||||
- 27017
|
- 27017
|
||||||
rabbitmq:
|
rabbitmq:
|
||||||
image: heidiks/rabbitmq-delayed-message-exchange:latest
|
image: todorabbit:latest
|
||||||
restart: always
|
restart: always
|
||||||
ports:
|
ports:
|
||||||
- 5672:5672
|
- 5672:5672
|
||||||
- 15672:15672
|
- 15672:15672
|
||||||
volumes:
|
volumes:
|
||||||
- rabbitmq_vol:/var/lib/rabbitmq
|
- rabbitmq_volume:/var/lib/rabbitmq
|
||||||
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
|
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
|
||||||
environment:
|
environment:
|
||||||
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
|
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
|
||||||
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
|
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
|
||||||
command: |
|
|
||||||
bash -c "
|
|
||||||
rabbitmq-plugins enable rabbitmq_delayed_message_exchange && \
|
|
||||||
rabbitmq-server
|
|
||||||
"
|
|
||||||
expose:
|
expose:
|
||||||
- 5672
|
- 5672
|
||||||
- 15672
|
- 15672
|
||||||
volumes:
|
volumes:
|
||||||
mongodb_vol:
|
mongodb_vol:
|
||||||
rabbitmq_vol:
|
rabbitmq_volume:
|
||||||
|
|
|
@ -1,26 +1,20 @@
|
||||||
import amqp from 'amqplib';
|
import amqp, { ConsumeMessage } from 'amqplib';
|
||||||
import { ITodo } from '../interfaces/ITodo';
|
import { ITodo } from '../interfaces/ITodo';
|
||||||
import { MongoDbModel } from '../mongodb/MongoDb';
|
import { MongoDbModel } from '../mongodb/MongoDb';
|
||||||
|
|
||||||
const env = require('dotenv').config().parsed;
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
|
|
||||||
export class RabbitMQ {
|
export class RabbitMQ {
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
queueName: string;
|
queueName: string;
|
||||||
mongoClient: MongoDbModel;
|
exchangeName: string;
|
||||||
exchange: string;
|
|
||||||
queue: string;
|
|
||||||
routingKey: string;
|
|
||||||
|
|
||||||
|
mongoClient: MongoDbModel;
|
||||||
constructor() {
|
constructor() {
|
||||||
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
|
|
||||||
this.mongoClient = new MongoDbModel();
|
this.mongoClient = new MongoDbModel();
|
||||||
|
|
||||||
this.exchange = 'delayed_exchange';
|
|
||||||
this.queue = 'delayed_queue';
|
|
||||||
this.routingKey = 'delayed_routing_key'
|
|
||||||
|
|
||||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
|
||||||
this.connect().then(() => {
|
this.connect().then(() => {
|
||||||
console.log('RabbitMQ connected');
|
console.log('RabbitMQ connected');
|
||||||
}).catch((error) => {
|
}).catch((error) => {
|
||||||
|
@ -29,6 +23,7 @@ export class RabbitMQ {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
|
try {
|
||||||
const connection = await amqp.connect({
|
const connection = await amqp.connect({
|
||||||
protocol: 'amqp',
|
protocol: 'amqp',
|
||||||
hostname: env.RABBITMQ_HOST,
|
hostname: env.RABBITMQ_HOST,
|
||||||
|
@ -36,13 +31,14 @@ export class RabbitMQ {
|
||||||
username: env.RABBITMQ_USERNAME,
|
username: env.RABBITMQ_USERNAME,
|
||||||
password: env.RABBITMQ_PASSWORD,
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.channel = await connection.createChannel();
|
this.channel = await connection.createChannel();
|
||||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
await this.channel.assertQueue(this.queueName);
|
||||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
console.log('Channel and queue asserted successfully #####');
|
||||||
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
} catch (error) {
|
||||||
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
|
throw error;
|
||||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async create(payload: ITodo, delayTimeForQueue: number) {
|
async create(payload: ITodo, delayTimeForQueue: number) {
|
||||||
|
@ -59,33 +55,43 @@ export class RabbitMQ {
|
||||||
}
|
}
|
||||||
|
|
||||||
async startConsumer() {
|
async startConsumer() {
|
||||||
try {
|
this.channel.assertQueue(this.queueName);
|
||||||
await this.channel.prefetch(1);
|
console.log("Consuming");
|
||||||
console.log('Consumer started, waiting for messages...');
|
this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
|
||||||
this.channel.consume(this.queueName, async (message) => {
|
|
||||||
if (message) {
|
if (message) {
|
||||||
const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
const todo = JSON.parse(message.content.toString());
|
||||||
payload: ITodo;
|
|
||||||
delayTimeForQueue: number;
|
|
||||||
};
|
|
||||||
|
|
||||||
console.log('Received notification:', payload);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await this.mongoClient.updateTodoStatus(payload);
|
|
||||||
console.log('Updated todo status in the DB');
|
|
||||||
} catch {
|
|
||||||
await this.create(payload, delayTimeForQueue);
|
|
||||||
this.channel.ack(message);
|
|
||||||
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.channel.ack(message);
|
this.channel.ack(message);
|
||||||
|
console.log('Received notification:', todo);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (error) {
|
|
||||||
console.error('Error consuming messages from RabbitMQ:', error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// try {
|
||||||
|
// console.log('Consumer started, waiting for messages...');
|
||||||
|
// this.channel.consume(this.queueName, async (message) => {
|
||||||
|
// if (message) {
|
||||||
|
// const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
||||||
|
// payload: ITodo;
|
||||||
|
// delayTimeForQueue: number;
|
||||||
|
// };
|
||||||
|
|
||||||
|
// console.log('Received notification:', payload);
|
||||||
|
|
||||||
|
// try {
|
||||||
|
// await this.mongoClient.updateTodoStatus(payload);
|
||||||
|
// console.log('Updated todo status in the DB');
|
||||||
|
// } catch {
|
||||||
|
// await this.create(payload, delayTimeForQueue);
|
||||||
|
// this.channel.ack(message);
|
||||||
|
// console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// this.channel.ack(message);
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
// } catch (error) {
|
||||||
|
// console.error('Error consuming messages from RabbitMQ:', error);
|
||||||
|
// }
|
||||||
}
|
}
|
|
@ -7,9 +7,9 @@ POST http://localhost:3000/todo
|
||||||
Content-Type: application/json
|
Content-Type: application/json
|
||||||
|
|
||||||
{
|
{
|
||||||
"title": "Go to the gymNEW!!!",
|
"title": "-2!!!!NEW!!!!",
|
||||||
"description": "Go to the gym at 8pm",
|
"description": "Go to the gym at 8pm",
|
||||||
"due_date": "2023-07-08T15:24:00.000Z"
|
"due_date": "2023-07-09T19:00:00.891Z"
|
||||||
}
|
}
|
||||||
|
|
||||||
### update request
|
### update request
|
||||||
|
|
|
@ -15,11 +15,14 @@
|
||||||
"aws-sdk": "^2.1413.0",
|
"aws-sdk": "^2.1413.0",
|
||||||
"dotenv": "^16.3.1",
|
"dotenv": "^16.3.1",
|
||||||
"express": "^4.18.2",
|
"express": "^4.18.2",
|
||||||
|
"luxon": "^3.3.0",
|
||||||
"moment": "^2.29.4",
|
"moment": "^2.29.4",
|
||||||
|
"moment-timezone": "^0.5.43",
|
||||||
"mongoose": "^7.3.1"
|
"mongoose": "^7.3.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/amqplib": "^0.10.1",
|
"@types/amqplib": "^0.10.1",
|
||||||
"@types/express": "^4.17.17"
|
"@types/express": "^4.17.17",
|
||||||
|
"@types/luxon": "^3.3.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,36 +1,40 @@
|
||||||
import { ApiError } from '../utils/ApiError';
|
|
||||||
import { Request, Response, NextFunction } from 'express';
|
import { Request, Response, NextFunction } from 'express';
|
||||||
import moment from 'moment';
|
import { ApiError } from '../utils/ApiError';
|
||||||
|
import { DateTime } from 'luxon';
|
||||||
|
import moment from 'moment-timezone';
|
||||||
|
|
||||||
const createTodoMiddleware = async (req: Request, res: Response, next: NextFunction) => {
|
const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => {
|
||||||
const { title, description, due_date } = req.body;
|
const { title, description, due_date } = req.body;
|
||||||
|
|
||||||
if (!title || !due_date) {
|
if (!title || !due_date) {
|
||||||
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
|
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
|
||||||
return next(error);
|
return next(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const inputDate = new Date(due_date);
|
if(new Date(due_date) < new Date()) {
|
||||||
if (isNaN(inputDate.getTime()) || inputDate < new Date()) {
|
const error = new ApiError(`due_date must be greater than current date`, 400, 'Bad Request');
|
||||||
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
|
||||||
return next(error);
|
return next(error);
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
const parsedDate = moment(due_date, 'YYYY-MM-DD HH:mm:ss');
|
const error = new ApiError(`due_date must be ISO `, 400, 'Bad Request');
|
||||||
if (!parsedDate.isValid() || parsedDate < moment()) {
|
|
||||||
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
|
||||||
return next(error);
|
return next(error);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (!description) {
|
if (!description) {
|
||||||
req.body.description = '';
|
req.body.description = '';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//check if date is valid, this is valid: 2023-07-08T14:00:00.000Z
|
||||||
|
const date = DateTime.fromISO(due_date);
|
||||||
|
if (!date.isValid) {
|
||||||
|
const error = new ApiError(`due_date must be a valid date Format`, 400, 'Bad Request');
|
||||||
|
return next(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
next();
|
next();
|
||||||
}
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
createTodoMiddleware
|
createTodoMiddleWare
|
||||||
}
|
}
|
|
@ -1,21 +1,18 @@
|
||||||
import amqp, { Options } from 'amqplib';
|
import amqp, { Options, ConsumeMessage, Channel } from 'amqplib';
|
||||||
import { ITodo } from '../schemas/todoSchema';
|
import { ITodo } from '../schemas/todoSchema';
|
||||||
|
|
||||||
const env = require('dotenv').config().parsed;
|
const env = require('dotenv').config().parsed;
|
||||||
|
|
||||||
export class RabbitMQ {
|
export class RabbitMQ {
|
||||||
|
connection: amqp.Connection;
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
queueName: string;
|
|
||||||
exchange: string;
|
|
||||||
queue: string;
|
queue: string;
|
||||||
routingKey: string;
|
exchange: string;
|
||||||
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
this.queue = env.RABBITMQ_QUEUE_NAME;
|
||||||
|
|
||||||
this.exchange = 'delayed_exchange';
|
this.exchange = 'delayed_exchange';
|
||||||
this.queue = 'delayed_queue';
|
|
||||||
this.routingKey = 'delayed_routing_key'
|
|
||||||
|
|
||||||
this.connect().then(() => {
|
this.connect().then(() => {
|
||||||
console.log('RabbitMQ connected');
|
console.log('RabbitMQ connected');
|
||||||
|
@ -25,20 +22,26 @@ export class RabbitMQ {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
console.log("IN CONNECT")
|
|
||||||
try {
|
try {
|
||||||
const connection = await amqp.connect({
|
this.connection = await amqp.connect({
|
||||||
protocol: 'amqp',
|
protocol: 'amqp',
|
||||||
hostname: env.RABBITMQ_HOST,
|
hostname: env.RABBITMQ_HOST,
|
||||||
port: parseInt(env.RABBITMQ_PORT),
|
port: parseInt(env.RABBITMQ_PORT),
|
||||||
username: env.RABBITMQ_USERNAME,
|
username: env.RABBITMQ_USERNAME,
|
||||||
password: env.RABBITMQ_PASSWORD,
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
this.channel = await connection.createChannel();
|
|
||||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
this.channel = await this.connection.createChannel();
|
||||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
await this.channel.assertQueue(this.queue);
|
||||||
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
await this.channel.assertExchange(this.exchange, 'x-delayed-message', {
|
||||||
console.log('Channel and queue asserted successfully');
|
durable: true,
|
||||||
|
arguments: {
|
||||||
|
'x-delayed-type': 'direct'
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await this.channel.bindQueue(this.queue, this.exchange, this.queue);
|
||||||
|
|
||||||
|
console.log('Channel and queue asserted successfully #####');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error connecting to RabbitMQ:', error);
|
console.error('Error connecting to RabbitMQ:', error);
|
||||||
throw error;
|
throw error;
|
||||||
|
@ -46,16 +49,26 @@ export class RabbitMQ {
|
||||||
}
|
}
|
||||||
|
|
||||||
async create(payload: ITodo) {
|
async create(payload: ITodo) {
|
||||||
console.log("IN CREATE!")
|
|
||||||
const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now();
|
const delayTimeForQueue = this.calculateDelayTimeForQueue(payload);
|
||||||
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
|
console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms");
|
||||||
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
|
||||||
|
const message = JSON.stringify({ payload });
|
||||||
|
const options: Options.Publish = { headers: { 'x-delay': delayTimeForQueue } };
|
||||||
try {
|
try {
|
||||||
await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options);
|
await this.channel.publish(this.exchange, this.queue, Buffer.from(message),
|
||||||
console.log('Message sent to the queue');
|
options
|
||||||
|
)
|
||||||
|
console.log(`Queue name is: ${this.queue}`)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error sending message to RabbitMQ:', error);
|
console.error('Error sending message to RabbitMQ:', error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
calculateDelayTimeForQueue(payload: ITodo) {
|
||||||
|
const delayTime = payload.due_date.getTime() - Date.now();
|
||||||
|
return delayTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import { Router } from 'express';
|
import { Router } from 'express';
|
||||||
import { TodoController } from '../controllers/todoController';
|
import { TodoController } from '../controllers/todoController';
|
||||||
import { createTodoMiddleware } from '../middleware/createTodoMiddleWare';
|
import { createTodoMiddleWare } from '../middleware/createTodoMiddleWare';
|
||||||
|
|
||||||
class TodoRouter {
|
class TodoRouter {
|
||||||
router: Router;
|
router: Router;
|
||||||
|
@ -15,8 +15,8 @@ class TodoRouter {
|
||||||
private setRoutes() {
|
private setRoutes() {
|
||||||
this.router.get('/', this.todoController.getAll);
|
this.router.get('/', this.todoController.getAll);
|
||||||
this.router.get('/:id', this.todoController.getOne);
|
this.router.get('/:id', this.todoController.getOne);
|
||||||
this.router.post('/', createTodoMiddleware, this.todoController.createOne);
|
this.router.post('/', createTodoMiddleWare, this.todoController.createOne);
|
||||||
this.router.put('/:id', createTodoMiddleware, this.todoController.updateOne);
|
this.router.put('/:id', createTodoMiddleWare, this.todoController.updateOne);
|
||||||
this.router.delete('/:id', this.todoController.deleteOne);
|
this.router.delete('/:id', this.todoController.deleteOne);
|
||||||
this.router.delete('/', this.todoController.removeAll)
|
this.router.delete('/', this.todoController.removeAll)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue