import os import spaces import trimesh import traceback import numpy as np import gradio as gr from functools import partial from multiprocessing import Process, Queue import torch from torch import nn from transformers import ( AutoTokenizer, Qwen2ForCausalLM, Qwen2Model, PreTrainedModel) from transformers.modeling_outputs import CausalLMOutputWithPast class FourierPointEncoder(nn.Module): def __init__(self, hidden_size): super().__init__() frequencies = 2.0 ** torch.arange(8, dtype=torch.float32) self.register_buffer('frequencies', frequencies, persistent=False) self.projection = nn.Linear(54, hidden_size) def forward(self, points): x = points[..., :3] x = (x.unsqueeze(-1) * self.frequencies).view(*x.shape[:-1], -1) x = torch.cat((points[..., :3], x.sin(), x.cos()), dim=-1) x = self.projection(torch.cat((x, points[..., 3:]), dim=-1)) return x class CADRecode(Qwen2ForCausalLM): def __init__(self, config): PreTrainedModel.__init__(self, config) self.model = Qwen2Model(config) self.vocab_size = config.vocab_size self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) torch.set_default_dtype(torch.float32) self.point_encoder = FourierPointEncoder(config.hidden_size) torch.set_default_dtype(torch.bfloat16) def forward(self, input_ids=None, attention_mask=None, point_cloud=None, position_ids=None, past_key_values=None, inputs_embeds=None, labels=None, use_cache=None, output_attentions=None, output_hidden_states=None, return_dict=None, cache_position=None): output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states return_dict = return_dict if return_dict is not None else self.config.use_return_dict # concatenate point and text embeddings if past_key_values is None or past_key_values.get_seq_length() == 0: assert inputs_embeds is None inputs_embeds = self.model.embed_tokens(input_ids) point_embeds = self.point_encoder(point_cloud).bfloat16() inputs_embeds[attention_mask == -1] = point_embeds.reshape(-1, point_embeds.shape[2]) attention_mask[attention_mask == -1] = 1 input_ids = None position_ids = None # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, cache_position=cache_position) hidden_states = outputs[0] logits = self.lm_head(hidden_states) logits = logits.float() loss = None if labels is not None: # Shift so that tokens < n predict n shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten the tokens loss_fct = nn.CrossEntropyLoss() shift_logits = shift_logits.view(-1, self.config.vocab_size) shift_labels = shift_labels.view(-1) # Enable model parallelism shift_labels = shift_labels.to(shift_logits.device) loss = loss_fct(shift_logits, shift_labels) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return CausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions) def prepare_inputs_for_generation(self, *args, **kwargs): model_inputs = super().prepare_inputs_for_generation(*args, **kwargs) model_inputs['point_cloud'] = kwargs['point_cloud'] return model_inputs def mesh_to_point_cloud(mesh, n_points=256): vertices, faces = trimesh.sample.sample_surface(mesh, n_points) point_cloud = np.concatenate(( np.asarray(vertices), mesh.face_normals[faces] ), axis=1) ids = np.lexsort((point_cloud[:, 0], point_cloud[:, 1], point_cloud[:, 2])) point_cloud = point_cloud[ids] return point_cloud def py_string_to_mesh_file(py_string, mesh_path, queue): try: exec(py_string, globals()) compound = globals()['r'].val() vertices, faces = compound.tessellate(0.001, 0.1) mesh = trimesh.Trimesh([(v.x, v.y, v.z) for v in vertices], faces) mesh.export(mesh_path) except: queue.put(traceback.format_exc()) def py_string_to_mesh_file_safe(py_string, mesh_path): # CadQuery code predicted by LLM may be unsafe and cause memory leaks. # That's why we execute it in a separace Process with timeout. queue = Queue() process = Process( target=py_string_to_mesh_file, args=(py_string, mesh_path, queue)) process.start() process.join(5) if process.is_alive(): process.terminate() process.join() raise gr.Error('Process is alive after 3 seconds') if not queue.empty(): raise gr.Error(queue.get()) def run_point_cloud(in_mesh_path, seed): try: mesh = trimesh.load(in_mesh_path) mesh.apply_translation(-(mesh.bounds[0] + mesh.bounds[1]) / 2.0) mesh.apply_scale(2.0 / max(mesh.extents)) np.random.seed(seed) point_cloud = mesh_to_point_cloud(mesh) pcd_path = '/tmp/pcd.obj' trimesh.points.PointCloud(point_cloud[:, :3]).export(pcd_path) return point_cloud, pcd_path except: raise gr.Error(traceback.format_exc()) @spaces.GPU(duration=20) def run_cad_recode(point_cloud): try: input_ids = [tokenizer.pad_token_id] * len(point_cloud) + [tokenizer('<|im_start|>')['input_ids'][0]] attention_mask = [-1] * len(point_cloud) + [1] model = cad_recode.cuda() with torch.no_grad(): batch_ids = cad_recode.generate( input_ids=torch.tensor(input_ids).unsqueeze(0).to(model.device), attention_mask=torch.tensor(attention_mask).unsqueeze(0).to(model.device), point_cloud=torch.tensor(point_cloud.astype(np.float32)).unsqueeze(0).to(model.device), max_new_tokens=768, pad_token_id=tokenizer.pad_token_id).cpu() py_string = tokenizer.batch_decode(batch_ids)[0] begin = py_string.find('<|im_start|>') + 12 end = py_string.find('<|endoftext|>') py_string = py_string[begin: end] return py_string, py_string except: raise gr.Error(traceback.format_exc()) def run_mesh(py_string): try: out_mesh_path = '/tmp/mesh.stl' py_string_to_mesh_file_safe(py_string, out_mesh_path) return out_mesh_path except: raise gr.Error(traceback.format_exc()) def run(): with gr.Blocks() as demo: with gr.Row(): gr.Markdown('## CAD-Recode Demo\n' 'Upload mesh or select from examples and press Run! Mesh ⇾ 256 points ⇾ Python code by CAD-Recode ⇾ CAD model.') with gr.Row(equal_height=True): in_model = gr.Model3D(label='1. Input Mesh', interactive=True) point_model = gr.Model3D(label='2. Sampled Point Cloud', display_mode='point_cloud', interactive=False) out_model = gr.Model3D( label='4. Result CAD Model', interactive=False ) with gr.Row(): with gr.Column(): with gr.Row(): seed_slider = gr.Slider(label='Random Seed', value=42, interactive=True) with gr.Row(): gr.Examples( examples=[ ['./data/49215_5368e45e_0000.stl', 42], ['./data/00882236.stl', 6], ['./data/User Library-engrenage.stl', 18], ['./data/00010900.stl', 42], ['./data/21492_8bd34fc1_0008.stl', 42], ['./data/00375556.stl', 96], ['./data/49121_adb01620_0000.stl', 42], ['./data/41473_c2137170_0023.stl', 42]], example_labels=[ 'fusion360_table1', 'deepcad_star', 'cc3d_gear', 'deepcad_barrels', 'fusion360_gear', 'deepcad_house', 'fusion360_table2', 'fusion360_omega'], inputs=[in_model, seed_slider], cache_examples=False) with gr.Row(): run_button = gr.Button('Run') with gr.Column(): out_code = gr.Code(language='python', label='3. Generated Python Code', wrap_lines=True, interactive=False) with gr.Column(): pass state = gr.State() run_button.click( run_point_cloud, inputs=[in_model, seed_slider], outputs=[state, point_model] ).success( run_cad_recode, inputs=[state], outputs=[state, out_code] ).success( run_mesh, inputs=[state], outputs=[out_model] ) demo.launch(show_error=True) tokenizer = AutoTokenizer.from_pretrained( 'Qwen/Qwen2-1.5B', pad_token='<|im_end|>', padding_side='left') cad_recode = CADRecode.from_pretrained( 'filapro/cad-recode', torch_dtype='auto', attn_implementation='flash_attention_2').eval() os.environ['TOKENIZERS_PARALLELISM'] = 'False' run()