working but with bug in the delay
This commit is contained in:
parent
c5164fdba1
commit
9ffc77e95c
6 changed files with 46 additions and 45 deletions
|
@ -22,7 +22,7 @@ services:
|
|||
- 5672:5672
|
||||
- 15672:15672
|
||||
volumes:
|
||||
- rabbitmq_vol:/var/lib/rabbitmq
|
||||
- rabbitmq_volume:/var/lib/rabbitmq
|
||||
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
|
||||
environment:
|
||||
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
|
||||
|
@ -32,4 +32,4 @@ services:
|
|||
- 15672
|
||||
volumes:
|
||||
mongodb_vol:
|
||||
rabbitmq_vol:
|
||||
rabbitmq_volume:
|
||||
|
|
|
@ -38,10 +38,7 @@ export class RabbitMQ {
|
|||
});
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||
|
||||
|
||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||
}
|
||||
|
||||
|
@ -60,7 +57,6 @@ export class RabbitMQ {
|
|||
|
||||
async startConsumer() {
|
||||
try {
|
||||
await this.channel.prefetch(1);
|
||||
console.log('Consumer started, waiting for messages...');
|
||||
this.channel.consume(this.queueName, async (message) => {
|
||||
if (message) {
|
||||
|
|
|
@ -9,7 +9,7 @@ Content-Type: application/json
|
|||
{
|
||||
"title": "Go to the gymNEW!!!",
|
||||
"description": "Go to the gym at 8pm",
|
||||
"due_date": "2023-07-08T16:02:00.000Z"
|
||||
"due_date": "2023-07-08T19:00:00.891Z"
|
||||
}
|
||||
|
||||
### update request
|
||||
|
|
|
@ -1,36 +1,39 @@
|
|||
import { ApiError } from '../utils/ApiError';
|
||||
import { Request, Response, NextFunction } from 'express';
|
||||
import moment from 'moment';
|
||||
import { ApiError } from '../utils/ApiError';
|
||||
import { DateTime } from 'luxon';
|
||||
|
||||
const createTodoMiddleware = async (req: Request, res: Response, next: NextFunction) => {
|
||||
const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => {
|
||||
const { title, description, due_date } = req.body;
|
||||
|
||||
if (!title || !due_date) {
|
||||
const error = new ApiError(`${!title ? 'title' : 'due_date'} is required`, 400, 'Bad Request');
|
||||
return next(error);
|
||||
}
|
||||
|
||||
try {
|
||||
const inputDate = new Date(due_date);
|
||||
if (isNaN(inputDate.getTime()) || inputDate < new Date()) {
|
||||
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
||||
if(new Date(due_date) < new Date()) {
|
||||
const error = new ApiError(`due_date must be greater than current date`, 400, 'Bad Request');
|
||||
return next(error);
|
||||
}
|
||||
} catch {
|
||||
const parsedDate = moment(due_date, 'YYYY-MM-DD HH:mm:ss');
|
||||
if (!parsedDate.isValid() || parsedDate < moment()) {
|
||||
const error = new ApiError('due_date must be greater than current date', 400, 'Bad Request');
|
||||
return next(error);
|
||||
}
|
||||
const error = new ApiError(`due_date must be ISO `, 400, 'Bad Request');
|
||||
return next(error);
|
||||
}
|
||||
|
||||
|
||||
if (!description) {
|
||||
req.body.description = '';
|
||||
}
|
||||
|
||||
//check if date is valid, this is valid: 2023-07-08T14:00:00.000Z
|
||||
const date = DateTime.fromISO(due_date);
|
||||
if (!date.isValid) {
|
||||
const error = new ApiError(`due_date must be a valid date Format`, 400, 'Bad Request');
|
||||
return next(error);
|
||||
}
|
||||
|
||||
|
||||
next();
|
||||
}
|
||||
|
||||
export {
|
||||
createTodoMiddleware
|
||||
createTodoMiddleWare
|
||||
}
|
|
@ -6,17 +6,9 @@ const env = require('dotenv').config().parsed;
|
|||
export class RabbitMQ {
|
||||
channel: amqp.Channel;
|
||||
queueName: string;
|
||||
exchange: string;
|
||||
queue: string;
|
||||
routingKey: string;
|
||||
|
||||
constructor() {
|
||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||
|
||||
this.exchange = 'delayed_exchange';
|
||||
this.queue = 'delayed_queue';
|
||||
this.routingKey = 'delayed_routing_key'
|
||||
|
||||
this.connect().then(() => {
|
||||
console.log('RabbitMQ connected');
|
||||
}).catch((error) => {
|
||||
|
@ -25,7 +17,6 @@ export class RabbitMQ {
|
|||
}
|
||||
|
||||
async connect() {
|
||||
console.log("IN CONNECT")
|
||||
try {
|
||||
const connection = await amqp.connect({
|
||||
protocol: 'amqp',
|
||||
|
@ -35,9 +26,8 @@ export class RabbitMQ {
|
|||
password: env.RABBITMQ_PASSWORD,
|
||||
});
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||
await this.channel.assertExchange(this.queueName, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||
await this.channel.assertQueue(this.queueName);
|
||||
console.log('Channel and queue asserted successfully');
|
||||
} catch (error) {
|
||||
console.error('Error connecting to RabbitMQ:', error);
|
||||
|
@ -46,16 +36,28 @@ export class RabbitMQ {
|
|||
}
|
||||
|
||||
async create(payload: ITodo) {
|
||||
console.log("IN CREATE!")
|
||||
const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now();
|
||||
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
|
||||
|
||||
const delayTimeForQueue = this.calculateDelayTimeForQueue(payload);
|
||||
|
||||
console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms");
|
||||
|
||||
const message = JSON.stringify({ payload });
|
||||
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
||||
console.log("Options: ");
|
||||
console.log(options);
|
||||
try {
|
||||
await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options);
|
||||
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
|
||||
console.log('Message sent to the queue');
|
||||
} catch (error) {
|
||||
console.error('Error sending message to RabbitMQ:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
calculateDelayTimeForQueue(payload: ITodo) {
|
||||
|
||||
const delayTimeForQueue = payload.due_date.getTime() - Date.now();
|
||||
return 30000;
|
||||
// return delayTimeForQueue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { Router } from 'express';
|
||||
import { TodoController } from '../controllers/todoController';
|
||||
import { createTodoMiddleware } from '../middleware/createTodoMiddleWare';
|
||||
import { createTodoMiddleWare } from '../middleware/createTodoMiddleWare';
|
||||
|
||||
class TodoRouter {
|
||||
router: Router;
|
||||
|
@ -15,8 +15,8 @@ class TodoRouter {
|
|||
private setRoutes() {
|
||||
this.router.get('/', this.todoController.getAll);
|
||||
this.router.get('/:id', this.todoController.getOne);
|
||||
this.router.post('/', createTodoMiddleware, this.todoController.createOne);
|
||||
this.router.put('/:id', createTodoMiddleware, this.todoController.updateOne);
|
||||
this.router.post('/', createTodoMiddleWare, this.todoController.createOne);
|
||||
this.router.put('/:id', createTodoMiddleWare, this.todoController.updateOne);
|
||||
this.router.delete('/:id', this.todoController.deleteOne);
|
||||
this.router.delete('/', this.todoController.removeAll)
|
||||
|
||||
|
|
Loading…
Reference in a new issue