File size: 3,642 Bytes
a8b3f00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from typing import Optional, Union

from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
from models.model import App, EndUser
from models.web import PinnedConversation
from services.conversation_service import ConversationService


class WebConversationService:
    @classmethod
    def pagination_by_last_id(
        cls,
        app_model: App,
        user: Optional[Union[Account, EndUser]],
        last_id: Optional[str],
        limit: int,
        invoke_from: InvokeFrom,
        pinned: Optional[bool] = None,
        sort_by="-updated_at",
    ) -> InfiniteScrollPagination:
        include_ids = None
        exclude_ids = None
        if pinned is not None:
            pinned_conversations = (
                db.session.query(PinnedConversation)
                .filter(
                    PinnedConversation.app_id == app_model.id,
                    PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
                    PinnedConversation.created_by == user.id,
                )
                .order_by(PinnedConversation.created_at.desc())
                .all()
            )
            pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
            if pinned:
                include_ids = pinned_conversation_ids
            else:
                exclude_ids = pinned_conversation_ids

        return ConversationService.pagination_by_last_id(
            app_model=app_model,
            user=user,
            last_id=last_id,
            limit=limit,
            invoke_from=invoke_from,
            include_ids=include_ids,
            exclude_ids=exclude_ids,
            sort_by=sort_by,
        )

    @classmethod
    def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        pinned_conversation = (
            db.session.query(PinnedConversation)
            .filter(
                PinnedConversation.app_id == app_model.id,
                PinnedConversation.conversation_id == conversation_id,
                PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
                PinnedConversation.created_by == user.id,
            )
            .first()
        )

        if pinned_conversation:
            return

        conversation = ConversationService.get_conversation(
            app_model=app_model, conversation_id=conversation_id, user=user
        )

        pinned_conversation = PinnedConversation(
            app_id=app_model.id,
            conversation_id=conversation.id,
            created_by_role="account" if isinstance(user, Account) else "end_user",
            created_by=user.id,
        )

        db.session.add(pinned_conversation)
        db.session.commit()

    @classmethod
    def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        pinned_conversation = (
            db.session.query(PinnedConversation)
            .filter(
                PinnedConversation.app_id == app_model.id,
                PinnedConversation.conversation_id == conversation_id,
                PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
                PinnedConversation.created_by == user.id,
            )
            .first()
        )

        if not pinned_conversation:
            return

        db.session.delete(pinned_conversation)
        db.session.commit()