nicoboou commited on
Commit
a0a61c5
·
verified ·
1 Parent(s): 83abec4

Upload model

Browse files
Files changed (3) hide show
  1. config.json +5 -1
  2. config_chada_vit.py +34 -0
  3. model.py +423 -0
config.json CHANGED
@@ -2,6 +2,10 @@
2
  "architectures": [
3
  "ChAdaViTModel"
4
  ],
 
 
 
 
5
  "depth": 12,
6
  "drop_path_rate": 0.0,
7
  "drop_rate": 0.0,
@@ -17,5 +21,5 @@
17
  "patch_size": 16,
18
  "return_all_tokens": false,
19
  "torch_dtype": "float32",
20
- "transformers_version": "4.40.0.dev0"
21
  }
 
2
  "architectures": [
3
  "ChAdaViTModel"
4
  ],
5
+ "auto_map": {
6
+ "AutoConfig": "config_chada_vit.ChAdaViTConfig",
7
+ "AutoModel": "model.ChAdaViTModel"
8
+ },
9
  "depth": 12,
10
  "drop_path_rate": 0.0,
11
  "drop_rate": 0.0,
 
21
  "patch_size": 16,
22
  "return_all_tokens": false,
23
  "torch_dtype": "float32",
24
+ "transformers_version": "4.43.0"
25
  }
config_chada_vit.py ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from transformers import PretrainedConfig
2
+ from typing import List
3
+
4
+
5
+ class ChAdaViTConfig(PretrainedConfig):
6
+ model_type = "chadavit"
7
+
8
+ def __init__(
9
+ self,
10
+ img_size: List[int] = [224],
11
+ in_chans: int = 1,
12
+ embed_dim: int = 192,
13
+ patch_size: int = 16,
14
+ num_classes: int = 0,
15
+ depth: int = 12,
16
+ num_heads: int = 12,
17
+ drop_rate: float = 0.0,
18
+ drop_path_rate: float = 0.0,
19
+ return_all_tokens: bool = True,
20
+ max_number_channels: int = 10,
21
+ **kwargs,
22
+ ):
23
+ self.img_size = img_size
24
+ self.in_chans = in_chans
25
+ self.embed_dim = embed_dim
26
+ self.patch_size = patch_size
27
+ self.num_classes = num_classes
28
+ self.depth = depth
29
+ self.num_heads = num_heads
30
+ self.drop_rate = drop_rate
31
+ self.drop_path_rate = drop_path_rate
32
+ self.return_all_tokens = return_all_tokens
33
+ self.max_number_channels = max_number_channels
34
+ super().__init__(**kwargs)
model.py ADDED
@@ -0,0 +1,423 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ ChAda-ViT (i.e Channel Adaptive ViT) is a variant of ViT that can handle multi-channel images.
3
+ """
4
+
5
+ import math
6
+ from typing import Optional, Union, Callable
7
+
8
+ import torch
9
+ import torch.nn as nn
10
+ from transformers import PreTrainedModel
11
+
12
+ from torch import Tensor
13
+ import torch.nn.functional as F
14
+ from torch.nn.modules.module import Module
15
+ from torch.nn.modules.activation import MultiheadAttention
16
+ from torch.nn.modules.dropout import Dropout
17
+ from torch.nn.modules.linear import Linear
18
+ from torch.nn.modules.normalization import LayerNorm
19
+
20
+ from utils import trunc_normal_
21
+ from config_chada_vit import ChAdaViTConfig
22
+
23
+
24
+ def _get_activation_fn(activation: str) -> Callable[[Tensor], Tensor]:
25
+ if activation == "relu":
26
+ return F.relu
27
+ elif activation == "gelu":
28
+ return F.gelu
29
+
30
+ raise RuntimeError("activation should be relu/gelu, not {}".format(activation))
31
+
32
+
33
+ class TransformerEncoderLayer(Module):
34
+ r"""
35
+ Mostly copied from torch.nn.TransformerEncoderLayer, but with the following changes:
36
+ - Added the possibility to retrieve the attention weights
37
+ """
38
+
39
+ __constants__ = ["batch_first", "norm_first"]
40
+
41
+ def __init__(
42
+ self,
43
+ d_model: int,
44
+ nhead: int,
45
+ dim_feedforward: int = 2048,
46
+ dropout: float = 0.1,
47
+ activation: Union[str, Callable[[Tensor], Tensor]] = F.relu,
48
+ layer_norm_eps: float = 1e-5,
49
+ batch_first: bool = False,
50
+ norm_first: bool = False,
51
+ device=None,
52
+ dtype=None,
53
+ ) -> None:
54
+ factory_kwargs = {"device": device, "dtype": dtype}
55
+ super(TransformerEncoderLayer, self).__init__()
56
+ self.self_attn = MultiheadAttention(
57
+ embed_dim=d_model,
58
+ num_heads=nhead,
59
+ dropout=dropout,
60
+ batch_first=batch_first,
61
+ **factory_kwargs,
62
+ )
63
+ # Implementation of Feedforward model
64
+ self.linear1 = Linear(d_model, dim_feedforward, **factory_kwargs)
65
+ self.dropout = Dropout(dropout)
66
+ self.linear2 = Linear(dim_feedforward, d_model, **factory_kwargs)
67
+
68
+ self.norm_first = norm_first
69
+ self.norm1 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs)
70
+ self.norm2 = LayerNorm(d_model, eps=layer_norm_eps, **factory_kwargs)
71
+ self.dropout1 = Dropout(dropout)
72
+ self.dropout2 = Dropout(dropout)
73
+
74
+ # Legacy string support for activation function.
75
+ if isinstance(activation, str):
76
+ activation = _get_activation_fn(activation)
77
+
78
+ # We can't test self.activation in forward() in TorchScript,
79
+ # so stash some information about it instead.
80
+ if activation is F.relu:
81
+ self.activation_relu_or_gelu = 1
82
+ elif activation is F.gelu:
83
+ self.activation_relu_or_gelu = 2
84
+ else:
85
+ self.activation_relu_or_gelu = 0
86
+ self.activation = activation
87
+
88
+ def __setstate__(self, state):
89
+ super(TransformerEncoderLayer, self).__setstate__(state)
90
+ if not hasattr(self, "activation"):
91
+ self.activation = F.relu
92
+
93
+ def forward(
94
+ self,
95
+ src: Tensor,
96
+ src_mask: Optional[Tensor] = None,
97
+ src_key_padding_mask: Optional[Tensor] = None,
98
+ return_attention=False,
99
+ ) -> Tensor:
100
+ r"""Pass the input through the encoder layer.
101
+
102
+ Args:
103
+ src: the sequence to the encoder layer (required).
104
+ src_mask: the mask for the src sequence (optional).
105
+ src_key_padding_mask: the mask for the src keys per batch (optional).
106
+
107
+ Shape:
108
+ see the docs in Transformer class.
109
+ """
110
+
111
+ x = src
112
+ if self.norm_first:
113
+ attn, attn_weights = self._sa_block(
114
+ x=self.norm1(x),
115
+ attn_mask=src_mask,
116
+ key_padding_mask=src_key_padding_mask,
117
+ return_attention=return_attention,
118
+ )
119
+ if return_attention:
120
+ return attn_weights
121
+ x = x + attn
122
+ x = x + self._ff_block(self.norm2(x))
123
+ else:
124
+ attn, attn_weights = self._sa_block(
125
+ x=self.norm1(x),
126
+ attn_mask=src_mask,
127
+ key_padding_mask=src_key_padding_mask,
128
+ return_attention=return_attention,
129
+ )
130
+ if return_attention:
131
+ return attn_weights
132
+ x = self.norm1(x + attn)
133
+ x = self.norm2(x + self._ff_block(x))
134
+
135
+ return x
136
+
137
+ # self-attention block
138
+ def _sa_block(
139
+ self,
140
+ x: Tensor,
141
+ attn_mask: Optional[Tensor],
142
+ key_padding_mask: Optional[Tensor],
143
+ return_attention: bool = False,
144
+ ) -> Tensor:
145
+ x, attn_weights = self.self_attn(
146
+ x,
147
+ x,
148
+ x,
149
+ attn_mask=attn_mask,
150
+ key_padding_mask=key_padding_mask,
151
+ need_weights=return_attention,
152
+ average_attn_weights=False,
153
+ )
154
+ return self.dropout1(x), attn_weights
155
+
156
+ # feed forward block
157
+ def _ff_block(self, x: Tensor) -> Tensor:
158
+ x = self.linear2(self.dropout(self.activation(self.linear1(x))))
159
+ return self.dropout2(x)
160
+
161
+
162
+ class TokenLearner(nn.Module):
163
+ """Image to Patch Embedding"""
164
+
165
+ def __init__(self, img_size=224, patch_size=16, in_chans=1, embed_dim=768):
166
+ super().__init__()
167
+ num_patches = (img_size // patch_size) * (img_size // patch_size)
168
+ self.img_size = img_size
169
+ self.patch_size = patch_size
170
+ self.num_patches = num_patches
171
+
172
+ self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)
173
+
174
+ def forward(self, x):
175
+ x = self.proj(x)
176
+ x = x.flatten(2)
177
+ x = x.transpose(1, 2)
178
+ return x
179
+
180
+
181
+ class ChAdaViTModel(PreTrainedModel):
182
+ """Channel Adaptive Vision Transformer"""
183
+
184
+ config_class = ChAdaViTConfig
185
+
186
+ def __init__(self, config):
187
+ super().__init__(config)
188
+
189
+ # Embeddings dimension
190
+ self.num_features = self.embed_dim = config.embed_dim
191
+
192
+ # Num of maximum channels in the batch
193
+ self.max_channels = config.max_number_channels
194
+
195
+ # Tokenization module
196
+ self.token_learner = TokenLearner(
197
+ img_size=config.img_size[0],
198
+ patch_size=config.patch_size,
199
+ in_chans=config.in_chans,
200
+ embed_dim=self.embed_dim,
201
+ )
202
+ num_patches = self.token_learner.num_patches
203
+
204
+ self.cls_token = nn.Parameter(
205
+ torch.zeros(1, 1, self.embed_dim)
206
+ ) # (B, max_channels * num_tokens, embed_dim)
207
+ self.channel_token = nn.Parameter(
208
+ torch.zeros(1, self.max_channels, 1, self.embed_dim)
209
+ ) # (B, max_channels, 1, embed_dim)
210
+ self.pos_embed = nn.Parameter(
211
+ torch.zeros(1, 1, num_patches + 1, self.embed_dim)
212
+ ) # (B, max_channels, num_tokens, embed_dim)
213
+ self.pos_drop = nn.Dropout(p=config.drop_rate)
214
+
215
+ # TransformerEncoder block
216
+ dpr = [
217
+ x.item() for x in torch.linspace(0, config.drop_path_rate, config.depth)
218
+ ] # stochastic depth decay rule
219
+ self.blocks = nn.ModuleList(
220
+ [
221
+ TransformerEncoderLayer(
222
+ d_model=self.embed_dim,
223
+ nhead=config.num_heads,
224
+ dim_feedforward=2048,
225
+ dropout=dpr[i],
226
+ batch_first=True,
227
+ )
228
+ for i in range(config.depth)
229
+ ]
230
+ )
231
+ self.norm = nn.LayerNorm(self.embed_dim)
232
+
233
+ # Classifier head
234
+ self.head = nn.Linear(self.embed_dim, config.num_classes) if config.num_classes > 0 else nn.Identity()
235
+
236
+ # Return only the [CLS] token or all tokens
237
+ self.return_all_tokens = config.return_all_tokens
238
+
239
+ trunc_normal_(self.pos_embed, std=0.02)
240
+ trunc_normal_(self.cls_token, std=0.02)
241
+ trunc_normal_(self.channel_token, std=0.02)
242
+ self.apply(self._init_weights)
243
+
244
+ def _init_weights(self, m):
245
+ if isinstance(m, nn.Linear):
246
+ trunc_normal_(m.weight, std=0.02)
247
+ if isinstance(m, nn.Linear) and m.bias is not None:
248
+ nn.init.constant_(m.bias, 0)
249
+ elif isinstance(m, nn.LayerNorm):
250
+ nn.init.constant_(m.bias, 0)
251
+ nn.init.constant_(m.weight, 1.0)
252
+
253
+ def add_pos_encoding_per_channel(self, x, w, h, class_pos_embed: bool = False):
254
+ """
255
+ Adds num_patches positional embeddings to EACH of the channels.
256
+ """
257
+ npatch = x.shape[2]
258
+ N = self.pos_embed.shape[2] - 1
259
+
260
+ # --------------------- [CLS] positional encoding --------------------- #
261
+ if class_pos_embed:
262
+ return self.pos_embed[:, :, 0]
263
+
264
+ # --------------------- Patches positional encoding --------------------- #
265
+ # If the input size is the same as the training size, return the positional embeddings for the desired type
266
+ if npatch == N and w == h:
267
+ return self.pos_embed[:, :, 1:]
268
+
269
+ # Otherwise, interpolate the positional encoding for the input tokens
270
+ class_pos_embed = self.pos_embed[:, :, 0]
271
+ patch_pos_embed = self.pos_embed[:, :, 1:]
272
+ dim = x.shape[-1]
273
+ w0 = w // self.token_learner.patch_size
274
+ h0 = h // self.token_learner.patch_size
275
+ # a small number is added by DINO team to avoid floating point error in the interpolation
276
+ # see discussion at https://github.com/facebookresearch/dino/issues/8
277
+ w0, h0 = w0 + 0.1, h0 + 0.1
278
+ patch_pos_embed = nn.functional.interpolate(
279
+ patch_pos_embed.reshape(1, int(math.sqrt(N)), int(math.sqrt(N)), dim).permute(0, 3, 1, 2),
280
+ scale_factor=(w0 / math.sqrt(N), h0 / math.sqrt(N)),
281
+ mode="bicubic",
282
+ )
283
+ assert int(w0) == patch_pos_embed.shape[-2] and int(h0) == patch_pos_embed.shape[-1]
284
+ patch_pos_embed = patch_pos_embed.permute(0, 2, 3, 1).view(1, -1, dim)
285
+ return patch_pos_embed.unsqueeze(0)
286
+
287
+ def channel_aware_tokenization(self, x, index, list_num_channels, max_channels=10):
288
+ B, nc, w, h = x.shape # (B*num_channels, 1, w, h)
289
+
290
+ # Tokenize through linear embedding
291
+ tokens_per_channel = self.token_learner(x)
292
+
293
+ # Concatenate tokens per channel in each image
294
+ chunks = torch.split(tokens_per_channel, list_num_channels[index], dim=0)
295
+
296
+ # Pad the tokens tensor with zeros for each image separately in the chunks list
297
+ padded_tokens = [
298
+ torch.cat(
299
+ [
300
+ chunk,
301
+ torch.zeros(
302
+ (max_channels - chunk.size(0), chunk.size(1), chunk.size(2)),
303
+ device=chunk.device,
304
+ ),
305
+ ],
306
+ dim=0,
307
+ )
308
+ if chunk.size(0) < max_channels
309
+ else chunk
310
+ for chunk in chunks
311
+ ]
312
+
313
+ # Stack along the batch dimension
314
+ padded_tokens = torch.stack(padded_tokens, dim=0)
315
+ num_tokens = padded_tokens.size(2)
316
+
317
+ # Reshape the patches embeddings on the channel dimension
318
+ padded_tokens = padded_tokens.reshape(padded_tokens.size(0), -1, padded_tokens.size(3))
319
+
320
+ # Compute the masking for avoiding self-attention on empty padded channels
321
+ channel_mask = torch.all(padded_tokens == 0.0, dim=-1)
322
+
323
+ # Destack to obtain the original number of channels
324
+ padded_tokens = padded_tokens.reshape(-1, max_channels, num_tokens, padded_tokens.size(-1))
325
+
326
+ # Add the [POS] token to the embed patch tokens
327
+ padded_tokens = padded_tokens + self.add_pos_encoding_per_channel(
328
+ padded_tokens, w, h, class_pos_embed=False
329
+ )
330
+
331
+ # Add the [CHANNEL] token to the embed patch tokens
332
+ if max_channels == self.max_channels:
333
+ channel_tokens = self.channel_token.expand(padded_tokens.shape[0], -1, padded_tokens.shape[2], -1)
334
+ padded_tokens = padded_tokens + channel_tokens
335
+
336
+ # Restack the patches embeddings on the channel dimension
337
+ embeddings = padded_tokens.reshape(padded_tokens.size(0), -1, padded_tokens.size(3))
338
+
339
+ # Expand the [CLS] token to the batch dimension
340
+ cls_tokens = self.cls_token.expand(embeddings.shape[0], -1, -1)
341
+
342
+ # Add [POS] positional encoding to the [CLS] token
343
+ cls_tokens = cls_tokens + self.add_pos_encoding_per_channel(embeddings, w, h, class_pos_embed=True)
344
+
345
+ # Concatenate the [CLS] token to the embed patch tokens
346
+ embeddings = torch.cat([cls_tokens, embeddings], dim=1)
347
+
348
+ # Adding a False value to the beginning of each channel_mask to account for the [CLS] token
349
+ channel_mask = torch.cat(
350
+ [
351
+ torch.tensor([False], device=channel_mask.device).expand(channel_mask.size(0), 1),
352
+ channel_mask,
353
+ ],
354
+ dim=1,
355
+ )
356
+
357
+ return self.pos_drop(embeddings), channel_mask
358
+
359
+ def forward(self, x, index, list_num_channels):
360
+ # Apply the TokenLearner module to obtain learnable tokens
361
+ x, channel_mask = self.channel_aware_tokenization(
362
+ x, index, list_num_channels
363
+ ) # (B*num_channels, embed_dim)
364
+
365
+ # Apply the self-attention layers with masked self-attention
366
+ for blk in self.blocks:
367
+ x = blk(
368
+ x, src_key_padding_mask=channel_mask
369
+ ) # Use src_key_padding_mask to mask out padded tokens
370
+
371
+ # Normalize
372
+ x = self.norm(x)
373
+
374
+ if self.return_all_tokens:
375
+ # Create a mask to select non-masked tokens (excluding CLS token)
376
+ non_masked_tokens_mask = ~channel_mask[:, 1:]
377
+ non_masked_tokens = x[:, 1:][non_masked_tokens_mask]
378
+ return non_masked_tokens # return non-masked tokens (excluding CLS token)
379
+ else:
380
+ return x[:, 0] # return only the [CLS] token
381
+
382
+ def channel_token_sanity_check(self, x):
383
+ """
384
+ Helper function to check consistency of channel tokens.
385
+ """
386
+ # 1. Compare Patches Across Different Channels
387
+ print("Values for the first patch across different channels:")
388
+ for ch in range(10): # Assuming 10 channels
389
+ print(f"Channel {ch + 1}:", x[0, ch, 0, :5]) # Print first 5 values of the embedding for brevity
390
+
391
+ print("\n")
392
+
393
+ # 2. Compare Patches Within the Same Channel
394
+ for ch in range(10):
395
+ is_same = torch.all(x[0, ch, 0] == x[0, ch, 1])
396
+ print(f"First and second patch embeddings are the same for Channel {ch + 1}: {is_same.item()}")
397
+
398
+ # 3. Check Consistency Across Batch
399
+ print("Checking consistency of channel tokens across the batch:")
400
+ for ch in range(10):
401
+ is_consistent = torch.all(x[0, ch, 0] == x[1, ch, 0])
402
+ print(
403
+ f"Channel token for first patch is consistent between first and second image for Channel {ch + 1}: {is_consistent.item()}"
404
+ )
405
+
406
+ def get_last_selfattention(self, x):
407
+ x, channel_mask = self.channel_aware_tokenization(x, index=0, list_num_channels=[1], max_channels=1)
408
+ for i, blk in enumerate(self.blocks):
409
+ if i < len(self.blocks) - 1:
410
+ x = blk(x, src_key_padding_mask=channel_mask)
411
+ else:
412
+ # return attention of the last block
413
+ return blk(x, src_key_padding_mask=channel_mask, return_attention=True)
414
+
415
+ def get_intermediate_layers(self, x, n=1):
416
+ x, channel_mask = self.channel_aware_tokenization(x)
417
+ # return the output tokens from the `n` last blocks
418
+ output = []
419
+ for i, blk in enumerate(self.blocks):
420
+ x = blk(x, src_key_padding_mask=channel_mask)
421
+ if len(self.blocks) - i <= n:
422
+ output.append(self.norm(x))
423
+ return output