Compare commits

..

3 commits

Author SHA1 Message Date
a0334bf886 done with consumer 2023-07-08 22:35:20 +03:00
9ffc77e95c working but with bug in the delay 2023-07-08 20:52:30 +03:00
c5164fdba1 added Dockerfile for cost rabbit image 2023-07-08 16:08:35 +03:00
8 changed files with 131 additions and 105 deletions

5
infra/docker/Dockerfile Normal file
View file

@ -0,0 +1,5 @@
FROM rabbitmq:3.8-management
RUN apt-get update && apt-get install -y wget unzip
RUN wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
RUN mv rabbitmq_delayed_message_exchange-3.8.0.ez $RABBITMQ_HOME/plugins/
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

View file

@ -16,25 +16,20 @@ services:
expose: expose:
- 27017 - 27017
rabbitmq: rabbitmq:
image: heidiks/rabbitmq-delayed-message-exchange:latest image: todorabbit:latest
restart: always restart: always
ports: ports:
- 5672:5672 - 5672:5672
- 15672:15672 - 15672:15672
volumes: volumes:
- rabbitmq_vol:/var/lib/rabbitmq - rabbitmq_volume:/var/lib/rabbitmq
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh - ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
environment: environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER} - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS} - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
command: |
bash -c "
rabbitmq-plugins enable rabbitmq_delayed_message_exchange && \
rabbitmq-server
"
expose: expose:
- 5672 - 5672
- 15672 - 15672
volumes: volumes:
mongodb_vol: mongodb_vol:
rabbitmq_vol: rabbitmq_volume:

View file

@ -1,26 +1,20 @@
import amqp from 'amqplib'; import amqp, { ConsumeMessage } from 'amqplib';
import { ITodo } from '../interfaces/ITodo'; import { ITodo } from '../interfaces/ITodo';
import { MongoDbModel } from '../mongodb/MongoDb'; import { MongoDbModel } from '../mongodb/MongoDb';
const env = require('dotenv').config().parsed; const env = require('dotenv').config().parsed;
export class RabbitMQ { export class RabbitMQ {
channel: amqp.Channel; channel: amqp.Channel;
queueName: string; queueName: string;
exchangeName: string;
mongoClient: MongoDbModel; mongoClient: MongoDbModel;
exchange: string;
queue: string;
routingKey: string;
constructor() { constructor() {
this.mongoClient = new MongoDbModel();
this.exchange = 'delayed_exchange';
this.queue = 'delayed_queue';
this.routingKey = 'delayed_routing_key'
this.queueName = env.RABBITMQ_QUEUE_NAME; this.queueName = env.RABBITMQ_QUEUE_NAME;
this.mongoClient = new MongoDbModel();
this.connect().then(() => { this.connect().then(() => {
console.log('RabbitMQ connected'); console.log('RabbitMQ connected');
}).catch((error) => { }).catch((error) => {
@ -29,20 +23,22 @@ export class RabbitMQ {
} }
async connect() { async connect() {
const connection = await amqp.connect({ try {
protocol: 'amqp', const connection = await amqp.connect({
hostname: env.RABBITMQ_HOST, protocol: 'amqp',
port: parseInt(env.RABBITMQ_PORT), hostname: env.RABBITMQ_HOST,
username: env.RABBITMQ_USERNAME, port: parseInt(env.RABBITMQ_PORT),
password: env.RABBITMQ_PASSWORD, username: env.RABBITMQ_USERNAME,
}); password: env.RABBITMQ_PASSWORD,
this.channel = await connection.createChannel(); });
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
await this.channel.assertQueue(this.queueName, { durable: true });
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
this.channel = await connection.createChannel();
await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.assertQueue(this.queueName);
console.log('Channel and queue asserted successfully #####');
} catch (error) {
console.error('Error connecting to RabbitMQ:', error);
throw error;
}
} }
async create(payload: ITodo, delayTimeForQueue: number) { async create(payload: ITodo, delayTimeForQueue: number) {
@ -59,33 +55,43 @@ export class RabbitMQ {
} }
async startConsumer() { async startConsumer() {
try { this.channel.assertQueue(this.queueName);
await this.channel.prefetch(1); console.log("Consuming");
console.log('Consumer started, waiting for messages...'); this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
this.channel.consume(this.queueName, async (message) => { if (message) {
if (message) { const todo = JSON.parse(message.content.toString());
const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as { this.channel.ack(message);
payload: ITodo; console.log('Received notification:', todo);
delayTimeForQueue: number; }
}; });
console.log('Received notification:', payload);
try {
await this.mongoClient.updateTodoStatus(payload);
console.log('Updated todo status in the DB');
} catch {
await this.create(payload, delayTimeForQueue);
this.channel.ack(message);
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
return;
}
this.channel.ack(message);
}
});
} catch (error) {
console.error('Error consuming messages from RabbitMQ:', error);
}
} }
}
// try {
// console.log('Consumer started, waiting for messages...');
// this.channel.consume(this.queueName, async (message) => {
// if (message) {
// const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
// payload: ITodo;
// delayTimeForQueue: number;
// };
// console.log('Received notification:', payload);
// try {
// await this.mongoClient.updateTodoStatus(payload);
// console.log('Updated todo status in the DB');
// } catch {
// await this.create(payload, delayTimeForQueue);
// this.channel.ack(message);
// console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
// return;
// }
// this.channel.ack(message);
// }
// });
// } catch (error) {
// console.error('Error consuming messages from RabbitMQ:', error);
// }
}

View file

@ -7,9 +7,9 @@ POST http://localhost:3000/todo
Content-Type: application/json Content-Type: application/json
{ {
"title": "Go to the gymNEW!!!", "title": "-2!!!!NEW!!!!",
"description": "Go to the gym at 8pm", "description": "Go to the gym at 8pm",
"due_date": "2023-07-08T15:24:00.000Z" "due_date": "2023-07-09T19:00:00.891Z"
} }
### update request ### update request

View file

@ -15,11 +15,14 @@
"aws-sdk": "^2.1413.0", "aws-sdk": "^2.1413.0",
"dotenv": "^16.3.1", "dotenv": "^16.3.1",
"express": "^4.18.2", "express": "^4.18.2",
"luxon": "^3.3.0",
"moment": "^2.29.4", "moment": "^2.29.4",
"moment-timezone": "^0.5.43",
"mongoose": "^7.3.1" "mongoose": "^7.3.1"
}, },
"devDependencies": { "devDependencies": {
"@types/amqplib": "^0.10.1", "@types/amqplib": "^0.10.1",
"@types/express": "^4.17.17" "@types/express": "^4.17.17",
"@types/luxon": "^3.3.0"
} }
} }

View file

@ -1,36 +1,40 @@
import { ApiError } from '../utils/ApiError';
import { Request, Response, NextFunction } from 'express'; import { Request, Response, NextFunction } from 'express';
import moment from 'moment'; import { ApiError } from '../utils/ApiError';
import { DateTime } from 'luxon';
import moment from 'moment-timezone';
const createTodoMiddleware = async (req: Request, res: Response, next: NextFunction) => { const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => {
const { title, description, due_date } = req.body; const { title, description, due_date } = req.body;
if (!title || !due_date) { if (!title || !due_date) {
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request'); const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
return next(error); return next(error);
} }
try { try {
const inputDate = new Date(due_date); if(new Date(due_date) < new Date()) {
if (isNaN(inputDate.getTime()) || inputDate < new Date()) { const error = new ApiError(`due_date must be greater than current date`, 400, 'Bad Request');
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
return next(error); return next(error);
} }
} catch { } catch {
const parsedDate = moment(due_date, 'YYYY-MM-DD HH:mm:ss'); const error = new ApiError(`due_date must be ISO `, 400, 'Bad Request');
if (!parsedDate.isValid() || parsedDate < moment()) { return next(error);
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
return next(error);
}
} }
if (!description) { if (!description) {
req.body.description = ''; req.body.description = '';
} }
//check if date is valid, this is valid: 2023-07-08T14:00:00.000Z
const date = DateTime.fromISO(due_date);
if (!date.isValid) {
const error = new ApiError(`due_date must be a valid date Format`, 400, 'Bad Request');
return next(error);
}
next(); next();
} }
export { export {
createTodoMiddleware createTodoMiddleWare
} }

View file

@ -1,21 +1,18 @@
import amqp, { Options } from 'amqplib'; import amqp, { Options, ConsumeMessage, Channel } from 'amqplib';
import { ITodo } from '../schemas/todoSchema'; import { ITodo } from '../schemas/todoSchema';
const env = require('dotenv').config().parsed; const env = require('dotenv').config().parsed;
export class RabbitMQ { export class RabbitMQ {
connection: amqp.Connection;
channel: amqp.Channel; channel: amqp.Channel;
queueName: string;
exchange: string;
queue: string; queue: string;
routingKey: string; exchange: string;
constructor() { constructor() {
this.queueName = env.RABBITMQ_QUEUE_NAME; this.queue = env.RABBITMQ_QUEUE_NAME;
this.exchange = 'delayed_exchange'; this.exchange = 'delayed_exchange';
this.queue = 'delayed_queue';
this.routingKey = 'delayed_routing_key'
this.connect().then(() => { this.connect().then(() => {
console.log('RabbitMQ connected'); console.log('RabbitMQ connected');
@ -25,20 +22,26 @@ export class RabbitMQ {
} }
async connect() { async connect() {
console.log("IN CONNECT")
try { try {
const connection = await amqp.connect({ this.connection = await amqp.connect({
protocol: 'amqp', protocol: 'amqp',
hostname: env.RABBITMQ_HOST, hostname: env.RABBITMQ_HOST,
port: parseInt(env.RABBITMQ_PORT), port: parseInt(env.RABBITMQ_PORT),
username: env.RABBITMQ_USERNAME, username: env.RABBITMQ_USERNAME,
password: env.RABBITMQ_PASSWORD, password: env.RABBITMQ_PASSWORD,
}); });
this.channel = await connection.createChannel();
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); this.channel = await this.connection.createChannel();
await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.assertQueue(this.queue);
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); await this.channel.assertExchange(this.exchange, 'x-delayed-message', {
console.log('Channel and queue asserted successfully'); durable: true,
arguments: {
'x-delayed-type': 'direct'
}
});
await this.channel.bindQueue(this.queue, this.exchange, this.queue);
console.log('Channel and queue asserted successfully #####');
} catch (error) { } catch (error) {
console.error('Error connecting to RabbitMQ:', error); console.error('Error connecting to RabbitMQ:', error);
throw error; throw error;
@ -46,16 +49,26 @@ export class RabbitMQ {
} }
async create(payload: ITodo) { async create(payload: ITodo) {
console.log("IN CREATE!")
const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now(); const delayTimeForQueue = this.calculateDelayTimeForQueue(payload);
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms");
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
const message = JSON.stringify({ payload });
const options: Options.Publish = { headers: { 'x-delay': delayTimeForQueue } };
try { try {
await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options); await this.channel.publish(this.exchange, this.queue, Buffer.from(message),
console.log('Message sent to the queue'); options
)
console.log(`Queue name is: ${this.queue}`)
} catch (error) { } catch (error) {
console.error('Error sending message to RabbitMQ:', error); console.error('Error sending message to RabbitMQ:', error);
throw error; throw error;
} }
}
calculateDelayTimeForQueue(payload: ITodo) {
const delayTime = payload.due_date.getTime() - Date.now();
return delayTime;
} }
} }

View file

@ -1,6 +1,6 @@
import { Router } from 'express'; import { Router } from 'express';
import { TodoController } from '../controllers/todoController'; import { TodoController } from '../controllers/todoController';
import { createTodoMiddleware } from '../middleware/createTodoMiddleWare'; import { createTodoMiddleWare } from '../middleware/createTodoMiddleWare';
class TodoRouter { class TodoRouter {
router: Router; router: Router;
@ -15,8 +15,8 @@ class TodoRouter {
private setRoutes() { private setRoutes() {
this.router.get('/', this.todoController.getAll); this.router.get('/', this.todoController.getAll);
this.router.get('/:id', this.todoController.getOne); this.router.get('/:id', this.todoController.getOne);
this.router.post('/', createTodoMiddleware, this.todoController.createOne); this.router.post('/', createTodoMiddleWare, this.todoController.createOne);
this.router.put('/:id', createTodoMiddleware, this.todoController.updateOne); this.router.put('/:id', createTodoMiddleWare, this.todoController.updateOne);
this.router.delete('/:id', this.todoController.deleteOne); this.router.delete('/:id', this.todoController.deleteOne);
this.router.delete('/', this.todoController.removeAll) this.router.delete('/', this.todoController.removeAll)