|
import pandas as pd
|
|
import streamlit as st
|
|
import torch
|
|
import os
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from transformers import BertTokenizer
|
|
import appbuilder
|
|
from transformers import BertModel
|
|
|
|
pretrained = BertModel.from_pretrained('hfl/chinese-macbert-base')
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
pretrained.to(device)
|
|
|
|
for param in pretrained.parameters():
|
|
param.requires_grad_(False)
|
|
|
|
|
|
class MultiHeadAttention(nn.Module):
|
|
def __init__(self, hidden_size, num_heads):
|
|
super(MultiHeadAttention, self).__init__()
|
|
|
|
assert hidden_size % num_heads == 0
|
|
self.hidden_size = hidden_size
|
|
self.num_heads = num_heads
|
|
self.head_dim = hidden_size // num_heads
|
|
|
|
self.linear_q = nn.Linear(hidden_size, hidden_size)
|
|
self.linear_k = nn.Linear(hidden_size, hidden_size)
|
|
self.linear_v = nn.Linear(hidden_size, hidden_size)
|
|
self.linear_out = nn.Linear(hidden_size, hidden_size)
|
|
|
|
def forward(self, x):
|
|
batch_size, seq_len, _ = x.size()
|
|
|
|
q = self.linear_q(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
|
|
k = self.linear_k(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
|
|
v = self.linear_v(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
|
|
|
|
scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float))
|
|
attn_weights = F.softmax(scores, dim=-1)
|
|
|
|
context = torch.matmul(attn_weights, v).transpose(1, 2).contiguous().view(batch_size, seq_len, self.hidden_size)
|
|
out = self.linear_out(context)
|
|
return out
|
|
|
|
class Model(nn.Module):
|
|
def __init__(self):
|
|
super(Model, self).__init__()
|
|
self.fc1 = nn.Linear(768, 512)
|
|
self.fc2 = nn.Linear(512, 256)
|
|
self.fc3 = nn.Linear(256, 2)
|
|
self.dropout = nn.Dropout(p=0.5)
|
|
self.bn1 = nn.BatchNorm1d(512)
|
|
self.bn2 = nn.BatchNorm1d(256)
|
|
self.activation = nn.ReLU()
|
|
self.multihead_attention = MultiHeadAttention(hidden_size=768, num_heads=8)
|
|
|
|
def forward(self, input_ids, attention_mask, token_type_ids):
|
|
out = pretrained(input_ids=input_ids,
|
|
attention_mask=attention_mask,
|
|
token_type_ids=token_type_ids).last_hidden_state
|
|
|
|
|
|
out = self.multihead_attention(out)
|
|
out = out[:, 0]
|
|
|
|
out = self.activation(self.bn1(self.fc1(out)))
|
|
out = self.dropout(out)
|
|
out = self.activation(self.bn2(self.fc2(out)))
|
|
out = self.dropout(out)
|
|
out = self.fc3(out)
|
|
out = out.softmax(dim=1)
|
|
return out
|
|
|
|
|
|
def load_models_and_predict(text, device):
|
|
|
|
MacBERT_base_CDialBias = torch.load('models\MacBERT-base-CDialBias.pth')
|
|
MacBERT_base_CDialBias.to(device)
|
|
MacBERT_base_COLD = torch.load('models\MacBERT-base-CDialBias.pth')
|
|
MacBERT_base_COLD.to(device)
|
|
|
|
|
|
os.environ['APPBUILDER_TOKEN'] = "bce-v3/ALTAK-n2XgeA6FS3Q5E7Jab6UwE/850b44ebec64c4cad705986ab0b5e3df4b05d407"
|
|
app_id = "df881861-9fa6-40b6-b3bd-26df5f5d4b9a"
|
|
|
|
|
|
your_agent = appbuilder.AppBuilderClient(app_id)
|
|
|
|
|
|
conversation_id = your_agent.create_conversation()
|
|
|
|
|
|
tokenizer = BertTokenizer.from_pretrained('hfl/chinese-macbert-base')
|
|
|
|
|
|
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
|
|
|
|
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
|
|
|
|
MacBERT_base_CDialBias.eval()
|
|
MacBERT_base_COLD.eval()
|
|
|
|
|
|
msg = your_agent.run(conversation_id, text)
|
|
answer = msg.content.answer
|
|
|
|
|
|
with torch.no_grad():
|
|
out1 = MacBERT_base_CDialBias(**inputs)
|
|
with torch.no_grad():
|
|
out2 = MacBERT_base_COLD(**inputs)
|
|
|
|
out1 = torch.argmax(out1, dim=1).item()
|
|
out2 = torch.argmax(out2, dim=1).item()
|
|
out3 = answer[0]
|
|
|
|
|
|
if out3 == "1":
|
|
if out1 == out2 == out3 == 1:
|
|
result = "这句话具有攻击性和社会偏见"
|
|
elif out1 == 0 and out2 == 1:
|
|
result = "这句话具有攻击性,但无社会偏见"
|
|
elif out1 == 1 and out2 == 0:
|
|
result = "这句话不具有攻击性,但具有社会偏见"
|
|
else:
|
|
result = "这句话具有攻击性"
|
|
elif out3 == "0":
|
|
if out1 == out2 == out3 == 0:
|
|
result = "这句话不具有攻击性和社会偏见"
|
|
elif out1 == 0 and out2 == 1:
|
|
result = "这句话具有攻击性,但无社会偏见"
|
|
elif out1 == 1 and out2 == 0:
|
|
result = "这句话不具有攻击性,但具有社会偏见"
|
|
else:
|
|
result = "这句话不具有攻击性"
|
|
return result
|
|
|
|
|
|
st.set_page_config(page_title="文件式文本检测工具")
|
|
st.title("批量检测攻击性和偏见")
|
|
|
|
with st.sidebar:
|
|
|
|
if 'logged_in' not in st.session_state:
|
|
st.session_state.logged_in = False
|
|
|
|
|
|
username = st.sidebar.text_input('用户名')
|
|
password = st.sidebar.text_input('密码', type='password')
|
|
|
|
|
|
if st.sidebar.button('登录'):
|
|
|
|
if username == 'admin' and password == '12345':
|
|
st.session_state.logged_in = True
|
|
st.sidebar.success('登录成功!')
|
|
|
|
|
|
else:
|
|
st.error('用户名或密码错误,请重试。')
|
|
st.stop()
|
|
st.divider()
|
|
|
|
|
|
file = st.file_uploader("上传你的CSV文件", type=["csv"])
|
|
|
|
if file is not None:
|
|
|
|
df = pd.read_csv(file)
|
|
st.dataframe(df)
|
|
|
|
|
|
column = st.text_input("请输入需要判断的内容的列名")
|
|
|
|
|
|
save_results = st.checkbox("保存结果为CSV文件")
|
|
|
|
if st.button("开始检测") and st.session_state.logged_in == True:
|
|
if column not in df.columns:
|
|
st.error(f"列名 '{column}' 不存在于数据集中,请检查并重新输入。")
|
|
else:
|
|
|
|
results_df = pd.DataFrame(columns=['检测文本', '检测结果'])
|
|
|
|
|
|
progress_bar = st.progress(0)
|
|
|
|
|
|
stop_flag = False
|
|
|
|
|
|
stop_button = st.button("停止检测")
|
|
|
|
for i, (index, row) in enumerate(df.iterrows()):
|
|
|
|
|
|
if stop_button:
|
|
stop_flag = True
|
|
break
|
|
|
|
text = row[column]
|
|
|
|
|
|
with st.spinner("AI正在思考中,请稍等..."):
|
|
result = load_models_and_predict(text, device)
|
|
|
|
|
|
results_df.loc[i] = [text, result]
|
|
r = results_df.loc[i]
|
|
|
|
|
|
st.dataframe(r)
|
|
|
|
st.divider()
|
|
|
|
|
|
progress_bar.progress((i + 1) / len(df))
|
|
|
|
|
|
progress_bar.empty()
|
|
|
|
|
|
if stop_flag:
|
|
st.warning("检测已停止。")
|
|
else:
|
|
st.success("所有文本已检测完成!")
|
|
|
|
|
|
if (save_results and not stop_flag) or st.button("保存结果为CSV文件"):
|
|
|
|
csv_result = results_df.to_csv(index=False)
|
|
st.download_button(
|
|
label="下载结果",
|
|
data=csv_result,
|
|
file_name='results.csv',
|
|
mime='text/csv'
|
|
)
|
|
elif st.button("开始检测") and st.session_state.logged_in == False:
|
|
st.error("请先登录!")
|
|
st.stop()
|
|
|