File size: 11,202 Bytes
89cbc4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#####################################################
### DOCUMENT PROCESSOR [Metadata Adders]
#####################################################
### Jonathan Wang

# ABOUT:
# This creates an app to chat with PDFs.

# This is the Metadata Adders
# Which are classes that add metadata fields to documents.
# This often is used for summaries or keywords.
#####################################################
### TODO Board:
# Seems like this overlaps well with the `metadata extractors` interface from llama_index.
# These are TransformComponents which take a Sequence of Nodes as input, and returns a list of Dicts as output (with the dicts storing metdata for each node).
# We should add a wrapper which adds this metadata to nodes.
# We should also add a wrapper

# https://github.com/run-llama/llama_index/blob/be3bd619ec114d26cf328d12117c033762695b3f/llama-index-core/llama_index/core/extractors/interface.py#L21
# https://github.com/run-llama/llama_index/blob/be3bd619ec114d26cf328d12117c033762695b3f/llama-index-core/llama_index/core/extractors/metadata_extractors.py#L332

#####################################################
### PROGRAM SETTINGS


#####################################################
### PROGRAM IMPORTS
from __future__ import annotations

import logging
import re
from abc import abstractmethod
from typing import Any, List, Optional, TypeVar, Sequence

from llama_index.core.bridge.pydantic import Field, PrivateAttr
from llama_index.core.schema import BaseNode, TransformComponent

# Own modules


#####################################################
### CONSTANTS
# ah how beautiful the regex
# handy visualizer and checker: https://www.debuggex.com/, https://www.regexpr.com/
logger = logging.getLogger(__name__)
GenericNode = TypeVar("GenericNode", bound=BaseNode)

DATE_REGEX = re.compile(r"(?:(?<!\:)(?<!\:\d)[0-3]?\d(?:st|nd|rd|th)?\s+(?:of\s+)?(?:jan\.?|january|feb\.?|february|mar\.?|march|apr\.?|april|may|jun\.?|june|jul\.?|july|aug\.?|august|sep\.?|september|oct\.?|october|nov\.?|november|dec\.?|december)|(?:jan\.?|january|feb\.?|february|mar\.?|march|apr\.?|april|may|jun\.?|june|jul\.?|july|aug\.?|august|sep\.?|september|oct\.?|october|nov\.?|november|dec\.?|december)\s+(?<!\:)(?<!\:\d)[0-3]?\d(?:st|nd|rd|th)?)(?:\,)?\s*(?:\d{4})?|[0-3]?\d[-\./][0-3]?\d[-\./]\d{2,4}", re.IGNORECASE)
TIME_REGEX = re.compile(r"\d{1,2}:\d{2} ?(?:[ap]\.?m\.?)?|\d[ap]\.?m\.?", re.IGNORECASE)
EMAIL_REGEX = re.compile(r"([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)")
PHONE_REGEX = re.compile(r"((?:(?<![\d-])(?:\+?\d{1,3}[-.\s*]?)?(?:\(?\d{3}\)?[-.\s*]?)?\d{3}[-.\s*]?\d{4}(?![\d-]))|(?:(?<![\d-])(?:(?:\(\+?\d{2}\))|(?:\+?\d{2}))\s*\d{2}\s*\d{3}\s*\d{4}(?![\d-])))")
MAIL_ADDR_REGEX = re.compile(r"\d{1,4}.{1,10}[\w\s]{1,20}[\s]+(?:street|st|avenue|ave|road|rd|highway|hwy|square|sq|trail|trl|drive|dr|court|ct|parkway|pkwy|circle|cir|boulevard|blvd)\W?(?=\s|$)", re.IGNORECASE)

# DEFAULT_NUM_WORKERS = os.cpu_count() - 1 if os.cpu_count() else 1  # type: ignore


#####################################################
### SCRIPT

class MetadataAdder(TransformComponent):
    """Adds metadata to a node.

    Args:
        metadata_name: The name of the metadata to add to the node. Defaults to 'metadata'.
        # num_workers: The number of workers to use for parallel processing. By default, use all available cores minus one. currently WIP.
    """

    metadata_name: str = Field(
        default="metadata",
        description="The name of the metadata field to add to the document. Defaults to 'metadata'.",
    )
    # num_workers: int = Field(
    #     default=DEFAULT_NUM_WORKERS,
    #     description="The number of workers to use for parallel processing. By default, use all available cores minus one.",
    # )

    def __init__(
        self, metadata_name: str = "metadata", **kwargs: Any
    ) -> None:
        super().__init__(**kwargs)
        self.metadata_name = metadata_name
        # self.num_workers = num_workers

    @classmethod
    def class_name(cls) -> str:
        return "MetadataAdder"

    @abstractmethod
    def get_node_metadata(self, node: BaseNode) -> str | None:
        """Given a node, get the metadata for the node."""

    def add_node_metadata(self, node: GenericNode, metadata_value: Any | None) -> GenericNode:
        """Given a node and the metadata, add the metadata to the node's `metadata_name` field."""
        if (metadata_value is None):
            return node
        else:
            node.metadata[self.metadata_name] = metadata_value
        return node

    def process_nodes(self, nodes: list[GenericNode]) -> list[GenericNode]:
        """Process the list of nodes. This gets called by __call__.

        Args:
            nodes (List[GenericNode]): The nodes to process.

        Returns:
            List[GenericNode]: The processed nodes, with metadata field metadata_name added.
        """
        output_nodes = []
        for node in nodes:
            node_metadata = self.get_node_metadata(node)
            node_with_metadata = self.add_node_metadata(node, node_metadata)
            output_nodes.append(node_with_metadata)
        return(output_nodes)

    def __call__(self, nodes: Sequence[BaseNode], **kwargs: Any) -> list[BaseNode]:
        """Check whether nodes have the specified regex pattern."""
        return self.process_nodes(nodes)


class RegexMetadataAdder(MetadataAdder):
    """Adds regex metadata to a document.

    Args:
        regex_pattern: The regex pattern to search for.
        metadata_name: The name of the metadata to add to the document. Defaults to 'regex_metadata'.
        # num_workers: The number of workers to use for parallel processing. By default, use all available cores minus one.
    """

    _regex_pattern: re.Pattern = PrivateAttr()
    _boolean_mode: bool = PrivateAttr()
    # num_workers: int = Field(
    #     default=DEFAULT_NUM_WORKERS,
    #     description="The number of workers to use for parallel processing. By default, use all available cores minus one.",
    # )

    def __init__(
        self,
        regex_pattern: re.Pattern | str = DATE_REGEX,
        metadata_name: str = "regex_metadata",
        boolean_mode: bool = False,
        # num_workers: int = DEFAULT_NUM_WORKERS,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        if (isinstance(regex_pattern, str)):
            regex_pattern = re.compile(regex_pattern)
        # self.num_workers = num_workers
        super().__init__(metadata_name=metadata_name, **kwargs)  # ah yes i love oop :)
        self._regex_pattern=regex_pattern
        self._boolean_mode=boolean_mode

    @classmethod
    def class_name(cls) -> str:
        return "RegexMetadataAdder"

    def get_node_metadata(self, node: BaseNode) -> str | None:
        """Given a node with text, return the regex match if it exists.

        Args:
            node (BaseNode): The base node to extract from.

        Returns:
            Optional[str]: The regex match if it exists. If not, return None.
        """
        if (getattr(node, "text", None) is None):
            return None

        if (self._boolean_mode):
            return str(self._regex_pattern.match(node.text) is not None)
        else:
            return str(self._regex_pattern.findall(node.text))  # NOTE: we are saving these as a string'd list since this is easier


class ModelMetadataAdder(MetadataAdder):
    """Adds metadata to nodes based on a language model."""

    prompt_template: str = Field(
        description="The prompt to use to generate the metadata. Defaults to DEFAULT_SUMMARY_TEMPLATE.",
    )

    def __init__(
        self,
        metadata_name: str,
        prompt_template: str | None = None,
        **kwargs: Any
    ) -> None:
        """Init params."""
        super().__init__(metadata_name=metadata_name, prompt_template=prompt_template, **kwargs)

    @classmethod
    def class_name(cls) -> str:
        return "ModelMetadataAdder"

    @abstractmethod
    def get_node_metadata(self, node: BaseNode) -> str | None:
        """Given a node, get the metadata for the node.

        Args:
            node (BaseNode): The node to add metadata to.

        Returns:
            Optional[str]: The metadata if it exists. If not, return None.
        """


class UnstructuredPDFPostProcessor(TransformComponent):
    """Handles postprocessing of PDF which was read in using UnstructuredIO."""

    ### NOTE: okay technically we could have done this in the IngestionPipeline abstraction. Maybe we integrate in the future?
    # This component doesn't play nice with multi-processing due to having non-async LLMs.

    # _embed_model: Optional[BaseEmbedding] = PrivateAttr()
    _metadata_adders: list[MetadataAdder] = PrivateAttr()

    def __init__(
        self,
        # embed_model: Optional[BaseEmbedding] = None,
        metadata_adders: list[MetadataAdder] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)
        # self._embed_model = embed_model or Settings.embed_model
        self._metadata_adders = metadata_adders or []

    @classmethod
    def class_name(cls) -> str:
        return "UnstructuredPDFPostProcessor"

    # def _apply_embed_model(self, nodes: List[BaseNode]) -> List[BaseNode]:
    #     if (self._embed_model is not None):
    #         nodes = self._embed_model(nodes)
    #     return nodes

    def _apply_metadata_adders(self, nodes: list[GenericNode]) -> list[GenericNode]:
        for metadata_adder in self._metadata_adders:
            nodes = metadata_adder(nodes)
        return nodes
    
    def __call__(self, nodes: list[GenericNode], **kwargs: Any) -> Sequence[BaseNode]:
        return self._apply_metadata_adders(nodes)
        # nodes = self._apply_embed_model(nodes)  # this goes second in case we want to embed the metadata.

# def has_email(input_text: str) -> bool:
#     """
#     Given a chunk of text, determine whether it has an email address or not.

#     We're using the long complex email regex from https://emailregex.com/index.html
#     """
#     return (EMAIL_REGEX.search(input_text) is not None)


# def has_phone(input_text: str) -> bool:
#     """
#     Given a chunk of text, determine whether it has a phone number or not.
#     """
#     has_phone = PHONE_REGEX.search(input_text)
#     return (has_phone is not None)


# def has_mail_addr(input_text: str) -> bool:
#     """
#     Given a chunk of text, determine whether it has a mailing address or not.

#     NOTE: This is difficult to do with regex.
#         ... We could use spacy's English language NER model instead / as well:
#         Assume that addresses will have a GSP (geospatial political) or GPE (geopolitical entity).
#         DOCS SEE: https://www.nltk.org/book/ch07.html | https://spacy.io/usage/linguistic-features
#     """
#     has_addr = MAIL_ADDR_REGEX.search(input_text)
#     return (has_addr is not None)


# def has_date(input_text: str) -> bool:
#     """
#     Given a chunk of text, determine whether it has a date or not.
#     NOTE: relative dates are stuff like "within 30 days"
#     """
#     has_date = DATE_REGEX.search(input_text)
#     return (has_date is not None)