forked from LiveCarta/CommentAutomation
Added webhooks subapp for instagram webhooks and celery app and task to pass comment text to an LLM for further processing
This commit is contained in:
52
tests/test_webhook_tasks.py
Normal file
52
tests/test_webhook_tasks.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import pytest
|
||||
|
||||
from comment_automation.webhooks.tasks import (
|
||||
LLMEndpointConfigurationError,
|
||||
LLMEndpointTemporaryError,
|
||||
_forward_payload_to_llm,
|
||||
)
|
||||
|
||||
|
||||
def test_forward_payload_raises_when_endpoint_missing(monkeypatch) -> None:
|
||||
monkeypatch.delenv("LLM_ENDPOINT_URL", raising=False)
|
||||
|
||||
with pytest.raises(LLMEndpointConfigurationError):
|
||||
_forward_payload_to_llm({"hello": "world"})
|
||||
|
||||
|
||||
def test_forward_payload_retries_on_retryable_status(monkeypatch) -> None:
|
||||
class FakeResponse:
|
||||
status_code = 503
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
def fake_post(*args, **kwargs):
|
||||
return FakeResponse()
|
||||
|
||||
monkeypatch.setenv("LLM_ENDPOINT_URL", "https://example.org/llm")
|
||||
monkeypatch.setattr("comment_automation.webhooks.tasks.httpx.post", fake_post)
|
||||
|
||||
with pytest.raises(LLMEndpointTemporaryError):
|
||||
_forward_payload_to_llm({"hello": "world"})
|
||||
|
||||
|
||||
def test_forward_payload_success(monkeypatch) -> None:
|
||||
called = {"value": False}
|
||||
|
||||
class FakeResponse:
|
||||
status_code = 200
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
def fake_post(*args, **kwargs):
|
||||
called["value"] = True
|
||||
return FakeResponse()
|
||||
|
||||
monkeypatch.setenv("LLM_ENDPOINT_URL", "https://example.org/llm")
|
||||
monkeypatch.setattr("comment_automation.webhooks.tasks.httpx.post", fake_post)
|
||||
|
||||
_forward_payload_to_llm({"hello": "world"})
|
||||
|
||||
assert called["value"] is True
|
||||
Reference in New Issue
Block a user