import os
import nltk
import yaml
import pandas as pd
import streamlit as st
from txtai.embeddings import Documents, Embeddings
from txtai.pipeline import Segmentation, Summary, Tabular, Textractor, Translation
from txtai.workflow import ServiceTask, Task, UrlTask, Workflow
class Process:
@staticmethod
@st.cache(ttl=60 * 60, max_entries=3, allow_output_mutation=True, show_spinner=False)
def get(components, data):
"""
Lookup or creates a new workflow process instance
"""
process = Process(data)
with st.spinner("Building workflow...."):
process.build(components)
return process
def __init__(self, data):
"""
Create new Process
"""
self.components = {}
self.pipelines = {}
self. workflow = []
self.embeddings = None
self.documents = None
self.data = data
def build(self, components):
"""
Builds a workflow using components
"""
tasks = []
for component in components:
component = dict(component)
wtype = component.pop(type)
self.components[wtype] = component
if wtype == "embeddings":
self.embeddings = Embeddings({**component})
self.documents = Documents()
tasks.append(Task(self.documents.add, unpack=False))
elif wtype == "segmentation":
self.pipelines[wtype] = Segmentation(**self.components[wtype])
tasks.append(Task(self.pipelines[wtype]))
elif wtype == "service":
tasks.append(ServiceTask(**self.components[wtype]))
elif wtype == "summary":
self.pipelines[wtype] = Summary(component.pop("path"))
tasks.append(Task(lambda x: self.pipelines["summary"](x, **self.components["summary"])))
elif wtype == "tabular":
self.pipelines[wtype] = Tabular(**self.components[wtype])
tasks.append(Task(self.pipelines[wtype]))
elif wtype == "textractor":
self.pipelines[wtype] = Textractor(**self.components[wtype])
tasks.append(UrlTask(self.pipelines[wtype]))
elif wtype == "translation":
self.pipelines[wtype] = Translation()
tasks.append(Task(lambda x: self.pipelines["translation"](x, **self.components["translation"])))
self.workflow = Workflow(tasks)
def run(self, data):
"""
Runs a workflow using data as input
"""
if data and self.workflow:
# Builds tuples for embedding index
if self.documents:
data = [(x, element, None) for x, element in enumerate(data)]
# Process workflow
for result in self.workflow(data):
if not self.documents:
st.write(result)
# Build embedding index
if self.documents:
# Cache data
self.data = list(self.documents)
with st.spinner("Building embedding index...."):
self.embeddings.index(self.documents)
self.documents.close()
# Clear workflow
self.documents, self.pipelines, self.workflow = None, None, None
def search(self, query):
"""
Runs a search for query
"""
if self.embeddings and query:
st.markdown(
"""
""",
unsafe_allow_html=True,
)
limit = min(5, len(self.data))
results = []
for result in self.embeddings.search(query, limit):
# Tuples are returned when an index doesn't have stored content
if isinstance(result, tuple):
uid, score = result
results.append({"text": self.find(uid), "score": f"{score:.2}"})
else:
if "id" in result and "text" in result:
result["text"] = self.content(result.pop("id"), result["text"])
if "score" in result and result["score"]:
result["score"] = f'{result["score"]:.2}'
results.append(result)
df = pd.DataFrame(results)
st.write(df.to_html(escape=False), unsafe_allow_html=True)
def find(self, key):
"""
Lookup record from cached data by uid key
"""
# Lookup text by id
text = [text for uid, text, _ in self.data if uid == key][0]
return self.content(key, text)
def content(self, uid, text):
"""
Builds a content reference for uid and text
"""
if uid and uid.lower().startswith("http"):
return f"{text}"
return text
class Application:
"""
Main application
"""
def __init__(self, directory):
"""
Creates a new application
"""
# Workflow configuration directory
self.directory = directory
def default(self, names):
"""
Gets default workflow index
"""
# Gets names as lowercase to match case sensitive
lnames = [name.lower() for name in names]
# Get default workflow param
params = st.experimental_get_query_params()
index = params.get("default")
index = index[0].lower() if index else 0
# Lookup index of workflow name, add 1 to account for "--"
if index and index in lnames:
return lnames.index(index) + 1
# Workflow not found, default to index 0
return 0
def load(self, components):
"""
Load an existing workflow file
"""
with open(os.path.join(self.directory, "config.yml"), encoding="utf-8") as f:
config = yaml.safe_load(f)
names = [row["name"] for row in config]
files = [row["file"] for row in config]
selected = st.selectbox("Load workflow", ["--"] + names, self.default(names))
if selected != "--":
index = [x for x, name in enumerate(names) if name == selected][0]
with open(os.path.join(self.directory, files[index]), encoding="utf-8") as f:
workflow = yaml.safe_load(f)
st.markdown("---")
# Get tasks for first workflow
tasks = list(workflow["workflow"].values())[0]["tasks"]
selected = []
for task in tasks:
name = task.get("action", task.get("task"))
if name in components:
selected.append(name)
elif name in ["index", "upsert"]:
selected.append("embeddings")
return (selected, workflow)
return (None, None)
def state(self, key):
"""
Lookup a session state variable
"""
if key in st.session_state:
return st.session_state[key]
return None
def appsetting(self, workflow, name):
"""
Looks up an application configuration setting
"""
if workflow:
config = workflow.get("app")
if config:
return config.get(name)
return None
def setting(self, config, name, default=None):
"""
Looks up a component configuration settings
"""
return config.get(name, default) if config else default
def text(self, label, component, config, name, default=None):
"""
Create a new text input field
"""
default = self.setting(config, name, default)
if not default:
default = ""
elif isinstance(default, list):
default = ",".join(default)
elif isinstance(default, dict):
default = ",".join(default.keys())
st.caption(label)
st.code(default, language="yaml")
return default
def number(self, label, component, config, name, default=None):
"""
Creates a new numeric input field
"""
value = self.text(label, component, config, name, default)
return int(value) if value else None
def boolean(self, label, component, config, name, default=None):
"""
Creates a new checkbox field
"""
default = self.setting(config, name, default)
st.caption(label)
st.markdown(":white_check_mark:" if default else ":white_large_square:")
return default
def select(self, label, component, config, name, options, default=0):
"""
Creates a new select box field
"""
index = self.setting(config, name)
index = [x for x, option in enumerate(options) if option == default]
# Derive default index
default = index[0] if index else default
st.caption(label)
st.code(options[default], language="yaml")
return options[default]
def split(self, text):
"""
Splits text on commas and returns a list
"""
return [x.strip() for x in text.split(",")]
def options(self, component, workflow, index):
"""
Extracts component settings into a component configuration dict
"""
options = {"type": component}
config = None
if workflow:
if component in ["service", "translation"]:
tasks = list(workflow["workflow"].values())[0]["tasks"]
tasks = [task for task in tasks if task.get("task") == component or task.get("action") == component]
if tasks:
config = tasks[0]
else:
config = workflow.get(component)
if component == "embeddings":
st.markdown(f"** {index + 1}.) Embeddings Index** \n*Index workflow output*")
options["path"] = self.text("Embeddings model path", component, config, "path", "sentence-transformers/nli-mpnet-base-v2")
options["upsert"] = self.boolean("Upsert", component, config, "upsert")
options["content"] = self.boolean("Content", component, config, "content")
elif component in ("segmentation", "textractor"):
if component == "segmentation":
st.markdown(f"** {index + 1}.) Segment** \n*Split text into semantic units*")
else:
st.markdown(f"** {index + 1}.) Textract** \n*Extract text from documents")
options["sentences"] = self.boolean("Split sentences", component, config, "sentences")
options["lines"] = self.boolean("Split lines", component, config, "lines")
options["paragraphs"] = self.boolean("Split paragraphs", component, config, "paragraphs")
options["joint"] = self.boolean("Join tokenized", component, config, "join")
options["minlength"] = self.number("Min section length", component, config, "minlength")
elif component == "service":
st.markdown(f"** {index + 1}.) Service** \n*Extract data from an API*")
options["url"] = self.text("URL", component, config, "url")
options["method"] = self.select("Method", component, config, "method", ["get", "post"], 0)
options["params"] = self.text("URL parameters", component, config, "params")
options["batch"] = self.boolean("Run as batch", component, config, "batch", True)
options["extract"] = self.text("Subsection(s) to extract", component, config, "extract")
if options["params"]:
options["params"] = {key: None for key in self.split(options["params"])}
if options["extract"]:
options["extract"] = self.split(options["extract"])
elif component == "summary":
st.markdown(f"** {index + 1}.) Summary** \n*Abstractive text summarization*")
options["path"] = self.text("Model", component, config, "path", "sshleifer/distilbart-cnn-12-6")
options["minlength"] = self.number("Min length", component, config, "minlength")
options["maxlength"] = self.number("Max length", component, config, "maxlength")
elif component == "tabular":
st.markdown(f"** {index + 1}.) Tabular** \n*Split tabular data into rows and columns*")
options["idcolumn"] = self.text("Id columns", component, config, "idcolumn")
options["textcolumns"] = self.text("Text columns", component, config, "textcolumns")
options["content"] = self.text("Content", component, config, "content")
if options["textcolumns"]:
options["textcolumns"] = self.split(options["textcolumns"])
if options["content"]:
options["content"] = self.split(options["content"])
if len(options["content"]) == 1 and options["content"][0] == "1":
options["content"] = options["content"][0]
elif component == "translation":
st.markdown(f"** {index + 1}.) Translate** \n*Machine translation*")
options["target"] = self.text("Target language code", component, config, "args", "en")
st.markdown("---")
return options
def yaml(self, components):
"""
Builds yaml string for components
"""
data = {"app": {"data": self.state("data"), "query": self.state("query")}}
tasks = []
name = None
for component in components:
component = dict(component)
name = wtype = component.pop("type")
if wtype == "embeddings":
upsert = component.pop("upsert")
data[wtype] = component
data["writable"] = True
name = "index"
tasks.append({"action": "upsert" if upsert else "index"})
elif wtype == "segmentation":
data[wtype] = component
tasks.append({"action": wtype})
elif wtype == "service":
config = dict(**component)
config["task"] = wtype
tasks.append(config)
elif wtype == "summary":
data[wtype] = {"path": component.pop("path")}
tasks.append({"action": wtype})
elif wtype == "tabular":
data[wtype] = component
tasks.append({"action": wtype})
elif wtype == "textractor":
data[wtype] = component
tasks.append({"action": wtype, "tasks": "url"})
elif wtype == "translation":
data[wtype] = component
tasks.append({"action": wtype, "args": list(component.values())})
# Add in workflow
data["workflow"] = {name: {"tasks": tasks}}
return (name, yaml.dump(data))
def data(self, workflow):
"""
Gets input data
"""
# Get default data setting
data = self.appsetting(workflow, "data")
if not self.appsetting(workflow, "query"):
data = st.text_input("Input", value=data)
# Save data state
st.session_state["data"] = data
# Wrap data as list for workflow processing
return [data]
def query(self, workflow, index):
"""
Gets input query
"""
default = self.appsetting(workflow, "query")
default = default if default else ""
# Get query if this is an indexing workflow
query = st.text_input("Query", value=default) if index else None
# Save query state
st.session_state["query"] = query
return query
def process(self, workflow, components, index):
"""
Processes the current application action
"""
# Get input data and initialize query
data = self.data(workflow)
query = self.query(workflow, index)
# Get workflow process
process = Process.get(components, data if index else None)
# Run workflow process
process.run(data)
# Run search
if index:
process.search(query)
def run(self):
"""
Runs Streamlit application
"""
with st.sidebar:
st.markdown("# Workflow builder for Station \n*Build and apply workflows to data about articles* ")
st.markdown("This is a demo for Station and the data used is from [Hugging Face](https://huggingface.co/datasets/ag_news/viewer/default/train).")
st.markdown("---")
# Component configuration
components = ["embeddings", "segmentation", "service", "summary", "tabular", "textractor", "translation"]
selected, workflow = self.load(components)
if selected:
# Get selected options
components = [self.options(component, workflow, x) for x, component in enumerate(selected)]
if selected:
# Process current action
self.process(workflow, components, "embeddings" in selected)
with st.sidebar:
# Generate export button after workflow is complete
_, config = self.yaml(components)
st.download_button("Export", config, file_name="workflow.yaml", help="Export the API workflow as YAML")
else:
st.info("Selected a workflow from the sidebar")
if __name__ == "__main__":
os.environ["TOKENIZERS_PARALLELISM"] = "false"
try:
nltk.sent_tokenize("This is a test. Split")
except:
nltk.download("punkt")
# Create and run application
app = Application("workflows")
app.run()