From 9ffc77e95c370b55a7daa8248ad1c6ef7467168e Mon Sep 17 00:00:00 2001 From: Kfir Dayan Date: Sat, 8 Jul 2023 20:52:30 +0300 Subject: [PATCH] working but with bug in the delay --- infra/docker/docker-compose.yaml | 4 +-- notification-service/src/rabbitmq/RabbitMQ.ts | 10 ++---- request.http | 2 +- .../src/middleware/createTodoMiddleWare.ts | 35 ++++++++++--------- todo-service/src/rabbitmq/RabbitMQ.ts | 34 +++++++++--------- todo-service/src/routes/todoRouter.ts | 6 ++-- 6 files changed, 46 insertions(+), 45 deletions(-) diff --git a/infra/docker/docker-compose.yaml b/infra/docker/docker-compose.yaml index 6ec940f..302e019 100644 --- a/infra/docker/docker-compose.yaml +++ b/infra/docker/docker-compose.yaml @@ -22,7 +22,7 @@ services: - 5672:5672 - 15672:15672 volumes: - - rabbitmq_vol:/var/lib/rabbitmq + - rabbitmq_volume:/var/lib/rabbitmq - ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh environment: - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER} @@ -32,4 +32,4 @@ services: - 15672 volumes: mongodb_vol: - rabbitmq_vol: + rabbitmq_volume: diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts index a4c8a41..2bada72 100644 --- a/notification-service/src/rabbitmq/RabbitMQ.ts +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -5,7 +5,7 @@ import { MongoDbModel } from '../mongodb/MongoDb'; const env = require('dotenv').config().parsed; -export class RabbitMQ { +export class RabbitMQ { channel: amqp.Channel; queueName: string; mongoClient: MongoDbModel; @@ -38,10 +38,7 @@ export class RabbitMQ { }); this.channel = await connection.createChannel(); await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); - await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); - - await this.channel.assertQueue(this.queueName, { durable: true }); } @@ -59,8 +56,7 @@ export class RabbitMQ { } async startConsumer() { - try { - await this.channel.prefetch(1); + try { console.log('Consumer started, waiting for messages...'); this.channel.consume(this.queueName, async (message) => { if (message) { @@ -88,4 +84,4 @@ export class RabbitMQ { console.error('Error consuming messages from RabbitMQ:', error); } } -} +} \ No newline at end of file diff --git a/request.http b/request.http index 309c54d..57354a4 100644 --- a/request.http +++ b/request.http @@ -9,7 +9,7 @@ Content-Type: application/json { "title": "Go to the gymNEW!!!", "description": "Go to the gym at 8pm", - "due_date": "2023-07-08T16:02:00.000Z" + "due_date": "2023-07-08T19:00:00.891Z" } ### update request diff --git a/todo-service/src/middleware/createTodoMiddleWare.ts b/todo-service/src/middleware/createTodoMiddleWare.ts index 30a4347..5e06736 100644 --- a/todo-service/src/middleware/createTodoMiddleWare.ts +++ b/todo-service/src/middleware/createTodoMiddleWare.ts @@ -1,36 +1,39 @@ -import { ApiError } from '../utils/ApiError'; import { Request, Response, NextFunction } from 'express'; -import moment from 'moment'; +import { ApiError } from '../utils/ApiError'; +import { DateTime } from 'luxon'; -const createTodoMiddleware = async (req: Request, res: Response, next: NextFunction) => { +const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => { const { title, description, due_date } = req.body; - if (!title || !due_date) { const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request'); return next(error); } - try { - const inputDate = new Date(due_date); - if (isNaN(inputDate.getTime()) || inputDate < new Date()) { - const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request'); + if(new Date(due_date) < new Date()) { + const error = new ApiError(`due_date must be greater than current date`, 400, 'Bad Request'); return next(error); - } + } } catch { - const parsedDate = moment(due_date, 'YYYY-MM-DD HH:mm:ss'); - if (!parsedDate.isValid() || parsedDate < moment()) { - const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request'); - return next(error); - } + const error = new ApiError(`due_date must be ISO `, 400, 'Bad Request'); + return next(error); } + if (!description) { req.body.description = ''; } + //check if date is valid, this is valid: 2023-07-08T14:00:00.000Z + const date = DateTime.fromISO(due_date); + if (!date.isValid) { + const error = new ApiError(`due_date must be a valid date Format`, 400, 'Bad Request'); + return next(error); + } + + next(); } export { - createTodoMiddleware -} + createTodoMiddleWare +} \ No newline at end of file diff --git a/todo-service/src/rabbitmq/RabbitMQ.ts b/todo-service/src/rabbitmq/RabbitMQ.ts index a0e0895..2cdda32 100644 --- a/todo-service/src/rabbitmq/RabbitMQ.ts +++ b/todo-service/src/rabbitmq/RabbitMQ.ts @@ -6,17 +6,9 @@ const env = require('dotenv').config().parsed; export class RabbitMQ { channel: amqp.Channel; queueName: string; - exchange: string; - queue: string; - routingKey: string; constructor() { this.queueName = env.RABBITMQ_QUEUE_NAME; - - this.exchange = 'delayed_exchange'; - this.queue = 'delayed_queue'; - this.routingKey = 'delayed_routing_key' - this.connect().then(() => { console.log('RabbitMQ connected'); }).catch((error) => { @@ -25,7 +17,6 @@ export class RabbitMQ { } async connect() { - console.log("IN CONNECT") try { const connection = await amqp.connect({ protocol: 'amqp', @@ -35,9 +26,8 @@ export class RabbitMQ { password: env.RABBITMQ_PASSWORD, }); this.channel = await connection.createChannel(); - await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); - await this.channel.assertQueue(this.queueName, { durable: true }); - await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); + await this.channel.assertExchange(this.queueName, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); + await this.channel.assertQueue(this.queueName); console.log('Channel and queue asserted successfully'); } catch (error) { console.error('Error connecting to RabbitMQ:', error); @@ -46,16 +36,28 @@ export class RabbitMQ { } async create(payload: ITodo) { - console.log("IN CREATE!") - const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now(); - const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); + + const delayTimeForQueue = this.calculateDelayTimeForQueue(payload); + + console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms"); + + const message = JSON.stringify({ payload }); const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } }; + console.log("Options: "); + console.log(options); try { - await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options); + await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); console.log('Message sent to the queue'); } catch (error) { console.error('Error sending message to RabbitMQ:', error); throw error; } } + + calculateDelayTimeForQueue(payload: ITodo) { + + const delayTimeForQueue = payload.due_date.getTime() - Date.now(); + return 30000; + // return delayTimeForQueue; + } } diff --git a/todo-service/src/routes/todoRouter.ts b/todo-service/src/routes/todoRouter.ts index 2b5567e..d671710 100644 --- a/todo-service/src/routes/todoRouter.ts +++ b/todo-service/src/routes/todoRouter.ts @@ -1,6 +1,6 @@ import { Router } from 'express'; import { TodoController } from '../controllers/todoController'; -import { createTodoMiddleware } from '../middleware/createTodoMiddleWare'; +import { createTodoMiddleWare } from '../middleware/createTodoMiddleWare'; class TodoRouter { router: Router; @@ -15,8 +15,8 @@ class TodoRouter { private setRoutes() { this.router.get('/', this.todoController.getAll); this.router.get('/:id', this.todoController.getOne); - this.router.post('/', createTodoMiddleware, this.todoController.createOne); - this.router.put('/:id', createTodoMiddleware, this.todoController.updateOne); + this.router.post('/', createTodoMiddleWare, this.todoController.createOne); + this.router.put('/:id', createTodoMiddleWare, this.todoController.updateOne); this.router.delete('/:id', this.todoController.deleteOne); this.router.delete('/', this.todoController.removeAll)