From 2333b78f8c893347d20e4f14bcd4abb0c8c95e98 Mon Sep 17 00:00:00 2001 From: Kfir Dayan Date: Sat, 8 Jul 2023 15:04:33 +0300 Subject: [PATCH] updating docker compose to init with script to enable plugin --- infra/docker/docker-compose.yaml | 11 +- .../enable_delayed_message_exchange.sh | 3 + notification-service/src/aws/Sqs.ts | 204 +++++++++--------- notification-service/src/mongodb/MongoDb.ts | 15 +- notification-service/src/rabbitmq/RabbitMQ.ts | 12 +- request.http | 2 +- .../src/controllers/todoController.ts | 4 +- todo-service/src/rabbitmq/RabbitMQ.ts | 13 +- 8 files changed, 137 insertions(+), 127 deletions(-) create mode 100644 infra/docker/rabbitmq-init-scripts/enable_delayed_message_exchange.sh diff --git a/infra/docker/docker-compose.yaml b/infra/docker/docker-compose.yaml index 7040782..74c31ab 100644 --- a/infra/docker/docker-compose.yaml +++ b/infra/docker/docker-compose.yaml @@ -15,18 +15,23 @@ services: platform: linux/arm64/v8 expose: - 27017 - rabitmq: - image: arm64v8/rabbitmq:3.8.3-management + rabbitmq: + image: heidiks/rabbitmq-delayed-message-exchange:latest restart: always ports: - 5672:5672 - 15672:15672 volumes: - rabbitmq_vol:/var/lib/rabbitmq + - ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh environment: - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER} - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS} - platform: linux/arm64/v8 + command: | + bash -c " + rabbitmq-plugins enable rabbitmq_delayed_message_exchange && \ + rabbitmq-server + " expose: - 5672 - 15672 diff --git a/infra/docker/rabbitmq-init-scripts/enable_delayed_message_exchange.sh b/infra/docker/rabbitmq-init-scripts/enable_delayed_message_exchange.sh new file mode 100644 index 0000000..93ce875 --- /dev/null +++ b/infra/docker/rabbitmq-init-scripts/enable_delayed_message_exchange.sh @@ -0,0 +1,3 @@ +#!/bin/bash +rabbitmq-plugins enable rabbitmq_delayed_message_exchange +rabbitmqctl restart \ No newline at end of file diff --git a/notification-service/src/aws/Sqs.ts b/notification-service/src/aws/Sqs.ts index 204a975..4157761 100644 --- a/notification-service/src/aws/Sqs.ts +++ b/notification-service/src/aws/Sqs.ts @@ -1,122 +1,122 @@ -import aws from 'aws-sdk'; -import { ITodo } from '../interfaces/ITodo'; -import { MongoDb } from '../mongodb/MongoDb'; +// import aws from 'aws-sdk'; +// import { ITodo } from '../interfaces/ITodo'; +// import { MongoDb } from '../mongodb/MongoDb'; -const env = require('dotenv').config().parsed; +// const env = require('dotenv').config().parsed; -export class Sqs { - sqs: aws.SQS; - queueUrl: string; +// export class Sqs { +// sqs: aws.SQS; +// queueUrl: string; - constructor() { - aws.config.update({ - accessKeyId: env.AWS_ACCESS_KEY_ID, - secretAccessKey: env.AWS_SECRET_ACCESS_KEY, - }); +// constructor() { +// aws.config.update({ +// accessKeyId: env.AWS_ACCESS_KEY_ID, +// secretAccessKey: env.AWS_SECRET_ACCESS_KEY, +// }); - this.sqs = new aws.SQS(); - this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME; +// this.sqs = new aws.SQS(); +// this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME; - if (this.sqs) { - console.log("SQS connected"); - } - } +// if (this.sqs) { +// console.log("SQS connected"); +// } +// } - async create(payload: ITodo, delayTimeForQueue: number) { - let reQueueTime = 0; - if (delayTimeForQueue > 900) { - reQueueTime = delayTimeForQueue - 900; - delayTimeForQueue = 900; - } +// async create(payload: ITodo, delayTimeForQueue: number) { +// let reQueueTime = 0; +// if (delayTimeForQueue > 900) { +// reQueueTime = delayTimeForQueue - 900; +// delayTimeForQueue = 900; +// } - const params = { - DelaySeconds: delayTimeForQueue, - MessageAttributes: {}, - MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }), - QueueUrl: this.queueUrl, - }; +// const params = { +// DelaySeconds: delayTimeForQueue, +// MessageAttributes: {}, +// MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }), +// QueueUrl: this.queueUrl, +// }; - try { - const data = await this.sqs.sendMessage(params).promise(); - console.log("Message sent to the queue", data.MessageId); - return data; - } catch (error) { - console.log("Error", error); - throw error; - } - } +// try { +// const data = await this.sqs.sendMessage(params).promise(); +// console.log("Message sent to the queue", data.MessageId); +// return data; +// } catch (error) { +// console.log("Error", error); +// throw error; +// } +// } - async startConsumer() { - while (true) { - const message = await this.getNextQueue(); - if (message) { - const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as { - payload: ITodo; - delayTimeForQueue: number; - reQueueTime: number; - }; +// async startConsumer() { +// while (true) { +// const message = await this.getNextQueue(); +// if (message) { +// const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as { +// payload: ITodo; +// delayTimeForQueue: number; +// reQueueTime: number; +// }; - console.log("Received notification:", payload); +// console.log("Received notification:", payload); - if (reQueueTime === 0) { - try { - await MongoDb.updateTodoStatus(payload); - } catch { - await this.create(payload, delayTimeForQueue); - await this.deleteMessage(message.ReceiptHandle); - console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue); - } - await this.deleteMessage(message.ReceiptHandle); - } else if (reQueueTime >= 900) { - const newDelayTime = 900; - const newReQueueTime = reQueueTime - 900; +// if (reQueueTime === 0) { +// try { +// await MongoDb.updateTodoStatus(payload); +// } catch { +// await this.create(payload, delayTimeForQueue); +// await this.deleteMessage(message.ReceiptHandle); +// console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue); +// } +// await this.deleteMessage(message.ReceiptHandle); +// } else if (reQueueTime >= 900) { +// const newDelayTime = 900; +// const newReQueueTime = reQueueTime - 900; - await this.create(payload, newDelayTime); - await this.deleteMessage(message.ReceiptHandle); +// await this.create(payload, newDelayTime); +// await this.deleteMessage(message.ReceiptHandle); - console.log("Published new queue with delay:", newDelayTime); - } else { - const newDelayTime = reQueueTime; +// console.log("Published new queue with delay:", newDelayTime); +// } else { +// const newDelayTime = reQueueTime; - await this.create(payload, newDelayTime); - await this.deleteMessage(message.ReceiptHandle); +// await this.create(payload, newDelayTime); +// await this.deleteMessage(message.ReceiptHandle); - console.log("Published new queue with delay:", newDelayTime); - } - } - } - } +// console.log("Published new queue with delay:", newDelayTime); +// } +// } +// } +// } - private getNextQueue = async () => { - const params = { - QueueUrl: this.queueUrl, - MaxNumberOfMessages: 1, - VisibilityTimeout: 30, - WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling - }; +// private getNextQueue = async () => { +// const params = { +// QueueUrl: this.queueUrl, +// MaxNumberOfMessages: 1, +// VisibilityTimeout: 30, +// WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling +// }; - try { - const data = await this.sqs.receiveMessage(params).promise(); - const message = data.Messages ? data.Messages[0] : null; - return message; - } catch (error) { - console.error("Error retrieving message from SQS:", error); - return null; - } - } +// try { +// const data = await this.sqs.receiveMessage(params).promise(); +// const message = data.Messages ? data.Messages[0] : null; +// return message; +// } catch (error) { +// console.error("Error retrieving message from SQS:", error); +// return null; +// } +// } - private deleteMessage = async (receiptHandle: string) => { - const params = { - QueueUrl: this.queueUrl, - ReceiptHandle: receiptHandle, - }; +// private deleteMessage = async (receiptHandle: string) => { +// const params = { +// QueueUrl: this.queueUrl, +// ReceiptHandle: receiptHandle, +// }; - try { - await this.sqs.deleteMessage(params).promise(); - console.log("Message deleted"); - } catch (error) { - console.error("Error deleting message from SQS:", error); - } - } +// try { +// await this.sqs.deleteMessage(params).promise(); +// console.log("Message deleted"); +// } catch (error) { +// console.error("Error deleting message from SQS:", error); +// } +// } -} +// } diff --git a/notification-service/src/mongodb/MongoDb.ts b/notification-service/src/mongodb/MongoDb.ts index ec6976d..8687de0 100644 --- a/notification-service/src/mongodb/MongoDb.ts +++ b/notification-service/src/mongodb/MongoDb.ts @@ -3,14 +3,17 @@ import { ITodo } from '../interfaces/ITodo'; const env = require('dotenv').config().parsed; -export class MongoDb { +export class MongoDbModel { + client: MongoClient; - public static updateTodoStatus = async (todo: ITodo) => { - const client = new MongoClient(env.DATABASE_URL); + constructor() { + this.client = new MongoClient(env.DATABASE_URL); + } + public updateTodoStatus = async (todo: ITodo) => { try { - await client.connect(); - const db = client.db(env.MONGO_DB_NAME); + await this.client.connect(); + const db = this.client.db(env.MONGO_DB_NAME); const todosCollection = db.collection('todos'); const result = await todosCollection.updateOne( { _id: new ObjectId(todo._id) }, @@ -20,7 +23,7 @@ export class MongoDb { } catch (error) { console.error("Error updating Todo status:", error); } finally { - await client.close(); + await this.client.close(); } } } \ No newline at end of file diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts index 8dfecbc..0f5ea71 100644 --- a/notification-service/src/rabbitmq/RabbitMQ.ts +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -1,14 +1,18 @@ import amqp from 'amqplib'; import { ITodo } from '../interfaces/ITodo'; -import { MongoDb } from '../mongodb/MongoDb'; +import { MongoDbModel } from '../mongodb/MongoDb'; const env = require('dotenv').config().parsed; + export class RabbitMQ { channel: amqp.Channel; queueName: string; + mongoClient: MongoDbModel; constructor() { + this.mongoClient = new MongoDbModel(); + this.queueName = env.RABBITMQ_QUEUE_NAME; this.connect().then(() => { console.log('RabbitMQ connected'); @@ -44,10 +48,6 @@ export class RabbitMQ { async startConsumer() { try { - if (!this.channel) { - throw new Error('Channel is not initialized. Make sure to call connect before starting the consumer.'); - } - await this.channel.prefetch(1); console.log('Consumer started, waiting for messages...'); this.channel.consume(this.queueName, async (message) => { @@ -60,7 +60,7 @@ export class RabbitMQ { console.log('Received notification:', payload); try { - await MongoDb.updateTodoStatus(payload); + await this.mongoClient.updateTodoStatus(payload); console.log('Updated todo status in the DB'); } catch { await this.create(payload, delayTimeForQueue); diff --git a/request.http b/request.http index a2a08ef..941ea64 100644 --- a/request.http +++ b/request.http @@ -9,7 +9,7 @@ Content-Type: application/json { "title": "Go to the gymNEW!!!", "description": "Go to the gym at 8pm", - "due_date": "2023-07-08 12:00:00" + "due_date": "2023-07-08T15:05:00.000Z" } ### update request diff --git a/todo-service/src/controllers/todoController.ts b/todo-service/src/controllers/todoController.ts index bec1e0e..34111e6 100644 --- a/todo-service/src/controllers/todoController.ts +++ b/todo-service/src/controllers/todoController.ts @@ -39,9 +39,7 @@ export class TodoController { if (todo instanceof ApiError) { return next(todo); } - const delayTimeForQueue = Math.floor((new Date(todo.due_date).getTime() - new Date().getTime()) / 1000); - - this.queue.create(todo, delayTimeForQueue); + this.queue.create(todo); return res.json(todo); } catch { diff --git a/todo-service/src/rabbitmq/RabbitMQ.ts b/todo-service/src/rabbitmq/RabbitMQ.ts index 01dba89..8d2a4ee 100644 --- a/todo-service/src/rabbitmq/RabbitMQ.ts +++ b/todo-service/src/rabbitmq/RabbitMQ.ts @@ -1,4 +1,4 @@ -import amqp from 'amqplib'; +import amqp, { Options } from 'amqplib'; import { ITodo } from '../schemas/todoSchema'; const env = require('dotenv').config().parsed; @@ -18,6 +18,7 @@ export class RabbitMQ { } async connect() { + console.log("IN CONNECT") try { const connection = await amqp.connect({ protocol: 'amqp', @@ -27,6 +28,7 @@ export class RabbitMQ { password: env.RABBITMQ_PASSWORD, }); this.channel = await connection.createChannel(); + await this.channel.assertExchange('delayed', 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); await this.channel.assertQueue(this.queueName, { durable: true }); console.log('Channel and queue asserted successfully'); } catch (error) { @@ -35,12 +37,11 @@ export class RabbitMQ { } } - async create(payload: ITodo, delayTimeForQueue: number) { - console.log('Creating message to send to the queue') - console.log(`with delayTimeForQueue ${delayTimeForQueue}`) + async create(payload: ITodo) { + console.log("IN CREATE!") + const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now(); const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); - const options = { persistent: true, delayTimeForQueue: delayTimeForQueue.toString() }; - + const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } }; try { await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); console.log('Message sent to the queue');