Compare commits
No commits in common. "ac122916054ee5d14b1f6f7277e623cbe2e2dd7c" and "3e91c159d71655ad52bf62de9b45b6897911a601" have entirely different histories.
ac12291605
...
3e91c159d7
11 changed files with 70 additions and 190 deletions
|
@ -1,4 +1,4 @@
|
||||||
import { ObjectId } from 'mongodb';
|
import { ObjectId } from "mongodb";
|
||||||
|
|
||||||
export interface ITodo {
|
export interface ITodo {
|
||||||
_id: ObjectId;
|
_id: ObjectId;
|
||||||
|
|
|
@ -1,50 +1,17 @@
|
||||||
import { ITodo } from "./interfaces/ITodo";
|
// import { Sqs } from './aws/Sqs';
|
||||||
import { RabbitMQ } from "./rabbitmq/RabbitMQ";
|
import { RabbitMQ } from './rabbitmq/RabbitMQ';
|
||||||
import { EnvService } from "./services/EnvService";
|
|
||||||
import { MongoDbModel } from "./mongodb/MongoDb";
|
|
||||||
|
|
||||||
export class NotificationService {
|
export class NotificationService {
|
||||||
rabbitmq: RabbitMQ;
|
rabbitmq: RabbitMQ;
|
||||||
mongoModle: MongoDbModel;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.rabbitmq = new RabbitMQ();
|
this.rabbitmq = new RabbitMQ();
|
||||||
this.mongoModle = new MongoDbModel();
|
|
||||||
this.startListener();
|
this.startListener();
|
||||||
}
|
}
|
||||||
|
|
||||||
async startListener() {
|
async startListener() {
|
||||||
if (this.IsUserConnected()) {
|
await this.rabbitmq.connect();
|
||||||
await this.rabbitmq.connect();
|
this.rabbitmq.startConsumer();
|
||||||
this.rabbitmq.startConsumer();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async newMessageValidator(message: ITodo) {
|
|
||||||
const todo = await this.mongoModle.getTodoById(message._id.toString());
|
|
||||||
if (todo) {
|
|
||||||
if (
|
|
||||||
todo.status === "pending" &&
|
|
||||||
todo.due_date < new Date() &&
|
|
||||||
todo.due_date === message.due_date
|
|
||||||
) {
|
|
||||||
await this.mongoModle.updateTodoStatus(todo);
|
|
||||||
await this.sendNotification(todo); // Send notification to user
|
|
||||||
} else {
|
|
||||||
console.log("Todo is not valid for notification");
|
|
||||||
console.log(`todo.status === "pending" - ${todo.status === "pending"}`);
|
|
||||||
console.log(`todo.due_date < new Date() - ${todo.due_date < new Date()}`);
|
|
||||||
console.log(`todo.due_date === message.due_date - ${todo.due_date === message.due_date}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private IsUserConnected() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async sendNotification(todo: ITodo) {
|
|
||||||
console.log("Sending notification for Todo:", todo._id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,24 +1,19 @@
|
||||||
import { MongoClient, ObjectId } from "mongodb";
|
import { MongoClient, ObjectId } from "mongodb";
|
||||||
import { ITodo } from "../interfaces/ITodo";
|
import { ITodo } from "../interfaces/ITodo";
|
||||||
import { EnvService } from "../services/EnvService";
|
|
||||||
|
const env = require("dotenv").config().parsed;
|
||||||
|
|
||||||
export class MongoDbModel {
|
export class MongoDbModel {
|
||||||
client: MongoClient;
|
client: MongoClient;
|
||||||
envSerivce: EnvService;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.envSerivce = EnvService.getInstance();
|
this.client = new MongoClient(env.DATABASE_URL);
|
||||||
this.client = new MongoClient(
|
|
||||||
this.envSerivce.getEnvVariable("DATABASE_URL")
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public updateTodoStatus = async (todo: ITodo) => {
|
public updateTodoStatus = async (todo: ITodo) => {
|
||||||
try {
|
try {
|
||||||
await this.client.connect();
|
await this.client.connect();
|
||||||
const db = this.client.db(
|
const db = this.client.db(env.MONGO_DB_NAME);
|
||||||
this.envSerivce.getEnvVariable("MONGO_DB_NAME")
|
|
||||||
);
|
|
||||||
const todosCollection = db.collection("todos");
|
const todosCollection = db.collection("todos");
|
||||||
const result = await todosCollection.updateOne(
|
const result = await todosCollection.updateOne(
|
||||||
{ _id: new ObjectId(todo._id) },
|
{ _id: new ObjectId(todo._id) },
|
||||||
|
@ -31,38 +26,4 @@ export class MongoDbModel {
|
||||||
await this.client.close();
|
await this.client.close();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public getTodoById = async (id: string): Promise<ITodo | void> => {
|
|
||||||
try {
|
|
||||||
await this.client.connect();
|
|
||||||
const db = this.client.db(
|
|
||||||
this.envSerivce.getEnvVariable("MONGO_DB_NAME")
|
|
||||||
);
|
|
||||||
const todosCollection = db.collection("todos");
|
|
||||||
const todo = await todosCollection.findOne({
|
|
||||||
_id: new ObjectId(id),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Convert the document to ITodo type
|
|
||||||
if (todo) {
|
|
||||||
const convertedTodo: ITodo = {
|
|
||||||
_id: new ObjectId(todo._id),
|
|
||||||
title: todo.title,
|
|
||||||
description: todo.description,
|
|
||||||
due_date: todo.due_date,
|
|
||||||
createAt: todo.createAt,
|
|
||||||
updateAt: todo.updateAt,
|
|
||||||
status: todo.status,
|
|
||||||
};
|
|
||||||
|
|
||||||
return convertedTodo;
|
|
||||||
}
|
|
||||||
|
|
||||||
return undefined;
|
|
||||||
} catch (error) {
|
|
||||||
console.error("Error getting Todo by id:", error);
|
|
||||||
} finally {
|
|
||||||
await this.client.close();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import amqp, { ConsumeMessage } from "amqplib";
|
import amqp, { ConsumeMessage } from "amqplib";
|
||||||
import { ITodo } from "../interfaces/ITodo";
|
import { ITodo } from "../interfaces/ITodo";
|
||||||
import { MongoDbModel } from "../mongodb/MongoDb";
|
import { MongoDbModel } from "../mongodb/MongoDb";
|
||||||
import { EnvService } from "../services/EnvService";
|
|
||||||
|
const env = require("dotenv").config().parsed;
|
||||||
|
|
||||||
export class RabbitMQ {
|
export class RabbitMQ {
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
|
@ -9,11 +10,8 @@ export class RabbitMQ {
|
||||||
exchangeName: string;
|
exchangeName: string;
|
||||||
|
|
||||||
mongoClient: MongoDbModel;
|
mongoClient: MongoDbModel;
|
||||||
envService: EnvService;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.envService = EnvService.getInstance();
|
this.queueName = env.RABBITMQ_QUEUE_NAME;
|
||||||
this.queueName = this.envService.getEnvVariable('RABBITMQ_QUEUE_NAME');
|
|
||||||
|
|
||||||
this.mongoClient = new MongoDbModel();
|
this.mongoClient = new MongoDbModel();
|
||||||
|
|
||||||
|
@ -30,10 +28,10 @@ export class RabbitMQ {
|
||||||
try {
|
try {
|
||||||
const connection = await amqp.connect({
|
const connection = await amqp.connect({
|
||||||
protocol: "amqp",
|
protocol: "amqp",
|
||||||
hostname: this.envService.getEnvVariable('RABBITMQ_HOST'),
|
hostname: env.RABBITMQ_HOST,
|
||||||
port: parseInt(this.envService.getEnvVariable('RABBITMQ_PORT')),
|
port: parseInt(env.RABBITMQ_PORT),
|
||||||
username: this.envService.getEnvVariable('RABBITMQ_USERNAME'),
|
username: env.RABBITMQ_USERNAME,
|
||||||
password: this.envService.getEnvVariable('RABBITMQ_PASSWORD'),
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.channel = await connection.createChannel();
|
this.channel = await connection.createChannel();
|
||||||
|
@ -65,14 +63,43 @@ export class RabbitMQ {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async startConsumer(): Promise<ITodo | void> {
|
async startConsumer() {
|
||||||
this.channel.assertQueue(this.queueName);
|
this.channel.assertQueue(this.queueName);
|
||||||
|
console.log("Consuming");
|
||||||
this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
|
this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
|
||||||
if (message) {
|
if (message) {
|
||||||
const todo: ITodo = JSON.parse(message.content.toString());
|
const todo = JSON.parse(message.content.toString());
|
||||||
this.channel.ack(message);
|
this.channel.ack(message);
|
||||||
return todo;
|
console.log("Received notification:", todo);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// try {
|
||||||
|
// console.log('Consumer started, waiting for messages...');
|
||||||
|
// this.channel.consume(this.queueName, async (message) => {
|
||||||
|
// if (message) {
|
||||||
|
// const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
|
||||||
|
// payload: ITodo;
|
||||||
|
// delayTimeForQueue: number;
|
||||||
|
// };
|
||||||
|
|
||||||
|
// console.log('Received notification:', payload);
|
||||||
|
|
||||||
|
// try {
|
||||||
|
// await this.mongoClient.updateTodoStatus(payload);
|
||||||
|
// console.log('Updated todo status in the DB');
|
||||||
|
// } catch {
|
||||||
|
// await this.create(payload, delayTimeForQueue);
|
||||||
|
// this.channel.ack(message);
|
||||||
|
// console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// this.channel.ack(message);
|
||||||
|
// }
|
||||||
|
// });
|
||||||
|
// } catch (error) {
|
||||||
|
// console.error('Error consuming messages from RabbitMQ:', error);
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
import dotenv from "dotenv";
|
|
||||||
|
|
||||||
// Load environment variables from .env file
|
|
||||||
dotenv.config();
|
|
||||||
|
|
||||||
export class EnvService {
|
|
||||||
private static instance: EnvService;
|
|
||||||
private env: any;
|
|
||||||
|
|
||||||
private constructor() {
|
|
||||||
// Get the parsed environment variables
|
|
||||||
this.env = dotenv.config().parsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static getInstance(): EnvService {
|
|
||||||
if (!EnvService.instance) {
|
|
||||||
EnvService.instance = new EnvService();
|
|
||||||
}
|
|
||||||
return EnvService.instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public getEnvVariable(name: string): string | undefined {
|
|
||||||
return this.env[name];
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -2,8 +2,11 @@ import { NextFunction, Request, Response } from "express";
|
||||||
import { ApiError } from "../utils/ApiError";
|
import { ApiError } from "../utils/ApiError";
|
||||||
import { ITodo } from "../schemas/todoSchema";
|
import { ITodo } from "../schemas/todoSchema";
|
||||||
import { TodoModel } from "../models/todoModel";
|
import { TodoModel } from "../models/todoModel";
|
||||||
|
// import { Sqs } from '../aws/Sqs';
|
||||||
import { RabbitMQ } from "../rabbitmq/RabbitMQ";
|
import { RabbitMQ } from "../rabbitmq/RabbitMQ";
|
||||||
|
|
||||||
|
const env = require("dotenv").config().parsed;
|
||||||
|
|
||||||
export class TodoController {
|
export class TodoController {
|
||||||
private todoModel: TodoModel;
|
private todoModel: TodoModel;
|
||||||
queue: RabbitMQ;
|
queue: RabbitMQ;
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
import express from "express";
|
import express from "express";
|
||||||
import mongoose from "mongoose";
|
import mongoose from "mongoose";
|
||||||
import todoRouter from "./routes/todoRouter";
|
import todoRouter from "./routes/todoRouter";
|
||||||
import { EnvService } from "./services/EnvService";
|
|
||||||
import { ApiError } from "./utils/ApiError";
|
import { ApiError } from "./utils/ApiError";
|
||||||
|
|
||||||
|
const env = require("dotenv").config().parsed;
|
||||||
|
|
||||||
class TodoApp {
|
class TodoApp {
|
||||||
app: express.Application;
|
app: express.Application;
|
||||||
envService: EnvService;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.envService = EnvService.getInstance();
|
|
||||||
this.connectToDB();
|
this.connectToDB();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ class TodoApp {
|
||||||
}
|
}
|
||||||
|
|
||||||
private startServer() {
|
private startServer() {
|
||||||
const PORT = this.envService.getEnvVariable("PORT") || 3000;
|
const PORT = env.PORT || 3000;
|
||||||
this.app.listen(PORT, () => {
|
this.app.listen(PORT, () => {
|
||||||
console.log(`Server started on port ${PORT}`);
|
console.log(`Server started on port ${PORT}`);
|
||||||
});
|
});
|
||||||
|
@ -40,7 +40,7 @@ class TodoApp {
|
||||||
}
|
}
|
||||||
|
|
||||||
private connectToDB() {
|
private connectToDB() {
|
||||||
mongoose.connect(this.envService.getEnvVariable("DATABASE_URL"));
|
mongoose.connect(env.DATABASE_URL);
|
||||||
const db = mongoose.connection;
|
const db = mongoose.connection;
|
||||||
// Check for DB connection
|
// Check for DB connection
|
||||||
db.on("error", () => {
|
db.on("error", () => {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { Request, Response, NextFunction } from "express";
|
import { Request, Response, NextFunction } from "express";
|
||||||
import { ApiError } from "../utils/ApiError";
|
import { ApiError } from "../utils/ApiError";
|
||||||
import { DateTime } from "luxon";
|
import { DateTime } from "luxon";
|
||||||
import mongoose from 'mongoose';
|
import moment from "moment-timezone";
|
||||||
|
|
||||||
const createTodoMiddleWare = async (
|
const createTodoMiddleWare = async (
|
||||||
req: Request,
|
req: Request,
|
||||||
|
@ -49,24 +49,4 @@ const createTodoMiddleWare = async (
|
||||||
next();
|
next();
|
||||||
};
|
};
|
||||||
|
|
||||||
const paramIdMiddleware = async (
|
export { createTodoMiddleWare };
|
||||||
req: Request,
|
|
||||||
res: Response,
|
|
||||||
next: NextFunction
|
|
||||||
) => {
|
|
||||||
// check if it's a valid mongo id
|
|
||||||
const { id } = req.params;
|
|
||||||
if (!mongoose.Types.ObjectId.isValid(id)) {
|
|
||||||
const error = new ApiError(
|
|
||||||
`id must be a valid mongo id`,
|
|
||||||
400,
|
|
||||||
"Bad Request"
|
|
||||||
);
|
|
||||||
return next(error);
|
|
||||||
}
|
|
||||||
next();
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
export { createTodoMiddleWare, paramIdMiddleware };
|
|
||||||
|
|
|
@ -1,17 +1,16 @@
|
||||||
import amqp, { Options, ConsumeMessage, Channel } from "amqplib";
|
import amqp, { Options, ConsumeMessage, Channel } from "amqplib";
|
||||||
import { ITodo } from "../schemas/todoSchema";
|
import { ITodo } from "../schemas/todoSchema";
|
||||||
import { EnvService } from "../services/EnvService";
|
|
||||||
|
const env = require("dotenv").config().parsed;
|
||||||
|
|
||||||
export class RabbitMQ {
|
export class RabbitMQ {
|
||||||
connection: amqp.Connection;
|
connection: amqp.Connection;
|
||||||
channel: amqp.Channel;
|
channel: amqp.Channel;
|
||||||
queue: string;
|
queue: string;
|
||||||
exchange: string;
|
exchange: string;
|
||||||
envService: EnvService;
|
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.envService = EnvService.getInstance();
|
this.queue = env.RABBITMQ_QUEUE_NAME;
|
||||||
this.queue = this.envService.getEnvVariable("RABBITMQ_QUEUE_NAME");
|
|
||||||
this.exchange = "delayed_exchange";
|
this.exchange = "delayed_exchange";
|
||||||
|
|
||||||
this.connect()
|
this.connect()
|
||||||
|
@ -27,10 +26,10 @@ export class RabbitMQ {
|
||||||
try {
|
try {
|
||||||
this.connection = await amqp.connect({
|
this.connection = await amqp.connect({
|
||||||
protocol: "amqp",
|
protocol: "amqp",
|
||||||
hostname: this.envService.getEnvVariable("RABBITMQ_HOST"),
|
hostname: env.RABBITMQ_HOST,
|
||||||
port: parseInt(this.envService.getEnvVariable('RABBITMQ_PORT')),
|
port: parseInt(env.RABBITMQ_PORT),
|
||||||
username: this.envService.getEnvVariable('RABBITMQ_USERNAME'),
|
username: env.RABBITMQ_USERNAME,
|
||||||
password: this.envService.getEnvVariable('RABBITMQ_PASSWORD'),
|
password: env.RABBITMQ_PASSWORD,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.channel = await this.connection.createChannel();
|
this.channel = await this.connection.createChannel();
|
||||||
|
|
|
@ -1,9 +1,6 @@
|
||||||
import { Router } from "express";
|
import { Router } from "express";
|
||||||
import { TodoController } from "../controllers/todoController";
|
import { TodoController } from "../controllers/todoController";
|
||||||
import {
|
import { createTodoMiddleWare } from "../middleware/createTodoMiddleWare";
|
||||||
createTodoMiddleWare,
|
|
||||||
paramIdMiddleware,
|
|
||||||
} from "../middleware/createTodoMiddleWare";
|
|
||||||
|
|
||||||
class TodoRouter {
|
class TodoRouter {
|
||||||
router: Router;
|
router: Router;
|
||||||
|
@ -17,18 +14,14 @@ class TodoRouter {
|
||||||
|
|
||||||
private setRoutes() {
|
private setRoutes() {
|
||||||
this.router.get("/", this.todoController.getAll);
|
this.router.get("/", this.todoController.getAll);
|
||||||
this.router.get("/:id", paramIdMiddleware, this.todoController.getOne);
|
this.router.get("/:id", this.todoController.getOne);
|
||||||
this.router.post("/", createTodoMiddleWare, this.todoController.createOne);
|
this.router.post("/", createTodoMiddleWare, this.todoController.createOne);
|
||||||
this.router.put(
|
this.router.put(
|
||||||
"/:id",
|
"/:id",
|
||||||
createTodoMiddleWare,
|
createTodoMiddleWare,
|
||||||
this.todoController.updateOne
|
this.todoController.updateOne
|
||||||
);
|
);
|
||||||
this.router.delete(
|
this.router.delete("/:id", this.todoController.deleteOne);
|
||||||
"/:id",
|
|
||||||
paramIdMiddleware,
|
|
||||||
this.todoController.deleteOne
|
|
||||||
);
|
|
||||||
this.router.delete("/", this.todoController.removeAll);
|
this.router.delete("/", this.todoController.removeAll);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
import dotenv from "dotenv";
|
|
||||||
|
|
||||||
// Load environment variables from .env file
|
|
||||||
dotenv.config();
|
|
||||||
|
|
||||||
export class EnvService {
|
|
||||||
private static instance: EnvService;
|
|
||||||
private env: any;
|
|
||||||
|
|
||||||
private constructor() {
|
|
||||||
// Get the parsed environment variables
|
|
||||||
this.env = dotenv.config().parsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static getInstance(): EnvService {
|
|
||||||
if (!EnvService.instance) {
|
|
||||||
EnvService.instance = new EnvService();
|
|
||||||
}
|
|
||||||
return EnvService.instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
public getEnvVariable(name: string): string | undefined {
|
|
||||||
return this.env[name];
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue