first commit - CRUD python API for Events

This commit is contained in:
Kfir Dayan 2024-01-04 14:21:37 +02:00
commit 64c5a24817
14 changed files with 438 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
## environment setup
venv
__pycache__
instance

14
README.md Normal file
View file

@ -0,0 +1,14 @@
To create the DB in sqlite, make those commends:
```
flask db init
flask db migrate -m "initial migration"
flask db upgrade
```
or
```
python3 -m flask db init
python3 -m flask db migrate -m "initial migration"
python3 -m flask db upgrade
```

18
app.py Normal file
View file

@ -0,0 +1,18 @@
from flask import Flask
from models import db
from flask_migrate import Migrate
from routes import api
import config
from celery_utils import make_celery
app = Flask(__name__)
app.config.from_object(config.Config)
db.init_app(app)
migrate = Migrate(app, db)
celery = make_celery(app)
app.register_blueprint(api)
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)

2
config.py Normal file
View file

@ -0,0 +1,2 @@
class Config:
SQLALCHEMY_DATABASE_URI = 'sqlite:///events.db'

7
infra/docker-compose.yml Normal file
View file

@ -0,0 +1,7 @@
version: '3'
services:
redis:
image: redis:latest
ports:
- "6379:6379"

36
middlewares.py Normal file
View file

@ -0,0 +1,36 @@
from functools import wraps
from flask import request, jsonify
from datetime import datetime
def validate_event_post_request(f):
@wraps(f)
def decorated_function(*args, **kwargs):
data = request.get_json()
if not data:
return jsonify({"message": "No input data provided"}), 400
# Check required fields
required_fields = ['title', 'duedate', 'location', 'description']
if not all(field in data for field in required_fields):
return jsonify({"message": "Please check your data, you missing some props; visit our docs https://git.dayanhub.com/kfir"}), 400
# Validate 'title'
if not isinstance(data['title'], str) or not data['title'].strip():
return jsonify({"message": "Invalid title"}), 400
# Validate 'description'
if not isinstance(data['description'], str):
return jsonify({"message": "Invalid description"}), 400
# Validate 'time' (ensure it's a valid datetime string)
try:
datetime.strptime(data['duedate'], '%Y-%m-%dT%H:%M:%S')
except ValueError:
return jsonify({"message": "Invalid time format. Use YYYY-MM-DDTHH:MM:SS"}), 400
# Validate 'location'
if not isinstance(data['location'], str) or not data['location'].strip():
return jsonify({"message": "Invalid location"}), 400
return f(*args, **kwargs)
return decorated_function

1
migrations/README Normal file
View file

@ -0,0 +1 @@
Single-database configuration for Flask.

50
migrations/alembic.ini Normal file
View file

@ -0,0 +1,50 @@
# A generic, single database configuration.
[alembic]
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic,flask_migrate
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[logger_flask_migrate]
level = INFO
handlers =
qualname = flask_migrate
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

113
migrations/env.py Normal file
View file

@ -0,0 +1,113 @@
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
def get_engine():
try:
# this works with Flask-SQLAlchemy<3 and Alchemical
return current_app.extensions['migrate'].db.get_engine()
except (TypeError, AttributeError):
# this works with Flask-SQLAlchemy>=3
return current_app.extensions['migrate'].db.engine
def get_engine_url():
try:
return get_engine().url.render_as_string(hide_password=False).replace(
'%', '%%')
except AttributeError:
return str(get_engine().url).replace('%', '%%')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option('sqlalchemy.url', get_engine_url())
target_db = current_app.extensions['migrate'].db
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_metadata():
if hasattr(target_db, 'metadatas'):
return target_db.metadatas[None]
return target_db.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=get_metadata(), literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
conf_args = current_app.extensions['migrate'].configure_args
if conf_args.get("process_revision_directives") is None:
conf_args["process_revision_directives"] = process_revision_directives
connectable = get_engine()
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(),
**conf_args
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
migrations/script.py.mako Normal file
View file

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}

View file

@ -0,0 +1,42 @@
"""initial migration
Revision ID: 5569d39a87cf
Revises:
Create Date: 2024-01-04 13:44:40.811421
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '5569d39a87cf'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('event', schema=None) as batch_op:
batch_op.add_column(sa.Column('title', sa.String(length=100), nullable=False))
batch_op.add_column(sa.Column('description', sa.String(length=200), nullable=True))
batch_op.add_column(sa.Column('location', sa.String(length=100), nullable=True))
batch_op.add_column(sa.Column('deleted', sa.Boolean(), nullable=True))
batch_op.add_column(sa.Column('duedate', sa.DateTime(), nullable=True))
batch_op.add_column(sa.Column('created_at', sa.DateTime(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('event', schema=None) as batch_op:
batch_op.drop_column('created_at')
batch_op.drop_column('duedate')
batch_op.drop_column('deleted')
batch_op.drop_column('location')
batch_op.drop_column('description')
batch_op.drop_column('title')
# ### end Alembic commands ###

22
models.py Normal file
View file

@ -0,0 +1,22 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Event(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
description = db.Column(db.String(200))
location = db.Column(db.String(100))
deleted = db.Column(db.Boolean, default=False)
duedate = db.Column(db.DateTime)
created_at = db.Column(db.DateTime, default=db.func.now())
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'description': self.description,
'location': self.location,
'duedate': self.duedate.isoformat() if self.duedate else None,
'created_at': self.created_at.isoformat()
}

61
routes.py Normal file
View file

@ -0,0 +1,61 @@
from flask import Blueprint, jsonify, request
from services import EventService
from middlewares import validate_event_post_request
api = Blueprint('api', __name__)
# Create new event
@api.route('/events', methods=['POST'])
@validate_event_post_request
def create_event():
try:
data = request.json
new_event = EventService.create_event(data)
if new_event:
return jsonify(new_event.to_dict()), 201
else:
return jsonify({'error': 'Failed to create event'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
# Get All Events
@api.route('/events', methods=['GET'])
def get_events():
try:
return {"events": EventService.get_all_events()}, 200
except Exception as e:
return {"error": str(e)}, 500
# Get Event by ID
@api.route('/events/<int:event_id>', methods=['GET'])
def get_event(event_id):
try:
return {"event": EventService.get_event_by_id(event_id)}, 200
except Exception as e:
return {"error": str(e)}, 500
# Update Event
@api.route('/events/<int:event_id>', methods=['PUT'])
@validate_event_post_request
def update_event(event_id):
try:
data = request.json
updated_event = EventService.update_event(event_id, data)
if updated_event:
return jsonify(updated_event.to_dict()), 200
else:
return jsonify({'error': 'Failed to update event'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
# DELETE Event
@api.route('/events/<int:event_id>', methods=['DELETE'])
def delete_event(event_id):
try:
deleted_event = EventService.delete_event(event_id)
if deleted_event:
return jsonify(deleted_event.to_dict()), 200
else:
return jsonify({'error': 'Failed to delete event'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500

43
services.py Normal file
View file

@ -0,0 +1,43 @@
from models import db, Event
from datetime import datetime
class EventService:
@staticmethod
def create_event(data):
new_event = Event(
title=data['title'],
description=data.get('description', ''),
location=data.get('location', ''),
duedate=datetime.strptime(data['duedate'], '%Y-%m-%dT%H:%M:%S')
)
db.session.add(new_event)
db.session.commit()
return new_event
@staticmethod
def get_all_events():
events=Event.query.filter_by(deleted=False).all()
return [event.to_dict() for event in events]
@staticmethod
def get_event_by_id(event_id):
return Event.query.filter_by(id=event_id, deleted=False).first().to_dict()
@staticmethod
def update_event(event_id, data):
event = Event.query.get(event_id)
event.title = data['title']
event.description = data.get('description', '')
event.location = data.get('location', '')
event.time = datetime.strptime(data['duedate'], '%Y-%m-%dT%H:%M:%S')
db.session.commit()
return event
@staticmethod
def delete_event(event_id):
event = Event.query.filter_by(id=event_id, deleted=False).first()
if event:
event.deleted = True
db.session.commit()
else:
return None
return event