Event-CRUD-Flask-python3-API/services/EventService.py
2024-01-09 18:30:30 +02:00

86 lines
No EOL
2.8 KiB
Python

from sqlalchemy.sql import exists
from models import db, Event, UserEventAssociation, User
from services.UserService import UserService
from datetime import datetime
from flask import g
from sqlalchemy import func
from datetime import timedelta
class EventService:
@staticmethod
def create_event(data):
new_event = Event(
title=data['title'],
description=data.get('description', ''),
location=data.get('location', ''),
duedate=datetime.strptime(data['duedate'], '%Y-%m-%dT%H:%M:%S'),
user_id=data['user_id']
)
db.session.add(new_event)
db.session.commit()
return new_event
@staticmethod
def get_upcoming_events(location=None, sort_by=None):
timedelta = datetime.now() + timedelta(minutes=30)
query = Event.query.filter(
Event.deleted == False,
Event.duedate <= timedelta
)
if location:
query = query.filter(Event.location.ilike(f"%{location}%"))
if sort_by == 'date':
query = query.order_by(Event.duedate)
elif sort_by == 'popularity':
query = query.join(UserEventAssociation, Event.id == UserEventAssociation.event_id)\
.group_by(Event.id)\
.order_by(func.count(UserEventAssociation.user_id).desc())
elif sort_by == 'creation':
query = query.order_by(Event.created_at)
events = query.all()
return [event.to_dict() for event in events]
@staticmethod
def get_event_by_id(event_id, user_id):
event = Event.query.filter_by(id=event_id,user_id=user_id, deleted=False).first()
if(event):
return event.to_dict()
else:
return []
@staticmethod
def update_event(event_id, data):
event = Event.query.filter_by(user_id=g.user_id, id=event_id).first()
if not event:
return None
event.title = data['title']
event.description = data.get('description', '')
event.location = data.get('location', '')
event.time = datetime.strptime(data['duedate'], '%Y-%m-%dT%H:%M:%S')
db.session.commit()
return event
@staticmethod
def delete_event(event_id):
event = Event.query.filter_by(id=event_id, user_id=g.user_id, deleted=False).first()
if event:
event.deleted = True
db.session.commit()
else:
return None
return event
@staticmethod
def get_all_upcomming_events():
now = datetime.now()
upcoming_deadline = now + timedelta(minutes=30)
events = Event.query.filter(
# Event.duedate <= upcoming_deadline,
Event.deleted == False
).all()
return [event.to_dict() for event in events]