diff --git a/src/documents/signals/handlers.py b/src/documents/signals/handlers.py index a5d32ccd5..e495631f8 100644 --- a/src/documents/signals/handlers.py +++ b/src/documents/signals/handlers.py @@ -42,13 +42,13 @@ from documents.models import WorkflowAction from documents.models import WorkflowRun from documents.models import WorkflowTrigger from documents.permissions import get_objects_for_user_owner_aware -from documents.workflows.utils import apply_assignment_to_document -from documents.workflows.utils import apply_assignment_to_overrides -from documents.workflows.utils import apply_removal_to_document -from documents.workflows.utils import apply_removal_to_overrides -from documents.workflows.utils import build_workflow_action_context -from documents.workflows.utils import execute_email_action -from documents.workflows.utils import execute_webhook_action +from documents.workflows.actions import build_workflow_action_context +from documents.workflows.actions import execute_email_action +from documents.workflows.actions import execute_webhook_action +from documents.workflows.mutations import apply_assignment_to_document +from documents.workflows.mutations import apply_assignment_to_overrides +from documents.workflows.mutations import apply_removal_to_document +from documents.workflows.mutations import apply_removal_to_overrides from documents.workflows.utils import get_workflows_for_trigger if TYPE_CHECKING: diff --git a/src/documents/workflows/actions.py b/src/documents/workflows/actions.py new file mode 100644 index 000000000..6bfec9d26 --- /dev/null +++ b/src/documents/workflows/actions.py @@ -0,0 +1,254 @@ +import logging +from pathlib import Path + +from django.conf import settings +from django.contrib.auth.models import User +from django.utils import timezone + +from documents.data_models import ConsumableDocument +from documents.data_models import DocumentMetadataOverrides +from documents.mail import EmailAttachment +from documents.mail import send_email +from documents.models import Correspondent +from documents.models import Document +from documents.models import DocumentType +from documents.models import WorkflowAction +from documents.models import WorkflowTrigger +from documents.templating.workflows import parse_w_workflow_placeholders +from documents.workflows.utils import send_webhook + +logger = logging.getLogger("paperless.workflows.actions") + + +def build_workflow_action_context( + document: Document | ConsumableDocument, + overrides: DocumentMetadataOverrides | None, + *, + use_overrides: bool = False, +) -> dict: + """ + Build context dictionary for workflow action placeholder parsing. + """ + if not use_overrides: + return { + "title": document.title, + "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/", + "correspondent": document.correspondent.name + if document.correspondent + else "", + "document_type": document.document_type.name + if document.document_type + else "", + "owner_username": document.owner.username if document.owner else "", + "filename": document.original_filename or "", + "current_filename": document.filename or "", + "added": timezone.localtime(document.added), + "created": document.created, + } + + correspondent_obj = ( + Correspondent.objects.filter(pk=overrides.correspondent_id).first() + if overrides and overrides.correspondent_id + else None + ) + document_type_obj = ( + DocumentType.objects.filter(pk=overrides.document_type_id).first() + if overrides and overrides.document_type_id + else None + ) + owner_obj = ( + User.objects.filter(pk=overrides.owner_id).first() + if overrides and overrides.owner_id + else None + ) + + filename = document.original_file if document.original_file else "" + return { + "title": overrides.title + if overrides and overrides.title + else str(document.original_file), + "doc_url": "", + "correspondent": correspondent_obj.name if correspondent_obj else "", + "document_type": document_type_obj.name if document_type_obj else "", + "owner_username": owner_obj.username if owner_obj else "", + "filename": filename, + "current_filename": filename, + "added": timezone.localtime(timezone.now()), + "created": overrides.created if overrides else None, + } + + +def execute_email_action( + action: WorkflowAction, + document: Document | ConsumableDocument, + context: dict, + logging_group, + original_file: Path, + trigger_type: WorkflowTrigger.WorkflowTriggerType, +) -> None: + """ + Execute an email action for a workflow. + """ + + subject = ( + parse_w_workflow_placeholders( + action.email.subject, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + if action.email.subject + else "" + ) + body = ( + parse_w_workflow_placeholders( + action.email.body, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + if action.email.body + else "" + ) + + try: + attachments: list[EmailAttachment] = [] + if action.email.include_document: + attachment: EmailAttachment | None = None + if trigger_type in [ + WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, + WorkflowTrigger.WorkflowTriggerType.SCHEDULED, + ] and isinstance(document, Document): + friendly_name = ( + Path(context["current_filename"]).name + if context["current_filename"] + else document.source_path.name + ) + attachment = EmailAttachment( + path=document.source_path, + mime_type=document.mime_type, + friendly_name=friendly_name, + ) + elif original_file: + friendly_name = ( + Path(context["current_filename"]).name + if context["current_filename"] + else original_file.name + ) + attachment = EmailAttachment( + path=original_file, + mime_type=document.mime_type, + friendly_name=friendly_name, + ) + if attachment: + attachments = [attachment] + + n_messages = send_email( + subject=subject, + body=body, + to=action.email.to.split(","), + attachments=attachments, + ) + logger.debug( + f"Sent {n_messages} notification email(s) to {action.email.to}", + extra={"group": logging_group}, + ) + except Exception as e: + logger.exception( + f"Error occurred sending notification email: {e}", + extra={"group": logging_group}, + ) + + +def execute_webhook_action( + action: WorkflowAction, + document: Document | ConsumableDocument, + context: dict, + logging_group, + original_file: Path, +): + try: + data = {} + if action.webhook.use_params: + if action.webhook.params: + try: + for key, value in action.webhook.params.items(): + data[key] = parse_w_workflow_placeholders( + value, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + except Exception as e: + logger.error( + f"Error occurred parsing webhook params: {e}", + extra={"group": logging_group}, + ) + elif action.webhook.body: + data = parse_w_workflow_placeholders( + action.webhook.body, + context["correspondent"], + context["document_type"], + context["owner_username"], + context["added"], + context["filename"], + context["current_filename"], + context["created"], + context["title"], + context["doc_url"], + ) + headers = {} + if action.webhook.headers: + try: + headers = {str(k): str(v) for k, v in action.webhook.headers.items()} + except Exception as e: + logger.error( + f"Error occurred parsing webhook headers: {e}", + extra={"group": logging_group}, + ) + files = None + if action.webhook.include_document: + with original_file.open("rb") as f: + files = { + "file": ( + str(context["filename"]) + if context["filename"] + else original_file.name, + f.read(), + document.mime_type, + ), + } + send_webhook.delay( + url=action.webhook.url, + data=data, + headers=headers, + files=files, + as_json=action.webhook.as_json, + ) + logger.debug( + f"Webhook to {action.webhook.url} queued", + extra={"group": logging_group}, + ) + except Exception as e: + logger.exception( + f"Error occurred sending webhook: {e}", + extra={"group": logging_group}, + ) diff --git a/src/documents/workflows/mutations.py b/src/documents/workflows/mutations.py new file mode 100644 index 000000000..4655e1172 --- /dev/null +++ b/src/documents/workflows/mutations.py @@ -0,0 +1,348 @@ +import logging + +from django.utils import timezone +from guardian.shortcuts import remove_perm + +from documents.data_models import DocumentMetadataOverrides +from documents.models import CustomFieldInstance +from documents.models import Document +from documents.models import WorkflowAction +from documents.permissions import set_permissions_for_object +from documents.templating.workflows import parse_w_workflow_placeholders + +logger = logging.getLogger("paperless.workflows.mutations") + + +def apply_assignment_to_document( + action: WorkflowAction, + document: Document, + doc_tag_ids: list[int], + logging_group, +): + """ + Apply assignment actions to a Document instance. + """ + if action.assign_tags.exists(): + tag_ids_to_add: set[int] = set() + for tag in action.assign_tags.all(): + tag_ids_to_add.add(tag.pk) + tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) + + doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add) + + if action.assign_correspondent: + document.correspondent = action.assign_correspondent + + if action.assign_document_type: + document.document_type = action.assign_document_type + + if action.assign_storage_path: + document.storage_path = action.assign_storage_path + + if action.assign_owner: + document.owner = action.assign_owner + + if action.assign_title: + try: + document.title = parse_w_workflow_placeholders( + action.assign_title, + document.correspondent.name if document.correspondent else "", + document.document_type.name if document.document_type else "", + document.owner.username if document.owner else "", + timezone.localtime(document.added), + document.original_filename or "", + document.filename or "", + document.created, + ) + except Exception: # pragma: no cover + logger.exception( + f"Error occurred parsing title assignment '{action.assign_title}', falling back to original", + extra={"group": logging_group}, + ) + + if any( + [ + action.assign_view_users.exists(), + action.assign_view_groups.exists(), + action.assign_change_users.exists(), + action.assign_change_groups.exists(), + ], + ): + permissions = { + "view": { + "users": action.assign_view_users.values_list("id", flat=True), + "groups": action.assign_view_groups.values_list("id", flat=True), + }, + "change": { + "users": action.assign_change_users.values_list("id", flat=True), + "groups": action.assign_change_groups.values_list("id", flat=True), + }, + } + set_permissions_for_object( + permissions=permissions, + object=document, + merge=True, + ) + + if action.assign_custom_fields.exists(): + for field in action.assign_custom_fields.all(): + value_field_name = CustomFieldInstance.get_value_field_name( + data_type=field.data_type, + ) + args = { + value_field_name: action.assign_custom_fields_values.get( + str(field.pk), + None, + ), + } + # for some reason update_or_create doesn't work here + instance = CustomFieldInstance.objects.filter( + field=field, + document=document, + ).first() + if instance and args[value_field_name] is not None: + setattr(instance, value_field_name, args[value_field_name]) + instance.save() + elif not instance: + CustomFieldInstance.objects.create( + **args, + field=field, + document=document, + ) + + +def apply_assignment_to_overrides( + action: WorkflowAction, + overrides: DocumentMetadataOverrides, +): + """ + Apply assignment actions to DocumentMetadataOverrides. + """ + if action.assign_tags.exists(): + if overrides.tag_ids is None: + overrides.tag_ids = [] + tag_ids_to_add: set[int] = set() + for tag in action.assign_tags.all(): + tag_ids_to_add.add(tag.pk) + tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) + + overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add) + + if action.assign_correspondent: + overrides.correspondent_id = action.assign_correspondent.pk + + if action.assign_document_type: + overrides.document_type_id = action.assign_document_type.pk + + if action.assign_storage_path: + overrides.storage_path_id = action.assign_storage_path.pk + + if action.assign_owner: + overrides.owner_id = action.assign_owner.pk + + if action.assign_title: + overrides.title = action.assign_title + + if any( + [ + action.assign_view_users.exists(), + action.assign_view_groups.exists(), + action.assign_change_users.exists(), + action.assign_change_groups.exists(), + ], + ): + overrides.view_users = list( + set( + (overrides.view_users or []) + + list(action.assign_view_users.values_list("id", flat=True)), + ), + ) + overrides.view_groups = list( + set( + (overrides.view_groups or []) + + list(action.assign_view_groups.values_list("id", flat=True)), + ), + ) + overrides.change_users = list( + set( + (overrides.change_users or []) + + list(action.assign_change_users.values_list("id", flat=True)), + ), + ) + overrides.change_groups = list( + set( + (overrides.change_groups or []) + + list(action.assign_change_groups.values_list("id", flat=True)), + ), + ) + + if action.assign_custom_fields.exists(): + if overrides.custom_fields is None: + overrides.custom_fields = {} + overrides.custom_fields.update( + { + field.pk: action.assign_custom_fields_values.get( + str(field.pk), + None, + ) + for field in action.assign_custom_fields.all() + }, + ) + + +def apply_removal_to_document( + action: WorkflowAction, + document: Document, + doc_tag_ids: list[int], +): + """ + Apply removal actions to a Document instance. + """ + + if action.remove_all_tags: + doc_tag_ids.clear() + else: + tag_ids_to_remove: set[int] = set() + for tag in action.remove_tags.all(): + tag_ids_to_remove.add(tag.pk) + tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) + + doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove] + + if action.remove_all_correspondents or ( + document.correspondent + and action.remove_correspondents.filter(pk=document.correspondent.pk).exists() + ): + document.correspondent = None + + if action.remove_all_document_types or ( + document.document_type + and action.remove_document_types.filter(pk=document.document_type.pk).exists() + ): + document.document_type = None + + if action.remove_all_storage_paths or ( + document.storage_path + and action.remove_storage_paths.filter(pk=document.storage_path.pk).exists() + ): + document.storage_path = None + + if action.remove_all_owners or ( + document.owner and action.remove_owners.filter(pk=document.owner.pk).exists() + ): + document.owner = None + + if action.remove_all_permissions: + permissions = { + "view": {"users": [], "groups": []}, + "change": {"users": [], "groups": []}, + } + set_permissions_for_object( + permissions=permissions, + object=document, + merge=False, + ) + elif any( + [ + action.remove_view_users.exists(), + action.remove_view_groups.exists(), + action.remove_change_users.exists(), + action.remove_change_groups.exists(), + ], + ): + for user in action.remove_view_users.all(): + remove_perm("view_document", user, document) + for user in action.remove_change_users.all(): + remove_perm("change_document", user, document) + for group in action.remove_view_groups.all(): + remove_perm("view_document", group, document) + for group in action.remove_change_groups.all(): + remove_perm("change_document", group, document) + + if action.remove_all_custom_fields: + CustomFieldInstance.objects.filter(document=document).hard_delete() + elif action.remove_custom_fields.exists(): + CustomFieldInstance.objects.filter( + field__in=action.remove_custom_fields.all(), + document=document, + ).hard_delete() + + +def apply_removal_to_overrides( + action: WorkflowAction, + overrides: DocumentMetadataOverrides, +): + """ + Apply removal actions to DocumentMetadataOverrides. + """ + if action.remove_all_tags: + overrides.tag_ids = None + elif overrides.tag_ids: + tag_ids_to_remove: set[int] = set() + for tag in action.remove_tags.all(): + tag_ids_to_remove.add(tag.pk) + tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) + + overrides.tag_ids = [t for t in overrides.tag_ids if t not in tag_ids_to_remove] + + if action.remove_all_correspondents or ( + overrides.correspondent_id + and action.remove_correspondents.filter(pk=overrides.correspondent_id).exists() + ): + overrides.correspondent_id = None + + if action.remove_all_document_types or ( + overrides.document_type_id + and action.remove_document_types.filter(pk=overrides.document_type_id).exists() + ): + overrides.document_type_id = None + + if action.remove_all_storage_paths or ( + overrides.storage_path_id + and action.remove_storage_paths.filter(pk=overrides.storage_path_id).exists() + ): + overrides.storage_path_id = None + + if action.remove_all_owners or ( + overrides.owner_id + and action.remove_owners.filter(pk=overrides.owner_id).exists() + ): + overrides.owner_id = None + + if action.remove_all_permissions: + overrides.view_users = None + overrides.view_groups = None + overrides.change_users = None + overrides.change_groups = None + elif any( + [ + action.remove_view_users.exists(), + action.remove_view_groups.exists(), + action.remove_change_users.exists(), + action.remove_change_groups.exists(), + ], + ): + if overrides.view_users: + for user in action.remove_view_users.filter(pk__in=overrides.view_users): + overrides.view_users.remove(user.pk) + if overrides.change_users: + for user in action.remove_change_users.filter( + pk__in=overrides.change_users, + ): + overrides.change_users.remove(user.pk) + if overrides.view_groups: + for group in action.remove_view_groups.filter(pk__in=overrides.view_groups): + overrides.view_groups.remove(group.pk) + if overrides.change_groups: + for group in action.remove_change_groups.filter( + pk__in=overrides.change_groups, + ): + overrides.change_groups.remove(group.pk) + + if action.remove_all_custom_fields: + overrides.custom_fields = None + elif action.remove_custom_fields.exists() and overrides.custom_fields: + for field in action.remove_custom_fields.filter( + pk__in=overrides.custom_fields.keys(), + ): + overrides.custom_fields.pop(field.pk, None) diff --git a/src/documents/workflows/utils.py b/src/documents/workflows/utils.py index 1159f8b3d..e0a6e55ab 100644 --- a/src/documents/workflows/utils.py +++ b/src/documents/workflows/utils.py @@ -1,29 +1,14 @@ import ipaddress import logging import socket -from pathlib import Path from urllib.parse import urlparse import httpx from celery import shared_task from django.conf import settings -from django.contrib.auth.models import User -from django.utils import timezone -from guardian.shortcuts import remove_perm -from documents.data_models import ConsumableDocument -from documents.data_models import DocumentMetadataOverrides -from documents.mail import EmailAttachment -from documents.mail import send_email -from documents.models import Correspondent -from documents.models import CustomFieldInstance -from documents.models import Document -from documents.models import DocumentType from documents.models import Workflow -from documents.models import WorkflowAction from documents.models import WorkflowTrigger -from documents.permissions import set_permissions_for_object -from documents.templating.workflows import parse_w_workflow_placeholders logger = logging.getLogger("paperless.workflows") @@ -62,575 +47,6 @@ def get_workflows_for_trigger( ) -def apply_assignment_to_document( - action: WorkflowAction, - document: Document, - doc_tag_ids: list[int], - logging_group, -): - """ - Apply assignment actions to a Document instance. - """ - if action.assign_tags.exists(): - tag_ids_to_add: set[int] = set() - for tag in action.assign_tags.all(): - tag_ids_to_add.add(tag.pk) - tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) - - doc_tag_ids[:] = list(set(doc_tag_ids) | tag_ids_to_add) - - if action.assign_correspondent: - document.correspondent = action.assign_correspondent - - if action.assign_document_type: - document.document_type = action.assign_document_type - - if action.assign_storage_path: - document.storage_path = action.assign_storage_path - - if action.assign_owner: - document.owner = action.assign_owner - - if action.assign_title: - try: - document.title = parse_w_workflow_placeholders( - action.assign_title, - document.correspondent.name if document.correspondent else "", - document.document_type.name if document.document_type else "", - document.owner.username if document.owner else "", - timezone.localtime(document.added), - document.original_filename or "", - document.filename or "", - document.created, - ) - except Exception: - logger.exception( - f"Error occurred parsing title assignment '{action.assign_title}', falling back to original", - extra={"group": logging_group}, - ) - - if any( - [ - action.assign_view_users.exists(), - action.assign_view_groups.exists(), - action.assign_change_users.exists(), - action.assign_change_groups.exists(), - ], - ): - permissions = { - "view": { - "users": action.assign_view_users.values_list("id", flat=True), - "groups": action.assign_view_groups.values_list("id", flat=True), - }, - "change": { - "users": action.assign_change_users.values_list("id", flat=True), - "groups": action.assign_change_groups.values_list("id", flat=True), - }, - } - set_permissions_for_object( - permissions=permissions, - object=document, - merge=True, - ) - - if action.assign_custom_fields.exists(): - for field in action.assign_custom_fields.all(): - value_field_name = CustomFieldInstance.get_value_field_name( - data_type=field.data_type, - ) - args = { - value_field_name: action.assign_custom_fields_values.get( - str(field.pk), - None, - ), - } - # for some reason update_or_create doesn't work here - instance = CustomFieldInstance.objects.filter( - field=field, - document=document, - ).first() - if instance and args[value_field_name] is not None: - setattr(instance, value_field_name, args[value_field_name]) - instance.save() - elif not instance: - CustomFieldInstance.objects.create( - **args, - field=field, - document=document, - ) - - -def apply_assignment_to_overrides( - action: WorkflowAction, - overrides: DocumentMetadataOverrides, -): - """ - Apply assignment actions to DocumentMetadataOverrides. - """ - if action.assign_tags.exists(): - if overrides.tag_ids is None: - overrides.tag_ids = [] - tag_ids_to_add: set[int] = set() - for tag in action.assign_tags.all(): - tag_ids_to_add.add(tag.pk) - tag_ids_to_add.update(int(pk) for pk in tag.get_ancestors_pks()) - - overrides.tag_ids = list(set(overrides.tag_ids) | tag_ids_to_add) - - if action.assign_correspondent: - overrides.correspondent_id = action.assign_correspondent.pk - - if action.assign_document_type: - overrides.document_type_id = action.assign_document_type.pk - - if action.assign_storage_path: - overrides.storage_path_id = action.assign_storage_path.pk - - if action.assign_owner: - overrides.owner_id = action.assign_owner.pk - - if action.assign_title: - overrides.title = action.assign_title - - if any( - [ - action.assign_view_users.exists(), - action.assign_view_groups.exists(), - action.assign_change_users.exists(), - action.assign_change_groups.exists(), - ], - ): - overrides.view_users = list( - set( - (overrides.view_users or []) - + list(action.assign_view_users.values_list("id", flat=True)), - ), - ) - overrides.view_groups = list( - set( - (overrides.view_groups or []) - + list(action.assign_view_groups.values_list("id", flat=True)), - ), - ) - overrides.change_users = list( - set( - (overrides.change_users or []) - + list(action.assign_change_users.values_list("id", flat=True)), - ), - ) - overrides.change_groups = list( - set( - (overrides.change_groups or []) - + list(action.assign_change_groups.values_list("id", flat=True)), - ), - ) - - if action.assign_custom_fields.exists(): - if overrides.custom_fields is None: - overrides.custom_fields = {} - overrides.custom_fields.update( - { - field.pk: action.assign_custom_fields_values.get( - str(field.pk), - None, - ) - for field in action.assign_custom_fields.all() - }, - ) - - -def apply_removal_to_document( - action: WorkflowAction, - document: Document, - doc_tag_ids: list[int], -): - """ - Apply removal actions to a Document instance. - """ - - if action.remove_all_tags: - doc_tag_ids.clear() - else: - tag_ids_to_remove: set[int] = set() - for tag in action.remove_tags.all(): - tag_ids_to_remove.add(tag.pk) - tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) - - doc_tag_ids[:] = [t for t in doc_tag_ids if t not in tag_ids_to_remove] - - if action.remove_all_correspondents or ( - document.correspondent - and action.remove_correspondents.filter(pk=document.correspondent.pk).exists() - ): - document.correspondent = None - - if action.remove_all_document_types or ( - document.document_type - and action.remove_document_types.filter(pk=document.document_type.pk).exists() - ): - document.document_type = None - - if action.remove_all_storage_paths or ( - document.storage_path - and action.remove_storage_paths.filter(pk=document.storage_path.pk).exists() - ): - document.storage_path = None - - if action.remove_all_owners or ( - document.owner and action.remove_owners.filter(pk=document.owner.pk).exists() - ): - document.owner = None - - if action.remove_all_permissions: - permissions = { - "view": {"users": [], "groups": []}, - "change": {"users": [], "groups": []}, - } - set_permissions_for_object( - permissions=permissions, - object=document, - merge=False, - ) - elif any( - [ - action.remove_view_users.exists(), - action.remove_view_groups.exists(), - action.remove_change_users.exists(), - action.remove_change_groups.exists(), - ], - ): - for user in action.remove_view_users.all(): - remove_perm("view_document", user, document) - for user in action.remove_change_users.all(): - remove_perm("change_document", user, document) - for group in action.remove_view_groups.all(): - remove_perm("view_document", group, document) - for group in action.remove_change_groups.all(): - remove_perm("change_document", group, document) - - if action.remove_all_custom_fields: - CustomFieldInstance.objects.filter(document=document).hard_delete() - elif action.remove_custom_fields.exists(): - CustomFieldInstance.objects.filter( - field__in=action.remove_custom_fields.all(), - document=document, - ).hard_delete() - - -def apply_removal_to_overrides( - action: WorkflowAction, - overrides: DocumentMetadataOverrides, -): - """ - Apply removal actions to DocumentMetadataOverrides. - """ - if action.remove_all_tags: - overrides.tag_ids = None - elif overrides.tag_ids: - tag_ids_to_remove: set[int] = set() - for tag in action.remove_tags.all(): - tag_ids_to_remove.add(tag.pk) - tag_ids_to_remove.update(int(pk) for pk in tag.get_descendants_pks()) - - overrides.tag_ids = [t for t in overrides.tag_ids if t not in tag_ids_to_remove] - - if action.remove_all_correspondents or ( - overrides.correspondent_id - and action.remove_correspondents.filter(pk=overrides.correspondent_id).exists() - ): - overrides.correspondent_id = None - - if action.remove_all_document_types or ( - overrides.document_type_id - and action.remove_document_types.filter(pk=overrides.document_type_id).exists() - ): - overrides.document_type_id = None - - if action.remove_all_storage_paths or ( - overrides.storage_path_id - and action.remove_storage_paths.filter(pk=overrides.storage_path_id).exists() - ): - overrides.storage_path_id = None - - if action.remove_all_owners or ( - overrides.owner_id - and action.remove_owners.filter(pk=overrides.owner_id).exists() - ): - overrides.owner_id = None - - if action.remove_all_permissions: - overrides.view_users = None - overrides.view_groups = None - overrides.change_users = None - overrides.change_groups = None - elif any( - [ - action.remove_view_users.exists(), - action.remove_view_groups.exists(), - action.remove_change_users.exists(), - action.remove_change_groups.exists(), - ], - ): - if overrides.view_users: - for user in action.remove_view_users.filter(pk__in=overrides.view_users): - overrides.view_users.remove(user.pk) - if overrides.change_users: - for user in action.remove_change_users.filter( - pk__in=overrides.change_users, - ): - overrides.change_users.remove(user.pk) - if overrides.view_groups: - for group in action.remove_view_groups.filter(pk__in=overrides.view_groups): - overrides.view_groups.remove(group.pk) - if overrides.change_groups: - for group in action.remove_change_groups.filter( - pk__in=overrides.change_groups, - ): - overrides.change_groups.remove(group.pk) - - if action.remove_all_custom_fields: - overrides.custom_fields = None - elif action.remove_custom_fields.exists() and overrides.custom_fields: - for field in action.remove_custom_fields.filter( - pk__in=overrides.custom_fields.keys(), - ): - overrides.custom_fields.pop(field.pk, None) - - -def build_workflow_action_context( - document: Document | ConsumableDocument, - overrides: DocumentMetadataOverrides | None, - *, - use_overrides: bool = False, -) -> dict: - """ - Build context dictionary for workflow action placeholder parsing. - """ - if not use_overrides: - return { - "title": document.title, - "doc_url": f"{settings.PAPERLESS_URL}{settings.BASE_URL}documents/{document.pk}/", - "correspondent": document.correspondent.name - if document.correspondent - else "", - "document_type": document.document_type.name - if document.document_type - else "", - "owner_username": document.owner.username if document.owner else "", - "filename": document.original_filename or "", - "current_filename": document.filename or "", - "added": timezone.localtime(document.added), - "created": document.created, - } - - correspondent_obj = ( - Correspondent.objects.filter(pk=overrides.correspondent_id).first() - if overrides and overrides.correspondent_id - else None - ) - document_type_obj = ( - DocumentType.objects.filter(pk=overrides.document_type_id).first() - if overrides and overrides.document_type_id - else None - ) - owner_obj = ( - User.objects.filter(pk=overrides.owner_id).first() - if overrides and overrides.owner_id - else None - ) - - filename = document.original_file if document.original_file else "" - return { - "title": overrides.title - if overrides and overrides.title - else str(document.original_file), - "doc_url": "", - "correspondent": correspondent_obj.name if correspondent_obj else "", - "document_type": document_type_obj.name if document_type_obj else "", - "owner_username": owner_obj.username if owner_obj else "", - "filename": filename, - "current_filename": filename, - "added": timezone.localtime(timezone.now()), - "created": overrides.created if overrides else None, - } - - -def execute_email_action( - action: WorkflowAction, - document: Document | ConsumableDocument, - context: dict, - logging_group, - original_file: Path, - trigger_type: WorkflowTrigger.WorkflowTriggerType, -) -> None: - """ - Execute an email action for a workflow. - """ - - subject = ( - parse_w_workflow_placeholders( - action.email.subject, - context["correspondent"], - context["document_type"], - context["owner_username"], - context["added"], - context["filename"], - context["current_filename"], - context["created"], - context["title"], - context["doc_url"], - ) - if action.email.subject - else "" - ) - body = ( - parse_w_workflow_placeholders( - action.email.body, - context["correspondent"], - context["document_type"], - context["owner_username"], - context["added"], - context["filename"], - context["current_filename"], - context["created"], - context["title"], - context["doc_url"], - ) - if action.email.body - else "" - ) - - try: - attachments: list[EmailAttachment] = [] - if action.email.include_document: - attachment: EmailAttachment | None = None - if trigger_type in [ - WorkflowTrigger.WorkflowTriggerType.DOCUMENT_UPDATED, - WorkflowTrigger.WorkflowTriggerType.SCHEDULED, - ] and isinstance(document, Document): - friendly_name = ( - Path(context["current_filename"]).name - if context["current_filename"] - else document.source_path.name - ) - attachment = EmailAttachment( - path=document.source_path, - mime_type=document.mime_type, - friendly_name=friendly_name, - ) - elif original_file: - friendly_name = ( - Path(context["current_filename"]).name - if context["current_filename"] - else original_file.name - ) - attachment = EmailAttachment( - path=original_file, - mime_type=document.mime_type, - friendly_name=friendly_name, - ) - if attachment: - attachments = [attachment] - - n_messages = send_email( - subject=subject, - body=body, - to=action.email.to.split(","), - attachments=attachments, - ) - logger.debug( - f"Sent {n_messages} notification email(s) to {action.email.to}", - extra={"group": logging_group}, - ) - except Exception as e: - logger.exception( - f"Error occurred sending notification email: {e}", - extra={"group": logging_group}, - ) - - -def execute_webhook_action( - action: WorkflowAction, - document: Document | ConsumableDocument, - context: dict, - logging_group, - original_file: Path, -): - try: - data = {} - if action.webhook.use_params: - if action.webhook.params: - try: - for key, value in action.webhook.params.items(): - data[key] = parse_w_workflow_placeholders( - value, - context["correspondent"], - context["document_type"], - context["owner_username"], - context["added"], - context["filename"], - context["current_filename"], - context["created"], - context["title"], - context["doc_url"], - ) - except Exception as e: - logger.error( - f"Error occurred parsing webhook params: {e}", - extra={"group": logging_group}, - ) - elif action.webhook.body: - data = parse_w_workflow_placeholders( - action.webhook.body, - context["correspondent"], - context["document_type"], - context["owner_username"], - context["added"], - context["filename"], - context["current_filename"], - context["created"], - context["title"], - context["doc_url"], - ) - headers = {} - if action.webhook.headers: - try: - headers = {str(k): str(v) for k, v in action.webhook.headers.items()} - except Exception as e: - logger.error( - f"Error occurred parsing webhook headers: {e}", - extra={"group": logging_group}, - ) - files = None - if action.webhook.include_document: - with original_file.open("rb") as f: - files = { - "file": ( - str(context["filename"]) - if context["filename"] - else original_file.name, - f.read(), - document.mime_type, - ), - } - send_webhook.delay( - url=action.webhook.url, - data=data, - headers=headers, - files=files, - as_json=action.webhook.as_json, - ) - logger.debug( - f"Webhook to {action.webhook.url} queued", - extra={"group": logging_group}, - ) - except Exception as e: - logger.exception( - f"Error occurred sending webhook: {e}", - extra={"group": logging_group}, - ) - - def _is_public_ip(ip: str) -> bool: try: obj = ipaddress.ip_address(ip)