diff --git a/src/rdds/lib/hdf5/hd5_data_generator.py b/src/rdds/lib/hdf5/hd5_data_generator.py index 0b6e4306..1f5ba24c 100644 --- a/src/rdds/lib/hdf5/hd5_data_generator.py +++ b/src/rdds/lib/hdf5/hd5_data_generator.py @@ -30,7 +30,9 @@ def __init__(self, label: str = None, replace_nan_floats_with: float = 0.0, load_data_into_ram: bool = True, - expand_1d_categorical_to_2d: bool = True): + expand_1d_categorical_to_2d: bool = True, + discard_loaded_data_on_epoch_complete: bool = True, + max_n_samples: int = None): """ :param hd5_file_path: HD5 file to generate data from :param group_name: Group in HD5 file to read data from (reads all datasets) @@ -40,12 +42,15 @@ def __init__(self, :param replace_nan_floats_with: Floating point value to replace NaN values :param load_data_into_ram: Load file content into RAM if True. Improves reading performance. :param expand_1d_categorical_to_2d: Unpack a 1D categorical label [0][TN] into 2D; [1, 0][TN, TP] + :param discard_loaded_data_on_epoch_complete: Drop references to data once data has been read once by caller (reduces RAM footprint) + :param max_n_samples: Maximum samples to yield """ self._hd5_file_path: str = hd5_file_path self._hd5_file = h5py.File(name=self._hd5_file_path, mode='r') self._data_length = int self._replace_nan_floats_with = replace_nan_floats_with self._expand_1d_categorical_to_2d: bool = expand_1d_categorical_to_2d + self._discard_loaded_data_on_epoch_complete = discard_loaded_data_on_epoch_complete self._group_name = group_name if group_name else list(self._hd5_file.keys())[0] _LOGGER.info(f'Generating data from group \'{group_name}\'') @@ -56,7 +61,10 @@ def __init__(self, _LOGGER.info(f'Loading data to RAM.') in_ram_group: Dict[str, np.ndarray] = {} # Mock the Group dictionary API by using a dict for dataset_name in self._group.keys(): - in_ram_group.update({dataset_name: self._group[dataset_name][:]}) + if max_n_samples is not None: + in_ram_group.update({dataset_name: self._group[dataset_name][0:max_n_samples]}) + else: + in_ram_group.update({dataset_name: self._group[dataset_name][:]}) self._group: Dict[str, np.ndarray] = in_ram_group _LOGGER.info('RAM loading complete.') @@ -82,6 +90,8 @@ def __init__(self, if dataset.shape != zeroeth_dataset.shape: raise ValueError(f'Not identical dataset shapes, got {dataset.shape}!={zeroeth_dataset.shape}') self._data_length: int = zeroeth_dataset.shape[0] + if max_n_samples is not None: + self._data_length = max_n_samples _LOGGER.info(f'{self._data_length} samples across {len(self._group.keys())} features') @property @@ -177,6 +187,18 @@ def _expand_categorical_label_1d_to_2d(label: float) -> np.array: Hd5DataGenerator._check_is_valid_categorical_label(label=label) return(1.0 - label, label) + def count_positive_negative_categorical_labels(self) -> Tuple[int, int]: + """ + Count occurrence of TN, TN labels in dataset according to 'label'. + Assumes categorical labels, 1D. + :return Tuple count of TP, TN label count + """ + positives = self._group[self._label].sum() + totals = self._data_length + negatives = totals - positives + return positives, negatives + + def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]: """ Yields data from HD5 data set. @@ -199,6 +221,10 @@ def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]: idx += 1 yield output_vector _LOGGER.debug('Restart epoch') + if self._discard_loaded_data_on_epoch_complete: + _LOGGER.info('Closing and discarding data in RAM') + del self._group # Drop references + self._hd5_file.close() # Close file def __del__(self): try: diff --git a/src/rdds/lib/tf/profiler/README.md b/src/rdds/lib/tf/profiler/README.md new file mode 100644 index 00000000..d1906676 --- /dev/null +++ b/src/rdds/lib/tf/profiler/README.md @@ -0,0 +1,32 @@ +# Tensorflow Profiler + +This tool is used to view performance of TF graphs and pipelines. + +## Tensorboard Dependencies +Pyenv must have the `tensorboard-plugin-profile==2.13.1` installed +in order to view profiling results. + +## GPU Limitations +Profiling on GPUs requires the Nvidia CUPTI library, if this is not +present, profiling will fail. + +If CUPTI lib is not available, one can disable GPUs temporarily +by setting environment variable `CUDA_VISIBLE_DEVICES=""`. + +## Tracing + +One must use the `Trace` context manager to record profiling +traces for actions. + +```python +dataset: tf.data.Dataset +profiler = TfProfiler(logdir=logdir) +profiler.start() +dataset = dataset.__iter__() +for step in range(10): + with Trace('batch', step_num=step, _r=1): + _ = next(dataset) +profiler.stop() +``` + +> `_r` keyword argument makes this trace event get processed as a step event by the Profiler. diff --git a/src/rdds/lib/tf/profiler/__init__.py b/src/rdds/lib/tf/profiler/__init__.py new file mode 100644 index 00000000..4ddbeb63 --- /dev/null +++ b/src/rdds/lib/tf/profiler/__init__.py @@ -0,0 +1 @@ +from .tf_profiler import TfProfiler, Trace \ No newline at end of file diff --git a/src/rdds/lib/tf/profiler/tf_profiler.py b/src/rdds/lib/tf/profiler/tf_profiler.py new file mode 100644 index 00000000..b947190f --- /dev/null +++ b/src/rdds/lib/tf/profiler/tf_profiler.py @@ -0,0 +1,59 @@ +from tensorflow.python.profiler import profiler_v2 as profiler +from tensorflow.python.eager.profiler import maybe_create_event_file as legacy_tensorboard_event_file_creator +from tensorflow.python.profiler.trace import Trace + +from rdds.lib.logging import get_logger +_LOGGER = get_logger(name='tf-profiler', log_level='info') + +DEFAULT_PROFILER_OPTIONS = profiler.ProfilerOptions(host_tracer_level=3, + python_tracer_level=1, + device_tracer_level=1) + +class TfProfiler: + + """ + Helper class to run profiling of TF graps. + + Use this class in conjunction with Trace(). + + NOTE: It's undefined to run both keras.tensorboard callback profiling and TfProfiler simultaneously. + + NOTE: The Keras TensorBoard callback will automatically perform sampled + profiling. Before enabling customized profiling, set the callback flag + "profile_batches=[]" to disable automatic sampled profiling. + + NOTE: on GPU prerequisites: https://www.tensorflow.org/guide/profiler#install_the_profiler_and_gpu_prerequisites + + https://www.tensorflow.org/guide/profiler#profiling_custom_training_loops + """ + + def __init__(self, logdir: str): + self._logdir = logdir + self._started = False + + def start_if_not_running(self, *args, **kwargs): + if self._started: + return + self.start(*args, **kwargs) + + def start(self, default_profiler_options: profiler.ProfilerOptions = DEFAULT_PROFILER_OPTIONS): + """ + Immediately start to profile once this is called, regardless whether a Trace is created or not. + Make sure to call this at the appropriate time to get the intended profiling results + (profiler can only support a limited amount of traces (further limited by tracing depth). + :param default_profiler_options: + :return: + """ + _LOGGER.info(f'Profiling logs saved in {self._logdir}') + # If event file is not created, Tensorboard won't show profiling results + legacy_tensorboard_event_file_creator(self._logdir) + profiler.start(self._logdir, options=default_profiler_options) + self._started = True + + def stop(self): + profiler.stop() + msg = (f'\ +Profiling complete. View results by: \ +PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python python3 -m tensorboard.main --port 4000 --logdir {self._logdir}\n \ +firefox http://localhost:4000#profile') + _LOGGER.info(msg) diff --git a/src/rdds/variant_rank_score/__main__.py b/src/rdds/variant_rank_score/__main__.py index 248c7ee6..149d481f 100644 --- a/src/rdds/variant_rank_score/__main__.py +++ b/src/rdds/variant_rank_score/__main__.py @@ -155,5 +155,49 @@ def view_ranked_vcf(args): case_names=case_names) subparser.set_defaults(func=view_ranked_vcf) + +subparser = subparsers.add_parser('pipeline-performance-test', help='Profile data pipeline and view results in Tensorboard') +subparser.add_argument('hd5', help='Path to .hd5 file to be used as training, validation data') +subparser.add_argument('--batches', help='Number of batches to profile', default=int(10)) +subparser.add_argument('--include_dataset_init', help='Include dataset bootstrapping (biasing result)', default=False) +subparser.add_argument('--start_on_first_epoch_end', help='Start profiling pipeline once all data is cached', default=True) +def profile_data_pipeline(args): + from math import ceil + from progressbar import ProgressBar + from .model import VariantRankScoreModel + from rdds.lib.tf.profiler import TfProfiler, Trace + batches = int(args.batches) + model = VariantRankScoreModel() + hparams = model.get_uninitialized_hyperparameters() + hparams.Int('batch_size', min_value=32, max_value=128, default=64) + model._init_datasets(hd5_file_path=args.hd5, + hparams=hparams, + compile_vocabulary_normalisation_factors=False, + init_test_data=False) + profile_from_batch = 0 + start_batch = 0 + stop_batch = start_batch + batches + if args.start_on_first_epoch_end: + batches_per_epoch = int(ceil(model._datasets.train_data_length / hparams.get('batch_size'))) + profile_from_batch = batches_per_epoch + stop_batch = batches_per_epoch + batches + pbar = ProgressBar(max_value=stop_batch) + pbar.start() + profiler = TfProfiler(logdir=os.path.join(model._workdir, 'profiler')) + dataset = model._datasets.dataset_train.__iter__() + for batch in range(start_batch, stop_batch): + if args.include_dataset_init or batch > profile_from_batch: + print(f'Profiling batch {batch}') + profiler.start_if_not_running() + with Trace('batch', step_num=batch, _r=1): + _ = next(dataset) + else: + _ = next(dataset) # Just discard data, profiling will happen some other time + pbar.update(batch) + profiler.stop() + pbar.finish() + +subparser.set_defaults(func=profile_data_pipeline) + args = parser.parse_args() args.func(args) # Callback to trigger func with args diff --git a/src/rdds/variant_rank_score/model/model.py b/src/rdds/variant_rank_score/model/model.py index bae9b69e..6110a827 100644 --- a/src/rdds/variant_rank_score/model/model.py +++ b/src/rdds/variant_rank_score/model/model.py @@ -31,6 +31,7 @@ from rdds.lib.vcf import ParsableVariant from .model_explainer import ModelExplainer from .default_model import DEFAULT_MODEL_SPEC +from ..dataset.class_labels import LABEL_PATHOGENIC_VARIANT, LABEL_BENIGN_VARIANT @@ -41,6 +42,7 @@ class InitializedDatasets: train_data_length: int test_data_length: int batch_size: int + model_bias_estimate: float # Estimate bias for model output layer, computed from training data class ratio dataset_train_numerical: tf.data.Dataset = None dataset_train_vocabulary: tf.data.Dataset = None @@ -53,9 +55,6 @@ class InitializedDatasets: If this is not the case, the layer might be removed in the graph! """ -# TODO: Determine whether to use GeneticModels_family_id? -# TODO: Determine whether to use ModelScore_family_id? - # See comment in the text_vectorization_layer.py about keras and tensorboard. _LOGGER = get_logger('vrs-model') _LOGGER.setLevel(logging.INFO) @@ -132,13 +131,26 @@ def _build_model(self, """ :param hparams: Hyperparameters for the model """ - input_text: tf.keras.Input = tf.keras.Input(shape=len(self._features_text), - ragged=True, - dtype=tf.string, - name='input_text') - input_numerical: tf.keras.Input = tf.keras.Input(shape=len(self._features_numerical), - dtype=tf.float32, - name='input_numerical') + + text_inputs = [] + for text_feature_name in self._features_text: + text_inputs.append( + tf.keras.Input(shape=(1, ), + ragged=True, + dtype=tf.string, + name=text_feature_name) + ) + input_text: tf.RaggedTensor = tf.concat(text_inputs, axis=1, name='concat_input_text') + + numerical_inputs = [] + for numerical_feature_name in self._features_numerical: + numerical_inputs.append( + tf.keras.Input(shape=(1, ), + dtype=tf.float32, + name=numerical_feature_name) + ) + input_numerical: tf.Tensor = tf.concat(numerical_inputs, axis=1, name='concat_input_numerical') + # Text preprocessing split_regex = '\s|\n|_|&|/|\||:|,|-|0|1|2|3|4|5|6|7|8|9' text_preprocessing_layer = TextPreprocessingLayer(split_regex=split_regex) @@ -285,19 +297,32 @@ def _build_model(self, # Specify network out shape - logits = tf.keras.layers.Dense(units=2, name='Logits', activation='linear')(x) # -> [bdim, 2] - - # Softmax layer - confidences = tf.keras.layers.Softmax(name='Confidences')(logits) # -> [bdim, 2] - - self._keras_model = tf.keras.Model(inputs=[input_text, input_numerical], + # Set initial bias in the model output layer to bias the model to expected TP/TN skew + bias_initializer = tf.keras.initializers.Constant(self._datasets.model_bias_estimate) + confidences = tf.keras.layers.Dense(units=1, + bias_initializer=bias_initializer, + name='Confidences', + activation='softmax')(x) # -> [bdim, 1] + + def _get_input_tensor_with_name(name: str) -> Union[tf.Tensor, tf.RaggedTensor]: + # Helper to assemble input tensor in order defined by self._features + all_input_tensors = text_inputs.copy() + all_input_tensors.extend(numerical_inputs) + for input_tensor in all_input_tensors: + if input_tensor.name == name: + return input_tensor + raise ValueError(f'Found no input tensor with name {name}') + model_inputs = [] # Flat list of model inputs [feature0, feature1, ... ] + for feature_name in self._features: + model_inputs.append(_get_input_tensor_with_name(name=feature_name)) + self._keras_model = tf.keras.Model(inputs=model_inputs, outputs=confidences) metrics = [tf.keras.metrics.TruePositives(), tf.keras.metrics.TrueNegatives(), tf.keras.metrics.FalsePositives(), tf.keras.metrics.FalseNegatives(), - tf.keras.metrics.CategoricalAccuracy(), + tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.AUC(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()] @@ -339,27 +364,9 @@ def loss_wrapper(x, y, y_pred, sample_weight): @staticmethod @tf.keras.saving.register_keras_serializable() def loss_fn(y_true, y_pred) -> tf.Tensor: - c = tf.keras.losses.categorical_crossentropy(y_true=y_true, y_pred=y_pred, from_logits=False) + c = tf.keras.losses.binary_crossentropy(y_true=y_true, y_pred=y_pred, from_logits=False) return c - @staticmethod - def count_feature_types(hd5_output_dtypes: Dict[str, Type]) -> Tuple[int, int]: - """ - Computes amount of numerical vs textbased features in dataset. - :param hd5_output_dtypes: The output types from hd5_data_generator.data_types attribute - :return: Tuple of count - """ - n_text_features = 0 - n_numerical_features = 0 - for feature_name, feature_dtype in hd5_output_dtypes.items(): - if feature_dtype == bytes: - n_text_features += 1 - elif feature_dtype == float: - n_numerical_features += 1 - else: - raise ValueError('Unknown feature dtype', feature_dtype) - return n_text_features, n_numerical_features - def save_model_fn(self, epoch: int, logs=Optional[dict]): """ Saves model to Keras saved model format @@ -372,10 +379,34 @@ def save_model_fn(self, epoch: int, logs=Optional[dict]): _LOGGER.info(f'Saving model to {filepath}') self._keras_model.save(filepath=filepath) + def _generate_dataset_tensor_signature(self) -> Tuple[Tuple[tf.TensorSpec, ...], ...]: + """ + Helper method to generatate HD5 -> TF data generator tensor signatures. + :return: A nested tuple of tf.TensorSpec instances + """ + def _get_input_signature_from_name(name: str) -> Union[tf.TensorSpec, tf.RaggedTensorSpec]: + # Helper to assemble input tensor in order defined by self._features + if name in self._features_text: + return tf.TensorSpec((), dtype=tf.string, name=name) # (, 1) + elif name in self._features_numerical: + return tf.TensorSpec((), dtype=tf.float32, name=name) # (, 1) + else: + raise ValueError(f'Found no input tensor with name {name}') + + input_tensor_signatures = () + for feature_name in self._features: + input_tensor_signatures += (_get_input_signature_from_name(name=feature_name), ) + signature = ( + input_tensor_signatures, + (tf.TensorSpec((1, ), dtype=tf.float32, name='label'), ) + ) + return signature + def _init_datasets(self, hd5_file_path: str, hparams: HyperParameters, - compile_vocabulary_normalisation_factors: bool = True) -> InitializedDatasets: + compile_vocabulary_normalisation_factors: bool = True, + init_test_data: bool = True) -> InitializedDatasets: batch_size: int = hparams.Int('batch_size', min_value=64, @@ -386,22 +417,85 @@ def _init_datasets(self, # Training setup hd5_data_generator_train: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name='train', - output_tensor_format=[self._features_text, - self._features_numerical], - label='label') - n_text_features, n_numerical_features = self.count_feature_types(hd5_output_dtypes=hd5_data_generator_train.data_types) - input_signature = ((tf.TensorSpec((n_text_features, ), dtype=tf.string), - tf.TensorSpec((n_numerical_features, ), dtype=tf.float32, name='input_numerical')), - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), )) + output_tensor_format=self._features, + label='label', + expand_1d_categorical_to_2d=False) + dataset_train: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_train, - output_signature=input_signature) + output_signature=self._generate_dataset_tensor_signature()) + dataset_train = dataset_train.cache() + + # Training weights + n_pathogenic, n_benign = hd5_data_generator_train.count_positive_negative_categorical_labels() + _LOGGER.info(f'nTP:{n_pathogenic} ({100*n_pathogenic/hd5_data_generator_train.data_length:.4f}%) \ +, nTN:{n_benign}, n_samples:{hd5_data_generator_train.data_length}') + + # Compute expected model bias based on training data skew, bias = TPs / TNs + model_bias_estimate: float = np.log(float(n_pathogenic) / float(n_benign)) + + @tf.function + def add_weights(data, labels, **kwargs): + """ + Helper function to compute weights for balanced loss + """ + weight_pathogenic = kwargs.get('weight_pathogenic') + weight_benign = kwargs.get('weight_benign') + weights = tf.where(condition=tf.equal(labels, tf.constant(LABEL_PATHOGENIC_VARIANT)), + x=tf.ones_like(labels) * tf.constant(weight_pathogenic), # cond == True + y=tf.ones_like(labels) * tf.constant(weight_benign)) # cond == False + return data, labels, weights + + training_weights = hparams.Choice('training_weights', + values=[False, True], + default=False) + if training_weights: + # Setup class weights so that dataset is perfectly balanced w.r.t class-ratio-loss imbalance + weight_pathogenic = (1.0 / float(n_pathogenic)) * (float(hd5_data_generator_train.data_length) / 2.0) + weight_benign = (1.0 / float(n_benign)) * (float(hd5_data_generator_train.data_length) / 2.0) + _LOGGER.info(f'class weights: benign:{weight_benign}, pathogenic:{weight_pathogenic}') + assert weight_pathogenic >= weight_benign + dataset_train = dataset_train.map(map_func=lambda *args: add_weights(*args, + weight_pathogenic=weight_pathogenic, + weight_benign=weight_benign), + num_parallel_calls=tf.data.AUTOTUNE) dataset_train = dataset_train.cache() dataset_train = dataset_train.repeat(-1) + + # Training occurrence sampling + expected_amount_of_variants_in_case = float(3.5E6) + likelihood_pathogenic = 1.0 / expected_amount_of_variants_in_case + training_occurrence_frq_sampling = hparams.Choice('training_occurrence_frq_sampling', + values=[True, False], + default=False) + + @tf.function + def filt_fn(data, labels, weights, **kwargs): + """ + Helper function to filter data samples on label + """ + del data + del weights + target_label = kwargs.get('target_label') + predicate = tf.equal(labels, target_label)[0, 0] + return predicate + + if training_occurrence_frq_sampling: + _LOGGER.info(f'Sampling pathogenic variants during training with likelihood of {likelihood_pathogenic}') + sampling_weights = (1.0 - likelihood_pathogenic, likelihood_pathogenic) + _LOGGER.info(f'Sampling weights (benign, pathogenic): {sampling_weights}') + train_pathogenic_variants = \ + dataset_train.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) + train_benign_variants = \ + dataset_train.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) + dataset_train = tf.data.Dataset.sample_from_datasets(datasets=(train_benign_variants, train_pathogenic_variants), + weights=sampling_weights, + seed=1) + # Setup new bias estimate, since training data is now skewed + model_bias_estimate = np.log(likelihood_pathogenic / expected_amount_of_variants_in_case) + dataset_train = dataset_train.shuffle(buffer_size=int(5E5), seed=1) # FIXME: Seed - #dataset_train = rejection_resample(dataset=dataset_train, - # desired_class_ratio=[0.5, 0.5], - # seed=1) + dataset_train = dataset_train.batch(batch_size) dataset_train = dataset_train.prefetch(buffer_size=tf.data.AUTOTUNE) @@ -415,7 +509,7 @@ def _init_datasets(self, group_name='train', output_tensor_format=[self._features_text]) input_signature_vocabulary: Tuple[tf.TensorSpec] = \ - (tf.TensorSpec((n_text_features,), dtype=tf.string, name='input_text_vocabulary'),) + (tf.TensorSpec((len(self._features_text),), dtype=tf.string, name='input_text_vocabulary'),) dataset_vocabulary = get_tf_dataset_from_hd5_data_generator( hd5_data_generator=hd5_data_generator_vocabulary, output_signature=input_signature_vocabulary) @@ -425,37 +519,46 @@ def _init_datasets(self, group_name='train', output_tensor_format=self._features_numerical) input_signature_numerical_normalisation = \ - tf.TensorSpec((n_numerical_features,), dtype=tf.float32, name='input_numerical_normalisation') + tf.TensorSpec((len(self._features_numerical),), dtype=tf.float32, name='input_numerical_normalisation') dataset_numerical: tf.data.Dataset = \ get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_numerical, output_signature=input_signature_numerical_normalisation) # Testing setup - hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, - group_name='test', - output_tensor_format=[self._features_text, - self._features_numerical], - label='label') - dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, - output_signature=input_signature) - dataset_test = dataset_test.cache() - dataset_test = dataset_test.repeat(-1) - dataset_test = dataset_test.shuffle(buffer_size=int(5E5), - seed=1) # FIXME: Seed - #dataset_test = rejection_resample(dataset=dataset_test, - # desired_class_ratio=[0.5, 0.5], - # seed=1) - dataset_test = dataset_test.batch(batch_size) - dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) - _LOGGER.info(f'Model Input data mapping: {input_signature}') + if init_test_data: + hd5_data_generator_test: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, + group_name='test', + output_tensor_format=self._features, + label='label', + expand_1d_categorical_to_2d=False) + dataset_test_length = hd5_data_generator_test.data_length + dataset_test: tf.data.Dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=hd5_data_generator_test, + output_signature=self._generate_dataset_tensor_signature()) + dataset_test = dataset_test.cache() + dataset_test = dataset_test.repeat(-1) + + if training_occurrence_frq_sampling: + dataset_test_pathogenic_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_PATHOGENIC_VARIANT)) + dataset_test_benign_variants = dataset_test.filter(predicate=lambda *args: filt_fn(*args, target_label=LABEL_BENIGN_VARIANT)) + dataset_test = tf.data.Dataset.sample_from_datasets(datasets=(dataset_test_benign_variants, dataset_test_pathogenic_variants), + weights=(1.0 - likelihood_pathogenic, likelihood_pathogenic), + seed=1) + dataset_test = dataset_test.shuffle(buffer_size=int(5E5), + seed=1) # FIXME: Seed + dataset_test = dataset_test.batch(batch_size) + dataset_test = dataset_test.prefetch(buffer_size=tf.data.AUTOTUNE) + else: + dataset_test = None + dataset_test_length = None self._datasets = InitializedDatasets(dataset_train_numerical=dataset_numerical, dataset_train_vocabulary=dataset_vocabulary, dataset_train=dataset_train, dataset_test=dataset_test, train_data_length=hd5_data_generator_train.data_length, - test_data_length=hd5_data_generator_test.data_length, - batch_size=batch_size) + test_data_length=dataset_test_length, + batch_size=batch_size, + model_bias_estimate=model_bias_estimate) _LOGGER.info(f'Datasets init complete: {self._datasets}') @staticmethod @@ -565,10 +668,10 @@ def train_model_explainer(self): if self._datasets is None: raise ValueError(f'No datasets available') # NOTE: Changes to below configuration must be reflected in the self._load_saved_model_explainer() + model_input_spec = self._generate_dataset_tensor_signature() + data_tensor_spec, _ = model_input_spec # Drop labels spec self._model_explainer = ModelExplainer(model=self._infer_pathogenicity_scores, - features_text=self._features_text, - features_numerical=self._features_numerical, - input_feature_names=self._features) + input_tensor_spec=data_tensor_spec) dataset = self._datasets.dataset_train # The data used for fitting the explainer should be randomly selected from the complete set of training data dataset = dataset.shuffle(buffer_size=self._datasets.train_data_length, @@ -616,10 +719,11 @@ def _load_saved_model_explainer(self, model_explainer_path: str): if self._keras_model is None: raise ValueError(f'Loading ModelExplainer requires a pre-loaded keras model, none currently loaded') # NOTE: Changes to below configuration must be reflected in the self.train_model_explainer() + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels self._model_explainer = ModelExplainer.from_saved_file(file_path=model_explainer_path, keras_model=self._infer_pathogenicity_scores, - features_text=self._features_text, - features_numerical=self._features_numerical) + input_tensor_spec=model_input_data_spec) def load_saved_model(self, keras_model_path: str = DEFAULT_MODEL_SPEC.keras_model, @@ -633,29 +737,29 @@ def load_saved_model(self, self._load_saved_model_explainer(model_explainer_path=model_explainer_path) def _infer_pathogenicity_scores(self, - tensor_text: tf.Tensor, - tensor_numerical: tf.Tensor) -> np.ndarray: + tensor_dict: Dict[str, tf.Tensor]) -> np.ndarray: """ Main method to compute inferences from input tensors. - :param tensor_text: Features containing text - :param tensor_numerical: Features containing numericals + :param tensor_dict: Input data tensors as dict, key is the tensor name :return: 1D scores same size as outer, batch dimension """ if self._keras_model is None: raise ValueError('No keras model available for inference computation!') - score_classes = self._keras_model([tensor_text, tensor_numerical]) # [class benign, class pathogenic] + score_classes = self._keras_model(tensor_dict) # [class benign, class pathogenic] score_classes = score_classes.numpy() prediction_class_pathogenic = score_classes[:, 1] return prediction_class_pathogenic def predict_on_hd5(self, hd5_file_path: str, - group_names: Set[str] = {'train', 'test'}) -> str: + group_names: Set[str] = {'train', 'test'}, + batch_size: int = 1000) -> str: """ Creates a .hd5 file containing inpute feature data, ground truth and inferences side-by-side. :param hd5_file_path: The file to the input data file for creating inferences :param group_names: The group names in the hd5 to load data and to compute inferences for + :param batch_size: Batch size, a large batch size improves speed :returns: The path to the .hd5 file containing data and inferences """ @@ -670,20 +774,14 @@ def predict_on_hd5(self, for group_name in group_names: # TODO: Make sure config to Hd5DataGenerator is identical to train time setup - output_tensor_format = [self._features_text, self._features_numerical] datagen: Hd5DataGenerator = Hd5DataGenerator(hd5_file_path=hd5_file_path, group_name=group_name, - output_tensor_format=output_tensor_format, + output_tensor_format=self._features, label='label', expand_1d_categorical_to_2d=True) - n_text_features, n_numerical_features = \ - self.count_feature_types(hd5_output_dtypes=datagen.data_types) - input_signature = ((tf.TensorSpec((n_text_features, ), dtype=tf.string), - tf.TensorSpec((n_numerical_features, ), dtype=tf.float32, name='input_numerical')), - (tf.TensorSpec((2, ), dtype=tf.float32, name='label'), )) dataset = get_tf_dataset_from_hd5_data_generator(hd5_data_generator=datagen, - output_signature=input_signature) - dataset = dataset.batch(10000) + output_signature=self._generate_dataset_tensor_signature()) + dataset = dataset.batch(batch_size) dataset = dataset.prefetch(buffer_size=10) data_length = datagen.data_length @@ -709,17 +807,20 @@ def predict_on_hd5(self, shape=(data_length, )) processed_sample_idx = 0 + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels for data, labels in dataset.as_numpy_iterator(): + data: Tuple[tf.Tensor] label, = labels label_class_pathogenic = label[:, 1] - tensor_str, tensor_numerical = data - prediction_class_pathogenic = self._infer_pathogenicity_scores(tensor_text=tensor_str, - tensor_numerical=tensor_numerical) - batch_size = tensor_str.shape[0] - for feature_names, features_data in [(self._features_text, tensor_str), - (self._features_numerical, tensor_numerical)]: - for feature_idx, feature_name in enumerate(feature_names): - output_group[feature_name][processed_sample_idx:processed_sample_idx+batch_size] = features_data[:, feature_idx] + input_tensor_dict: Dict[str, tf.Tensor] = {} + for input_feature_idx, tensor_spec in enumerate(model_input_data_spec): + input_tensor_dict.update({ + tensor_spec.name: tf.constant(data[input_feature_idx], dtype=tensor_spec.dtype, name=tensor_spec.name) + }) + prediction_class_pathogenic = self._infer_pathogenicity_scores(tensor_dict=input_tensor_dict) + for feature_name, tensor in input_tensor_dict.items(): + output_group[feature_name][processed_sample_idx:processed_sample_idx + batch_size] = tensor.numpy() output_group['ground-truth'][processed_sample_idx:processed_sample_idx+batch_size] = label_class_pathogenic output_group['prediction'][processed_sample_idx:processed_sample_idx + batch_size] = prediction_class_pathogenic processed_sample_idx += batch_size @@ -758,27 +859,36 @@ def get_num_feature(variant: ParsableVariant, return value return 0.0 - text_features_batch: List[List[bytes]] = [] - numerical_features_batch: List[List[float]] = [] - for variant in variants: - text_features = [get_str_feature(variant, feature_name) for feature_name in self._features_text] - text_features_batch.append(text_features) - numerical_features = [get_num_feature(variant, feature_name) for feature_name in self._features_numerical] - numerical_features_batch.append(numerical_features) - tensor_text = tf.constant(text_features_batch, dtype=tf.string) - tensor_numerical = tf.constant(numerical_features_batch, dtype=tf.float32) - pathogenicity_scores = self._infer_pathogenicity_scores(tensor_text=tensor_text, - tensor_numerical=tensor_numerical) - + model_input_spec = self._generate_dataset_tensor_signature() + model_input_data_spec, _ = model_input_spec # Drop labels + input_dict: Dict[str, Union[List, tf.Tensor]] = {} + for tensor_spec in model_input_data_spec: + input_dict.update({tensor_spec.name: []}) + for variant in variants: + if tensor_spec.dtype == tf.string: + input_dict[tensor_spec.name].append(get_str_feature(variant=variant, name=tensor_spec.name)) + elif tensor_spec.dtype == tf.float32: + input_dict[tensor_spec.name].append(get_num_feature(variant=variant, name=tensor_spec.name)) + else: + raise ValueError(f'Unmapped input data dtype spec: {tensor_spec}') + # Convert to Tensor + tensor_data = input_dict[tensor_spec.name].copy() + input_dict[tensor_spec.name] = tf.constant(value=tensor_data, + dtype=tensor_spec.dtype, + name=tensor_spec.name) + pathogenicity_scores = self._infer_pathogenicity_scores(tensor_dict=input_dict) if len(pathogenicity_scores) != len(variants): raise ValueError(f'Expected same amount of predictions as input data') idx_scores_above_threshold = np.flatnonzero(pathogenicity_scores >= explain_variant_score_threshold) - arr = np.concatenate((tensor_text.numpy(), tensor_numerical.numpy()), axis=1) - explanations_full = np.empty_like(arr) + df_dict = {} + for tensor_name, tensor in input_dict.items(): + df_dict.update({tensor_name: tensor.numpy()[idx_scores_above_threshold]}) + df_selected_variants_for_explanation = pd.DataFrame.from_dict(df_dict) + explanations_full = np.empty(shape=(len(variants), len(df_selected_variants_for_explanation.columns))) explanations_full.fill(np.nan) if len(idx_scores_above_threshold) > 0: - explanations_full[idx_scores_above_threshold, :] = self._model_explainer.shap_values(X=arr[idx_scores_above_threshold, :], - gc_collect=True) # FIXME: Method call + explanations_full[idx_scores_above_threshold, :] = self._model_explainer.shap_values(X=df_selected_variants_for_explanation.values, + gc_collect=True) # FIXME: Method call not supposed to be erroneous by typechecker explanations_df = pd.DataFrame(data=explanations_full, columns=self._features) result_df = pd.concat(objs=(pd.Series(pathogenicity_scores, name='pathogenicity_score'), explanations_df), diff --git a/src/rdds/variant_rank_score/model/model_explainer.py b/src/rdds/variant_rank_score/model/model_explainer.py index 7c7322f2..7125e3d8 100644 --- a/src/rdds/variant_rank_score/model/model_explainer.py +++ b/src/rdds/variant_rank_score/model/model_explainer.py @@ -1,7 +1,7 @@ import tensorflow as tf import pandas as pd import numpy as np -from typing import List, Callable +from typing import List, Callable, Tuple import gc from rdds.lib.logging import get_logger; _LOGGER = get_logger('ModelExplainer', 'debug') @@ -14,25 +14,22 @@ class ModelExplainer(ShapExplainer): def __init__(self, model: Callable, - features_text: List[str], - features_numerical: List[str], - input_feature_names: List[str]): + input_tensor_spec: Tuple[tf.TensorSpec, ...]): """ - Helper class to inferface VRS model and SHAP. + Helper class to interface VRS model and SHAP. :param model: The keras model used for inference, a callable to run inference - :param features_text: The text feature names (as input to keras model, order important) - :param features_numerical: The numerical feature names - :param input_feature_names: All feature names, as input to model, for SHAP visualisation + :param input_tensor_spec: The model input tensor spec (as input to keras model, order sensitive) """ shap_compatible_model = ShapCompatibleModel(keras_model=model, - features_numerical=features_numerical, - features_text=features_text) + input_tensor_spec=input_tensor_spec) + + self._input_feature_names: List[str] = [] + for tensor_spec in input_tensor_spec: + self._input_feature_names.append(tensor_spec.name) super().__init__(model=shap_compatible_model, - input_feature_names=input_feature_names) - self._features_text = features_text - self._features_numerical = features_numerical - _LOGGER.debug(f'Initialized with features: {self._features_numerical, self._features_text}') + input_feature_names=self._input_feature_names) + _LOGGER.debug(f'Initialized with features: {self._input_feature_names}') def adapt(self, dataset: tf.data.Dataset, @@ -53,12 +50,8 @@ def select_only_benign_samples(x, y): dataset = dataset.map(lambda x, y: x) # Drop labels dataset = dataset.take(n_reference_samples).as_numpy_iterator() data = list(dataset) # Load to RAM - # data : Tuple(text_features, numerical_features) - shap_data = pd.DataFrame([text_features for text_features, _ in data], - columns=self._features_text) - shap_data = pd.concat((shap_data, - pd.DataFrame([numerical_features for _, numerical_features in data], - columns=self._features_numerical)), axis=1) + # data : Tuple[Union[tf.string, tf.float32 with shape (n_reference_samples, n_features) ]]) + shap_data = pd.DataFrame(data=data, columns=self._input_feature_names) del data reference_data: np.ndarray = shap_data.values gc.collect() diff --git a/src/rdds/variant_rank_score/model/shap_compatible_model.py b/src/rdds/variant_rank_score/model/shap_compatible_model.py index 4f159606..7612a89c 100644 --- a/src/rdds/variant_rank_score/model/shap_compatible_model.py +++ b/src/rdds/variant_rank_score/model/shap_compatible_model.py @@ -1,5 +1,5 @@ import numpy as np -from typing import List, Tuple, Callable +from typing import Tuple, Callable, Dict import tensorflow as tf from rdds.lib.model_explanation.shap import ShapCompatibleSerializableModel @@ -18,11 +18,13 @@ class ShapCompatibleModel(ShapCompatibleSerializableModel): def __init__(self, keras_model: Callable, - features_text: List[str], - features_numerical: List[str]): + input_tensor_spec: Tuple[tf.TensorSpec, ...]): + """ + :param keras_model: A callable that accepts input data and generates inferences + :param input_tensor_spec: Model input specification, order sensitive + """ self._keras_model: Callable = keras_model - self._features_text = features_text - self._features_numerical = features_numerical + self._input_tensor_spec = input_tensor_spec def save(self, shap_model, file_pointer): # Nothing to save, rely on load_from_preloaded_keras_model() @@ -40,16 +42,17 @@ def load_from_prior_keras_model(*args, **kwargs): """ return ShapCompatibleModel(*args, **kwargs) - def _to_tensors(self, array: np.ndarray) -> Tuple[tf.Tensor, tf.Tensor]: + def _to_tensors(self, array: np.ndarray) -> Dict[str, tf.Tensor]: """ Convert array of mixed-type data to separate Tensors with defined dtypes. """ - txt_batch = array[:, 0:len(self._features_text)] - numerical_batch = array[:, len(self._features_text):] - txt_tensor = tf.constant(txt_batch, dtype=tf.string) - num_tensor = tf.constant(numerical_batch, dtype=tf.float32) - return txt_tensor, num_tensor + tensors: Dict[str, tf.Tensor] = dict() + for col_idx, tensor_spec in enumerate(self._input_tensor_spec): + tensors.update({ + tensor_spec.name: tf.constant(array[:, col_idx], dtype=tensor_spec.dtype) + }) + return tensors def __call__(self, array: np.ndarray) -> np.ndarray: """ @@ -57,5 +60,5 @@ def __call__(self, array: np.ndarray) -> np.ndarray: and run the model. Return inferences as np.ndarray shaped [batch_dim] """ tensors = self._to_tensors(array) - inferences: np.ndarray = self._keras_model(*tensors) + inferences: np.ndarray = self._keras_model(tensors) return np.array(inferences)