From e3268b463349b8adcb7d2b4bfc01160657a93c64 Mon Sep 17 00:00:00 2001 From: Kfir Dayan Date: Sat, 8 Jul 2023 15:26:13 +0300 Subject: [PATCH] still testing delayed queue --- notification-service/src/rabbitmq/RabbitMQ.ts | 12 ++++++++++++ request.http | 2 +- todo-service/src/rabbitmq/RabbitMQ.ts | 12 ++++++++++-- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/notification-service/src/rabbitmq/RabbitMQ.ts b/notification-service/src/rabbitmq/RabbitMQ.ts index 0f5ea71..a4c8a41 100644 --- a/notification-service/src/rabbitmq/RabbitMQ.ts +++ b/notification-service/src/rabbitmq/RabbitMQ.ts @@ -9,9 +9,16 @@ export class RabbitMQ { channel: amqp.Channel; queueName: string; mongoClient: MongoDbModel; + exchange: string; + queue: string; + routingKey: string; constructor() { this.mongoClient = new MongoDbModel(); + + this.exchange = 'delayed_exchange'; + this.queue = 'delayed_queue'; + this.routingKey = 'delayed_routing_key' this.queueName = env.RABBITMQ_QUEUE_NAME; this.connect().then(() => { @@ -30,6 +37,11 @@ export class RabbitMQ { password: env.RABBITMQ_PASSWORD, }); this.channel = await connection.createChannel(); + await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); + await this.channel.assertQueue(this.queueName, { durable: true }); + await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); + + await this.channel.assertQueue(this.queueName, { durable: true }); } diff --git a/request.http b/request.http index 941ea64..8940f6e 100644 --- a/request.http +++ b/request.http @@ -9,7 +9,7 @@ Content-Type: application/json { "title": "Go to the gymNEW!!!", "description": "Go to the gym at 8pm", - "due_date": "2023-07-08T15:05:00.000Z" + "due_date": "2023-07-08T15:24:00.000Z" } ### update request diff --git a/todo-service/src/rabbitmq/RabbitMQ.ts b/todo-service/src/rabbitmq/RabbitMQ.ts index 8d2a4ee..a0e0895 100644 --- a/todo-service/src/rabbitmq/RabbitMQ.ts +++ b/todo-service/src/rabbitmq/RabbitMQ.ts @@ -6,10 +6,17 @@ const env = require('dotenv').config().parsed; export class RabbitMQ { channel: amqp.Channel; queueName: string; + exchange: string; + queue: string; + routingKey: string; constructor() { this.queueName = env.RABBITMQ_QUEUE_NAME; + this.exchange = 'delayed_exchange'; + this.queue = 'delayed_queue'; + this.routingKey = 'delayed_routing_key' + this.connect().then(() => { console.log('RabbitMQ connected'); }).catch((error) => { @@ -28,8 +35,9 @@ export class RabbitMQ { password: env.RABBITMQ_PASSWORD, }); this.channel = await connection.createChannel(); - await this.channel.assertExchange('delayed', 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); + await this.channel.assertExchange(this.exchange, 'x-delayed-message', { durable: true, autoDelete: false, arguments: { 'x-delayed-type': 'direct' } }); await this.channel.assertQueue(this.queueName, { durable: true }); + await this.channel.bindQueue(this.queueName, this.exchange, this.routingKey); console.log('Channel and queue asserted successfully'); } catch (error) { console.error('Error connecting to RabbitMQ:', error); @@ -43,7 +51,7 @@ export class RabbitMQ { const message = JSON.stringify({ payload, delayTimeForQueue: delayTimeForQueue }); const options: Options.Publish = { persistent: true, headers: { 'x-delay': delayTimeForQueue } }; try { - await this.channel.sendToQueue(this.queueName, Buffer.from(message), options); + await this.channel.publish(this.exchange, this.routingKey, Buffer.from(message), options); console.log('Message sent to the queue'); } catch (error) { console.error('Error sending message to RabbitMQ:', error);