From 17dffa812ea7c5c48b8a9c49104f573c2ea15fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Tue, 18 Feb 2025 13:51:34 +0100 Subject: [PATCH 01/15] Hd5DataGenerator: Argument to throttle data size --- src/rdds/lib/hdf5/hd5_data_generator.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/rdds/lib/hdf5/hd5_data_generator.py b/src/rdds/lib/hdf5/hd5_data_generator.py index 0b6e4306..b39f1926 100644 --- a/src/rdds/lib/hdf5/hd5_data_generator.py +++ b/src/rdds/lib/hdf5/hd5_data_generator.py @@ -30,7 +30,8 @@ def __init__(self, label: str = None, replace_nan_floats_with: float = 0.0, load_data_into_ram: bool = True, - expand_1d_categorical_to_2d: bool = True): + expand_1d_categorical_to_2d: bool = True, + max_n_samples: int = None): """ :param hd5_file_path: HD5 file to generate data from :param group_name: Group in HD5 file to read data from (reads all datasets) @@ -40,6 +41,7 @@ def __init__(self, :param replace_nan_floats_with: Floating point value to replace NaN values :param load_data_into_ram: Load file content into RAM if True. Improves reading performance. :param expand_1d_categorical_to_2d: Unpack a 1D categorical label [0][TN] into 2D; [1, 0][TN, TP] + :param max_n_samples: Maximum samples to yield """ self._hd5_file_path: str = hd5_file_path self._hd5_file = h5py.File(name=self._hd5_file_path, mode='r') @@ -56,7 +58,10 @@ def __init__(self, _LOGGER.info(f'Loading data to RAM.') in_ram_group: Dict[str, np.ndarray] = {} # Mock the Group dictionary API by using a dict for dataset_name in self._group.keys(): - in_ram_group.update({dataset_name: self._group[dataset_name][:]}) + if max_n_samples is not None: + in_ram_group.update({dataset_name: self._group[dataset_name][0:max_n_samples]}) + else: + in_ram_group.update({dataset_name: self._group[dataset_name][:]}) self._group: Dict[str, np.ndarray] = in_ram_group _LOGGER.info('RAM loading complete.') @@ -82,6 +87,8 @@ def __init__(self, if dataset.shape != zeroeth_dataset.shape: raise ValueError(f'Not identical dataset shapes, got {dataset.shape}!={zeroeth_dataset.shape}') self._data_length: int = zeroeth_dataset.shape[0] + if max_n_samples is not None: + self._data_length = max_n_samples _LOGGER.info(f'{self._data_length} samples across {len(self._group.keys())} features') @property From f7923cf98412b9130a34bcb202f7befe66a7e2cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Tue, 18 Feb 2025 14:11:42 +0100 Subject: [PATCH 02/15] VRS: Refactor tensor input as separate tensors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split text and numerical input tensors into annotation level tensors. Signed-off-by: Tor Björgen --- src/rdds/variant_rank_score/model/model.py | 92 +++++++++++++--------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index bae9b69e..baeee315 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -132,13 +132,26 @@ def _build_model(self, """ :param hparams: Hyperparameters for the model """ - input_text: tf.keras.Input = tf.keras.Input(shape=len(self._features_text), - ragged=True, - dtype=tf.string, - name='input_text') - input_numerical: tf.keras.Input = tf.keras.Input(shape=len(self._features_numerical), - dtype=tf.float32, - name='input_numerical') + + text_inputs = [] + for text_feature_name in self._features_text: + text_inputs.append( + tf.keras.Input(shape=(1, ), + ragged=True, + dtype=tf.string, + name=text_feature_name) + ) + input_text: tf.RaggedTensor = tf.concat(text_inputs, axis=1, name='concat_input_text') + + numerical_inputs = [] + for numerical_feature_name in self._features_numerical: + numerical_inputs.append( + tf.keras.Input(shape=(1, ), + dtype=tf.float32, + name=numerical_feature_name) + ) + input_numerical: tf.Tensor = tf.concat(numerical_inputs, axis=1, name='concat_input_numerical') + # Text preprocessing split_regex = '\s|\n|_|&|/|\||:|,|-|0|1|2|3|4|5|6|7|8|9' text_preprocessing_layer = TextPreprocessingLayer(split_regex=split_regex) @@ -290,7 +303,18 @@ def _build_model(self, # Softmax layer confidences = tf.keras.layers.Softmax(name='Confidences')(logits) # -> [bdim, 2] - self._keras_model = tf.keras.Model(inputs=[input_text, input_numerical], + def _get_input_tensor_with_name(name: str) -> Union[tf.Tensor, tf.RaggedTensor]: + # Helper to assemble input tensor in order defined by self._features + all_input_tensors = text_inputs.copy() + all_input_tensors.extend(numerical_inputs) + for input_tensor in all_input_tensors: + if input_tensor.name == name: + return input_tensor + raise ValueError(f'Found no input tensor with name {name}') + model_inputs = [] # Flat list of model inputs [feature0, feature1, ... ] + for feature_name in self._features: + model_inputs.append(_get_input_tensor_with_name(name=feature_name)) + self._keras_model = tf.keras.Model(inputs=model_inputs, outputs=confidences) metrics = [tf.keras.metrics.TruePositives(), @@ -342,24 +366,6 @@ def loss_fn(y_true, y_pred) -> tf.Tensor: c = tf.keras.losses.categorical_crossentropy(y_true=y_true, y_pred=y_pred, from_logits=False) return c - @staticmethod - def count_feature_types(hd5_output_dtypes: Dict[str, Type]) -> Tuple[int, int]: - """ - Computes amount of numerical vs textbased features in dataset. - :param hd5_output_dtypes: The output types from hd5_data_generator.data_types attribute - :return: Tuple of count - """ - n_text_features = 0 - n_numerical_features = 0 - for feature_name, feature_dtype in hd5_output_dtypes.items(): - if feature_dtype == bytes: - n_text_features += 1 - elif feature_dtype == float: - n_numerical_features += 1 - else: - raise ValueError('Unknown feature dtype', feature_dtype) - return n_text_features, n_numerical_features - def save_model_fn(self, epoch: int, logs=Optional[dict]): """ Saves model to Keras saved model format @@ -386,15 +392,28 @@ def _init_datasets(self, # Training setup hd5_data_generator_train: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name='train', - output_tensor_format=[self._features_text, - self._features_numerical], + output_tensor_format=self._features, label='label') - n_text_features, n_numerical_features = self.count_feature_types(hd5_output_dtypes=hd5_data_generator_train.data_types) - input_signature = ((tf.TensorSpec((n_text_features, ), dtype=tf.string), - tf.TensorSpec((n_numerical_features, ), dtype=tf.float32, name='input_numerical')), - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), )) + + def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedTensorSpec]: + # Helper to assemble input tensor in order defined by self._features + if name in self._features_text: + return tf.TensorSpec((), dtype=tf.string, name=name) # (, 1) + elif name in self._features_numerical: + return tf.TensorSpec((), dtype=tf.float32, name=name) # (, 1) + else: + raise ValueError(f'Found no input tensor with name {name}') + + input_tensor_signatures = () + for feature_name in self._features: + input_tensor_signatures += (_get_input_signature_from_name(name=feature_name), ) + hd5_output_signature = ( + input_tensor_signatures, + (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), ) + ) + dataset_train: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_train, - output_signature=input_signature) + output_signature=hd5_output_signature) dataset_train = dataset_train.cache() dataset_train = dataset_train.repeat(-1) dataset_train = dataset_train.shuffle(buffer_size=int(5E5), @@ -433,11 +452,10 @@ def _init_datasets(self, # Testing setup hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name='test', - output_tensor_format=[self._features_text, - self._features_numerical], + output_tensor_format=self._features, label='label') dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, - output_signature=input_signature) + output_signature=hd5_output_signature) dataset_test = dataset_test.cache() dataset_test = dataset_test.repeat(-1) dataset_test = dataset_test.shuffle(buffer_size=int(5E5), @@ -447,7 +465,7 @@ def _init_datasets(self, # seed=1) dataset_test = dataset_test.batch(batch_size) dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) - _LOGGER.info(f'Model Input data mapping: {input_signature}') + _LOGGER.info(f'Model Input data mapping: {hd5_output_signature}') self._datasets = InitializedDatasets(dataset_train_numerical=dataset_numerical, dataset_train_vocabulary=dataset_vocabulary, From 1d144ace77ff74b56d39c44ee8f62ee2f7547a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Tue, 18 Feb 2025 14:59:32 +0100 Subject: [PATCH 03/15] fixup! VRS: Refactor tensor input as separate tensors --- src/rdds/variant_rank_score/model/model.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index baeee315..cea57c2e 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -434,7 +434,7 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT group_name='train', output_tensor_format=[self._features_text]) input_signature_vocabulary: Tuple[tf.TensorSpec] = \ - (tf.TensorSpec((n_text_features,), dtype=tf.string, name='input_text_vocabulary'),) + (tf.TensorSpec((len(self._features_text),), dtype=tf.string, name='input_text_vocabulary'),) dataset_vocabulary = get_tf_dataset_from_hd5_data_generator( hd5_data_generator=hd5_data_generator_vocabulary, output_signature=input_signature_vocabulary) @@ -444,7 +444,7 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT group_name='train', output_tensor_format=self._features_numerical) input_signature_numerical_normalisation = \ - tf.TensorSpec((n_numerical_features,), dtype=tf.float32, name='input_numerical_normalisation') + tf.TensorSpec((len(self._features_numerical),), dtype=tf.float32, name='input_numerical_normalisation') dataset_numerical: tf.data.Dataset = \ get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_numerical, output_signature=input_signature_numerical_normalisation) From 5b0b01380d36c826cbcc31f593cbdba06bb9a5aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Tue, 18 Feb 2025 15:09:12 +0100 Subject: [PATCH 04/15] fixup! VRS: Refactor tensor input as separate tensors --- src/rdds/variant_rank_score/model/model.py | 55 +++++++++++----------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index cea57c2e..755f2cbb 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -378,6 +378,29 @@ def save_model_fn(self, epoch: int, logs=Optional[dict]): _LOGGER.info(f'Saving model to {filepath}') self._keras_model.save(filepath=filepath) + def _generate_dataset_tensor_signature(self) -> Tuple[Tuple[tf.TensorSpec, ...], ...]: + """ + Helper method to generatate HD5 -> TF data generator tensor signatures. + :return: A nested tuple of tf.TensorSpec instances + """ + def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedTensorSpec]: + # Helper to assemble input tensor in order defined by self._features + if name in self._features_text: + return tf.TensorSpec((), dtype=tf.string, name=name) # (, 1) + elif name in self._features_numerical: + return tf.TensorSpec((), dtype=tf.float32, name=name) # (, 1) + else: + raise ValueError(f'Found no input tensor with name {name}') + + input_tensor_signatures = () + for feature_name in self._features: + input_tensor_signatures += (_get_input_signature_from_name(name=feature_name), ) + signature = ( + input_tensor_signatures, + (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), ) + ) + return signature + def _init_datasets(self, hd5_file_path: str, hparams: HyperParameters, @@ -395,25 +418,8 @@ def _init_datasets(self, output_tensor_format=self._features, label='label') - def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedTensorSpec]: - # Helper to assemble input tensor in order defined by self._features - if name in self._features_text: - return tf.TensorSpec((), dtype=tf.string, name=name) # (, 1) - elif name in self._features_numerical: - return tf.TensorSpec((), dtype=tf.float32, name=name) # (, 1) - else: - raise ValueError(f'Found no input tensor with name {name}') - - input_tensor_signatures = () - for feature_name in self._features: - input_tensor_signatures += (_get_input_signature_from_name(name=feature_name), ) - hd5_output_signature = ( - input_tensor_signatures, - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), ) - ) - dataset_train: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_train, - output_signature=hd5_output_signature) + output_signature=self._generate_dataset_tensor_signature()) dataset_train = dataset_train.cache() dataset_train = dataset_train.repeat(-1) dataset_train = dataset_train.shuffle(buffer_size=int(5E5), @@ -455,7 +461,7 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT output_tensor_format=self._features, label='label') dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, - output_signature=hd5_output_signature) + output_signature=self._generate_dataset_tensor_signature()) dataset_test = dataset_test.cache() dataset_test = dataset_test.repeat(-1) dataset_test = dataset_test.shuffle(buffer_size=int(5E5), @@ -465,7 +471,6 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT # seed=1) dataset_test = dataset_test.batch(batch_size) dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) - _LOGGER.info(f'Model Input data mapping: {hd5_output_signature}') self._datasets = InitializedDatasets(dataset_train_numerical=dataset_numerical, dataset_train_vocabulary=dataset_vocabulary, @@ -688,19 +693,13 @@ def predict_on_hd5(self, for group_name in group_names: # TODO: Make sure config to Hd5DataGenerator is identical to train time setup - output_tensor_format = [self._features_text, self._features_numerical] datagen: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name=group_name, - output_tensor_format=output_tensor_format, + output_tensor_format=self._features, label='label', expand_1d_categorical_to_2d=True) - n_text_features, n_numerical_features = \ - self.count_feature_types(hd5_output_dtypes=datagen.data_types) - input_signature = ((tf.TensorSpec((n_text_features, ), dtype=tf.string), - tf.TensorSpec((n_numerical_features, ), dtype=tf.float32, name='input_numerical')), - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), )) dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=datagen, - output_signature=input_signature) + output_signature=self._generate_dataset_tensor_signature()) dataset = dataset.batch(10000) dataset = dataset.prefetch(buffer_size=10) data_length = datagen.data_length From 2bea86187b7fb33eef403e394ec0a79ab244017c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 19 Feb 2025 09:59:36 +0100 Subject: [PATCH 05/15] fixup! VRS: Refactor tensor input as separate tensors --- src/rdds/variant_rank_score/model/model.py | 14 ++++---- .../model/model_explainer.py | 33 ++++++++----------- .../model/shap_compatible_model.py | 27 ++++++++------- 3 files changed, 34 insertions(+), 40 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index 755f2cbb..3093bfd1 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -588,10 +588,10 @@ def train_model_explainer(self): if self._datasets is None: raise ValueError(f'No datasets available') # NOTE: Changes to below configuration must be reflected in the self._load_saved_model_explainer() + model_input_spec = self._generate_dataset_tensor_signature() + data_tensor_spec, _ = model_input_spec # Drop labels spec self._model_explainer = ModelExplainer(model=self._infer_pathogenicity_scores, - features_text=self._features_text, - features_numerical=self._features_numerical, - input_feature_names=self._features) + input_tensor_spec=data_tensor_spec) dataset = self._datasets.dataset_train # The data used for fitting the explainer should be randomly selected from the complete set of training data dataset = dataset.shuffle(buffer_size=self._datasets.train_data_length, @@ -656,17 +656,15 @@ def load_saved_model(self, self._load_saved_model_explainer(model_explainer_path=model_explainer_path) def _infer_pathogenicity_scores(self, - tensor_text: tf.Tensor, - tensor_numerical: tf.Tensor) -> np.ndarray: + tensor_dict: Dict[str, tf.Tensor]) -> np.ndarray: """ Main method to compute inferences from input tensors. - :param tensor_text: Features containing text - :param tensor_numerical: Features containing numericals + :param tensor_dict: Input data tensors as dict, key is the tensor name :return: 1D scores same size as outer, batch dimension """ if self._keras_model is None: raise ValueError('No keras model available for inference computation!') - score_classes = self._keras_model([tensor_text, tensor_numerical]) # [class benign, class pathogenic] + score_classes = self._keras_model(tensor_dict) # [class benign, class pathogenic] score_classes = score_classes.numpy() prediction_class_pathogenic = score_classes[:, 1] return prediction_class_pathogenic diff --git a/src/rdds/variant_rank_score/model/model_explainer.py b/src/rdds/variant_rank_score/model/model_explainer.py index 7c7322f2..7125e3d8 100644 --- a/src/rdds/variant_rank_score/model/model_explainer.py +++ b/src/rdds/variant_rank_score/model/model_explainer.py @@ -1,7 +1,7 @@ import tensorflow as tf import pandas as pd import numpy as np -from typing import List, Callable +from typing import List, Callable, Tuple import gc from rdds.lib.logging import get_logger; _LOGGER = get_logger('ModelExplainer', 'debug') @@ -14,25 +14,22 @@ class ModelExplainer(ShapExplainer): def __init__(self, model: Callable, - features_text: List[str], - features_numerical: List[str], - input_feature_names: List[str]): + input_tensor_spec: Tuple[tf.TensorSpec, ...]): """ - Helper class to inferface VRS model and SHAP. + Helper class to interface VRS model and SHAP. :param model: The keras model used for inference, a callable to run inference - :param features_text: The text feature names (as input to keras model, order important) - :param features_numerical: The numerical feature names - :param input_feature_names: All feature names, as input to model, for SHAP visualisation + :param input_tensor_spec: The model input tensor spec (as input to keras model, order sensitive) """ shap_compatible_model = ShapCompatibleModel(keras_model=model, - features_numerical=features_numerical, - features_text=features_text) + input_tensor_spec=input_tensor_spec) + + self._input_feature_names: List[str] = [] + for tensor_spec in input_tensor_spec: + self._input_feature_names.append(tensor_spec.name) super().__init__(model=shap_compatible_model, - input_feature_names=input_feature_names) - self._features_text = features_text - self._features_numerical = features_numerical - _LOGGER.debug(f'Initialized with features: {self._features_numerical, self._features_text}') + input_feature_names=self._input_feature_names) + _LOGGER.debug(f'Initialized with features: {self._input_feature_names}') def adapt(self, dataset: tf.data.Dataset, @@ -53,12 +50,8 @@ def select_only_benign_samples(x, y): dataset = dataset.map(lambda x, y: x) # Drop labels dataset = dataset.take(n_reference_samples).as_numpy_iterator() data = list(dataset) # Load to RAM - # data : Tuple(text_features, numerical_features) - shap_data = pd.DataFrame([text_features for text_features, _ in data], - columns=self._features_text) - shap_data = pd.concat((shap_data, - pd.DataFrame([numerical_features for _, numerical_features in data], - columns=self._features_numerical)), axis=1) + # data : Tuple[Union[tf.string, tf.float32 with shape (n_reference_samples, n_features) ]]) + shap_data = pd.DataFrame(data=data, columns=self._input_feature_names) del data reference_data: np.ndarray = shap_data.values gc.collect() diff --git a/src/rdds/variant_rank_score/model/shap_compatible_model.py b/src/rdds/variant_rank_score/model/shap_compatible_model.py index 4f159606..7612a89c 100644 --- a/src/rdds/variant_rank_score/model/shap_compatible_model.py +++ b/src/rdds/variant_rank_score/model/shap_compatible_model.py @@ -1,5 +1,5 @@ import numpy as np -from typing import List, Tuple, Callable +from typing import Tuple, Callable, Dict import tensorflow as tf from rdds.lib.model_explanation.shap import ShapCompatibleSerializableModel @@ -18,11 +18,13 @@ class ShapCompatibleModel(ShapCompatibleSerializableModel): def __init__(self, keras_model: Callable, - features_text: List[str], - features_numerical: List[str]): + input_tensor_spec: Tuple[tf.TensorSpec, ...]): + """ + :param keras_model: A callable that accepts input data and generates inferences + :param input_tensor_spec: Model input specification, order sensitive + """ self._keras_model: Callable = keras_model - self._features_text = features_text - self._features_numerical = features_numerical + self._input_tensor_spec = input_tensor_spec def save(self, shap_model, file_pointer): # Nothing to save, rely on load_from_preloaded_keras_model() @@ -40,16 +42,17 @@ def load_from_prior_keras_model(*args, **kwargs): """ return ShapCompatibleModel(*args, **kwargs) - def _to_tensors(self, array: np.ndarray) -> Tuple[tf.Tensor, tf.Tensor]: + def _to_tensors(self, array: np.ndarray) -> Dict[str, tf.Tensor]: """ Convert array of mixed-type data to separate Tensors with defined dtypes. """ - txt_batch = array[:, 0:len(self._features_text)] - numerical_batch = array[:, len(self._features_text):] - txt_tensor = tf.constant(txt_batch, dtype=tf.string) - num_tensor = tf.constant(numerical_batch, dtype=tf.float32) - return txt_tensor, num_tensor + tensors: Dict[str, tf.Tensor] = dict() + for col_idx, tensor_spec in enumerate(self._input_tensor_spec): + tensors.update({ + tensor_spec.name: tf.constant(array[:, col_idx], dtype=tensor_spec.dtype) + }) + return tensors def __call__(self, array: np.ndarray) -> np.ndarray: """ @@ -57,5 +60,5 @@ def __call__(self, array: np.ndarray) -> np.ndarray: and run the model. Return inferences as np.ndarray shaped [batch_dim] """ tensors = self._to_tensors(array) - inferences: np.ndarray = self._keras_model(*tensors) + inferences: np.ndarray = self._keras_model(tensors) return np.array(inferences) From 9f54bad987e9ce92ea2f8ae06ba3f99ac3e21c73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 19 Feb 2025 11:49:00 +0100 Subject: [PATCH 06/15] fixup! VRS: Refactor tensor input as separate tensors --- src/rdds/variant_rank_score/model/model.py | 30 +++++++++++++--------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index 3093bfd1..b221ba50 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -639,10 +639,11 @@ def _load_saved_model_explainer(self, model_explainer_path: str): if self._keras_model is None: raise ValueError(f'Loading ModelExplainer requires a pre-loaded keras model, none currently loaded') # NOTE: Changes to below configuration must be reflected in the self.train_model_explainer() + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels self._model_explainer = ModelExplainer.from_saved_file(file_path=model_explainer_path, keras_model=self._infer_pathogenicity_scores, - features_text=self._features_text, - features_numerical=self._features_numerical) + input_tensor_spec=model_input_data_spec) def load_saved_model(self, keras_model_path: str = DEFAULT_MODEL_SPEC.keras_model, @@ -671,12 +672,14 @@ def _infer_pathogenicity_scores(self, def predict_on_hd5(self, hd5_file_path: str, - group_names: Set[str] = {'train', 'test'}) -> str: + group_names: Set[str] = {'train', 'test'}, + batch_size: int = 1000) -> str: """ Creates a .hd5 file containing inpute feature data, ground truth and inferences side-by-side. :param hd5_file_path: The file to the input data file for creating inferences :param group_names: The group names in the hd5 to load data and to compute inferences for + :param batch_size: Batch size, a large batch size improves speed :returns: The path to the .hd5 file containing data and inferences """ @@ -698,7 +701,7 @@ def predict_on_hd5(self, expand_1d_categorical_to_2d=True) dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=datagen, output_signature=self._generate_dataset_tensor_signature()) - dataset = dataset.batch(10000) + dataset = dataset.batch(batch_size) dataset = dataset.prefetch(buffer_size=10) data_length = datagen.data_length @@ -724,17 +727,20 @@ def predict_on_hd5(self, shape=(data_length, )) processed_sample_idx = 0 + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels for data, labels in dataset.as_numpy_iterator(): + data: Tuple[tf.Tensor] label, = labels label_class_pathogenic = label[:, 1] - tensor_str, tensor_numerical = data - prediction_class_pathogenic = self._infer_pathogenicity_scores(tensor_text=tensor_str, - tensor_numerical=tensor_numerical) - batch_size = tensor_str.shape[0] - for feature_names, features_data in [(self._features_text, tensor_str), - (self._features_numerical, tensor_numerical)]: - for feature_idx, feature_name in enumerate(feature_names): - output_group[feature_name][processed_sample_idx:processed_sample_idx+batch_size] = features_data[:, feature_idx] + input_tensor_dict: Dict[str, tf.Tensor] = {} + for input_feature_idx, tensor_spec in enumerate(model_input_data_spec): + input_tensor_dict.update({ + tensor_spec.name: tf.constant(data[input_feature_idx], dtype=tensor_spec.dtype, name=tensor_spec.name) + }) + prediction_class_pathogenic = self._infer_pathogenicity_scores(tensor_dict=input_tensor_dict) + for feature_name, tensor in input_tensor_dict.items(): + output_group[feature_name][processed_sample_idx:processed_sample_idx + batch_size] = tensor.numpy() output_group['ground-truth'][processed_sample_idx:processed_sample_idx+batch_size] = label_class_pathogenic output_group['prediction'][processed_sample_idx:processed_sample_idx + batch_size] = prediction_class_pathogenic processed_sample_idx += batch_size From 40da6710e7bea9f315639867d424851fc2fdf132 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 19 Feb 2025 15:43:12 +0100 Subject: [PATCH 07/15] fixup! VRS: Refactor tensor input as separate tensors --- src/rdds/variant_rank_score/model/model.py | 41 +++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index b221ba50..809b6efb 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -779,27 +779,36 @@ def get_num_feature(variant: ParsableVariant, return value return 0.0 - text_features_batch: List[List[bytes]] = [] - numerical_features_batch: List[List[float]] = [] - for variant in variants: - text_features = [get_str_feature(variant, feature_name) for feature_name in self._features_text] - text_features_batch.append(text_features) - numerical_features = [get_num_feature(variant, feature_name) for feature_name in self._features_numerical] - numerical_features_batch.append(numerical_features) - tensor_text = tf.constant(text_features_batch, dtype=tf.string) - tensor_numerical = tf.constant(numerical_features_batch, dtype=tf.float32) - pathogenicity_scores = self._infer_pathogenicity_scores(tensor_text=tensor_text, - tensor_numerical=tensor_numerical) - + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels + input_dict: Dict[str, Union[List, tf.Tensor]] = {} + for tensor_spec in model_input_data_spec: + input_dict.update({tensor_spec.name: []}) + for variant in variants: + if tensor_spec.dtype == tf.string: + input_dict[tensor_spec.name].append(get_str_feature(variant=variant, name=tensor_spec.name)) + elif tensor_spec.dtype == tf.float32: + input_dict[tensor_spec.name].append(get_num_feature(variant=variant, name=tensor_spec.name)) + else: + raise ValueError(f'Unmapped input data dtype spec: {tensor_spec}') + # Convert to Tensor + tensor_data = input_dict[tensor_spec.name].copy() + input_dict[tensor_spec.name] = tf.constant(value=tensor_data, + dtype=tensor_spec.dtype, + name=tensor_spec.name) + pathogenicity_scores = self._infer_pathogenicity_scores(tensor_dict=input_dict) if len(pathogenicity_scores) != len(variants): raise ValueError(f'Expected same amount of predictions as input data') idx_scores_above_threshold = np.flatnonzero(pathogenicity_scores >= explain_variant_score_threshold) - arr = np.concatenate((tensor_text.numpy(), tensor_numerical.numpy()), axis=1) - explanations_full = np.empty_like(arr) + df_dict = {} + for tensor_name, tensor in input_dict.items(): + df_dict.update({tensor_name: tensor.numpy()[idx_scores_above_threshold]}) + df_selected_variants_for_explanation = pd.DataFrame.from_dict(df_dict) + explanations_full = np.empty(shape=(len(variants), len(df_selected_variants_for_explanation.columns))) explanations_full.fill(np.nan) if len(idx_scores_above_threshold) > 0: - explanations_full[idx_scores_above_threshold, :] = self._model_explainer.shap_values(X=arr[idx_scores_above_threshold, :], - gc_collect=True) # FIXME: Method call + explanations_full[idx_scores_above_threshold, :] = self._model_explainer.shap_values(X=df_selected_variants_for_explanation.values, + gc_collect=True) # FIXME: Method call not supposed to be erroneous by typechecker explanations_df = pd.DataFrame(data=explanations_full, columns=self._features) result_df = pd.concat(objs=(pd.Series(pathogenicity_scores, name='pathogenicity_score'), explanations_df), From 11fe8f0f68955694697e9e73597f611d79648b92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Thu, 20 Feb 2025 09:15:12 +0100 Subject: [PATCH 08/15] VRS: Remove TODO on ModelScore in model.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/variant_rank_score/model/model.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index 809b6efb..a0a125f2 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -53,9 +53,6 @@ class InitializedDatasets: If this is not the case, the layer might be removed in the graph! """ -# TODO: Determine whether to use GeneticModels_family_id? -# TODO: Determine whether to use ModelScore_family_id? - # See comment in the text_vectorization_layer.py about keras and tensorboard. _LOGGER = get_logger('vrs-model') _LOGGER.setLevel(logging.INFO) From dd9f7c13888d401db19073c7f44161275c2e6e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Mon, 3 Mar 2025 15:55:34 +0100 Subject: [PATCH 09/15] VRS: Train using BinaryCrossentrophy loss MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/variant_rank_score/model/model.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index a0a125f2..92201d4c 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -295,10 +295,10 @@ def _build_model(self, # Specify network out shape - logits = tf.keras.layers.Dense(units=2, name='Logits', activation='linear')(x) # -> [bdim, 2] + logits = tf.keras.layers.Dense(units=1, name='Logits', activation='linear')(x) # -> [bdim, 1] # Softmax layer - confidences = tf.keras.layers.Softmax(name='Confidences')(logits) # -> [bdim, 2] + confidences = tf.keras.layers.Softmax(name='Confidences')(logits) # -> [bdim, 1] def _get_input_tensor_with_name(name: str) -> Union[tf.Tensor, tf.RaggedTensor]: # Helper to assemble input tensor in order defined by self._features @@ -318,7 +318,7 @@ def _get_input_tensor_with_name(name: str) -> Union[tf.Tensor, tf.RaggedTensor]: tf.keras.metrics.TrueNegatives(), tf.keras.metrics.FalsePositives(), tf.keras.metrics.FalseNegatives(), - tf.keras.metrics.CategoricalAccuracy(), + tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()] @@ -360,7 +360,7 @@ def loss_wrapper(x, y, y_pred, sample_weight): @staticmethod @tf.keras.saving.register_keras_serializable() def loss_fn(y_true, y_pred) -> tf.Tensor: - c = tf.keras.losses.categorical_crossentropy(y_true=y_true, y_pred=y_pred, from_logits=False) + c = tf.keras.losses.binary_crossentropy(y_true=y_true, y_pred=y_pred, from_logits=False) return c def save_model_fn(self, epoch: int, logs=Optional[dict]): @@ -394,7 +394,7 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT input_tensor_signatures += (_get_input_signature_from_name(name=feature_name), ) signature = ( input_tensor_signatures, - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), ) + (tf.TensorSpec((1, ), dtype=tf.float32, name='label'), ) ) return signature @@ -413,7 +413,8 @@ def _init_datasets(self, hd5_data_generator_train: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name='train', output_tensor_format=self._features, - label='label') + label='label', + expand_1d_categorical_to_2d=False) dataset_train: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_train, output_signature=self._generate_dataset_tensor_signature()) @@ -456,7 +457,8 @@ def _init_datasets(self, hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name='test', output_tensor_format=self._features, - label='label') + label='label', + expand_1d_categorical_to_2d=False) dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, output_signature=self._generate_dataset_tensor_signature()) dataset_test = dataset_test.cache() From fbb502ebaed4902e1c37c88713f1bed2ffb1cbc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Tue, 4 Mar 2025 13:01:56 +0100 Subject: [PATCH 10/15] hd5_data_generator: Helper fn to count TN, TPs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/lib/hdf5/hd5_data_generator.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/rdds/lib/hdf5/hd5_data_generator.py b/src/rdds/lib/hdf5/hd5_data_generator.py index b39f1926..7f2ce469 100644 --- a/src/rdds/lib/hdf5/hd5_data_generator.py +++ b/src/rdds/lib/hdf5/hd5_data_generator.py @@ -184,6 +184,18 @@ def _expand_categorical_label_1d_to_2d(label: float) -> np.array: Hd5DataGenerator._check_is_valid_categorical_label(label=label) return(1.0 - label, label) + def count_positive_negative_categorical_labels(self) -> Tuple[int, int]: + """ + Count occurrence of TN, TN labels in dataset according to 'label'. + Assumes categorical labels, 1D. + :return Tuple count of TP, TN label count + """ + positives = self._group[self._label].sum() + totals = self._data_length + negatives = totals - positives + return positives, negatives + + def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]: """ Yields data from HD5 data set. From 2cad92b05de4565c8f8e2bbab7d5f5b4ccddc93b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Mon, 3 Mar 2025 16:33:09 +0100 Subject: [PATCH 11/15] VRS: Bias output layer, training weights and dataset sampling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add different methods to skew the model to fit real world patient case scenario. Signed-off-by: Tor Björgen --- src/rdds/variant_rank_score/model/model.py | 97 +++++++++++++++++++--- 1 file changed, 86 insertions(+), 11 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index 92201d4c..6619b663 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -31,6 +31,7 @@ from rdds.lib.vcf import ParsableVariant from .model_explainer import ModelExplainer from .default_model import DEFAULT_MODEL_SPEC +from ..dataset.class_labels import LABEL_PATHOGENIC_VARIANT, LABEL_BENIGN_VARIANT @@ -41,6 +42,7 @@ class InitializedDatasets: train_data_length: int test_data_length: int batch_size: int + model_bias_estimate: float # Estimate bias for model output layer, computed from training data class ratio dataset_train_numerical: tf.data.Dataset = None dataset_train_vocabulary: tf.data.Dataset = None @@ -295,10 +297,12 @@ def _build_model(self, # Specify network out shape - logits = tf.keras.layers.Dense(units=1, name='Logits', activation='linear')(x) # -> [bdim, 1] - - # Softmax layer - confidences = tf.keras.layers.Softmax(name='Confidences')(logits) # -> [bdim, 1] + # Set initial bias in the model output layer to bias the model to expected TP/TN skew + bias_initializer = tf.keras.initializers.Constant(self._datasets.model_bias_estimate) + confidences = tf.keras.layers.Dense(units=1, + bias_initializer=bias_initializer, + name='Confidences', + activation='softmax')(x) # -> [bdim, 1] def _get_input_tensor_with_name(name: str) -> Union[tf.Tensor, tf.RaggedTensor]: # Helper to assemble input tensor in order defined by self._features @@ -419,12 +423,78 @@ def _init_datasets(self, dataset_train: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_train, output_signature=self._generate_dataset_tensor_signature()) dataset_train = dataset_train.cache() + + # Training weights + n_pathogenic, n_benign = hd5_data_generator_train.count_positive_negative_categorical_labels() + _LOGGER.info(f'nTP:{n_pathogenic} ({100*n_pathogenic/hd5_data_generator_train.data_length:.4f}%) \ +, nTN:{n_benign}, n_samples:{hd5_data_generator_train.data_length}') + + # Compute expected model bias based on training data skew, bias = TPs / TNs + model_bias_estimate: float = np.log(float(n_pathogenic) / float(n_benign)) + + @tf.function + def add_weights(data, labels, **kwargs): + """ + Helper function to compute weights for balanced loss + """ + weight_pathogenic = kwargs.get('weight_pathogenic') + weight_benign = kwargs.get('weight_benign') + weights = tf.where(condition=tf.equal(labels, tf.constant(LABEL_PATHOGENIC_VARIANT)), + x=tf.ones_like(labels) * tf.constant(weight_pathogenic), # cond == True + y=tf.ones_like(labels) * tf.constant(weight_benign)) # cond == False + return data, labels, weights + + training_weights = hparams.Choice('training_weights', + values=[False, True], + default=False) + if training_weights: + # Setup class weights so that dataset is perfectly balanced w.r.t class-ratio-loss imbalance + weight_pathogenic = (1.0 / float(n_pathogenic)) * (float(hd5_data_generator_train.data_length) / 2.0) + weight_benign = (1.0 / float(n_benign)) * (float(hd5_data_generator_train.data_length) / 2.0) + _LOGGER.info(f'class weights: benign:{weight_benign}, pathogenic:{weight_pathogenic}') + assert weight_pathogenic >= weight_benign + dataset_train = dataset_train.map(map_func=lambda *args: add_weights(*args, + weight_pathogenic=weight_pathogenic, + weight_benign=weight_benign), + num_parallel_calls=tf.data.AUTOTUNE) + dataset_train = dataset_train.cache() dataset_train = dataset_train.repeat(-1) + + # Training occurrence sampling + expected_amount_of_variants_in_case = float(3.5E6) + likelihood_pathogenic = 1.0 / expected_amount_of_variants_in_case + training_occurrence_frq_sampling = hparams.Choice('training_occurrence_frq_sampling', + values=[True, False], + default=False) + + @tf.function + def filt_fn(data, labels, weights, **kwargs): + """ + Helper function to filter data samples on label + """ + del data + del weights + target_label = kwargs.get('target_label') + predicate = tf.equal(labels, target_label)[0, 0] + return predicate + + if training_occurrence_frq_sampling: + _LOGGER.info(f'Sampling pathogenic variants during training with likelihood of {likelihood_pathogenic}') + sampling_weights = (1.0 - likelihood_pathogenic, likelihood_pathogenic) + _LOGGER.info(f'Sampling weights (benign, pathogenic): {sampling_weights}') + train_pathogenic_variants = \ + dataset_train.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) + train_benign_variants = \ + dataset_train.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) + dataset_train = tf.data.Dataset.sample_from_datasets(datasets=(train_benign_variants, train_pathogenic_variants), + weights=sampling_weights, + seed=1) + # Setup new bias estimate, since training data is now skewed + model_bias_estimate = np.log(likelihood_pathogenic / expected_amount_of_variants_in_case) + dataset_train = dataset_train.shuffle(buffer_size=int(5E5), seed=1) # FIXME: Seed - #dataset_train = rejection_resample(dataset=dataset_train, - # desired_class_ratio=[0.5, 0.5], - # seed=1) + dataset_train = dataset_train.batch(batch_size) dataset_train = dataset_train.prefetch(buffer_size=tf.data.AUTOTUNE) @@ -463,11 +533,15 @@ def _init_datasets(self, output_signature=self._generate_dataset_tensor_signature()) dataset_test = dataset_test.cache() dataset_test = dataset_test.repeat(-1) + + if training_occurrence_frq_sampling: + dataset_test_pathogenic_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) + dataset_test_benign_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) + dataset_test = tf.data.Dataset.sample_from_datasets(datasets=(dataset_test_benign_variants, dataset_test_pathogenic_variants), + weights=(1.0 - likelihood_pathogenic, likelihood_pathogenic), + seed=1) dataset_test = dataset_test.shuffle(buffer_size=int(5E5), seed=1) # FIXME: Seed - #dataset_test = rejection_resample(dataset=dataset_test, - # desired_class_ratio=[0.5, 0.5], - # seed=1) dataset_test = dataset_test.batch(batch_size) dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) @@ -477,7 +551,8 @@ def _init_datasets(self, dataset_test=dataset_test, train_data_length=hd5_data_generator_train.data_length, test_data_length=hd5_data_generator_test.data_length, - batch_size=batch_size) + batch_size=batch_size, + model_bias_estimate=model_bias_estimate) _LOGGER.info(f'Datasets init complete: {self._datasets}') @staticmethod From 5b41809c4b7a1d5ee2d65488b466bb1776d383c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 5 Mar 2025 10:06:52 +0100 Subject: [PATCH 12/15] VRS: Cmdline option to profile data pipeline --- src/rdds/variant_rank_score/__main__.py | 44 +++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/rdds/variant_rank_score/__main__.py b/src/rdds/variant_rank_score/__main__.py index 248c7ee6..149d481f 100644 --- a/src/rdds/variant_rank_score/__main__.py +++ b/src/rdds/variant_rank_score/__main__.py @@ -155,5 +155,49 @@ def view_ranked_vcf(args): case_names=case_names) subparser.set_defaults(func=view_ranked_vcf) + +subparser = subparsers.add_parser('pipeline-performance-test', help='Profile data pipeline and view results in Tensorboard') +subparser.add_argument('hd5', help='Path to .hd5 file to be used as training, validation data') +subparser.add_argument('--batches', help='Number of batches to profile', default=int(10)) +subparser.add_argument('--include_dataset_init', help='Include dataset bootstrapping (biasing result)', default=False) +subparser.add_argument('--start_on_first_epoch_end', help='Start profiling pipeline once all data is cached', default=True) +def profile_data_pipeline(args): + from math import ceil + from progressbar import ProgressBar + from .model import VariantRankScoreModel + from rdds.lib.tf.profiler import TfProfiler, Trace + batches = int(args.batches) + model = VariantRankScoreModel() + hparams = model.get_uninitialized_hyperparameters() + hparams.Int('batch_size', min_value=32, max_value=128, default=64) + model._init_datasets(hd5_file_path=args.hd5, + hparams=hparams, + compile_vocabulary_normalisation_factors=False, + init_test_data=False) + profile_from_batch = 0 + start_batch = 0 + stop_batch = start_batch + batches + if args.start_on_first_epoch_end: + batches_per_epoch = int(ceil(model._datasets.train_data_length / hparams.get('batch_size'))) + profile_from_batch = batches_per_epoch + stop_batch = batches_per_epoch + batches + pbar = ProgressBar(max_value=stop_batch) + pbar.start() + profiler = TfProfiler(logdir=os.path.join(model._workdir, 'profiler')) + dataset = model._datasets.dataset_train.__iter__() + for batch in range(start_batch, stop_batch): + if args.include_dataset_init or batch > profile_from_batch: + print(f'Profiling batch {batch}') + profiler.start_if_not_running() + with Trace('batch', step_num=batch, _r=1): + _ = next(dataset) + else: + _ = next(dataset) # Just discard data, profiling will happen some other time + pbar.update(batch) + profiler.stop() + pbar.finish() + +subparser.set_defaults(func=profile_data_pipeline) + args = parser.parse_args() args.func(args) # Callback to trigger func with args From 7d2a83a364622b621d6e4fc30bfdedbe191ae848 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 5 Mar 2025 11:07:10 +0100 Subject: [PATCH 13/15] Add TF profiler module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/lib/tf/profiler/README.md | 32 ++++++++++++++ src/rdds/lib/tf/profiler/__init__.py | 1 + src/rdds/lib/tf/profiler/tf_profiler.py | 59 +++++++++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 src/rdds/lib/tf/profiler/README.md create mode 100644 src/rdds/lib/tf/profiler/__init__.py create mode 100644 src/rdds/lib/tf/profiler/tf_profiler.py diff --git a/src/rdds/lib/tf/profiler/README.md b/src/rdds/lib/tf/profiler/README.md new file mode 100644 index 00000000..d1906676 --- /dev/null +++ b/src/rdds/lib/tf/profiler/README.md @@ -0,0 +1,32 @@ +# Tensorflow Profiler + +This tool is used to view performance of TF graphs and pipelines. + +## Tensorboard Dependencies +Pyenv must have the `tensorboard-plugin-profile==2.13.1` installed +in order to view profiling results. + +## GPU Limitations +Profiling on GPUs requires the Nvidia CUPTI library, if this is not +present, profiling will fail. + +If CUPTI lib is not available, one can disable GPUs temporarily +by setting environment variable `CUDA_VISIBLE_DEVICES=""`. + +## Tracing + +One must use the `Trace` context manager to record profiling +traces for actions. + +```python +dataset: tf.data.Dataset +profiler = TfProfiler(logdir=logdir) +profiler.start() +dataset = dataset.__iter__() +for step in range(10): + with Trace('batch', step_num=step, _r=1): + _ = next(dataset) +profiler.stop() +``` + +> `_r` keyword argument makes this trace event get processed as a step event by the Profiler. diff --git a/src/rdds/lib/tf/profiler/__init__.py b/src/rdds/lib/tf/profiler/__init__.py new file mode 100644 index 00000000..4ddbeb63 --- /dev/null +++ b/src/rdds/lib/tf/profiler/__init__.py @@ -0,0 +1 @@ +from .tf_profiler import TfProfiler, Trace \ No newline at end of file diff --git a/src/rdds/lib/tf/profiler/tf_profiler.py b/src/rdds/lib/tf/profiler/tf_profiler.py new file mode 100644 index 00000000..b947190f --- /dev/null +++ b/src/rdds/lib/tf/profiler/tf_profiler.py @@ -0,0 +1,59 @@ +from tensorflow.python.profiler import profiler_v2 as profiler +from tensorflow.python.eager.profiler import maybe_create_event_file as legacy_tensorboard_event_file_creator +from tensorflow.python.profiler.trace import Trace + +from rdds.lib.logging import get_logger +_LOGGER = get_logger(name='tf-profiler', log_level='info') + +DEFAULT_PROFILER_OPTIONS = profiler.ProfilerOptions(host_tracer_level=3, + python_tracer_level=1, + device_tracer_level=1) + +class TfProfiler: + + """ + Helper class to run profiling of TF graps. + + Use this class in conjunction with Trace(). + + NOTE: It's undefined to run both keras.tensorboard callback profiling and TfProfiler simultaneously. + + NOTE: The Keras TensorBoard callback will automatically perform sampled + profiling. Before enabling customized profiling, set the callback flag + "profile_batches=[]" to disable automatic sampled profiling. + + NOTE: on GPU prerequisites: https://www.tensorflow.org/guide/profiler#install_the_profiler_and_gpu_prerequisites + + https://www.tensorflow.org/guide/profiler#profiling_custom_training_loops + """ + + def __init__(self, logdir: str): + self._logdir = logdir + self._started = False + + def start_if_not_running(self, *args, **kwargs): + if self._started: + return + self.start(*args, **kwargs) + + def start(self, default_profiler_options: profiler.ProfilerOptions = DEFAULT_PROFILER_OPTIONS): + """ + Immediately start to profile once this is called, regardless whether a Trace is created or not. + Make sure to call this at the appropriate time to get the intended profiling results + (profiler can only support a limited amount of traces (further limited by tracing depth). + :param default_profiler_options: + :return: + """ + _LOGGER.info(f'Profiling logs saved in {self._logdir}') + # If event file is not created, Tensorboard won't show profiling results + legacy_tensorboard_event_file_creator(self._logdir) + profiler.start(self._logdir, options=default_profiler_options) + self._started = True + + def stop(self): + profiler.stop() + msg = (f'\ +Profiling complete. View results by: \ +PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python python3 -m tensorboard.main --port 4000 --logdir {self._logdir}\n \ +firefox http://localhost:4000#profile') + _LOGGER.info(msg) From 91f1721433759b5f7491a378d2f7c228b79db839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 5 Mar 2025 13:14:24 +0100 Subject: [PATCH 14/15] VRS: Option to not init test data set MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/variant_rank_score/model/model.py | 50 ++++++++++++---------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index 6619b663..6110a827 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -405,7 +405,8 @@ def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedT def _init_datasets(self, hd5_file_path: str, hparams: HyperParameters, - compile_vocabulary_normalisation_factors: bool = True) -> InitializedDatasets: + compile_vocabulary_normalisation_factors: bool = True, + init_test_data: bool = True) -> InitializedDatasets: batch_size: int = hparams.Int('batch_size', min_value=64, @@ -524,33 +525,38 @@ def filt_fn(data, labels, weights, **kwargs): output_signature=input_signature_numerical_normalisation) # Testing setup - hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, - group_name='test', - output_tensor_format=self._features, - label='label', - expand_1d_categorical_to_2d=False) - dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, - output_signature=self._generate_dataset_tensor_signature()) - dataset_test = dataset_test.cache() - dataset_test = dataset_test.repeat(-1) - - if training_occurrence_frq_sampling: - dataset_test_pathogenic_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) - dataset_test_benign_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) - dataset_test = tf.data.Dataset.sample_from_datasets(datasets=(dataset_test_benign_variants, dataset_test_pathogenic_variants), - weights=(1.0 - likelihood_pathogenic, likelihood_pathogenic), - seed=1) - dataset_test = dataset_test.shuffle(buffer_size=int(5E5), - seed=1) # FIXME: Seed - dataset_test = dataset_test.batch(batch_size) - dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) + if init_test_data: + hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, + group_name='test', + output_tensor_format=self._features, + label='label', + expand_1d_categorical_to_2d=False) + dataset_test_length = hd5_data_generator_test.data_length + dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, + output_signature=self._generate_dataset_tensor_signature()) + dataset_test = dataset_test.cache() + dataset_test = dataset_test.repeat(-1) + + if training_occurrence_frq_sampling: + dataset_test_pathogenic_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) + dataset_test_benign_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) + dataset_test = tf.data.Dataset.sample_from_datasets(datasets=(dataset_test_benign_variants, dataset_test_pathogenic_variants), + weights=(1.0 - likelihood_pathogenic, likelihood_pathogenic), + seed=1) + dataset_test = dataset_test.shuffle(buffer_size=int(5E5), + seed=1) # FIXME: Seed + dataset_test = dataset_test.batch(batch_size) + dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) + else: + dataset_test = None + dataset_test_length = None self._datasets = InitializedDatasets(dataset_train_numerical=dataset_numerical, dataset_train_vocabulary=dataset_vocabulary, dataset_train=dataset_train, dataset_test=dataset_test, train_data_length=hd5_data_generator_train.data_length, - test_data_length=hd5_data_generator_test.data_length, + test_data_length=dataset_test_length, batch_size=batch_size, model_bias_estimate=model_bias_estimate) _LOGGER.info(f'Datasets init complete: {self._datasets}') From 4d23825981332890ea2dce827c3b431e8fa593b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tor=20Bj=C3=B6rgen?= Date: Wed, 5 Mar 2025 15:59:28 +0100 Subject: [PATCH 15/15] HD5 data loader: Discard data on epoch complete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tor Björgen --- src/rdds/lib/hdf5/hd5_data_generator.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/rdds/lib/hdf5/hd5_data_generator.py b/src/rdds/lib/hdf5/hd5_data_generator.py index 7f2ce469..1f5ba24c 100644 --- a/src/rdds/lib/hdf5/hd5_data_generator.py +++ b/src/rdds/lib/hdf5/hd5_data_generator.py @@ -31,6 +31,7 @@ def __init__(self, replace_nan_floats_with: float = 0.0, load_data_into_ram: bool = True, expand_1d_categorical_to_2d: bool = True, + discard_loaded_data_on_epoch_complete: bool = True, max_n_samples: int = None): """ :param hd5_file_path: HD5 file to generate data from @@ -41,6 +42,7 @@ def __init__(self, :param replace_nan_floats_with: Floating point value to replace NaN values :param load_data_into_ram: Load file content into RAM if True. Improves reading performance. :param expand_1d_categorical_to_2d: Unpack a 1D categorical label [0][TN] into 2D; [1, 0][TN, TP] + :param discard_loaded_data_on_epoch_complete: Drop references to data once data has been read once by caller (reduces RAM footprint) :param max_n_samples: Maximum samples to yield """ self._hd5_file_path: str = hd5_file_path @@ -48,6 +50,7 @@ def __init__(self, self._data_length = int self._replace_nan_floats_with = replace_nan_floats_with self._expand_1d_categorical_to_2d: bool = expand_1d_categorical_to_2d + self._discard_loaded_data_on_epoch_complete = discard_loaded_data_on_epoch_complete self._group_name = group_name if group_name else list(self._hd5_file.keys())[0] _LOGGER.info(f'Generating data from group \'{group_name}\'') @@ -218,6 +221,10 @@ def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]: idx += 1 yield output_vector _LOGGER.debug('Restart epoch') + if self._discard_loaded_data_on_epoch_complete: + _LOGGER.info('Closing and discarding data in RAM') + del self._group # Drop references + self._hd5_file.close() # Close file def __del__(self): try: