Coverage for src / auth / services / auth_service.py: 57%
103 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 14:29 -0300
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 14:29 -0300
1"""Regras de negocio do modulo auth."""
3from fastapi import HTTPException, status
5from src.auth.models import AuthMode, AuthProvider
6from src.auth.schemas import AuthLoginRequestSchema
7from src.auth.schemas import AuthLoginResponseSchema
8from src.auth.schemas import AuthProviderConfigSchema
9from src.auth.schemas import AuthProviderPublicConfigSchema
10from src.auth.schemas import normalize_provider_name
11from src.core.config import get_auth_config
14DEFAULT_AUTH_CONFIG: dict[str, dict] = {
15 "local": {
16 "enabled": True,
17 "mode": "local",
18 "users": {},
19 },
20 "gmail": {
21 "enabled": False,
22 "mode": "oauth2",
23 "authorization_url": "https://accounts.google.com/o/oauth2/v2/auth",
24 "token_url": "https://oauth2.googleapis.com/token",
25 "scopes": ["openid", "email", "profile"],
26 },
27 "yahoo": {
28 "enabled": False,
29 "mode": "oauth2",
30 "authorization_url": "https://api.login.yahoo.com/oauth2/request_auth",
31 "token_url": "https://api.login.yahoo.com/oauth2/get_token",
32 "scopes": ["openid", "profile", "email"],
33 },
34 "microsoft": {
35 "enabled": False,
36 "mode": "oauth2",
37 "authorization_url": (
38 "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
39 ),
40 "token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
41 "scopes": ["openid", "profile", "email"],
42 },
43 "keycloak": {
44 "enabled": False,
45 "mode": "openid_connect",
46 "server_url": "http://localhost:8080",
47 "realm": "master",
48 "scopes": ["openid", "profile", "email"],
49 },
50 "ldap": {
51 "enabled": False,
52 "mode": "ldap",
53 "ldap_uri": "ldap://localhost:389",
54 "base_dn": "dc=example,dc=org",
55 },
56}
58SUPPORTED_PROVIDERS = tuple(item.value for item in AuthProvider)
61def _build_provider_config(
62 provider: AuthProvider, provider_settings: dict | None
63) -> AuthProviderConfigSchema:
64 base_settings = dict(DEFAULT_AUTH_CONFIG[provider.value])
65 incoming_settings = provider_settings or {}
66 base_settings.update(incoming_settings)
67 return AuthProviderConfigSchema(provider=provider, **base_settings)
70def list_provider_configs() -> list[AuthProviderConfigSchema]:
71 """Retorna configuracoes efetivas de todos os provedores."""
72 config = get_auth_config()
73 providers_config = config.get("providers", {})
74 providers_config = {
75 normalize_provider_name(str(key)): value
76 for key, value in providers_config.items()
77 }
78 result = []
79 for provider in AuthProvider:
80 item = _build_provider_config(provider, providers_config.get(provider.value))
81 result.append(item)
82 return result
85def get_provider_config(provider_name: str) -> AuthProviderConfigSchema:
86 """Retorna configuracao efetiva de um provedor."""
87 normalized = normalize_provider_name(provider_name)
88 try:
89 provider = AuthProvider(normalized)
90 except ValueError as error:
91 raise HTTPException(
92 status_code=status.HTTP_404_NOT_FOUND,
93 detail="Provedor de autenticacao nao suportado.",
94 ) from error
96 for item in list_provider_configs():
97 if item.provider == provider:
98 return item
100 raise HTTPException(
101 status_code=status.HTTP_404_NOT_FOUND,
102 detail="Provedor de autenticacao nao encontrado.",
103 )
106def to_public_provider_config(
107 provider_config: AuthProviderConfigSchema,
108) -> AuthProviderPublicConfigSchema:
109 """Remove segredos do payload de configuracao."""
110 return AuthProviderPublicConfigSchema(
111 provider=provider_config.provider,
112 enabled=provider_config.enabled,
113 mode=provider_config.mode,
114 client_id=provider_config.client_id,
115 authorization_url=provider_config.authorization_url,
116 token_url=provider_config.token_url,
117 redirect_uri=provider_config.redirect_uri,
118 scopes=provider_config.scopes,
119 tenant=provider_config.tenant,
120 server_url=provider_config.server_url,
121 realm=provider_config.realm,
122 ldap_uri=provider_config.ldap_uri,
123 base_dn=provider_config.base_dn,
124 bind_dn=provider_config.bind_dn,
125 local_usernames=sorted(provider_config.users.keys()),
126 )
129def login_with_provider(payload: AuthLoginRequestSchema) -> AuthLoginResponseSchema:
130 """Executa autenticacao conforme o provedor configurado."""
131 provider_config = get_provider_config(payload.provider.value)
132 if not provider_config.enabled:
133 raise HTTPException(
134 status_code=status.HTTP_403_FORBIDDEN,
135 detail="Provedor de autenticacao desabilitado.",
136 )
138 if provider_config.mode == AuthMode.LOCAL:
139 return _authenticate_local(payload, provider_config)
141 if provider_config.mode == AuthMode.LDAP:
142 return _authenticate_ldap(payload, provider_config)
144 return _authenticate_oauth(payload, provider_config.provider)
147def _authenticate_local(
148 payload: AuthLoginRequestSchema, provider_config: AuthProviderConfigSchema
149) -> AuthLoginResponseSchema:
150 if not payload.username or not payload.password:
151 raise HTTPException(
152 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
153 detail="Username e password sao obrigatorios para login local.",
154 )
156 registered_password = provider_config.users.get(payload.username)
157 if registered_password is None or registered_password != payload.password:
158 raise HTTPException(
159 status_code=status.HTTP_401_UNAUTHORIZED,
160 detail="Credenciais invalidas.",
161 )
163 token = f"local-token-{payload.username}"
164 return AuthLoginResponseSchema(
165 authenticated=True,
166 provider=AuthProvider.LOCAL,
167 access_token=token,
168 detail="Autenticacao local concluida.",
169 )
172def _authenticate_ldap(
173 payload: AuthLoginRequestSchema, provider_config: AuthProviderConfigSchema
174) -> AuthLoginResponseSchema:
175 if not payload.username or not payload.password:
176 raise HTTPException(
177 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
178 detail="Username e password sao obrigatorios para login LDAP.",
179 )
181 _ldap_authenticate(provider_config, payload.username, payload.password)
183 token = f"ldap-token-{payload.username}"
184 return AuthLoginResponseSchema(
185 authenticated=True,
186 provider=AuthProvider.LDAP,
187 access_token=token,
188 detail="Autenticacao LDAP concluida.",
189 )
192def _ldap_authenticate(
193 provider_config: AuthProviderConfigSchema, username: str, password: str
194) -> None:
195 ldap_uri = provider_config.ldap_uri
196 base_dn = provider_config.base_dn
197 if not ldap_uri or not base_dn:
198 raise HTTPException(
199 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
200 detail="Configuracao LDAP incompleta.",
201 )
203 try:
204 from ldap3 import Connection, Server
205 from ldap3.core.exceptions import LDAPBindError
206 from ldap3.core.exceptions import LDAPException
207 from ldap3.core.exceptions import LDAPSocketOpenError
208 from ldap3.utils.conv import escape_filter_chars
209 except ImportError as error:
210 raise HTTPException(
211 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
212 detail="Dependencia ldap3 nao instalada no fastapi_tpl.",
213 ) from error
215 server = Server(host=ldap_uri)
216 user_dn = f"uid={username},{base_dn}"
218 bind_dn = provider_config.bind_dn
219 bind_password = provider_config.bind_password
220 if bind_dn and bind_password:
221 try:
222 with Connection(
223 server,
224 user=bind_dn,
225 password=bind_password,
226 auto_bind=True,
227 ) as service_conn:
228 service_conn.search(
229 search_base=base_dn,
230 search_filter=f"(uid={escape_filter_chars(username)})",
231 attributes=["dn"],
232 )
233 if not service_conn.entries:
234 raise HTTPException(
235 status_code=status.HTTP_401_UNAUTHORIZED,
236 detail="Credenciais invalidas.",
237 )
238 user_dn = str(service_conn.entries[0].entry_dn)
239 except LDAPBindError as error:
240 raise HTTPException(
241 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
242 detail="Falha ao autenticar bind tecnico do LDAP.",
243 ) from error
244 except LDAPSocketOpenError as error:
245 raise HTTPException(
246 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
247 detail="Nao foi possivel conectar ao servidor LDAP.",
248 ) from error
249 except LDAPException as error:
250 raise HTTPException(
251 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
252 detail="Falha ao consultar usuario no LDAP.",
253 ) from error
255 try:
256 with Connection(
257 server,
258 user=user_dn,
259 password=password,
260 auto_bind=True,
261 ):
262 return
263 except LDAPBindError as error:
264 raise HTTPException(
265 status_code=status.HTTP_401_UNAUTHORIZED,
266 detail="Credenciais invalidas.",
267 ) from error
268 except LDAPSocketOpenError as error:
269 raise HTTPException(
270 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
271 detail="Nao foi possivel conectar ao servidor LDAP.",
272 ) from error
273 except LDAPException as error:
274 raise HTTPException(
275 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
276 detail="Erro inesperado na autenticacao LDAP.",
277 ) from error
280def _authenticate_oauth(
281 payload: AuthLoginRequestSchema, provider: AuthProvider
282) -> AuthLoginResponseSchema:
283 if not payload.authorization_code:
284 raise HTTPException(
285 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
286 detail="authorization_code e obrigatorio para login externo.",
287 )
289 token = f"{provider.value}-token-{payload.authorization_code}"
290 return AuthLoginResponseSchema(
291 authenticated=True,
292 provider=provider,
293 access_token=token,
294 detail=f"Autenticacao com {provider.value} concluida.",
295 )