Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import spaces | |
import trimesh | |
import traceback | |
import numpy as np | |
import gradio as gr | |
from functools import partial | |
from multiprocessing import Process, Queue | |
import torch | |
from torch import nn | |
from transformers import ( | |
AutoTokenizer, Qwen2ForCausalLM, Qwen2Model, PreTrainedModel) | |
from transformers.modeling_outputs import CausalLMOutputWithPast | |
class FourierPointEncoder(nn.Module): | |
def __init__(self, hidden_size): | |
super().__init__() | |
frequencies = 2.0 ** torch.arange(8, dtype=torch.float32) | |
self.register_buffer('frequencies', frequencies, persistent=False) | |
self.projection = nn.Linear(54, hidden_size) | |
def forward(self, points): | |
x = points[..., :3] | |
x = (x.unsqueeze(-1) * self.frequencies).view(*x.shape[:-1], -1) | |
x = torch.cat((points[..., :3], x.sin(), x.cos()), dim=-1) | |
x = self.projection(torch.cat((x, points[..., 3:]), dim=-1)) | |
return x | |
class CADRecode(Qwen2ForCausalLM): | |
def __init__(self, config): | |
PreTrainedModel.__init__(self, config) | |
self.model = Qwen2Model(config) | |
self.vocab_size = config.vocab_size | |
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) | |
torch.set_default_dtype(torch.float32) | |
self.point_encoder = FourierPointEncoder(config.hidden_size) | |
torch.set_default_dtype(torch.bfloat16) | |
def forward(self, | |
input_ids=None, | |
attention_mask=None, | |
point_cloud=None, | |
position_ids=None, | |
past_key_values=None, | |
inputs_embeds=None, | |
labels=None, | |
use_cache=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
cache_position=None): | |
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
output_hidden_states = output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
# concatenate point and text embeddings | |
if past_key_values is None or past_key_values.get_seq_length() == 0: | |
assert inputs_embeds is None | |
inputs_embeds = self.model.embed_tokens(input_ids) | |
point_embeds = self.point_encoder(point_cloud).bfloat16() | |
inputs_embeds[attention_mask == -1] = point_embeds.reshape(-1, point_embeds.shape[2]) | |
attention_mask[attention_mask == -1] = 1 | |
input_ids = None | |
position_ids = None | |
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) | |
outputs = self.model( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
position_ids=position_ids, | |
past_key_values=past_key_values, | |
inputs_embeds=inputs_embeds, | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
cache_position=cache_position) | |
hidden_states = outputs[0] | |
logits = self.lm_head(hidden_states) | |
logits = logits.float() | |
loss = None | |
if labels is not None: | |
# Shift so that tokens < n predict n | |
shift_logits = logits[..., :-1, :].contiguous() | |
shift_labels = labels[..., 1:].contiguous() | |
# Flatten the tokens | |
loss_fct = nn.CrossEntropyLoss() | |
shift_logits = shift_logits.view(-1, self.config.vocab_size) | |
shift_labels = shift_labels.view(-1) | |
# Enable model parallelism | |
shift_labels = shift_labels.to(shift_logits.device) | |
loss = loss_fct(shift_logits, shift_labels) | |
if not return_dict: | |
output = (logits,) + outputs[1:] | |
return (loss,) + output if loss is not None else output | |
return CausalLMOutputWithPast( | |
loss=loss, | |
logits=logits, | |
past_key_values=outputs.past_key_values, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions) | |
def prepare_inputs_for_generation(self, *args, **kwargs): | |
model_inputs = super().prepare_inputs_for_generation(*args, **kwargs) | |
model_inputs['point_cloud'] = kwargs['point_cloud'] | |
return model_inputs | |
def mesh_to_point_cloud(mesh, n_points=256): | |
vertices, faces = trimesh.sample.sample_surface(mesh, n_points) | |
point_cloud = np.concatenate(( | |
np.asarray(vertices), | |
mesh.face_normals[faces] | |
), axis=1) | |
ids = np.lexsort((point_cloud[:, 0], point_cloud[:, 1], point_cloud[:, 2])) | |
point_cloud = point_cloud[ids] | |
return point_cloud | |
def py_string_to_mesh_file(py_string, mesh_path, queue): | |
try: | |
exec(py_string, globals()) | |
compound = globals()['r'].val() | |
vertices, faces = compound.tessellate(0.001, 0.1) | |
mesh = trimesh.Trimesh([(v.x, v.y, v.z) for v in vertices], faces) | |
mesh.export(mesh_path) | |
except: | |
queue.put(traceback.format_exc()) | |
def py_string_to_mesh_file_safe(py_string, mesh_path): | |
# CadQuery code predicted by LLM may be unsafe and cause memory leaks. | |
# That's why we execute it in a separace Process with timeout. | |
queue = Queue() | |
process = Process( | |
target=py_string_to_mesh_file, | |
args=(py_string, mesh_path, queue)) | |
process.start() | |
process.join(5) | |
if process.is_alive(): | |
process.terminate() | |
process.join() | |
raise gr.Error('Process is alive after 3 seconds') | |
if not queue.empty(): | |
raise gr.Error(queue.get()) | |
def run_point_cloud(in_mesh_path, seed): | |
try: | |
mesh = trimesh.load(in_mesh_path) | |
mesh.apply_translation(-(mesh.bounds[0] + mesh.bounds[1]) / 2.0) | |
mesh.apply_scale(2.0 / max(mesh.extents)) | |
np.random.seed(seed) | |
point_cloud = mesh_to_point_cloud(mesh) | |
pcd_path = '/tmp/pcd.obj' | |
trimesh.points.PointCloud(point_cloud[:, :3]).export(pcd_path) | |
return point_cloud, pcd_path | |
except: | |
raise gr.Error(traceback.format_exc()) | |
def run_cad_recode(point_cloud): | |
try: | |
input_ids = [tokenizer.pad_token_id] * len(point_cloud) + [tokenizer('<|im_start|>')['input_ids'][0]] | |
attention_mask = [-1] * len(point_cloud) + [1] | |
model = cad_recode.cuda() | |
with torch.no_grad(): | |
batch_ids = cad_recode.generate( | |
input_ids=torch.tensor(input_ids).unsqueeze(0).to(model.device), | |
attention_mask=torch.tensor(attention_mask).unsqueeze(0).to(model.device), | |
point_cloud=torch.tensor(point_cloud.astype(np.float32)).unsqueeze(0).to(model.device), | |
max_new_tokens=768, | |
pad_token_id=tokenizer.pad_token_id).cpu() | |
py_string = tokenizer.batch_decode(batch_ids)[0] | |
begin = py_string.find('<|im_start|>') + 12 | |
end = py_string.find('<|endoftext|>') | |
py_string = py_string[begin: end] | |
return py_string, py_string | |
except: | |
raise gr.Error(traceback.format_exc()) | |
def run_mesh(py_string): | |
try: | |
out_mesh_path = '/tmp/mesh.stl' | |
py_string_to_mesh_file_safe(py_string, out_mesh_path) | |
return out_mesh_path | |
except: | |
raise gr.Error(traceback.format_exc()) | |
def run(): | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
gr.Markdown('## CAD-Recode Demo\n' | |
'Upload mesh or select from examples and press Run! Mesh ⇾ 256 points ⇾ Python code by CAD-Recode ⇾ CAD model.') | |
with gr.Row(equal_height=True): | |
in_model = gr.Model3D(label='1. Input Mesh', interactive=True) | |
point_model = gr.Model3D(label='2. Sampled Point Cloud', display_mode='point_cloud', interactive=False) | |
out_model = gr.Model3D( | |
label='4. Result CAD Model', interactive=False | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
seed_slider = gr.Slider(label='Random Seed', value=42, interactive=True) | |
with gr.Row(): | |
gr.Examples( | |
examples=[ | |
['./data/49215_5368e45e_0000.stl', 42], | |
['./data/00882236.stl', 6], | |
['./data/User Library-engrenage.stl', 18], | |
['./data/00010900.stl', 42], | |
['./data/21492_8bd34fc1_0008.stl', 42], | |
['./data/00375556.stl', 96], | |
['./data/49121_adb01620_0000.stl', 42], | |
['./data/41473_c2137170_0023.stl', 42]], | |
example_labels=[ | |
'fusion360_table1', 'deepcad_star', 'cc3d_gear', 'deepcad_barrels', | |
'fusion360_gear', 'deepcad_house', 'fusion360_table2', 'fusion360_omega'], | |
inputs=[in_model, seed_slider], | |
cache_examples=False) | |
with gr.Row(): | |
run_button = gr.Button('Run') | |
with gr.Column(): | |
out_code = gr.Code(language='python', label='3. Generated Python Code', wrap_lines=True, interactive=False) | |
with gr.Column(): | |
pass | |
state = gr.State() | |
run_button.click( | |
run_point_cloud, | |
inputs=[in_model, seed_slider], | |
outputs=[state, point_model] | |
).success( | |
run_cad_recode, | |
inputs=[state], | |
outputs=[state, out_code] | |
).success( | |
run_mesh, | |
inputs=[state], | |
outputs=[out_model] | |
) | |
demo.launch(show_error=True) | |
tokenizer = AutoTokenizer.from_pretrained( | |
'Qwen/Qwen2-1.5B', | |
pad_token='<|im_end|>', | |
padding_side='left') | |
cad_recode = CADRecode.from_pretrained( | |
'filapro/cad-recode', | |
torch_dtype='auto', | |
attn_implementation='flash_attention_2').eval() | |
os.environ['TOKENIZERS_PARALLELISM'] = 'False' | |
run() |