Workflow-Engine / api /core /moderation /output_moderation.py
Severian's picture
initial commit
a8b3f00
import logging
import threading
import time
from typing import Any, Optional
from flask import Flask, current_app
from pydantic import BaseModel, ConfigDict
from configs import dify_config
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.entities.queue_entities import QueueMessageReplaceEvent
from core.moderation.base import ModerationAction, ModerationOutputsResult
from core.moderation.factory import ModerationFactory
logger = logging.getLogger(__name__)
class ModerationRule(BaseModel):
type: str
config: dict[str, Any]
class OutputModeration(BaseModel):
tenant_id: str
app_id: str
rule: ModerationRule
queue_manager: AppQueueManager
thread: Optional[threading.Thread] = None
thread_running: bool = True
buffer: str = ""
is_final_chunk: bool = False
final_output: Optional[str] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
def should_direct_output(self) -> bool:
return self.final_output is not None
def get_final_output(self) -> str:
return self.final_output or ""
def append_new_token(self, token: str) -> None:
self.buffer += token
if not self.thread:
self.thread = self.start_thread()
def moderation_completion(self, completion: str, public_event: bool = False) -> str:
self.buffer = completion
self.is_final_chunk = True
result = self.moderation(tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=completion)
if not result or not result.flagged:
return completion
if result.action == ModerationAction.DIRECT_OUTPUT:
final_output = result.preset_response
else:
final_output = result.text
if public_event:
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE)
return final_output
def start_thread(self) -> threading.Thread:
buffer_size = dify_config.MODERATION_BUFFER_SIZE
thread = threading.Thread(
target=self.worker,
kwargs={
"flask_app": current_app._get_current_object(),
"buffer_size": buffer_size if buffer_size > 0 else dify_config.MODERATION_BUFFER_SIZE,
},
)
thread.start()
return thread
def stop_thread(self):
if self.thread and self.thread.is_alive():
self.thread_running = False
def worker(self, flask_app: Flask, buffer_size: int):
with flask_app.app_context():
current_length = 0
while self.thread_running:
moderation_buffer = self.buffer
buffer_length = len(moderation_buffer)
if not self.is_final_chunk:
chunk_length = buffer_length - current_length
if 0 <= chunk_length < buffer_size:
time.sleep(1)
continue
current_length = buffer_length
result = self.moderation(
tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=moderation_buffer
)
if not result or not result.flagged:
continue
if result.action == ModerationAction.DIRECT_OUTPUT:
final_output = result.preset_response
self.final_output = final_output
else:
final_output = result.text + self.buffer[len(moderation_buffer) :]
# trigger replace event
if self.thread_running:
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE)
if result.action == ModerationAction.DIRECT_OUTPUT:
break
def moderation(self, tenant_id: str, app_id: str, moderation_buffer: str) -> Optional[ModerationOutputsResult]:
try:
moderation_factory = ModerationFactory(
name=self.rule.type, app_id=app_id, tenant_id=tenant_id, config=self.rule.config
)
result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer)
return result
except Exception as e:
logger.error("Moderation Output error: %s", e)
return None