from sqlalchemy.orm import Session, joinedload from typing import List, Optional from . import models, schemas # Project CRUD def create_project(db: Session, project: schemas.ProjectCreate) -> models.Project: project_data = project.model_dump() # Ensure statuses has a default value if not provided if project_data.get("statuses") is None: project_data["statuses"] = models.DEFAULT_STATUSES db_project = models.Project(**project_data) db.add(db_project) db.commit() db.refresh(db_project) return db_project def get_project(db: Session, project_id: int) -> Optional[models.Project]: return db.query(models.Project).filter(models.Project.id == project_id).first() def get_projects(db: Session, skip: int = 0, limit: int = 100, archived: Optional[bool] = None) -> List[models.Project]: query = db.query(models.Project) # Filter by archive status if specified if archived is not None: query = query.filter(models.Project.is_archived == archived) return query.offset(skip).limit(limit).all() def update_project( db: Session, project_id: int, project: schemas.ProjectUpdate ) -> Optional[models.Project]: db_project = get_project(db, project_id) if not db_project: return None update_data = project.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(db_project, key, value) db.commit() db.refresh(db_project) return db_project def delete_project(db: Session, project_id: int) -> bool: db_project = get_project(db, project_id) if not db_project: return False db.delete(db_project) db.commit() return True # Task CRUD def create_task(db: Session, task: schemas.TaskCreate) -> models.Task: # Validate status against project's statuses project = get_project(db, task.project_id) if project and task.status not in project.statuses: raise ValueError(f"Invalid status '{task.status}'. Must be one of: {', '.join(project.statuses)}") # Get max sort_order for siblings if task.parent_task_id: max_order = db.query(models.Task).filter( models.Task.parent_task_id == task.parent_task_id ).count() else: max_order = db.query(models.Task).filter( models.Task.project_id == task.project_id, models.Task.parent_task_id.is_(None) ).count() task_data = task.model_dump() if "sort_order" not in task_data or task_data["sort_order"] == 0: task_data["sort_order"] = max_order db_task = models.Task(**task_data) db.add(db_task) db.commit() db.refresh(db_task) return db_task def get_task(db: Session, task_id: int) -> Optional[models.Task]: return db.query(models.Task).filter(models.Task.id == task_id).first() def get_tasks_by_project(db: Session, project_id: int) -> List[models.Task]: return db.query(models.Task).filter(models.Task.project_id == project_id).all() def get_root_tasks(db: Session, project_id: int) -> List[models.Task]: """Get all root-level tasks (no parent) for a project""" return db.query(models.Task).filter( models.Task.project_id == project_id, models.Task.parent_task_id.is_(None) ).order_by(models.Task.sort_order).all() def get_task_with_subtasks(db: Session, task_id: int) -> Optional[models.Task]: """Recursively load a task with all its subtasks""" return db.query(models.Task).options( joinedload(models.Task.subtasks) ).filter(models.Task.id == task_id).first() def check_and_update_parent_status(db: Session, parent_id: int): """Check if all children of a parent are done, and mark parent as done if so""" # Get all children of this parent children = db.query(models.Task).filter( models.Task.parent_task_id == parent_id ).all() # If no children, nothing to do if not children: return # Check if all children are done all_done = all(child.status == "done" for child in children) if all_done: # Mark parent as done parent = get_task(db, parent_id) if parent and parent.status != "done": parent.status = "done" db.commit() # Recursively check grandparent if parent.parent_task_id: check_and_update_parent_status(db, parent.parent_task_id) def update_task( db: Session, task_id: int, task: schemas.TaskUpdate ) -> Optional[models.Task]: db_task = get_task(db, task_id) if not db_task: return None update_data = task.model_dump(exclude_unset=True) # Validate status against project's statuses if status is being updated if "status" in update_data: project = get_project(db, db_task.project_id) if project and update_data["status"] not in project.statuses: raise ValueError(f"Invalid status '{update_data['status']}'. Must be one of: {', '.join(project.statuses)}") status_changed = True old_status = db_task.status else: status_changed = False for key, value in update_data.items(): setattr(db_task, key, value) db.commit() db.refresh(db_task) # If status changed to 'done' and this task has a parent, check if parent should auto-complete if status_changed and db_task.status == "done" and db_task.parent_task_id: check_and_update_parent_status(db, db_task.parent_task_id) return db_task def delete_task(db: Session, task_id: int) -> bool: db_task = get_task(db, task_id) if not db_task: return False db.delete(db_task) db.commit() return True def get_tasks_by_status(db: Session, project_id: int, status: str) -> List[models.Task]: """Get all tasks for a project with a specific status""" # Validate status against project's statuses project = get_project(db, project_id) if project and status not in project.statuses: raise ValueError(f"Invalid status '{status}'. Must be one of: {', '.join(project.statuses)}") return db.query(models.Task).filter( models.Task.project_id == project_id, models.Task.status == status ).all() # ========== BLOCKER CRUD ========== def _has_cycle(db: Session, start_id: int, target_id: int) -> bool: """BFS from start_id following its blockers. Returns True if target_id is reachable, which would mean adding target_id as a blocker of start_id creates a cycle.""" visited = set() queue = [start_id] while queue: current = queue.pop(0) if current == target_id: return True if current in visited: continue visited.add(current) task = db.query(models.Task).filter(models.Task.id == current).first() if task: for b in task.blockers: if b.id not in visited: queue.append(b.id) return False def add_blocker(db: Session, task_id: int, blocker_id: int) -> models.Task: """Add blocker_id as a prerequisite of task_id. Raises ValueError on self-reference or cycle.""" if task_id == blocker_id: raise ValueError("A task cannot block itself") task = get_task(db, task_id) blocker = get_task(db, blocker_id) if not task: raise ValueError("Task not found") if not blocker: raise ValueError("Blocker task not found") # Already linked — idempotent if any(b.id == blocker_id for b in task.blockers): return task # Cycle detection: would blocker_id eventually depend on task_id? if _has_cycle(db, blocker_id, task_id): raise ValueError("Adding this blocker would create a circular dependency") task.blockers.append(blocker) db.commit() db.refresh(task) return task def remove_blocker(db: Session, task_id: int, blocker_id: int) -> bool: """Remove blocker_id as a prerequisite of task_id.""" task = get_task(db, task_id) blocker = get_task(db, blocker_id) if not task or not blocker: return False if not any(b.id == blocker_id for b in task.blockers): return False task.blockers.remove(blocker) db.commit() return True def get_task_with_blockers(db: Session, task_id: int) -> Optional[models.Task]: """Get a task including its blockers and blocking lists.""" return db.query(models.Task).filter(models.Task.id == task_id).first() def get_actionable_tasks(db: Session) -> List[dict]: """Return all non-done tasks that have no incomplete blockers, with project name.""" tasks = db.query(models.Task).filter( models.Task.status != "done" ).all() result = [] for task in tasks: incomplete_blockers = [b for b in task.blockers if b.status != "done"] if not incomplete_blockers: result.append({ "id": task.id, "title": task.title, "project_id": task.project_id, "project_name": task.project.name, "status": task.status, "estimated_minutes": task.estimated_minutes, "tags": task.tags, "flag_color": task.flag_color, }) return result