|
import os, xml.etree.ElementTree as ET, torch, torch.nn as nn, numpy as np, logging, requests |
|
from collections import defaultdict |
|
from torch.utils.data import DataLoader, Dataset |
|
from transformers import AutoTokenizer, AutoModel |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from accelerate import Accelerator |
|
from tqdm import tqdm |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
class Config: E, H, N, C, B, M, S, V, W, L, D = 512, 32, 1024, 256, 128, 20000, 2048, 1e5, 4000, 2e-4, .15 |
|
|
|
class MyDataset(Dataset): |
|
def __init__(self, data, labels): self.data, self.labels = data, labels |
|
def __len__(self): return len(self.data) |
|
def __getitem__(self, index): return self.data[index], self.labels[index] |
|
|
|
class MyModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size): |
|
super(MyModel, self).__init__() |
|
self.hidden, self.output = nn.Linear(input_size, hidden_size), nn.Linear(hidden_size, output_size) |
|
self.lstm, self.fc = nn.LSTM(input_size, hidden_size, batch_first=True), nn.Linear(hidden_size, output_size) |
|
def forward(self, x): |
|
x = torch.relu(self.hidden(x)) |
|
h0, c0 = torch.zeros(1, x.size(0), hidden_size), torch.zeros(1, x.size(0), hidden_size) |
|
out, _ = self.lstm(x, (h0, c0)) |
|
return self.fc(out[:, -1, :]) |
|
|
|
class MemoryNetwork: |
|
def __init__(self, memory_size, embedding_size): |
|
self.memory, self.usage = np.zeros((memory_size, embedding_size)), np.zeros(memory_size) |
|
def store(self, data): |
|
index = np.argmin(self.usage) |
|
self.memory[index], self.usage[index] = data, 1.0 |
|
def retrieve(self, query): |
|
index = np.argmax(np.dot(self.memory, query)) |
|
self.usage[index] += 1.0 |
|
return self.memory[index] |
|
def update_usage(self): self.usage *= 0.99 |
|
|
|
class DM(nn.Module): |
|
def __init__(self, s): |
|
super(DM, self).__init__() |
|
self.s = nn.ModuleDict({sn: nn.ModuleList([self.cl(lp) for lp in l]) for sn, l in s.items()}) |
|
def cl(self, lp): |
|
l = [nn.Linear(lp['input_size'], lp['output_size'])] |
|
if lp.get('batch_norm', True): l.append(nn.BatchNorm1d(lp['output_size'])) |
|
a = lp.get('activation', 'relu') |
|
if a == 'relu': l.append(nn.ReLU(inplace=True)) |
|
elif a == 'tanh': l.append(nn.Tanh()) |
|
elif a == 'sigmoid': l.append(nn.Sigmoid()) |
|
elif a == 'leaky_relu': l.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) |
|
elif a == 'elu': l.append(nn.ELU(alpha=1.0, inplace=True)) |
|
if dr := lp.get('dropout', 0.0): l.append(nn.Dropout(p=dr)) |
|
return nn.Sequential(*l) |
|
def forward(self, x, sn=None): |
|
if sn: |
|
for l in self.s[sn]: x = l(x) |
|
else: |
|
for sn, l in self.s.items(): |
|
for l in l: x = l(x) |
|
return x |
|
|
|
def parse_xml(file_path): |
|
t, r, l = ET.parse(file_path), ET.parse(file_path).getroot(), [] |
|
for ly in r.findall('.//layer'): |
|
lp = {'input_size': int(ly.get('input_size', 128)), 'output_size': int(ly.get('output_size', 256)), 'activation': ly.get('activation', 'relu').lower()} |
|
l.append(lp) |
|
return l |
|
|
|
def create_model_from_folder(folder_path): |
|
s = defaultdict(list) |
|
for r, d, f in os.walk(folder_path): |
|
for file in f: |
|
if file.endswith('.xml'): |
|
s[os.path.basename(r).replace('.', '_')].extend(parse_xml(os.path.join(r, file))) |
|
return DM(dict(s)) |
|
|
|
def create_embeddings_and_sentences(folder_path, model_name="pile-of-law/legalbert-large-1.7M-1", max_length=512): |
|
t, m, embeddings, ds = AutoTokenizer.from_pretrained(model_name), AutoModel.from_pretrained(model_name), [], [] |
|
for r, d, f in os.walk(folder_path): |
|
for file in f: |
|
if file.endswith('.xml'): |
|
tree, root = ET.parse(os.path.join(r, file)), ET.parse(os.path.join(r, file)).getroot() |
|
for e in root.iter(): |
|
if e.text: |
|
text = e.text.strip() |
|
i = t(text, return_tensors="pt", truncation=True, padding=True, max_length=max_length) |
|
with torch.no_grad(): |
|
embeddings.append(m(**i).last_hidden_state.mean(dim=1).numpy()) |
|
ds.append(text) |
|
return np.vstack(embeddings), ds |
|
|
|
def query_vector_similarity(query, embeddings, ds, model_name="pile-of-law/legalbert-large-1.7M-2", max_length=512): |
|
t, m = AutoTokenizer.from_pretrained(model_name), AutoModel.from_pretrained(model_name) |
|
i = t(query, return_tensors="pt", truncation=True, padding=True, max_length=max_length) |
|
with torch.no_grad(): |
|
qe = m(**i).last_hidden_state.mean(dim=1).numpy() |
|
return [ds[i] for i in cosine_similarity(qe, embeddings)[0].argsort()[-5:][::-1]] |
|
|
|
def fetch_courtlistener_data(query): |
|
try: |
|
response = requests.get("https://nzlii.org/cgi-bin/sinosrch.cgi", params={"method": "auto", "query": query, "meta": "/nz", "results": "50", "format": "json"}, headers={"Accept": "application/json"}, timeout=10) |
|
response.raise_for_status() |
|
return [{"title": r.get("title", ""), "citation": r.get("citation", ""), "date": r.get("date", ""), "court": r.get("court", ""), "summary": r.get("summary", ""), "url": r.get("url", "")} for r in response.json().get("results", [])] |
|
except requests.exceptions.RequestException as e: |
|
logging.error(f"Failed to fetch data from NZLII API: {str(e)}") |
|
return [] |
|
|
|
def main(): |
|
folder_path, model = 'data', create_model_from_folder('Xml_Data') |
|
logging.info(f"Created dynamic PyTorch model with sections: {list(model.s.keys())}") |
|
embeddings, ds = create_embeddings_and_sentences(folder_path) |
|
accelerator, optimizer, criterion, num_epochs = Accelerator(), torch.optim.Adam(model.parameters(), lr=0.001), nn.CrossEntropyLoss(), 10 |
|
dataset, dataloader = MyDataset(torch.randn(1000, 10), torch.randint(0, 5, (1000,))), DataLoader(MyDataset(torch.randn(1000, 10), torch.randint(0, 5, (1000,))), batch_size=32, shuffle=True) |
|
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader) |
|
for epoch in range(num_epochs): |
|
model.train() |
|
for batch_data, batch_labels in dataloader: |
|
optimizer.zero_grad() |
|
outputs = model(batch_data) |
|
loss = criterion(outputs, batch_labels) |
|
accelerator.backward(loss) |
|
optimizer.step() |
|
logging.info(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}") |
|
query = "example query text" |
|
logging.info(f"Query results: {query_vector_similarity(query, embeddings, ds)}") |
|
logging.info(f"CourtListener API results: {fetch_courtlistener_data(query)}") |
|
|
|
if __name__ == "__main__": |
|
main() |