Compare commits

...

4 commits

11 changed files with 190 additions and 70 deletions

View file

@ -1,4 +1,4 @@
import { ObjectId } from "mongodb"; import { ObjectId } from 'mongodb';
export interface ITodo { export interface ITodo {
_id: ObjectId; _id: ObjectId;

View file

@ -1,18 +1,51 @@
// import { Sqs } from './aws/Sqs'; import { ITodo } from "./interfaces/ITodo";
import { RabbitMQ } from './rabbitmq/RabbitMQ'; import { RabbitMQ } from "./rabbitmq/RabbitMQ";
import { EnvService } from "./services/EnvService";
import { MongoDbModel } from "./mongodb/MongoDb";
export class NotificationService { export class NotificationService {
rabbitmq: RabbitMQ; rabbitmq: RabbitMQ;
mongoModle: MongoDbModel;
constructor() { constructor() {
this.rabbitmq = new RabbitMQ(); this.rabbitmq = new RabbitMQ();
this.mongoModle = new MongoDbModel();
this.startListener(); this.startListener();
} }
async startListener() { async startListener() {
if (this.IsUserConnected()) {
await this.rabbitmq.connect(); await this.rabbitmq.connect();
this.rabbitmq.startConsumer(); this.rabbitmq.startConsumer();
} }
}
async newMessageValidator(message: ITodo) {
const todo = await this.mongoModle.getTodoById(message._id.toString());
if (todo) {
if (
todo.status === "pending" &&
todo.due_date < new Date() &&
todo.due_date === message.due_date
) {
await this.mongoModle.updateTodoStatus(todo);
await this.sendNotification(todo); // Send notification to user
} else {
console.log("Todo is not valid for notification");
console.log(`todo.status === "pending" - ${todo.status === "pending"}`);
console.log(`todo.due_date < new Date() - ${todo.due_date < new Date()}`);
console.log(`todo.due_date === message.due_date - ${todo.due_date === message.due_date}`);
}
}
}
private IsUserConnected() {
return true;
}
private async sendNotification(todo: ITodo) {
console.log("Sending notification for Todo:", todo._id);
}
} }
const notificationService = new NotificationService(); const notificationService = new NotificationService();

View file

@ -1,19 +1,24 @@
import { MongoClient, ObjectId } from "mongodb"; import { MongoClient, ObjectId } from "mongodb";
import { ITodo } from "../interfaces/ITodo"; import { ITodo } from "../interfaces/ITodo";
import { EnvService } from "../services/EnvService";
const env = require("dotenv").config().parsed;
export class MongoDbModel { export class MongoDbModel {
client: MongoClient; client: MongoClient;
envSerivce: EnvService;
constructor() { constructor() {
this.client = new MongoClient(env.DATABASE_URL); this.envSerivce = EnvService.getInstance();
this.client = new MongoClient(
this.envSerivce.getEnvVariable("DATABASE_URL")
);
} }
public updateTodoStatus = async (todo: ITodo) => { public updateTodoStatus = async (todo: ITodo) => {
try { try {
await this.client.connect(); await this.client.connect();
const db = this.client.db(env.MONGO_DB_NAME); const db = this.client.db(
this.envSerivce.getEnvVariable("MONGO_DB_NAME")
);
const todosCollection = db.collection("todos"); const todosCollection = db.collection("todos");
const result = await todosCollection.updateOne( const result = await todosCollection.updateOne(
{ _id: new ObjectId(todo._id) }, { _id: new ObjectId(todo._id) },
@ -26,4 +31,38 @@ export class MongoDbModel {
await this.client.close(); await this.client.close();
} }
}; };
public getTodoById = async (id: string): Promise<ITodo | void> => {
try {
await this.client.connect();
const db = this.client.db(
this.envSerivce.getEnvVariable("MONGO_DB_NAME")
);
const todosCollection = db.collection("todos");
const todo = await todosCollection.findOne({
_id: new ObjectId(id),
});
// Convert the document to ITodo type
if (todo) {
const convertedTodo: ITodo = {
_id: new ObjectId(todo._id),
title: todo.title,
description: todo.description,
due_date: todo.due_date,
createAt: todo.createAt,
updateAt: todo.updateAt,
status: todo.status,
};
return convertedTodo;
}
return undefined;
} catch (error) {
console.error("Error getting Todo by id:", error);
} finally {
await this.client.close();
}
};
} }

View file

@ -1,8 +1,7 @@
import amqp, { ConsumeMessage } from "amqplib"; import amqp, { ConsumeMessage } from "amqplib";
import { ITodo } from "../interfaces/ITodo"; import { ITodo } from "../interfaces/ITodo";
import { MongoDbModel } from "../mongodb/MongoDb"; import { MongoDbModel } from "../mongodb/MongoDb";
import { EnvService } from "../services/EnvService";
const env = require("dotenv").config().parsed;
export class RabbitMQ { export class RabbitMQ {
channel: amqp.Channel; channel: amqp.Channel;
@ -10,8 +9,11 @@ export class RabbitMQ {
exchangeName: string; exchangeName: string;
mongoClient: MongoDbModel; mongoClient: MongoDbModel;
envService: EnvService;
constructor() { constructor() {
this.queueName = env.RABBITMQ_QUEUE_NAME; this.envService = EnvService.getInstance();
this.queueName = this.envService.getEnvVariable('RABBITMQ_QUEUE_NAME');
this.mongoClient = new MongoDbModel(); this.mongoClient = new MongoDbModel();
@ -28,10 +30,10 @@ export class RabbitMQ {
try { try {
const connection = await amqp.connect({ const connection = await amqp.connect({
protocol: "amqp", protocol: "amqp",
hostname: env.RABBITMQ_HOST, hostname: this.envService.getEnvVariable('RABBITMQ_HOST'),
port: parseInt(env.RABBITMQ_PORT), port: parseInt(this.envService.getEnvVariable('RABBITMQ_PORT')),
username: env.RABBITMQ_USERNAME, username: this.envService.getEnvVariable('RABBITMQ_USERNAME'),
password: env.RABBITMQ_PASSWORD, password: this.envService.getEnvVariable('RABBITMQ_PASSWORD'),
}); });
this.channel = await connection.createChannel(); this.channel = await connection.createChannel();
@ -63,43 +65,14 @@ export class RabbitMQ {
} }
} }
async startConsumer() { async startConsumer(): Promise<ITodo | void> {
this.channel.assertQueue(this.queueName); this.channel.assertQueue(this.queueName);
console.log("Consuming");
this.channel.consume(this.queueName, (message: ConsumeMessage | null) => { this.channel.consume(this.queueName, (message: ConsumeMessage | null) => {
if (message) { if (message) {
const todo = JSON.parse(message.content.toString()); const todo: ITodo = JSON.parse(message.content.toString());
this.channel.ack(message); this.channel.ack(message);
console.log("Received notification:", todo); return todo;
} }
}); });
} }
// try {
// console.log('Consumer started, waiting for messages...');
// this.channel.consume(this.queueName, async (message) => {
// if (message) {
// const { payload, delayTimeForQueue } = JSON.parse(message.content.toString()) as {
// payload: ITodo;
// delayTimeForQueue: number;
// };
// console.log('Received notification:', payload);
// try {
// await this.mongoClient.updateTodoStatus(payload);
// console.log('Updated todo status in the DB');
// } catch {
// await this.create(payload, delayTimeForQueue);
// this.channel.ack(message);
// console.log('Published new message with delay, THE DB IS DOWN!: ', delayTimeForQueue);
// return;
// }
// this.channel.ack(message);
// }
// });
// } catch (error) {
// console.error('Error consuming messages from RabbitMQ:', error);
// }
} }

View file

@ -0,0 +1,25 @@
import dotenv from "dotenv";
// Load environment variables from .env file
dotenv.config();
export class EnvService {
private static instance: EnvService;
private env: any;
private constructor() {
// Get the parsed environment variables
this.env = dotenv.config().parsed;
}
public static getInstance(): EnvService {
if (!EnvService.instance) {
EnvService.instance = new EnvService();
}
return EnvService.instance;
}
public getEnvVariable(name: string): string | undefined {
return this.env[name];
}
}

View file

@ -2,11 +2,8 @@ import { NextFunction, Request, Response } from "express";
import { ApiError } from "../utils/ApiError"; import { ApiError } from "../utils/ApiError";
import { ITodo } from "../schemas/todoSchema"; import { ITodo } from "../schemas/todoSchema";
import { TodoModel } from "../models/todoModel"; import { TodoModel } from "../models/todoModel";
// import { Sqs } from '../aws/Sqs';
import { RabbitMQ } from "../rabbitmq/RabbitMQ"; import { RabbitMQ } from "../rabbitmq/RabbitMQ";
const env = require("dotenv").config().parsed;
export class TodoController { export class TodoController {
private todoModel: TodoModel; private todoModel: TodoModel;
queue: RabbitMQ; queue: RabbitMQ;

View file

@ -1,15 +1,15 @@
import express from "express"; import express from "express";
import mongoose from "mongoose"; import mongoose from "mongoose";
import todoRouter from "./routes/todoRouter"; import todoRouter from "./routes/todoRouter";
import { EnvService } from "./services/EnvService";
import { ApiError } from "./utils/ApiError"; import { ApiError } from "./utils/ApiError";
const env = require("dotenv").config().parsed;
class TodoApp { class TodoApp {
app: express.Application; app: express.Application;
envService: EnvService;
constructor() { constructor() {
this.envService = EnvService.getInstance();
this.connectToDB(); this.connectToDB();
} }
@ -29,7 +29,7 @@ class TodoApp {
} }
private startServer() { private startServer() {
const PORT = env.PORT || 3000; const PORT = this.envService.getEnvVariable("PORT") || 3000;
this.app.listen(PORT, () => { this.app.listen(PORT, () => {
console.log(`Server started on port ${PORT}`); console.log(`Server started on port ${PORT}`);
}); });
@ -40,7 +40,7 @@ class TodoApp {
} }
private connectToDB() { private connectToDB() {
mongoose.connect(env.DATABASE_URL); mongoose.connect(this.envService.getEnvVariable("DATABASE_URL"));
const db = mongoose.connection; const db = mongoose.connection;
// Check for DB connection // Check for DB connection
db.on("error", () => { db.on("error", () => {

View file

@ -1,7 +1,7 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { ApiError } from "../utils/ApiError"; import { ApiError } from "../utils/ApiError";
import { DateTime } from "luxon"; import { DateTime } from "luxon";
import moment from "moment-timezone"; import mongoose from 'mongoose';
const createTodoMiddleWare = async ( const createTodoMiddleWare = async (
req: Request, req: Request,
@ -49,4 +49,24 @@ const createTodoMiddleWare = async (
next(); next();
}; };
export { createTodoMiddleWare }; const paramIdMiddleware = async (
req: Request,
res: Response,
next: NextFunction
) => {
// check if it's a valid mongo id
const { id } = req.params;
if (!mongoose.Types.ObjectId.isValid(id)) {
const error = new ApiError(
`id must be a valid mongo id`,
400,
"Bad Request"
);
return next(error);
}
next();
};
export { createTodoMiddleWare, paramIdMiddleware };

View file

@ -1,16 +1,17 @@
import amqp, { Options, ConsumeMessage, Channel } from "amqplib"; import amqp, { Options, ConsumeMessage, Channel } from "amqplib";
import { ITodo } from "../schemas/todoSchema"; import { ITodo } from "../schemas/todoSchema";
import { EnvService } from "../services/EnvService";
const env = require("dotenv").config().parsed;
export class RabbitMQ { export class RabbitMQ {
connection: amqp.Connection; connection: amqp.Connection;
channel: amqp.Channel; channel: amqp.Channel;
queue: string; queue: string;
exchange: string; exchange: string;
envService: EnvService;
constructor() { constructor() {
this.queue = env.RABBITMQ_QUEUE_NAME; this.envService = EnvService.getInstance();
this.queue = this.envService.getEnvVariable("RABBITMQ_QUEUE_NAME");
this.exchange = "delayed_exchange"; this.exchange = "delayed_exchange";
this.connect() this.connect()
@ -26,10 +27,10 @@ export class RabbitMQ {
try { try {
this.connection = await amqp.connect({ this.connection = await amqp.connect({
protocol: "amqp", protocol: "amqp",
hostname: env.RABBITMQ_HOST, hostname: this.envService.getEnvVariable("RABBITMQ_HOST"),
port: parseInt(env.RABBITMQ_PORT), port: parseInt(this.envService.getEnvVariable('RABBITMQ_PORT')),
username: env.RABBITMQ_USERNAME, username: this.envService.getEnvVariable('RABBITMQ_USERNAME'),
password: env.RABBITMQ_PASSWORD, password: this.envService.getEnvVariable('RABBITMQ_PASSWORD'),
}); });
this.channel = await this.connection.createChannel(); this.channel = await this.connection.createChannel();

View file

@ -1,6 +1,9 @@
import { Router } from "express"; import { Router } from "express";
import { TodoController } from "../controllers/todoController"; import { TodoController } from "../controllers/todoController";
import { createTodoMiddleWare } from "../middleware/createTodoMiddleWare"; import {
createTodoMiddleWare,
paramIdMiddleware,
} from "../middleware/createTodoMiddleWare";
class TodoRouter { class TodoRouter {
router: Router; router: Router;
@ -14,14 +17,18 @@ class TodoRouter {
private setRoutes() { private setRoutes() {
this.router.get("/", this.todoController.getAll); this.router.get("/", this.todoController.getAll);
this.router.get("/:id", this.todoController.getOne); this.router.get("/:id", paramIdMiddleware, this.todoController.getOne);
this.router.post("/", createTodoMiddleWare, this.todoController.createOne); this.router.post("/", createTodoMiddleWare, this.todoController.createOne);
this.router.put( this.router.put(
"/:id", "/:id",
createTodoMiddleWare, createTodoMiddleWare,
this.todoController.updateOne this.todoController.updateOne
); );
this.router.delete("/:id", this.todoController.deleteOne); this.router.delete(
"/:id",
paramIdMiddleware,
this.todoController.deleteOne
);
this.router.delete("/", this.todoController.removeAll); this.router.delete("/", this.todoController.removeAll);
} }

View file

@ -0,0 +1,25 @@
import dotenv from "dotenv";
// Load environment variables from .env file
dotenv.config();
export class EnvService {
private static instance: EnvService;
private env: any;
private constructor() {
// Get the parsed environment variables
this.env = dotenv.config().parsed;
}
public static getInstance(): EnvService {
if (!EnvService.instance) {
EnvService.instance = new EnvService();
}
return EnvService.instance;
}
public getEnvVariable(name: string): string | undefined {
return this.env[name];
}
}