import logging from pathlib import Path from django.conf import settings from django.contrib.auth.models import User from django.utils import timezone from documents.data_models import ConsumableDocument from documents.data_models import DocumentMetadataOverrides from documents.mail import EmailAttachment from documents.mail import send_email from documents.models import Correspondent from documents.models import Document from documents.models import DocumentType from documents.models import WorkflowAction from documents.models import WorkflowTrigger from documents.templating.workflows import parse_w_workflow_placeholders from documents.workflows.utils import send_webhook logger = logging.getLogger("paperless.workflows.actions") def build_workflow_action_context( document: Document | ConsumableDocument, overrides: DocumentMetadataOverrides | None, ) -> dict: """ Build context dictionary for workflow action placeholder parsing. """ use_overrides = overrides is not None if not use_overrides: return { "title": document.title, "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/", "correspondent": document.correspondent.name if document.correspondent else "", "document_type": document.document_type.name if document.document_type else "", "owner_username": document.owner.username if document.owner else "", "filename": document.original_filename or "", "current_filename": document.filename or "", "added": timezone.localtime(document.added), "created": document.created, } correspondent_obj = ( Correspondent.objects.filter(pk=overrides.correspondent_id).first() if overrides and overrides.correspondent_id else None ) document_type_obj = ( DocumentType.objects.filter(pk=overrides.document_type_id).first() if overrides and overrides.document_type_id else None ) owner_obj = ( User.objects.filter(pk=overrides.owner_id).first() if overrides and overrides.owner_id else None ) filename = document.original_file if document.original_file else "" return { "title": overrides.title if overrides and overrides.title else str(document.original_file), "doc_url": "", "correspondent": correspondent_obj.name if correspondent_obj else "", "document_type": document_type_obj.name if document_type_obj else "", "owner_username": owner_obj.username if owner_obj else "", "filename": filename, "current_filename": filename, "added": timezone.localtime(timezone.now()), "created": overrides.created if overrides else None, } def execute_email_action( action: WorkflowAction, document: Document | ConsumableDocument, context: dict, logging_group, original_file: Path, trigger_type: WorkflowTrigger.WorkflowTriggerType, ) -> None: """ Execute an email action for a workflow. """ subject = ( parse_w_workflow_placeholders( action.email.subject, context["correspondent"], context["document_type"], context["owner_username"], context["added"], context["filename"], context["current_filename"], context["created"], context["title"], context["doc_url"], ) if action.email.subject else "" ) body = ( parse_w_workflow_placeholders( action.email.body, context["correspondent"], context["document_type"], context["owner_username"], context["added"], context["filename"], context["current_filename"], context["created"], context["title"], context["doc_url"], ) if action.email.body else "" ) try: attachments: list[EmailAttachment] = [] if action.email.include_document: attachment: EmailAttachment | None = None if trigger_type in [ WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, WorkflowTrigger.WorkflowTriggerType.SCHEDULED, ] and isinstance(document, Document): friendly_name = ( Path(context["current_filename"]).name if context["current_filename"] else document.source_path.name ) attachment = EmailAttachment( path=document.source_path, mime_type=document.mime_type, friendly_name=friendly_name, ) elif original_file: friendly_name = ( Path(context["current_filename"]).name if context["current_filename"] else original_file.name ) attachment = EmailAttachment( path=original_file, mime_type=document.mime_type, friendly_name=friendly_name, ) if attachment: attachments = [attachment] n_messages = send_email( subject=subject, body=body, to=action.email.to.split(","), attachments=attachments, ) logger.debug( f"Sent {n_messages} notification email(s) to {action.email.to}", extra={"group": logging_group}, ) except Exception as e: logger.exception( f"Error occurred sending notification email: {e}", extra={"group": logging_group}, ) def execute_webhook_action( action: WorkflowAction, document: Document | ConsumableDocument, context: dict, logging_group, original_file: Path, ): try: data = {} if action.webhook.use_params: if action.webhook.params: try: for key, value in action.webhook.params.items(): data[key] = parse_w_workflow_placeholders( value, context["correspondent"], context["document_type"], context["owner_username"], context["added"], context["filename"], context["current_filename"], context["created"], context["title"], context["doc_url"], ) except Exception as e: logger.error( f"Error occurred parsing webhook params: {e}", extra={"group": logging_group}, ) elif action.webhook.body: data = parse_w_workflow_placeholders( action.webhook.body, context["correspondent"], context["document_type"], context["owner_username"], context["added"], context["filename"], context["current_filename"], context["created"], context["title"], context["doc_url"], ) headers = {} if action.webhook.headers: try: headers = {str(k): str(v) for k, v in action.webhook.headers.items()} except Exception as e: logger.error( f"Error occurred parsing webhook headers: {e}", extra={"group": logging_group}, ) files = None if action.webhook.include_document: with original_file.open("rb") as f: files = { "file": ( str(context["filename"]) if context["filename"] else original_file.name, f.read(), document.mime_type, ), } send_webhook.delay( url=action.webhook.url, data=data, headers=headers, files=files, as_json=action.webhook.as_json, ) logger.debug( f"Webhook to {action.webhook.url} queued", extra={"group": logging_group}, ) except Exception as e: logger.exception( f"Error occurred sending webhook: {e}", extra={"group": logging_group}, )