Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/rdds/lib/hdf5/hd5_data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def __init__(self,
label: str = None,
replace_nan_floats_with: float = 0.0,
load_data_into_ram: bool = True,
expand_1d_categorical_to_2d: bool = True):
expand_1d_categorical_to_2d: bool = True,
discard_loaded_data_on_epoch_complete: bool = True,
max_n_samples: int = None):
"""
:param hd5_file_path: HD5 file to generate data from
:param group_name: Group in HD5 file to read data from (reads all datasets)
Expand All @@ -40,12 +42,15 @@ def __init__(self,
:param replace_nan_floats_with: Floating point value to replace NaN values
:param load_data_into_ram: Load file content into RAM if True. Improves reading performance.
:param expand_1d_categorical_to_2d: Unpack a 1D categorical label [0][TN] into 2D; [1, 0][TN, TP]
:param discard_loaded_data_on_epoch_complete: Drop references to data once data has been read once by caller (reduces RAM footprint)
:param max_n_samples: Maximum samples to yield
"""
self._hd5_file_path: str = hd5_file_path
self._hd5_file = h5py.File(name=self._hd5_file_path, mode='r')
self._data_length = int
self._replace_nan_floats_with = replace_nan_floats_with
self._expand_1d_categorical_to_2d: bool = expand_1d_categorical_to_2d
self._discard_loaded_data_on_epoch_complete = discard_loaded_data_on_epoch_complete

self._group_name = group_name if group_name else list(self._hd5_file.keys())[0]
_LOGGER.info(f'Generating data from group \'{group_name}\'')
Expand All @@ -56,7 +61,10 @@ def __init__(self,
_LOGGER.info(f'Loading data to RAM.')
in_ram_group: Dict[str, np.ndarray] = {} # Mock the Group dictionary API by using a dict
for dataset_name in self._group.keys():
in_ram_group.update({dataset_name: self._group[dataset_name][:]})
if max_n_samples is not None:
in_ram_group.update({dataset_name: self._group[dataset_name][0:max_n_samples]})
else:
in_ram_group.update({dataset_name: self._group[dataset_name][:]})
self._group: Dict[str, np.ndarray] = in_ram_group
_LOGGER.info('RAM loading complete.')

Expand All @@ -82,6 +90,8 @@ def __init__(self,
if dataset.shape != zeroeth_dataset.shape:
raise ValueError(f'Not identical dataset shapes, got {dataset.shape}!={zeroeth_dataset.shape}')
self._data_length: int = zeroeth_dataset.shape[0]
if max_n_samples is not None:
self._data_length = max_n_samples
_LOGGER.info(f'{self._data_length} samples across {len(self._group.keys())} features')

@property
Expand Down Expand Up @@ -177,6 +187,18 @@ def _expand_categorical_label_1d_to_2d(label: float) -> np.array:
Hd5DataGenerator._check_is_valid_categorical_label(label=label)
return(1.0 - label, label)

def count_positive_negative_categorical_labels(self) -> Tuple[int, int]:
"""
Count occurrence of TN, TN labels in dataset according to 'label'.
Assumes categorical labels, 1D.
:return Tuple count of TP, TN label count
"""
positives = self._group[self._label].sum()
totals = self._data_length
negatives = totals - positives
return positives, negatives


def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]:
"""
Yields data from HD5 data set.
Expand All @@ -199,6 +221,10 @@ def __call__(self) -> Iterator[Tuple[Union[str, float], ...]]:
idx += 1
yield output_vector
_LOGGER.debug('Restart epoch')
if self._discard_loaded_data_on_epoch_complete:
_LOGGER.info('Closing and discarding data in RAM')
del self._group # Drop references
self._hd5_file.close() # Close file

def __del__(self):
try:
Expand Down
32 changes: 32 additions & 0 deletions src/rdds/lib/tf/profiler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Tensorflow Profiler

This tool is used to view performance of TF graphs and pipelines.

## Tensorboard Dependencies
Pyenv must have the `tensorboard-plugin-profile==2.13.1` installed
in order to view profiling results.

## GPU Limitations
Profiling on GPUs requires the Nvidia CUPTI library, if this is not
present, profiling will fail.

If CUPTI lib is not available, one can disable GPUs temporarily
by setting environment variable `CUDA_VISIBLE_DEVICES=""`.

## Tracing

One must use the `Trace` context manager to record profiling
traces for actions.

```python
dataset: tf.data.Dataset
profiler = TfProfiler(logdir=logdir)
profiler.start()
dataset = dataset.__iter__()
for step in range(10):
with Trace('batch', step_num=step, _r=1):
_ = next(dataset)
profiler.stop()
```

> `_r` keyword argument makes this trace event get processed as a step event by the Profiler.
1 change: 1 addition & 0 deletions src/rdds/lib/tf/profiler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .tf_profiler import TfProfiler, Trace
59 changes: 59 additions & 0 deletions src/rdds/lib/tf/profiler/tf_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from tensorflow.python.profiler import profiler_v2 as profiler
from tensorflow.python.eager.profiler import maybe_create_event_file as legacy_tensorboard_event_file_creator
from tensorflow.python.profiler.trace import Trace

from rdds.lib.logging import get_logger
_LOGGER = get_logger(name='tf-profiler', log_level='info')

DEFAULT_PROFILER_OPTIONS = profiler.ProfilerOptions(host_tracer_level=3,
python_tracer_level=1,
device_tracer_level=1)

class TfProfiler:

"""
Helper class to run profiling of TF graps.

Use this class in conjunction with Trace().

NOTE: It's undefined to run both keras.tensorboard callback profiling and TfProfiler simultaneously.

NOTE: The Keras TensorBoard callback will automatically perform sampled
profiling. Before enabling customized profiling, set the callback flag
"profile_batches=[]" to disable automatic sampled profiling.

NOTE: on GPU prerequisites: https://www.tensorflow.org/guide/profiler#install_the_profiler_and_gpu_prerequisites

https://www.tensorflow.org/guide/profiler#profiling_custom_training_loops
"""

def __init__(self, logdir: str):
self._logdir = logdir
self._started = False

def start_if_not_running(self, *args, **kwargs):
if self._started:
return
self.start(*args, **kwargs)

def start(self, default_profiler_options: profiler.ProfilerOptions = DEFAULT_PROFILER_OPTIONS):
"""
Immediately start to profile once this is called, regardless whether a Trace is created or not.
Make sure to call this at the appropriate time to get the intended profiling results
(profiler can only support a limited amount of traces (further limited by tracing depth).
:param default_profiler_options:
:return:
"""
_LOGGER.info(f'Profiling logs saved in {self._logdir}')
# If event file is not created, Tensorboard won't show profiling results
legacy_tensorboard_event_file_creator(self._logdir)
profiler.start(self._logdir, options=default_profiler_options)
self._started = True

def stop(self):
profiler.stop()
msg = (f'\
Profiling complete. View results by: \
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python python3 -m tensorboard.main --port 4000 --logdir {self._logdir}\n \
firefox http://localhost:4000#profile')
_LOGGER.info(msg)
44 changes: 44 additions & 0 deletions src/rdds/variant_rank_score/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,49 @@ def view_ranked_vcf(args):
case_names=case_names)
subparser.set_defaults(func=view_ranked_vcf)


subparser = subparsers.add_parser('pipeline-performance-test', help='Profile data pipeline and view results in Tensorboard')
subparser.add_argument('hd5', help='Path to .hd5 file to be used as training, validation data')
subparser.add_argument('--batches', help='Number of batches to profile', default=int(10))
subparser.add_argument('--include_dataset_init', help='Include dataset bootstrapping (biasing result)', default=False)
subparser.add_argument('--start_on_first_epoch_end', help='Start profiling pipeline once all data is cached', default=True)
def profile_data_pipeline(args):
from math import ceil
from progressbar import ProgressBar
from .model import VariantRankScoreModel
from rdds.lib.tf.profiler import TfProfiler, Trace
batches = int(args.batches)
model = VariantRankScoreModel()
hparams = model.get_uninitialized_hyperparameters()
hparams.Int('batch_size', min_value=32, max_value=128, default=64)
model._init_datasets(hd5_file_path=args.hd5,
hparams=hparams,
compile_vocabulary_normalisation_factors=False,
init_test_data=False)
profile_from_batch = 0
start_batch = 0
stop_batch = start_batch + batches
if args.start_on_first_epoch_end:
batches_per_epoch = int(ceil(model._datasets.train_data_length / hparams.get('batch_size')))
profile_from_batch = batches_per_epoch
stop_batch = batches_per_epoch + batches
pbar = ProgressBar(max_value=stop_batch)
pbar.start()
profiler = TfProfiler(logdir=os.path.join(model._workdir, 'profiler'))
dataset = model._datasets.dataset_train.__iter__()
for batch in range(start_batch, stop_batch):
if args.include_dataset_init or batch > profile_from_batch:
print(f'Profiling batch {batch}')
profiler.start_if_not_running()
with Trace('batch', step_num=batch, _r=1):
_ = next(dataset)
else:
_ = next(dataset) # Just discard data, profiling will happen some other time
pbar.update(batch)
profiler.stop()
pbar.finish()

subparser.set_defaults(func=profile_data_pipeline)

args = parser.parse_args()
args.func(args) # Callback to trigger func with args
Loading