diff --git a/src/documents/tests/test_workflows.py b/src/documents/tests/test_workflows.py index a7e470035..b3436ed33 100644 --- a/src/documents/tests/test_workflows.py +++ b/src/documents/tests/test_workflows.py @@ -26,7 +26,7 @@ from rest_framework.test import APITestCase from documents.file_handling import create_source_path_directory from documents.file_handling import generate_unique_filename from documents.signals.handlers import run_workflows -from documents.workflows.utils import send_webhook +from documents.workflows.webhooks import send_webhook if TYPE_CHECKING: from django.db.models import QuerySet @@ -3215,7 +3215,7 @@ class TestWorkflows( PAPERLESS_FORCE_SCRIPT_NAME="/paperless", BASE_URL="/paperless/", ) - @mock.patch("documents.workflows.utils.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_body(self, mock_post): """ GIVEN: @@ -3274,7 +3274,7 @@ class TestWorkflows( @override_settings( PAPERLESS_URL="http://localhost:8000", ) - @mock.patch("documents.workflows.utils.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_w_files(self, mock_post): """ GIVEN: @@ -3498,7 +3498,7 @@ class TestWorkflows( ) self.assertIn(expected_str, cm.output[0]) - @mock.patch("documents.workflows.utils.send_webhook.delay") + @mock.patch("documents.workflows.webhooks.send_webhook.delay") def test_workflow_webhook_action_consumption(self, mock_post): """ GIVEN: diff --git a/src/documents/workflows/actions.py b/src/documents/workflows/actions.py index 3f2c21c2d..ab66bd646 100644 --- a/src/documents/workflows/actions.py +++ b/src/documents/workflows/actions.py @@ -15,7 +15,7 @@ from documents.models import DocumentType from documents.models import WorkflowAction from documents.models import WorkflowTrigger from documents.templating.workflows import parse_w_workflow_placeholders -from documents.workflows.utils import send_webhook +from documents.workflows.webhooks import send_webhook logger = logging.getLogger("paperless.workflows.actions") diff --git a/src/documents/workflows/utils.py b/src/documents/workflows/utils.py index e0a6e55ab..e217184da 100644 --- a/src/documents/workflows/utils.py +++ b/src/documents/workflows/utils.py @@ -1,17 +1,6 @@ -import ipaddress -import logging -import socket -from urllib.parse import urlparse - -import httpx -from celery import shared_task -from django.conf import settings - from documents.models import Workflow from documents.models import WorkflowTrigger -logger = logging.getLogger("paperless.workflows") - def get_workflows_for_trigger( trigger_type: WorkflowTrigger.WorkflowTriggerType, @@ -45,89 +34,3 @@ def get_workflows_for_trigger( .order_by("order") .distinct() ) - - -def _is_public_ip(ip: str) -> bool: - try: - obj = ipaddress.ip_address(ip) - return not ( - obj.is_private - or obj.is_loopback - or obj.is_link_local - or obj.is_multicast - or obj.is_unspecified - ) - except ValueError: # pragma: no cover - return False - - -def _resolve_first_ip(host: str) -> str | None: - try: - info = socket.getaddrinfo(host, None) - return info[0][4][0] if info else None - except Exception: # pragma: no cover - return None - - -@shared_task( - retry_backoff=True, - autoretry_for=(httpx.HTTPStatusError,), - max_retries=3, - throws=(httpx.HTTPError,), -) -def send_webhook( - url: str, - data: str | dict, - headers: dict, - files: dict, - *, - as_json: bool = False, -): - p = urlparse(url) - if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname: - logger.warning("Webhook blocked: invalid scheme/hostname") - raise ValueError("Invalid URL scheme or hostname.") - - port = p.port or (443 if p.scheme == "https" else 80) - if ( - len(settings.WEBHOOKS_ALLOWED_PORTS) > 0 - and port not in settings.WEBHOOKS_ALLOWED_PORTS - ): - logger.warning("Webhook blocked: port not permitted") - raise ValueError("Destination port not permitted.") - - ip = _resolve_first_ip(p.hostname) - if not ip or ( - not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS - ): - logger.warning("Webhook blocked: destination not allowed") - raise ValueError("Destination host is not allowed.") - - try: - post_args = { - "url": url, - "headers": { - k: v for k, v in (headers or {}).items() if k.lower() != "host" - }, - "files": files or None, - "timeout": 5.0, - "follow_redirects": False, - } - if as_json: - post_args["json"] = data - elif isinstance(data, dict): - post_args["data"] = data - else: - post_args["content"] = data - - httpx.post( - **post_args, - ).raise_for_status() - logger.info( - f"Webhook sent to {url}", - ) - except Exception as e: - logger.error( - f"Failed attempt sending webhook to {url}: {e}", - ) - raise e diff --git a/src/documents/workflows/webhooks.py b/src/documents/workflows/webhooks.py new file mode 100644 index 000000000..c7bb9f7c2 --- /dev/null +++ b/src/documents/workflows/webhooks.py @@ -0,0 +1,96 @@ +import ipaddress +import logging +import socket +from urllib.parse import urlparse + +import httpx +from celery import shared_task +from django.conf import settings + +logger = logging.getLogger("paperless.workflows.webhooks") + + +def _is_public_ip(ip: str) -> bool: + try: + obj = ipaddress.ip_address(ip) + return not ( + obj.is_private + or obj.is_loopback + or obj.is_link_local + or obj.is_multicast + or obj.is_unspecified + ) + except ValueError: # pragma: no cover + return False + + +def _resolve_first_ip(host: str) -> str | None: + try: + info = socket.getaddrinfo(host, None) + return info[0][4][0] if info else None + except Exception: # pragma: no cover + return None + + +@shared_task( + retry_backoff=True, + autoretry_for=(httpx.HTTPStatusError,), + max_retries=3, + throws=(httpx.HTTPError,), +) +def send_webhook( + url: str, + data: str | dict, + headers: dict, + files: dict, + *, + as_json: bool = False, +): + p = urlparse(url) + if p.scheme.lower() not in settings.WEBHOOKS_ALLOWED_SCHEMES or not p.hostname: + logger.warning("Webhook blocked: invalid scheme/hostname") + raise ValueError("Invalid URL scheme or hostname.") + + port = p.port or (443 if p.scheme == "https" else 80) + if ( + len(settings.WEBHOOKS_ALLOWED_PORTS) > 0 + and port not in settings.WEBHOOKS_ALLOWED_PORTS + ): + logger.warning("Webhook blocked: port not permitted") + raise ValueError("Destination port not permitted.") + + ip = _resolve_first_ip(p.hostname) + if not ip or ( + not _is_public_ip(ip) and not settings.WEBHOOKS_ALLOW_INTERNAL_REQUESTS + ): + logger.warning("Webhook blocked: destination not allowed") + raise ValueError("Destination host is not allowed.") + + try: + post_args = { + "url": url, + "headers": { + k: v for k, v in (headers or {}).items() if k.lower() != "host" + }, + "files": files or None, + "timeout": 5.0, + "follow_redirects": False, + } + if as_json: + post_args["json"] = data + elif isinstance(data, dict): + post_args["data"] = data + else: + post_args["content"] = data + + httpx.post( + **post_args, + ).raise_for_status() + logger.info( + f"Webhook sent to {url}", + ) + except Exception as e: + logger.error( + f"Failed attempt sending webhook to {url}: {e}", + ) + raise e