Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def reset(self):

def release(self):
self._release_model()
self.launcher.release()
if self.launcher:
self.launcher.release()

def _release_model(self):
if self.model:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2024 Intel Corporation
Copyright (c) 2024-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
"""
import os
import numpy as np

from PIL import Image
from .base_custom_evaluator import BaseCustomEvaluator
from .base_models import BaseCascadeModel
from ...config import ConfigError
Expand All @@ -30,9 +30,20 @@

try:
import open_clip
except ImportError as error:
open_clip = UnsupportedPackage('open_clip', error.msg)
except ImportError as clip_error:
open_clip = UnsupportedPackage('open_clip', clip_error.msg)

try:
from transformers import AutoModel, AutoTokenizer
except ImportError as transformers_error:
AutoModel = UnsupportedPackage('AutoModel', transformers_error.msg)
AutoTokenizer = UnsupportedPackage('AutoTokenizer', transformers_error.msg)

try:
import torch
import torch.nn.functional as F
except ImportError as torch_error:
torch = UnsupportedPackage("torch", torch_error.msg)

class OpenVinoClipEvaluator(BaseCustomEvaluator):
def __init__(self, dataset_config, launcher, model, orig_config):
Expand All @@ -42,8 +53,7 @@ def __init__(self, dataset_config, launcher, model, orig_config):
@classmethod
def from_configs(cls, config, delayed_model_loading=False, orig_config=None):
dataset_config, launcher, _ = cls.get_dataset_and_launcher_info(config)

model = OpenVinoClipModel(
model = OpenVinoClipVitModel(
config.get('network_info', {}), launcher, config.get('_models', []),
config.get('_model_is_blob'),
delayed_model_loading, config
Expand All @@ -69,43 +79,52 @@ def _process(self, output_callback, calculate_metrics, progress_reporter, metric
self._update_progress(progress_reporter, metric_config, batch_id, len(batch_prediction), csv_file)


class OpenVinoClipModel(BaseCascadeModel):
class OpenVinoJinaClipEvaluator(OpenVinoClipEvaluator):
@classmethod
def from_configs(cls, config, delayed_model_loading=False, orig_config=None):
if config['launchers'][0]['framework'] == 'pytorch':
dataset_config, launcher = config["datasets"], None
delayed_model_loading = False
else:
dataset_config, launcher, _ = cls.get_dataset_and_launcher_info(config)

model = OpenVinoJinaClipModel(
config.get('network_info', {}), launcher, config.get('_models', []),
config.get('_model_is_blob'),
delayed_model_loading, config
)
return cls(dataset_config, launcher, model, orig_config)


class BaseOpenVinoClipModel(BaseCascadeModel):
def __init__(self, network_info, launcher, models_args, is_blob, delayed_model_loading=False, config=None):
super().__init__(network_info, launcher, delayed_model_loading)
self.network_info = network_info
self.launcher = launcher
self.config = config or {}
parts = ['text_encoder', 'image_encoder']
network_info = self.fill_part_with_model(network_info, parts, models_args, False, delayed_model_loading)
if not contains_all(network_info, parts) and not delayed_model_loading:
raise ConfigError('configuration for text_encoder/image_encoder does not exist')
self.templates_file = None
self.parameters_file = None
self.templates = ["a photo of a {classname}"]
self.parts = network_info.keys()
if launcher:
network_info = self.fill_part_with_model(network_info, self.parts,
models_args, False, delayed_model_loading)
if not contains_all(network_info, self.parts) and not delayed_model_loading:
raise ConfigError('configuration for text_encoder/image_encoder does not exist')
if not delayed_model_loading:
self.create_pipeline(launcher, network_info)

def create_pipeline(self, launcher, network_info):
orig_model_name = self.config.get("orig_model_name", "ViT-B-16-plus-240")
self.load_models(network_info, launcher, True)

self.text_encoder = launcher.ie_core.compile_model(self.text_encoder_model, launcher.device)
self.image_encoder = launcher.ie_core.compile_model(self.image_encoder_model, launcher.device)

unet_shapes = [inp.get_partial_shape() for inp in self.text_encoder_model.inputs]
if unet_shapes[0][0].is_dynamic:
self.templates_file = self.config.get("templates", "zeroshot_classification_templates.json")
else:
self.templates_file = None
raise NotImplementedError("Subclasses should implement this method")

self.classnames_file = self.config.get("classnames", "classnames.json")
self.parameters_file = self.config.get("pretrained_model_params", None)
self.tokenizer = open_clip.get_tokenizer(orig_model_name)
def get_logits(self, image_features, zeroshot_weights):
raise NotImplementedError("Subclasses should implement this method")

def predict(self, identifiers, input_data, zeroshot_weights):
preds = []
for idx, image_data in zip(identifiers, input_data):
image = np.expand_dims(image_data, axis=0)
image_features = self.encode_image(image)
image_features = self.normalize(image_features, axis=-1)
logits = 100. * image_features @ zeroshot_weights
image_features = self.encode_image(image_data)
logits = self.get_logits(image_features, zeroshot_weights)
preds.append(ClassificationPrediction(idx, np.squeeze(logits, axis=0)))
return None, preds

Expand All @@ -116,23 +135,11 @@ def get_network(self):
model_list.append({"name": model_part_name, "model": model})
return model_list

def encode_image(self, image):
features = self.image_encoder(image)
return features[self.image_encoder.output()]
def encode_image(self, image_data):
raise NotImplementedError("Subclasses should implement this method")

def encode_text(self, texts, params):
text = self.tokenizer(texts).to('cpu')
indices = text.detach().cpu().numpy()

x = params['token_embedding'][indices]
x = x + params['positional_embedding']
x = x.transpose(1, 0, 2)
x = self.text_encoder((x, params['attn_mask']))
x = x[self.text_encoder.output()]
x = x.transpose(1, 0, 2)
x = self.layer_norm(x, params['gamma'], params['beta'])
x = x[np.arange(x.shape[0]), np.argmax(indices, axis=-1)] @ params['text_projection']
return x
raise NotImplementedError("Subclasses should implement this method")

@staticmethod
def get_pretrained_model_params(path):
Expand All @@ -147,14 +154,20 @@ def get_pretrained_model_params(path):
params['beta'] = open_clip_params['beta']
return params

def get_class_embeddings(self, texts, params):
raise NotImplementedError("Subclasses should implement this method")

def zero_shot_classifier(self, data_source):
classnames = read_json(os.path.join(data_source, self.classnames_file))
if self.templates_file:
templates = read_json(os.path.join(data_source, self.templates_file))
else:
templates = ["a photo of a {c}"]
templates = self.templates

params = None
if self.parameters_file:
params = self.get_pretrained_model_params(os.path.join(data_source, self.parameters_file))

params = self.get_pretrained_model_params(os.path.join(data_source, self.parameters_file))
print_info('Encoding zeroshot weights for {} imagenet classes'.format(len(classnames)))

zeroshot_weights = []
Expand All @@ -163,12 +176,9 @@ def zero_shot_classifier(self, data_source):
iterator = tqdm(classnames, mininterval=2)

for classname in iterator:
texts = [template.format(c=classname) for template in templates]
class_embeddings = self.encode_text(texts, params)
class_embedding = self.normalize(class_embeddings, axis=-1)
class_embedding = np.mean(class_embedding, axis=0)
class_embedding /= np.linalg.norm(class_embedding, ord=2)
zeroshot_weights.append(class_embedding)
texts = [template.format(classname=classname) for template in templates]
class_embeddings = self.get_class_embeddings(texts, params)
zeroshot_weights.append(class_embeddings)
return np.stack(zeroshot_weights, axis=1)

def load_models(self, network_info, launcher, log=False):
Expand All @@ -192,7 +202,7 @@ def load_model(self, network_list, launcher):
setattr(self, "{}_model".format(network_list["name"]), network)

def print_input_output_info(self):
model_parts = ("text_encoder", "image_encoder")
model_parts = self.parts
for part in model_parts:
part_model_id = "{}_model".format(part)
model = getattr(self, part_model_id, None)
Expand All @@ -218,3 +228,114 @@ def normalize(input_array, p=2, axis=-1, epsilon=1e-12):
norm = np.maximum(norm, epsilon)
normalized = input_array / norm
return normalized


class OpenVinoClipVitModel(BaseOpenVinoClipModel):
def create_pipeline(self, launcher, network_info):
orig_model_name = self.config.get("orig_model_name", "ViT-B-16-plus-240")
self.load_models(network_info, launcher, True)
self.text_encoder = launcher.ie_core.compile_model(self.text_encoder_model, launcher.device)
self.image_encoder = launcher.ie_core.compile_model(self.image_encoder_model, launcher.device)
unet_shapes = [inp.get_partial_shape() for inp in self.text_encoder_model.inputs]
if unet_shapes[0][0].is_dynamic:
self.templates_file = self.config.get("templates", "zeroshot_classification_templates.json")

self.classnames_file = self.config.get("classnames", "classnames.json")
self.parameters_file = self.config.get("pretrained_model_params", None)
self.tokenizer = open_clip.get_tokenizer(orig_model_name)

def get_logits(self, image_features, zeroshot_weights):
image_features = self.normalize(image_features, axis=-1)
logits = 100. * image_features @ zeroshot_weights
return logits

def encode_image(self, image_data):
image = np.expand_dims(image_data, axis=0)
features = self.image_encoder(image)
return features[self.image_encoder.output()]

def encode_text(self, texts, params):
text = self.tokenizer(texts).to('cpu')
indices = text.detach().cpu().numpy()

x = params['token_embedding'][indices]
x = x + params['positional_embedding']
x = x.transpose(1, 0, 2)
x = self.text_encoder((x, params['attn_mask']))
x = x[self.text_encoder.output()]
x = x.transpose(1, 0, 2)
x = self.layer_norm(x, params['gamma'], params['beta'])
x = x[np.arange(x.shape[0]), np.argmax(indices, axis=-1)] @ params['text_projection']
return x

def get_class_embeddings(self, texts, params):
class_embeddings = self.encode_text(texts, params)
class_embedding = self.normalize(class_embeddings, axis=-1)
class_embedding = np.mean(class_embedding, axis=0)
class_embedding /= np.linalg.norm(class_embedding, ord=2)
return class_embedding


class OpenVinoJinaClipModel(BaseOpenVinoClipModel):
def create_pipeline(self, launcher, network_info):
if isinstance(AutoTokenizer, UnsupportedPackage):
AutoTokenizer.raise_error(self.__class__.__name__)
if isinstance(AutoModel, UnsupportedPackage):
AutoModel.raise_error(self.__class__.__name__)
if isinstance(torch, UnsupportedPackage):
torch.raise_error(self.__class__.__name__)

orig_model_name = self.config.get("orig_model_name", "jinaai/jina-clip-v1")

model = AutoModel.from_pretrained(orig_model_name, trust_remote_code=True)
if launcher:
self.load_models(network_info, launcher, True)
self.text_encoder = launcher.ie_core.compile_model(self.text_model, launcher.device)
self.vision_encoder = launcher.ie_core.compile_model(self.vision_model, launcher.device)
else:
self.text_encoder = model.text_model
self.vision_encoder = model.vision_model

self.templates = ["{classname}"]
self.classnames_file = self.config.get("classnames", "classnames.json")
self.tokenizer = AutoTokenizer.from_pretrained(orig_model_name, trust_remote_code=True)
self.processor = model.get_preprocess()

def encode_image(self, image_data):
image = Image.fromarray(image_data)
vision_input = self.processor(images=[image], return_tensors="pt")
image_embeddings = self.vision_encoder(vision_input["pixel_values"])

if isinstance(image_embeddings, torch.Tensor):
image_embeddings = image_embeddings.detach().numpy()
else:
image_embeddings = image_embeddings[0]

return image_embeddings

def encode_text(self, text_input):
text_embeddings = self.text_encoder(text_input["input_ids"])

if isinstance(text_embeddings, torch.Tensor):
text_embeddings = text_embeddings.detach().numpy()
else:
text_embeddings = text_embeddings[0]
return text_embeddings

def get_logits(self, image_features, zeroshot_weights):
text_embeddings = np.squeeze(zeroshot_weights)
simularity = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in simularity, update to similarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

for emb1 in image_features:
temp_simularity = []
for emb2 in text_embeddings:
temp_simularity.append(emb1 @ emb2)
simularity.append(temp_simularity)

simularity_tensor = torch.tensor(simularity)
logits = 100. * F.softmax(simularity_tensor, dim=-1).numpy()
return logits

def get_class_embeddings(self, texts, params):
text_input = self.tokenizer(texts, return_tensors="pt", padding="max_length",
max_length=512, truncation=True).to("cpu")
return self.encode_text(text_input)
Loading