done with consumer
This commit is contained in:
parent
9ffc77e95c
commit
a0334bf886
5 changed files with 97 additions and 72 deletions
|
@ -1,26 +1,20 @@
|
|||
import amqp from 'amqplib';
|
||||
import amqp, { ConsumeMessage } from 'amqplib';
|
||||
import { ITodo } from '../interfaces/ITodo';
|
||||
import { MongoDbModel } from '../mongodb/MongoDb';
|
||||
|
||||
const env = require('dotenv').config().parsed;
|
||||
|
||||
|
||||
export class RabbitMQ {
|
||||
export class RabbitMQ {
|
||||
channel: amqp.Channel;
|
||||
queueName: string;
|
||||
exchangeName: string;
|
||||
|
||||
mongoClient: MongoDbModel;
|
||||
exchange: string;
|
||||
queue: string;
|
||||
routingKey: string;
|
||||
|
||||
constructor() {
|
||||
this.mongoClient = new MongoDbModel();
|
||||
|
||||
this.exchange = 'delayed_exchange';
|
||||
this.queue = 'delayed_queue';
|
||||
this.routingKey = 'delayed_routing_key'
|
||||
|
||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||
|
||||
this.mongoClient = new MongoDbModel();
|
||||
|
||||
this.connect().then(() => {
|
||||
console.log('RabbitMQ connected');
|
||||
}).catch((error) => {
|
||||
|
@ -29,17 +23,22 @@ export class RabbitMQ {
|
|||
}
|
||||
|
||||
async connect() {
|
||||
const connection = await amqp.connect({
|
||||
protocol: 'amqp',
|
||||
hostname: env.RABBITMQ_HOST,
|
||||
port: parseInt(env.RABBITMQ_PORT),
|
||||
username: env.RABBITMQ_USERNAME,
|
||||
password: env.RABBITMQ_PASSWORD,
|
||||
});
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||
await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey);
|
||||
await this.channel.assertQueue(this.queueName, { durable: true });
|
||||
try {
|
||||
const connection = await amqp.connect({
|
||||
protocol: 'amqp',
|
||||
hostname: env.RABBITMQ_HOST,
|
||||
port: parseInt(env.RABBITMQ_PORT),
|
||||
username: env.RABBITMQ_USERNAME,
|
||||
password: env.RABBITMQ_PASSWORD,
|
||||
});
|
||||
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertQueue(this.queueName);
|
||||
console.log('Channel and queue asserted successfully #####');
|
||||
} catch (error) {
|
||||
console.error('Error connecting to RabbitMQ:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async create(payload: ITodo, delayTimeForQueue: number) {
|
||||
|
@ -56,32 +55,43 @@ export class RabbitMQ {
|
|||
}
|
||||
|
||||
async startConsumer() {
|
||||
try {
|
||||
console.log('Consumer started, waiting for messages...');
|
||||
this.channel.consume(this.queueName, async (message) => {
|
||||
if (message) {
|
||||
const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
||||
payload: ITodo;
|
||||
delayTimeForQueue: number;
|
||||
};
|
||||
|
||||
console.log('Received notification:', payload);
|
||||
|
||||
try {
|
||||
await this.mongoClient.updateTodoStatus(payload);
|
||||
console.log('Updated todo status in the DB');
|
||||
} catch {
|
||||
await this.create(payload, delayTimeForQueue);
|
||||
this.channel.ack(message);
|
||||
console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||
return;
|
||||
}
|
||||
|
||||
this.channel.ack(message);
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error consuming messages from RabbitMQ:', error);
|
||||
}
|
||||
this.channel.assertQueue(this.queueName);
|
||||
console.log("Consuming");
|
||||
this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
|
||||
if (message) {
|
||||
const todo = JSON.parse(message.content.toString());
|
||||
this.channel.ack(message);
|
||||
console.log('Received notification:', todo);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// try {
|
||||
// console.log('Consumer started, waiting for messages...');
|
||||
// this.channel.consume(this.queueName, async (message) => {
|
||||
// if (message) {
|
||||
// const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
||||
// payload: ITodo;
|
||||
// delayTimeForQueue: number;
|
||||
// };
|
||||
|
||||
// console.log('Received notification:', payload);
|
||||
|
||||
// try {
|
||||
// await this.mongoClient.updateTodoStatus(payload);
|
||||
// console.log('Updated todo status in the DB');
|
||||
// } catch {
|
||||
// await this.create(payload, delayTimeForQueue);
|
||||
// this.channel.ack(message);
|
||||
// console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||
// return;
|
||||
// }
|
||||
|
||||
// this.channel.ack(message);
|
||||
// }
|
||||
// });
|
||||
// } catch (error) {
|
||||
// console.error('Error consuming messages from RabbitMQ:', error);
|
||||
// }
|
||||
}
|
|
@ -7,9 +7,9 @@ POST http://localhost:3000/todo
|
|||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"title": "Go to the gymNEW!!!",
|
||||
"title": "-2!!!!NEW!!!!",
|
||||
"description": "Go to the gym at 8pm",
|
||||
"due_date": "2023-07-08T19:00:00.891Z"
|
||||
"due_date": "2023-07-09T19:00:00.891Z"
|
||||
}
|
||||
|
||||
### update request
|
||||
|
|
|
@ -15,11 +15,14 @@
|
|||
"aws-sdk": "^2.1413.0",
|
||||
"dotenv": "^16.3.1",
|
||||
"express": "^4.18.2",
|
||||
"luxon": "^3.3.0",
|
||||
"moment": "^2.29.4",
|
||||
"moment-timezone": "^0.5.43",
|
||||
"mongoose": "^7.3.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.10.1",
|
||||
"@types/express": "^4.17.17"
|
||||
"@types/express": "^4.17.17",
|
||||
"@types/luxon": "^3.3.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { Request, Response, NextFunction } from 'express';
|
||||
import { ApiError } from '../utils/ApiError';
|
||||
import { DateTime } from 'luxon';
|
||||
import moment from 'moment-timezone';
|
||||
|
||||
const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => {
|
||||
const { title, description, due_date } = req.body;
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
import amqp, { Options } from 'amqplib';
|
||||
import amqp, { Options, ConsumeMessage, Channel } from 'amqplib';
|
||||
import { ITodo } from '../schemas/todoSchema';
|
||||
|
||||
const env = require('dotenv').config().parsed;
|
||||
|
||||
export class RabbitMQ {
|
||||
connection: amqp.Connection;
|
||||
channel: amqp.Channel;
|
||||
queueName: string;
|
||||
queue: string;
|
||||
exchange: string;
|
||||
|
||||
|
||||
constructor() {
|
||||
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||
this.queue = env.RABBITMQ_QUEUE_NAME;
|
||||
this.exchange = 'delayed_exchange';
|
||||
|
||||
this.connect().then(() => {
|
||||
console.log('RabbitMQ connected');
|
||||
}).catch((error) => {
|
||||
|
@ -18,17 +23,25 @@ export class RabbitMQ {
|
|||
|
||||
async connect() {
|
||||
try {
|
||||
const connection = await amqp.connect({
|
||||
this.connection = await amqp.connect({
|
||||
protocol: 'amqp',
|
||||
hostname: env.RABBITMQ_HOST,
|
||||
port: parseInt(env.RABBITMQ_PORT),
|
||||
username: env.RABBITMQ_USERNAME,
|
||||
password: env.RABBITMQ_PASSWORD,
|
||||
});
|
||||
this.channel = await connection.createChannel();
|
||||
await this.channel.assertExchange(this.queueName, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
|
||||
await this.channel.assertQueue(this.queueName);
|
||||
console.log('Channel and queue asserted successfully');
|
||||
|
||||
this.channel = await this.connection.createChannel();
|
||||
await this.channel.assertQueue(this.queue);
|
||||
await this.channel.assertExchange(this.exchange, 'x-delayed-message', {
|
||||
durable: true,
|
||||
arguments: {
|
||||
'x-delayed-type': 'direct'
|
||||
}
|
||||
});
|
||||
await this.channel.bindQueue(this.queue, this.exchange, this.queue);
|
||||
|
||||
console.log('Channel and queue asserted successfully #####');
|
||||
} catch (error) {
|
||||
console.error('Error connecting to RabbitMQ:', error);
|
||||
throw error;
|
||||
|
@ -38,26 +51,24 @@ export class RabbitMQ {
|
|||
async create(payload: ITodo) {
|
||||
|
||||
const delayTimeForQueue = this.calculateDelayTimeForQueue(payload);
|
||||
|
||||
console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms");
|
||||
|
||||
const message = JSON.stringify({ payload });
|
||||
const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
|
||||
console.log("Options: ");
|
||||
console.log(options);
|
||||
const options: Options.Publish = { headers: { 'x-delay': delayTimeForQueue } };
|
||||
try {
|
||||
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
|
||||
console.log('Message sent to the queue');
|
||||
await this.channel.publish(this.exchange, this.queue, Buffer.from(message),
|
||||
options
|
||||
)
|
||||
console.log(`Queue name is: ${this.queue}`)
|
||||
} catch (error) {
|
||||
console.error('Error sending message to RabbitMQ:', error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
calculateDelayTimeForQueue(payload: ITodo) {
|
||||
|
||||
const delayTimeForQueue = payload.due_date.getTime() - Date.now();
|
||||
return 30000;
|
||||
// return delayTimeForQueue;
|
||||
const delayTime = payload.due_date.getTime() - Date.now();
|
||||
return delayTime;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue