Spaces:
Build error
Build error
import logging | |
import threading | |
import time | |
from typing import Any, Optional | |
from flask import Flask, current_app | |
from pydantic import BaseModel, ConfigDict | |
from configs import dify_config | |
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom | |
from core.app.entities.queue_entities import QueueMessageReplaceEvent | |
from core.moderation.base import ModerationAction, ModerationOutputsResult | |
from core.moderation.factory import ModerationFactory | |
logger = logging.getLogger(__name__) | |
class ModerationRule(BaseModel): | |
type: str | |
config: dict[str, Any] | |
class OutputModeration(BaseModel): | |
tenant_id: str | |
app_id: str | |
rule: ModerationRule | |
queue_manager: AppQueueManager | |
thread: Optional[threading.Thread] = None | |
thread_running: bool = True | |
buffer: str = "" | |
is_final_chunk: bool = False | |
final_output: Optional[str] = None | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
def should_direct_output(self) -> bool: | |
return self.final_output is not None | |
def get_final_output(self) -> str: | |
return self.final_output or "" | |
def append_new_token(self, token: str) -> None: | |
self.buffer += token | |
if not self.thread: | |
self.thread = self.start_thread() | |
def moderation_completion(self, completion: str, public_event: bool = False) -> str: | |
self.buffer = completion | |
self.is_final_chunk = True | |
result = self.moderation(tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=completion) | |
if not result or not result.flagged: | |
return completion | |
if result.action == ModerationAction.DIRECT_OUTPUT: | |
final_output = result.preset_response | |
else: | |
final_output = result.text | |
if public_event: | |
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) | |
return final_output | |
def start_thread(self) -> threading.Thread: | |
buffer_size = dify_config.MODERATION_BUFFER_SIZE | |
thread = threading.Thread( | |
target=self.worker, | |
kwargs={ | |
"flask_app": current_app._get_current_object(), | |
"buffer_size": buffer_size if buffer_size > 0 else dify_config.MODERATION_BUFFER_SIZE, | |
}, | |
) | |
thread.start() | |
return thread | |
def stop_thread(self): | |
if self.thread and self.thread.is_alive(): | |
self.thread_running = False | |
def worker(self, flask_app: Flask, buffer_size: int): | |
with flask_app.app_context(): | |
current_length = 0 | |
while self.thread_running: | |
moderation_buffer = self.buffer | |
buffer_length = len(moderation_buffer) | |
if not self.is_final_chunk: | |
chunk_length = buffer_length - current_length | |
if 0 <= chunk_length < buffer_size: | |
time.sleep(1) | |
continue | |
current_length = buffer_length | |
result = self.moderation( | |
tenant_id=self.tenant_id, app_id=self.app_id, moderation_buffer=moderation_buffer | |
) | |
if not result or not result.flagged: | |
continue | |
if result.action == ModerationAction.DIRECT_OUTPUT: | |
final_output = result.preset_response | |
self.final_output = final_output | |
else: | |
final_output = result.text + self.buffer[len(moderation_buffer) :] | |
# trigger replace event | |
if self.thread_running: | |
self.queue_manager.publish(QueueMessageReplaceEvent(text=final_output), PublishFrom.TASK_PIPELINE) | |
if result.action == ModerationAction.DIRECT_OUTPUT: | |
break | |
def moderation(self, tenant_id: str, app_id: str, moderation_buffer: str) -> Optional[ModerationOutputsResult]: | |
try: | |
moderation_factory = ModerationFactory( | |
name=self.rule.type, app_id=app_id, tenant_id=tenant_id, config=self.rule.config | |
) | |
result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer) | |
return result | |
except Exception as e: | |
logger.error("Moderation Output error: %s", e) | |
return None | |