updating docker compose to init with script to enable plugin

This commit is contained in:
Kfir Dayan 2023-07-08 15:04:33 +03:00
parent c2695c7eb8
commit 2333b78f8c
8 changed files with 137 additions and 127 deletions

View file

@ -15,18 +15,23 @@ services:
platform: linux/arm64/v8 platform: linux/arm64/v8
expose: expose:
- 27017 - 27017
rabitmq: rabbitmq:
image: arm64v8/rabbitmq:3.8.3-management image: heidiks/rabbitmq-delayed-message-exchange:latest
restart: always restart: always
ports: ports:
- 5672:5672 - 5672:5672
- 15672:15672 - 15672:15672
volumes: volumes:
- rabbitmq_vol:/var/lib/rabbitmq - rabbitmq_vol:/var/lib/rabbitmq
- ./rabbitmq-init-scripts/init.sh:/docker-entrypoint-initdb.d/init.sh
environment: environment:
- RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER} - RABBITMQ_DEFAULT_USER=${RABBITMQ_DEFAULT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS} - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
platform: linux/arm64/v8 command: |
bash -c "
rabbitmq-plugins enable rabbitmq_delayed_message_exchange && \
rabbitmq-server
"
expose: expose:
- 5672 - 5672
- 15672 - 15672

View file

@ -0,0 +1,3 @@
#!/bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmqctl restart

View file

@ -1,122 +1,122 @@
import aws from 'aws-sdk'; // import aws from 'aws-sdk';
import { ITodo } from '../interfaces/ITodo'; // import { ITodo } from '../interfaces/ITodo';
import { MongoDb } from '../mongodb/MongoDb'; // import { MongoDb } from '../mongodb/MongoDb';
const env = require('dotenv').config().parsed; // const env = require('dotenv').config().parsed;
export class Sqs { // export class Sqs {
sqs: aws.SQS; // sqs: aws.SQS;
queueUrl: string; // queueUrl: string;
constructor() { // constructor() {
aws.config.update({ // aws.config.update({
accessKeyId: env.AWS_ACCESS_KEY_ID, // accessKeyId: env.AWS_ACCESS_KEY_ID,
secretAccessKey: env.AWS_SECRET_ACCESS_KEY, // secretAccessKey: env.AWS_SECRET_ACCESS_KEY,
}); // });
this.sqs = new aws.SQS(); // this.sqs = new aws.SQS();
this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME; // this.queueUrl = env.AWS_SQS_URL + env.AWS_SQS_QUEUE_NAME;
if (this.sqs) { // if (this.sqs) {
console.log("SQS connected"); // console.log("SQS connected");
} // }
} // }
async create(payload: ITodo, delayTimeForQueue: number) { // async create(payload: ITodo, delayTimeForQueue: number) {
let reQueueTime = 0; // let reQueueTime = 0;
if (delayTimeForQueue > 900) { // if (delayTimeForQueue > 900) {
reQueueTime = delayTimeForQueue - 900; // reQueueTime = delayTimeForQueue - 900;
delayTimeForQueue = 900; // delayTimeForQueue = 900;
} // }
const params = { // const params = {
DelaySeconds: delayTimeForQueue, // DelaySeconds: delayTimeForQueue,
MessageAttributes: {}, // MessageAttributes: {},
MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }), // MessageBody: JSON.stringify({ payload, delayTimeForQueue, reQueueTime }),
QueueUrl: this.queueUrl, // QueueUrl: this.queueUrl,
}; // };
try { // try {
const data = await this.sqs.sendMessage(params).promise(); // const data = await this.sqs.sendMessage(params).promise();
console.log("Message sent to the queue", data.MessageId); // console.log("Message sent to the queue", data.MessageId);
return data; // return data;
} catch (error) { // } catch (error) {
console.log("Error", error); // console.log("Error", error);
throw error; // throw error;
} // }
} // }
async startConsumer() { // async startConsumer() {
while (true) { // while (true) {
const message = await this.getNextQueue(); // const message = await this.getNextQueue();
if (message) { // if (message) {
const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as { // const { payload, delayTimeForQueue, reQueueTime } = JSON.parse(message.Body) as {
payload: ITodo; // payload: ITodo;
delayTimeForQueue: number; // delayTimeForQueue: number;
reQueueTime: number; // reQueueTime: number;
}; // };
console.log("Received notification:", payload); // console.log("Received notification:", payload);
if (reQueueTime === 0) { // if (reQueueTime === 0) {
try { // try {
await MongoDb.updateTodoStatus(payload); // await MongoDb.updateTodoStatus(payload);
} catch { // } catch {
await this.create(payload, delayTimeForQueue); // await this.create(payload, delayTimeForQueue);
await this.deleteMessage(message.ReceiptHandle); // await this.deleteMessage(message.ReceiptHandle);
console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue); // console.log("Published new queue with delay, THE DB IS DOWN!:", delayTimeForQueue);
} // }
await this.deleteMessage(message.ReceiptHandle); // await this.deleteMessage(message.ReceiptHandle);
} else if (reQueueTime >= 900) { // } else if (reQueueTime >= 900) {
const newDelayTime = 900; // const newDelayTime = 900;
const newReQueueTime = reQueueTime - 900; // const newReQueueTime = reQueueTime - 900;
await this.create(payload, newDelayTime); // await this.create(payload, newDelayTime);
await this.deleteMessage(message.ReceiptHandle); // await this.deleteMessage(message.ReceiptHandle);
console.log("Published new queue with delay:", newDelayTime); // console.log("Published new queue with delay:", newDelayTime);
} else { // } else {
const newDelayTime = reQueueTime; // const newDelayTime = reQueueTime;
await this.create(payload, newDelayTime); // await this.create(payload, newDelayTime);
await this.deleteMessage(message.ReceiptHandle); // await this.deleteMessage(message.ReceiptHandle);
console.log("Published new queue with delay:", newDelayTime); // console.log("Published new queue with delay:", newDelayTime);
} // }
} // }
} // }
} // }
private getNextQueue = async () => { // private getNextQueue = async () => {
const params = { // const params = {
QueueUrl: this.queueUrl, // QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1, // MaxNumberOfMessages: 1,
VisibilityTimeout: 30, // VisibilityTimeout: 30,
WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling // WaitTimeSeconds: 20, // Increase the WaitTimeSeconds for long polling
}; // };
try { // try {
const data = await this.sqs.receiveMessage(params).promise(); // const data = await this.sqs.receiveMessage(params).promise();
const message = data.Messages ? data.Messages[0] : null; // const message = data.Messages ? data.Messages[0] : null;
return message; // return message;
} catch (error) { // } catch (error) {
console.error("Error retrieving message from SQS:", error); // console.error("Error retrieving message from SQS:", error);
return null; // return null;
} // }
} // }
private deleteMessage = async (receiptHandle: string) => { // private deleteMessage = async (receiptHandle: string) => {
const params = { // const params = {
QueueUrl: this.queueUrl, // QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle, // ReceiptHandle: receiptHandle,
}; // };
try { // try {
await this.sqs.deleteMessage(params).promise(); // await this.sqs.deleteMessage(params).promise();
console.log("Message deleted"); // console.log("Message deleted");
} catch (error) { // } catch (error) {
console.error("Error deleting message from SQS:", error); // console.error("Error deleting message from SQS:", error);
} // }
} // }
} // }

View file

@ -3,14 +3,17 @@ import { ITodo } from '../interfaces/ITodo';
const env = require('dotenv').config().parsed; const env = require('dotenv').config().parsed;
export class MongoDb { export class MongoDbModel {
client: MongoClient;
public static updateTodoStatus = async (todo: ITodo) => { constructor() {
const client = new MongoClient(env.DATABASE_URL); this.client = new MongoClient(env.DATABASE_URL);
}
public updateTodoStatus = async (todo: ITodo) => {
try { try {
await client.connect(); await this.client.connect();
const db = client.db(env.MONGO_DB_NAME); const db = this.client.db(env.MONGO_DB_NAME);
const todosCollection = db.collection('todos'); const todosCollection = db.collection('todos');
const result = await todosCollection.updateOne( const result = await todosCollection.updateOne(
{ _id: new ObjectId(todo._id) }, { _id: new ObjectId(todo._id) },
@ -20,7 +23,7 @@ export class MongoDb {
} catch (error) { } catch (error) {
console.error("Error updating Todo status:", error); console.error("Error updating Todo status:", error);
} finally { } finally {
await client.close(); await this.client.close();
} }
} }
} }

View file

@ -1,14 +1,18 @@
import amqp from 'amqplib'; import amqp from 'amqplib';
import { ITodo } from '../interfaces/ITodo'; import { ITodo } from '../interfaces/ITodo';
import { MongoDb } from '../mongodb/MongoDb'; import { MongoDbModel } from '../mongodb/MongoDb';
const env = require('dotenv').config().parsed; const env = require('dotenv').config().parsed;
export class RabbitMQ { export class RabbitMQ {
channel: amqp.Channel; channel: amqp.Channel;
queueName: string; queueName: string;
mongoClient: MongoDbModel;
constructor() { constructor() {
this.mongoClient = new MongoDbModel();
this.queueName = env.RABBITMQ_QUEUE_NAME; this.queueName = env.RABBITMQ_QUEUE_NAME;
this.connect().then(() => { this.connect().then(() => {
console.log('RabbitMQ connected'); console.log('RabbitMQ connected');
@ -44,10 +48,6 @@ export class RabbitMQ {
async startConsumer() { async startConsumer() {
try { try {
if (!this.channel) {
throw new Error('Channel is not initialized. Make sure to call connect before starting the consumer.');
}
await this.channel.prefetch(1); await this.channel.prefetch(1);
console.log('Consumer started, waiting for messages...'); console.log('Consumer started, waiting for messages...');
this.channel.consume(this.queueName, async (message) => { this.channel.consume(this.queueName, async (message) => {
@ -60,7 +60,7 @@ export class RabbitMQ {
console.log('Received notification:', payload); console.log('Received notification:', payload);
try { try {
await MongoDb.updateTodoStatus(payload); await this.mongoClient.updateTodoStatus(payload);
console.log('Updated todo status in the DB'); console.log('Updated todo status in the DB');
} catch { } catch {
await this.create(payload, delayTimeForQueue); await this.create(payload, delayTimeForQueue);

View file

@ -9,7 +9,7 @@ Content-Type: application/json
{ {
"title": "Go to the gymNEW!!!", "title": "Go to the gymNEW!!!",
"description": "Go to the gym at 8pm", "description": "Go to the gym at 8pm",
"due_date": "2023-07-08 12:00:00" "due_date": "2023-07-08T15:05:00.000Z"
} }
### update request ### update request

View file

@ -39,9 +39,7 @@ export class TodoController {
if (todo instanceof ApiError) { if (todo instanceof ApiError) {
return next(todo); return next(todo);
} }
const delayTimeForQueue = Math.floor((new Date(todo.due_date).getTime() - new Date().getTime()) / 1000); this.queue.create(todo);
this.queue.create(todo, delayTimeForQueue);
return res.json(todo); return res.json(todo);
} catch { } catch {

View file

@ -1,4 +1,4 @@
import amqp from 'amqplib'; import amqp, { Options } from 'amqplib';
import { ITodo } from '../schemas/todoSchema'; import { ITodo } from '../schemas/todoSchema';
const env = require('dotenv').config().parsed; const env = require('dotenv').config().parsed;
@ -18,6 +18,7 @@ export class RabbitMQ {
} }
async connect() { async connect() {
console.log("IN CONNECT")
try { try {
const connection = await amqp.connect({ const connection = await amqp.connect({
protocol: 'amqp', protocol: 'amqp',
@ -27,6 +28,7 @@ export class RabbitMQ {
password: env.RABBITMQ_PASSWORD, password: env.RABBITMQ_PASSWORD,
}); });
this.channel = await connection.createChannel(); this.channel = await connection.createChannel();
await this.channel.assertExchange('delayed', 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } });
await this.channel.assertQueue(this.queueName, { durable: true }); await this.channel.assertQueue(this.queueName, { durable: true });
console.log('Channel and queue asserted successfully'); console.log('Channel and queue asserted successfully');
} catch (error) { } catch (error) {
@ -35,12 +37,11 @@ export class RabbitMQ {
} }
} }
async create(payload: ITodo, delayTimeForQueue: number) { async create(payload: ITodo) {
console.log('Creating message to send to the queue') console.log("IN CREATE!")
console.log(`with delayTimeForQueue ${delayTimeForQueue}`) const delayTimeForQueue: number = new Date(payload.due_date).getTime() - Date.now();
const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue });
const options = { persistent: true, delayTimeForQueue: delayTimeForQueue.toString() }; const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } };
try { try {
await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); await this.channel.sendToQueue(this.queueName, Buffer.from(message), options);
console.log('Message sent to the queue'); console.log('Message sent to the queue');