Event-CRUD-Flask-python3-API/app.py

83 lines
2.8 KiB
Python

from flask import Flask
from models import db
from flask_migrate import Migrate
from routes.userRoutes import userRoutes
from routes.eventRoutes import eventRoutes
import config
from flask_jwt_extended import JWTManager
from middlewares.errorHandlers import handle_auth_error, handle_invalid_token
from flask_jwt_extended.exceptions import NoAuthorizationError
from jwt.exceptions import InvalidTokenError
from datetime import timedelta
from flask_apscheduler import APScheduler
from services.EventNotifyerService import EventNotifyerService
class App:
def __init__(self):
self.app = Flask(__name__)
self.set_config()
self.set_up_db()
self.set_up_jwt()
self.register_blueprints()
self.setup_error_handlers()
self.scheduler = APScheduler()
def setup_scheduler(self):
self.app.config.from_object(self.SchadulerConfig())
self.scheduler.init_app(self.app)
self.scheduler.start()
# Schedule the job
self.scheduler.add_job(id='notifyUpcommingEvents', func=self.notifyUpcommingEvents, trigger='interval', minutes=5)
print("Scheduler started")
def set_config(self):
self.app.config.from_object(config.Config)
def set_up_db(self):
db.init_app(self.app)
self.migrate = Migrate(self.app, db)
def setup_error_handlers(self):
self.app.register_error_handler(NoAuthorizationError, handle_auth_error)
self.app.register_error_handler(InvalidTokenError, handle_invalid_token)
def set_up_jwt(self):
self.jwt_manager = JWTManager(self.app)
self.app.config['JWT_TOKEN_LOCATION'] = ['cookies']
self.app.config['JWT_COOKIE_NAME'] = 'access_token_cookie'
self.app.config['JWT_COOKIE_CSRF_PROTECT'] = False
self.app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(days=30)
def register_blueprints(self):
self.app.register_blueprint(userRoutes, url_prefix='/user')
self.app.register_blueprint(eventRoutes, url_prefix='/event')
def print_endpoints(self):
print("Endpoints and their functions:")
for rule in self.app.url_map.iter_rules():
print(f"Endpoint: {rule.endpoint}, Path: {rule}")
function_name = self.app.view_functions[rule.endpoint].__name__
print(f" Function: {function_name}")
def SchadulerConfig(object):
SCHEDULER_API_ENABLED = True
def notifyUpcommingEvents(self):
with self.app.app_context():
EventNotifyerService.notifyUpcommingEvents()
def run(self):
with self.app.app_context():
db.create_all()
self.setup_scheduler() # Setup scheduler
self.app.run(port=8000, debug=True)
app_class_instance = App()
app_instance = app_class_instance.app
if __name__ == '__main__':
app_class_instance.print_endpoints()
app_class_instance.run()