Coverage for src / tarefas / services / tarefa_service.py: 93%
73 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 14:29 -0300
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 14:29 -0300
1"""Regras de negocio do modulo tarefas."""
3from datetime import datetime, timezone
5from sqlalchemy import case, select
6from sqlalchemy.orm import Session
8from src.tarefas.models import EscopoPrivacidadeTarefa
9from src.tarefas.models import PrioridadeTarefa
10from src.tarefas.models import Tarefa, VisibilidadeTarefa
11from src.tarefas.schemas import TarefaCreateSchema, TarefaUpdateSchema
14def _prioridade_order_case():
15 return case(
16 (Tarefa.prioridade == PrioridadeTarefa.CRITICA, 0),
17 (Tarefa.prioridade == PrioridadeTarefa.ALTA, 1),
18 (Tarefa.prioridade == PrioridadeTarefa.MEDIA, 2),
19 else_=3,
20 )
23def _can_read_tarefa(
24 tarefa: Tarefa, solicitante_id: str | None, grupo_nome: str | None
25) -> bool:
26 if tarefa.visibilidade == VisibilidadeTarefa.PUBLICA:
27 return True
29 if tarefa.escopo_privacidade == EscopoPrivacidadeTarefa.CRIADOR:
30 return solicitante_id is not None and tarefa.criador_id == solicitante_id
32 if tarefa.escopo_privacidade == EscopoPrivacidadeTarefa.GRUPO:
33 return grupo_nome is not None and tarefa.grupo_nome == grupo_nome
35 return False
38def _validate_visibilidade_fields(
39 visibilidade: VisibilidadeTarefa,
40 escopo_privacidade: EscopoPrivacidadeTarefa | None,
41 grupo_nome: str | None,
42) -> None:
43 if visibilidade == VisibilidadeTarefa.PUBLICA:
44 if escopo_privacidade is not None or grupo_nome is not None:
45 raise ValueError(
46 "Tarefa publica nao aceita escopo_privacidade nem grupo_nome."
47 )
48 return
50 if escopo_privacidade is None:
51 raise ValueError("Tarefa privada exige escopo_privacidade.")
53 if escopo_privacidade == EscopoPrivacidadeTarefa.CRIADOR:
54 if grupo_nome is not None:
55 raise ValueError(
56 "Tarefa privada de criador nao aceita grupo_nome."
57 )
58 return
60 if grupo_nome is None:
61 raise ValueError("Tarefa privada de grupo exige grupo_nome.")
64def list_tarefas(
65 db: Session, solicitante_id: str | None, grupo_nome: str | None
66) -> list[Tarefa]:
67 """Lista tarefas visiveis para o solicitante."""
68 tarefas = list(
69 db.scalars(
70 select(Tarefa).order_by(_prioridade_order_case(), Tarefa.id)
71 )
72 )
73 return [
74 tarefa
75 for tarefa in tarefas
76 if _can_read_tarefa(tarefa, solicitante_id, grupo_nome)
77 ]
80def get_tarefa_by_id(db: Session, tarefa_id: int) -> Tarefa | None:
81 """Busca tarefa por id."""
82 return db.get(Tarefa, tarefa_id)
85def can_read_tarefa(
86 tarefa: Tarefa, solicitante_id: str | None, grupo_nome: str | None
87) -> bool:
88 """Valida se solicitante pode ler a tarefa."""
89 return _can_read_tarefa(tarefa, solicitante_id, grupo_nome)
92def create_tarefa(db: Session, payload: TarefaCreateSchema) -> Tarefa:
93 """Cria tarefa e persiste no banco."""
94 _validate_visibilidade_fields(
95 payload.visibilidade,
96 payload.escopo_privacidade,
97 payload.grupo_nome,
98 )
99 tarefa = Tarefa(**payload.model_dump())
100 db.add(tarefa)
101 db.commit()
102 db.refresh(tarefa)
103 return tarefa
106def update_tarefa(
107 db: Session, tarefa: Tarefa, payload: TarefaCreateSchema
108) -> Tarefa:
109 """Substitui os dados da tarefa existente (PUT)."""
110 _validate_visibilidade_fields(
111 payload.visibilidade,
112 payload.escopo_privacidade,
113 payload.grupo_nome,
114 )
115 for key, value in payload.model_dump().items():
116 setattr(tarefa, key, value)
117 tarefa.atualizado_em = datetime.now(timezone.utc)
119 db.add(tarefa)
120 db.commit()
121 db.refresh(tarefa)
122 return tarefa
125def patch_tarefa(
126 db: Session, tarefa: Tarefa, payload: TarefaUpdateSchema
127) -> Tarefa:
128 """Atualiza parcialmente os dados da tarefa (PATCH)."""
129 update_data = payload.model_dump(exclude_unset=True)
130 for key, value in update_data.items():
131 setattr(tarefa, key, value)
133 _validate_visibilidade_fields(
134 tarefa.visibilidade,
135 tarefa.escopo_privacidade,
136 tarefa.grupo_nome,
137 )
138 tarefa.atualizado_em = datetime.now(timezone.utc)
140 db.add(tarefa)
141 db.commit()
142 db.refresh(tarefa)
143 return tarefa
146def conclude_tarefa(db: Session, tarefa: Tarefa, concluida: bool = True) -> Tarefa:
147 """Marca tarefa como concluida."""
148 tarefa.concluida = concluida
149 tarefa.atualizado_em = datetime.now(timezone.utc)
150 db.add(tarefa)
151 db.commit()
152 db.refresh(tarefa)
153 return tarefa
156def delete_tarefa(db: Session, tarefa: Tarefa) -> None:
157 """Remove tarefa do banco."""
158 db.delete(tarefa)
159 db.commit()