From a0334bf88644bab733b945c3a3e14e89bbaa118e Mon Sep 17 00:00:00 2001 From: Kfir Dayan Date: Sat, 8 Jul 2023 22:35:20 +0300 Subject: [PATCH] done with consumer --- notification-service/src/rabbitmq/RabbitMQ.ts | 112 ++++++++++-------- request.http | 4 +- todo-service/package.json | 5 +- .../src/middleware/createTodoMiddleWare.ts | 1 + todo-service/src/rabbitmq/RabbitMQ.ts | 47 +++++--- 5 files changed, 97 insertions(+), 72 deletions(-) diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts index 2bada72..7cfedcd 100644 --- a/notification-service/src/rabbitmq/RabbitMQ.ts +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -1,26 +1,20 @@ -import amqp from 'amqplib'; +import amqp, { ConsumeMessage } from 'amqplib'; import { ITodo } from '../interfaces/ITodo'; import { MongoDbModel } from '../mongodb/MongoDb'; const env = require('dotenv').config().parsed; - -export class RabbitMQ { +export class RabbitMQ { channel: amqp.Channel; queueName: string; + exchangeName: string; + mongoClient: MongoDbModel; - exchange: string; - queue: string; - routingKey: string; - constructor() { - this.mongoClient = new MongoDbModel(); - - this.exchange = 'delayed_exchange'; - this.queue = 'delayed_queue'; - this.routingKey = 'delayed_routing_key' - this.queueName = env.RABBITMQ_QUEUE_NAME; + + this.mongoClient = new MongoDbModel(); + this.connect().then(() => { console.log('RabbitMQ connected'); }).catch((error) => { @@ -29,17 +23,22 @@ export class RabbitMQ { } async connect() { - const connection = await amqp.connect({ - protocol: 'amqp', - hostname: env.RABBITMQ_HOST, - port: parseInt(env.RABBITMQ_PORT), - username: env.RABBITMQ_USERNAME, - password: env.RABBITMQ_PASSWORD, - }); - this.channel = await connection.createChannel(); - await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); - await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); - await this.channel.assertQueue(this.queueName, { durable: true }); + try { + const connection = await amqp.connect({ + protocol: 'amqp', + hostname: env.RABBITMQ_HOST, + port: parseInt(env.RABBITMQ_PORT), + username: env.RABBITMQ_USERNAME, + password: env.RABBITMQ_PASSWORD, + }); + + this.channel = await connection.createChannel(); + await this.channel.assertQueue(this.queueName); + console.log('Channel and queue asserted successfully #####'); + } catch (error) { + console.error('Error connecting to RabbitMQ:', error); + throw error; + } } async create(payload: ITodo, delayTimeForQueue: number) { @@ -56,32 +55,43 @@ export class RabbitMQ { } async startConsumer() { - try { - console.log('Consumer started, waiting for messages...'); - this.channel.consume(this.queueName, async (message) => { - if (message) { - const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as { - payload: ITodo; - delayTimeForQueue: number; - }; - - console.log('Received notification:', payload); - - try { - await this.mongoClient.updateTodoStatus(payload); - console.log('Updated todo status in the DB'); - } catch { - await this.create(payload, delayTimeForQueue); - this.channel.ack(message); - console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); - return; - } - - this.channel.ack(message); - } - }); - } catch (error) { - console.error('Error consuming messages from RabbitMQ:', error); - } + this.channel.assertQueue(this.queueName); + console.log("Consuming"); + this.channel.consume(this.queueName, (message: ConsumeMessage | null) => { + if (message) { + const todo = JSON.parse(message.content.toString()); + this.channel.ack(message); + console.log('Received notification:', todo); + } + }); } + + + // try { + // console.log('Consumer started, waiting for messages...'); + // this.channel.consume(this.queueName, async (message) => { + // if (message) { + // const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as { + // payload: ITodo; + // delayTimeForQueue: number; + // }; + + // console.log('Received notification:', payload); + + // try { + // await this.mongoClient.updateTodoStatus(payload); + // console.log('Updated todo status in the DB'); + // } catch { + // await this.create(payload, delayTimeForQueue); + // this.channel.ack(message); + // console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue); + // return; + // } + + // this.channel.ack(message); + // } + // }); + // } catch (error) { + // console.error('Error consuming messages from RabbitMQ:', error); + // } } \ No newline at end of file diff --git a/request.http b/request.http index 57354a4..91d46c5 100644 --- a/request.http +++ b/request.http @@ -7,9 +7,9 @@ POST http://localhost:3000/todo Content-Type: application/json { - "title": "Go to the gymNEW!!!", + "title": "-2!!!!NEW!!!!", "description": "Go to the gym at 8pm", - "due_date": "2023-07-08T19:00:00.891Z" + "due_date": "2023-07-09T19:00:00.891Z" } ### update request diff --git a/todo-service/package.json b/todo-service/package.json index 22d2802..e07a6e0 100644 --- a/todo-service/package.json +++ b/todo-service/package.json @@ -15,11 +15,14 @@ "aws-sdk": "^2.1413.0", "dotenv": "^16.3.1", "express": "^4.18.2", + "luxon": "^3.3.0", "moment": "^2.29.4", + "moment-timezone": "^0.5.43", "mongoose": "^7.3.1" }, "devDependencies": { "@types/amqplib": "^0.10.1", - "@types/express": "^4.17.17" + "@types/express": "^4.17.17", + "@types/luxon": "^3.3.0" } } diff --git a/todo-service/src/middleware/createTodoMiddleWare.ts b/todo-service/src/middleware/createTodoMiddleWare.ts index 5e06736..acebf9d 100644 --- a/todo-service/src/middleware/createTodoMiddleWare.ts +++ b/todo-service/src/middleware/createTodoMiddleWare.ts @@ -1,6 +1,7 @@ import { Request, Response, NextFunction } from 'express'; import { ApiError } from '../utils/ApiError'; import { DateTime } from 'luxon'; +import moment from 'moment-timezone'; const createTodoMiddleWare = async (req: Request, res: Response, next: NextFunction) => { const { title, description, due_date } = req.body; diff --git a/todo-service/src/rabbitmq/RabbitMQ.ts b/todo-service/src/rabbitmq/RabbitMQ.ts index 2cdda32..01a28ee 100644 --- a/todo-service/src/rabbitmq/RabbitMQ.ts +++ b/todo-service/src/rabbitmq/RabbitMQ.ts @@ -1,14 +1,19 @@ -import amqp, { Options } from 'amqplib'; +import amqp, { Options, ConsumeMessage, Channel } from 'amqplib'; import { ITodo } from '../schemas/todoSchema'; const env = require('dotenv').config().parsed; export class RabbitMQ { + connection: amqp.Connection; channel: amqp.Channel; - queueName: string; + queue: string; + exchange: string; + constructor() { - this.queueName = env.RABBITMQ_QUEUE_NAME; + this.queue = env.RABBITMQ_QUEUE_NAME; + this.exchange = 'delayed_exchange'; + this.connect().then(() => { console.log('RabbitMQ connected'); }).catch((error) => { @@ -18,17 +23,25 @@ export class RabbitMQ { async connect() { try { - const connection = await amqp.connect({ + this.connection = await amqp.connect({ protocol: 'amqp', hostname: env.RABBITMQ_HOST, port: parseInt(env.RABBITMQ_PORT), username: env.RABBITMQ_USERNAME, password: env.RABBITMQ_PASSWORD, }); - this.channel = await connection.createChannel(); - await this.channel.assertExchange(this.queueName, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); - await this.channel.assertQueue(this.queueName); - console.log('Channel and queue asserted successfully'); + + this.channel = await this.connection.createChannel(); + await this.channel.assertQueue(this.queue); + await this.channel.assertExchange(this.exchange, 'x-delayed-message', { + durable: true, + arguments: { + 'x-delayed-type': 'direct' + } + }); + await this.channel.bindQueue(this.queue, this.exchange, this.queue); + + console.log('Channel and queue asserted successfully #####'); } catch (error) { console.error('Error connecting to RabbitMQ:', error); throw error; @@ -38,26 +51,24 @@ export class RabbitMQ { async create(payload: ITodo) { const delayTimeForQueue = this.calculateDelayTimeForQueue(payload); - console.log("The Queue will be delayed for: ", delayTimeForQueue, " ms"); const message = JSON.stringify({ payload }); - const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } }; - console.log("Options: "); - console.log(options); + const options: Options.Publish = { headers: { 'x-delay': delayTimeForQueue } }; try { - await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); - console.log('Message sent to the queue'); + await this.channel.publish(this.exchange, this.queue, Buffer.from(message), + options + ) + console.log(`Queue name is: ${this.queue}`) } catch (error) { console.error('Error sending message to RabbitMQ:', error); throw error; } + } calculateDelayTimeForQueue(payload: ITodo) { - - const delayTimeForQueue = payload.due_date.getTime() - Date.now(); - return 30000; - // return delayTimeForQueue; + const delayTime = payload.due_date.getTime() - Date.now(); + return delayTime; } }