diff --git a/examples/link_prediction/graph_store/heterogeneous_inference.py b/examples/link_prediction/graph_store/heterogeneous_inference.py index cb4288a59..7ea077786 100644 --- a/examples/link_prediction/graph_store/heterogeneous_inference.py +++ b/examples/link_prediction/graph_store/heterogeneous_inference.py @@ -233,7 +233,7 @@ def _inference_process( ) # Get the node ids on the current machine for the current node type - input_nodes = dataset.get_node_ids(node_type=args.inference_node_type) + input_nodes = dataset.fetch_node_ids(node_type=args.inference_node_type) logger.info( f"Rank {rank} got input nodes of shapes: {[f'{rank}: {node.shape}' for rank, node in input_nodes.items()]}" ) diff --git a/examples/link_prediction/graph_store/homogeneous_inference.py b/examples/link_prediction/graph_store/homogeneous_inference.py index f2f515ae2..cdd2df289 100644 --- a/examples/link_prediction/graph_store/homogeneous_inference.py +++ b/examples/link_prediction/graph_store/homogeneous_inference.py @@ -223,7 +223,7 @@ def _inference_process( # We expect that each compute machine has the same input nodes. # As such, we shard across the compute machine cluster. # If this is not done, then all nodes will receive the same input nodes, which is not what we want. - input_nodes = dataset.get_node_ids( + input_nodes = dataset.fetch_node_ids( rank=args.cluster_info.compute_node_rank, world_size=args.cluster_info.num_compute_nodes, ) diff --git a/gigl/distributed/dist_ablp_neighborloader.py b/gigl/distributed/dist_ablp_neighborloader.py index 1ddaf0fc7..268bc4b5a 100644 --- a/gigl/distributed/dist_ablp_neighborloader.py +++ b/gigl/distributed/dist_ablp_neighborloader.py @@ -151,7 +151,7 @@ def __init__( For Graph Store mode: `dict[int, ABLPInputNodes]` Maps server_rank to an ABLPInputNodes dataclass containing anchor nodes, positive labels, and negative labels with explicit node type and edge type info. - This is the return type of `RemoteDistDataset.get_ablp_input()`. + This is the return type of `RemoteDistDataset.fetch_ablp_input()`. supervision_edge_type (Optional[Union[EdgeType, list[EdgeType]]]): The edge type(s) to use for supervision. For Colocated mode: Must be None iff the dataset is labeled homogeneous. @@ -600,7 +600,7 @@ def _setup_for_graph_store( Setup method for Graph Store mode. Args: - input_nodes: ABLP input from RemoteDistDataset.get_ablp_input(). + input_nodes: ABLP input from RemoteDistDataset.fetch_ablp_input(). Maps server_rank to ABLPInputNodes containing anchor nodes, positive/negative labels with explicit node type and edge type information. dataset: The RemoteDistDataset to sample from. @@ -612,13 +612,13 @@ def _setup_for_graph_store( Returns: Tuple of (list[ABLPNodeSamplerInput], RemoteDistSamplingWorkerOptions, DatasetSchema). """ - node_feature_info = dataset.get_node_feature_info() - edge_feature_info = dataset.get_edge_feature_info() - edge_types = dataset.get_edge_types() + node_feature_info = dataset.fetch_node_feature_info() + edge_feature_info = dataset.fetch_edge_feature_info() + edge_types = dataset.fetch_edge_types() node_rank = dataset.cluster_info.compute_node_rank # Get sampling ports for compute-storage connections. - sampling_ports = dataset.get_free_ports_on_storage_cluster( + sampling_ports = dataset.fetch_free_ports_on_storage_cluster( num_ports=dataset.cluster_info.num_compute_nodes ) sampling_port = sampling_ports[node_rank] @@ -745,7 +745,7 @@ def _setup_for_graph_store( edge_types=edge_types, node_feature_info=node_feature_info, edge_feature_info=edge_feature_info, - edge_dir=dataset.get_edge_dir(), + edge_dir=dataset.fetch_edge_dir(), ), ) diff --git a/gigl/distributed/distributed_neighborloader.py b/gigl/distributed/distributed_neighborloader.py index 5b96d9da2..6d385ef39 100644 --- a/gigl/distributed/distributed_neighborloader.py +++ b/gigl/distributed/distributed_neighborloader.py @@ -293,13 +293,13 @@ def _setup_for_graph_store( f"When using Graph Store mode, input nodes must be of type (dict[int, torch.Tensor] | (NodeType, dict[int, torch.Tensor])), received {type(input_nodes)} ({type(input_nodes[0])}, {type(input_nodes[1])})" ) - node_feature_info = dataset.get_node_feature_info() - edge_feature_info = dataset.get_edge_feature_info() - edge_types = dataset.get_edge_types() + node_feature_info = dataset.fetch_node_feature_info() + edge_feature_info = dataset.fetch_edge_feature_info() + edge_types = dataset.fetch_edge_types() node_rank = dataset.cluster_info.compute_node_rank # Get sampling ports for compute-storage connections. - sampling_ports = dataset.get_free_ports_on_storage_cluster( + sampling_ports = dataset.fetch_free_ports_on_storage_cluster( num_ports=dataset.cluster_info.num_compute_nodes ) sampling_port = sampling_ports[node_rank] @@ -379,7 +379,7 @@ def _setup_for_graph_store( edge_types=edge_types, node_feature_info=node_feature_info, edge_feature_info=edge_feature_info, - edge_dir=dataset.get_edge_dir(), + edge_dir=dataset.fetch_edge_dir(), ), ) diff --git a/gigl/distributed/graph_store/remote_dist_dataset.py b/gigl/distributed/graph_store/remote_dist_dataset.py index 14874559b..2106d051a 100644 --- a/gigl/distributed/graph_store/remote_dist_dataset.py +++ b/gigl/distributed/graph_store/remote_dist_dataset.py @@ -40,7 +40,7 @@ def __init__( local_rank (int): The local rank of the process on the compute node. mp_sharing_dict (Optional[MutableMapping[str, torch.Tensor]]): (Optional) If provided, will be used to share tensors across the local machine. - e.g. for `get_node_ids`. + e.g. for `fetch_node_ids`. If provided, *must* be a `DictProxy` e.g. the return value of a mp.Manager. ex. torch.multiprocessing.Manager().dict(). """ @@ -65,10 +65,10 @@ def __init__( def cluster_info(self) -> GraphStoreInfo: return self._cluster_info - def get_node_feature_info( + def fetch_node_feature_info( self, ) -> Union[FeatureInfo, dict[NodeType, FeatureInfo], None]: - """Get node feature information from the registered dataset. + """Fetch node feature information from the registered dataset. Returns: Node feature information, which can be: @@ -81,10 +81,10 @@ def get_node_feature_info( DistServer.get_node_feature_info, ) - def get_edge_feature_info( + def fetch_edge_feature_info( self, ) -> Union[FeatureInfo, dict[EdgeType, FeatureInfo], None]: - """Get edge feature information from the registered dataset. + """Fetch edge feature information from the registered dataset. Returns: Edge feature information, which can be: @@ -97,8 +97,8 @@ def get_edge_feature_info( DistServer.get_edge_feature_info, ) - def get_edge_dir(self) -> Union[str, Literal["in", "out"]]: - """Get the edge direction from the registered dataset. + def fetch_edge_dir(self) -> Union[str, Literal["in", "out"]]: + """Fetch the edge direction from the registered dataset. Returns: The edge direction. @@ -108,11 +108,11 @@ def get_edge_dir(self) -> Union[str, Literal["in", "out"]]: DistServer.get_edge_dir, ) - def get_node_partition_book( + def fetch_node_partition_book( self, node_type: Optional[NodeType] = None ) -> Optional[PartitionBook]: """ - Gets the partition book for the specified node type. + Fetches the partition book for the specified node type. Args: node_type: The node type to look up. Must be ``None`` for @@ -129,11 +129,11 @@ def get_node_partition_book( node_type=node_type, ) - def get_edge_partition_book( + def fetch_edge_partition_book( self, edge_type: Optional[EdgeType] = None ) -> Optional[PartitionBook]: """ - Gets the partition book for the specified edge type. + Fetches the partition book for the specified edge type. Args: edge_type: The edge type to look up. Must be ``None`` for @@ -157,7 +157,7 @@ def _infer_node_type_if_homogeneous_with_label_edges( Auto-infers the default homogeneous node type for homogeneous datasets with label edges. """ if node_type is None: - node_types = self.get_node_types() + node_types = self.fetch_node_types() if node_types is not None and DEFAULT_HOMOGENEOUS_NODE_TYPE in node_types: node_type = DEFAULT_HOMOGENEOUS_NODE_TYPE logger.info( @@ -173,7 +173,7 @@ def _infer_edge_type_if_homogeneous_with_label_edges( Auto-infers the default homogeneous edge type for homogeneous datasets with label edges. """ if edge_type is None: - edge_types = self.get_edge_types() + edge_types = self.fetch_edge_types() if edge_types is not None and DEFAULT_HOMOGENEOUS_EDGE_TYPE in edge_types: edge_type = DEFAULT_HOMOGENEOUS_EDGE_TYPE logger.info( @@ -182,7 +182,7 @@ def _infer_edge_type_if_homogeneous_with_label_edges( ) return edge_type - def _get_node_ids( + def _fetch_node_ids( self, rank: Optional[int] = None, world_size: Optional[int] = None, @@ -211,7 +211,7 @@ def _get_node_ids( node_ids = torch.futures.wait_all(futures) return {server_rank: node_ids for server_rank, node_ids in enumerate(node_ids)} - def get_node_ids( + def fetch_node_ids( self, rank: Optional[int] = None, world_size: Optional[int] = None, @@ -247,7 +247,7 @@ def get_node_ids( Get all nodes (no split filtering, no sharding): - >>> dataset.get_node_ids() + >>> dataset.fetch_node_ids() { 0: tensor([0, 1, 2, 3, 4, 5, 6, 7]), # All 8 nodes from storage rank 0 1: tensor([8, 9, 10, 11, 12, 13, 14, 15]) # All 8 nodes from storage rank 1 @@ -255,7 +255,7 @@ def get_node_ids( Shard all nodes across 2 compute nodes (compute rank 0 gets first half from each storage): - >>> dataset.get_node_ids(rank=0, world_size=2) + >>> dataset.fetch_node_ids(rank=0, world_size=2) { 0: tensor([0, 1, 2, 3]), # First 4 of all 8 nodes from storage rank 0 1: tensor([8, 9, 10, 11]) # First 4 of all 8 nodes from storage rank 1 @@ -263,7 +263,7 @@ def get_node_ids( Get only training nodes (no sharding): - >>> dataset.get_node_ids(split="train") + >>> dataset.fetch_node_ids(split="train") { 0: tensor([0, 1, 2, 3]), # 4 training nodes from storage rank 0 1: tensor([8, 9, 10, 11]) # 4 training nodes from storage rank 1 @@ -271,7 +271,7 @@ def get_node_ids( Combine split and sharding (training nodes, sharded for compute rank 0): - >>> dataset.get_node_ids(rank=0, world_size=2, split="train") + >>> dataset.fetch_node_ids(rank=0, world_size=2, split="train") { 0: tensor([0, 1]), # First 2 of 4 training nodes from storage rank 0 1: tensor([8, 9]) # First 2 of 4 training nodes from storage rank 1 @@ -297,7 +297,7 @@ def server_key(server_rank: int) -> str: logger.info( f"Compute rank {torch.distributed.get_rank()} is getting node ids from storage nodes" ) - node_ids = self._get_node_ids(rank, world_size, node_type, split) + node_ids = self._fetch_node_ids(rank, world_size, node_type, split) for server_rank, node_id in node_ids.items(): node_id.share_memory_() self._mp_sharing_dict[server_key(server_rank)] = node_id @@ -311,9 +311,9 @@ def server_key(server_rank: int) -> str: } return node_ids else: - return self._get_node_ids(rank, world_size, node_type, split) + return self._fetch_node_ids(rank, world_size, node_type, split) - def get_free_ports_on_storage_cluster(self, num_ports: int) -> list[int]: + def fetch_free_ports_on_storage_cluster(self, num_ports: int) -> list[int]: """ Get free ports from the storage master node. @@ -351,7 +351,7 @@ def get_free_ports_on_storage_cluster(self, num_ports: int) -> list[int]: logger.info(f"Compute rank {compute_cluster_rank} received free ports: {ports}") return cast(list[int], ports) - def _get_ablp_input( + def _fetch_ablp_input( self, split: Literal["train", "val", "test"], rank: Optional[int] = None, @@ -389,7 +389,7 @@ def _get_ablp_input( } # TODO(#488) - support multiple supervision edge types - def get_ablp_input( + def fetch_ablp_input( self, split: Literal["train", "val", "test"], rank: Optional[int] = None, @@ -439,7 +439,7 @@ def get_ablp_input( Get training ABLP input (heterogeneous): - >>> dataset.get_ablp_input(split="train", node_type=USER, supervision_edge_type=USER_TO_ITEM) + >>> dataset.fetch_ablp_input(split="train", node_type=USER, supervision_edge_type=USER_TO_ITEM) { 0: ABLPInputNodes( anchor_nodes=tensor([0, 1, 2]), @@ -503,7 +503,7 @@ def wrap_ablp_input( logger.info( f"Compute rank {torch.distributed.get_rank()} is getting ABLP input from storage nodes" ) - raw_ablp_inputs = self._get_ablp_input( + raw_ablp_inputs = self._fetch_ablp_input( split=split, rank=rank, world_size=world_size, @@ -551,7 +551,7 @@ def wrap_ablp_input( ) return returned_ablp_inputs else: - raw_inputs = self._get_ablp_input( + raw_inputs = self._fetch_ablp_input( split=split, rank=rank, world_size=world_size, @@ -572,8 +572,8 @@ def wrap_ablp_input( ) in raw_inputs.items() } - def get_edge_types(self) -> Optional[list[EdgeType]]: - """Get the edge types from the registered dataset. + def fetch_edge_types(self) -> Optional[list[EdgeType]]: + """Fetch the edge types from the registered dataset. Returns: The edge types in the dataset, None if the dataset is homogeneous. @@ -583,8 +583,8 @@ def get_edge_types(self) -> Optional[list[EdgeType]]: DistServer.get_edge_types, ) - def get_node_types(self) -> Optional[list[NodeType]]: - """Get the node types from the registered dataset. + def fetch_node_types(self) -> Optional[list[NodeType]]: + """Fetch the node types from the registered dataset. Returns: The node types in the dataset, None if the dataset is homogeneous. diff --git a/tests/integration/distributed/graph_store/graph_store_integration_test.py b/tests/integration/distributed/graph_store/graph_store_integration_test.py index 5a414f718..f29a1fd97 100644 --- a/tests/integration/distributed/graph_store/graph_store_integration_test.py +++ b/tests/integration/distributed/graph_store/graph_store_integration_test.py @@ -209,8 +209,8 @@ def _run_compute_train_tests( mp_sharing_dict=mp_sharing_dict, ) - # Test get_ablp_input for train split - ablp_result = remote_dist_dataset.get_ablp_input( + # Test fetch_ablp_input for train split + ablp_result = remote_dist_dataset.fetch_ablp_input( split="train", rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, @@ -227,7 +227,7 @@ def _run_compute_train_tests( worker_concurrency=2, ) - random_negative_input = remote_dist_dataset.get_node_ids( + random_negative_input = remote_dist_dataset.fetch_node_ids( split="train", rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, @@ -307,13 +307,13 @@ def _run_compute_multiple_loaders_test( mp_sharing_dict=mp_sharing_dict, ) - ablp_result = remote_dist_dataset.get_ablp_input( + ablp_result = remote_dist_dataset.fetch_ablp_input( split="train", rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, ) - random_negative_input = remote_dist_dataset.get_node_ids( + random_negative_input = remote_dist_dataset.fetch_node_ids( split="train", rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, @@ -582,15 +582,15 @@ def _run_compute_tests( rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() assert ( - remote_dist_dataset.get_edge_dir() == "in" - ), f"Edge direction must be 'in' for the test dataset. Got {remote_dist_dataset.get_edge_dir()}" + remote_dist_dataset.fetch_edge_dir() == "in" + ), f"Edge direction must be 'in' for the test dataset. Got {remote_dist_dataset.fetch_edge_dir()}" assert ( - remote_dist_dataset.get_edge_feature_info() is not None + remote_dist_dataset.fetch_edge_feature_info() is not None ), "Edge feature info must not be None for the test dataset" assert ( - remote_dist_dataset.get_node_feature_info() is not None + remote_dist_dataset.fetch_node_feature_info() is not None ), "Node feature info must not be None for the test dataset" - ports = remote_dist_dataset.get_free_ports_on_storage_cluster(num_ports=2) + ports = remote_dist_dataset.fetch_free_ports_on_storage_cluster(num_ports=2) assert len(ports) == 2, "Expected 2 free ports" if rank == 0: all_ports = [None] * torch.distributed.get_world_size() @@ -609,7 +609,7 @@ def _run_compute_tests( torch.distributed.barrier() logger.info("Verified that all ranks received the same free ports") - sampler_input = remote_dist_dataset.get_node_ids( + sampler_input = remote_dist_dataset.fetch_node_ids( node_type=node_type, rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, @@ -621,7 +621,7 @@ def _run_compute_tests( cluster_info=cluster_info, local_rank=client_rank, mp_sharing_dict=None, - ).get_node_ids( + ).fetch_node_ids( node_type=node_type, rank=cluster_info.compute_node_rank, world_size=cluster_info.num_compute_nodes, @@ -629,8 +629,8 @@ def _run_compute_tests( _assert_sampler_input(cluster_info, simple_sampler_input, expected_sampler_input) assert ( - remote_dist_dataset.get_edge_types() == expected_edge_types - ), f"Expected edge types {expected_edge_types}, got {remote_dist_dataset.get_edge_types()}" + remote_dist_dataset.fetch_edge_types() == expected_edge_types + ), f"Expected edge types {expected_edge_types}, got {remote_dist_dataset.fetch_edge_types()}" torch.distributed.barrier() if node_type is not None: diff --git a/tests/test_assets/distributed/utils.py b/tests/test_assets/distributed/utils.py index b60403271..133a02036 100644 --- a/tests/test_assets/distributed/utils.py +++ b/tests/test_assets/distributed/utils.py @@ -137,7 +137,7 @@ class MockRemoteDistDataset(RemoteDistDataset): ... num_storage_nodes=2, ... edge_types=[EdgeType("user", "knows", "user")], ... ) - >>> mock_dataset.get_edge_types() # Returns the configured edge_types + >>> mock_dataset.fetch_edge_types() # Returns the configured edge_types >>> mock_dataset.cluster_info.num_storage_nodes # Returns 2 """ @@ -174,22 +174,26 @@ def cluster_info(self) -> GraphStoreInfo: """Returns a MockGraphStoreInfo with the configured compute_node_rank.""" return MockGraphStoreInfo(self._mock_cluster_info, self._mock_compute_node_rank) - def get_node_feature_info(self): + def fetch_node_feature_info(self): """Returns None (no node features configured).""" return None - def get_edge_feature_info(self): + def fetch_edge_feature_info(self): """Returns None (no edge features configured).""" return None - def get_edge_dir(self) -> str: + def fetch_edge_dir(self) -> str: """Returns the configured edge direction.""" return self._mock_edge_dir - def get_edge_types(self) -> Optional[list[EdgeType]]: + def fetch_edge_types(self) -> Optional[list[EdgeType]]: """Returns the configured edge types.""" return self._mock_edge_types - def get_free_ports_on_storage_cluster(self, num_ports: int) -> list[int]: - """Returns a list of mock port numbers starting at 20000.""" + def fetch_node_types(self) -> None: + """Returns None (no node types configured).""" + return None + + def fetch_free_ports_on_storage_cluster(self, num_ports: int) -> list[int]: + """Returns a list of mock free port numbers.""" return get_free_ports(num_ports=num_ports) diff --git a/tests/unit/distributed/graph_store/remote_dist_dataset_test.py b/tests/unit/distributed/graph_store/remote_dist_dataset_test.py index 7eb945a2b..39ecc9e96 100644 --- a/tests/unit/distributed/graph_store/remote_dist_dataset_test.py +++ b/tests/unit/distributed/graph_store/remote_dist_dataset_test.py @@ -97,18 +97,18 @@ def tearDown(self) -> None: side_effect=_mock_request_server, ) def test_graph_metadata_getters_homogeneous(self, mock_request): - """Test get_node_feature_info, get_edge_feature_info, get_edge_dir, get_edge_types, get_node_types for homogeneous graphs.""" + """Test fetch_node_feature_info, fetch_edge_feature_info, fetch_edge_dir, fetch_edge_types, fetch_node_types for homogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) self.assertEqual( - remote_dataset.get_node_feature_info(), + remote_dataset.fetch_node_feature_info(), FeatureInfo(dim=3, dtype=torch.float32), ) - self.assertIsNone(remote_dataset.get_edge_feature_info()) - self.assertEqual(remote_dataset.get_edge_dir(), "out") - self.assertIsNone(remote_dataset.get_edge_types()) - self.assertIsNone(remote_dataset.get_node_types()) + self.assertIsNone(remote_dataset.fetch_edge_feature_info()) + self.assertEqual(remote_dataset.fetch_edge_dir(), "out") + self.assertIsNone(remote_dataset.fetch_edge_types()) + self.assertIsNone(remote_dataset.fetch_node_types()) def test_init_rejects_non_dict_proxy_for_mp_sharing_dict(self): cluster_info = _create_mock_graph_store_info() @@ -137,34 +137,34 @@ def test_cluster_info_property(self): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_ids(self, mock_request, mock_async_request): - """Test get_node_ids returns node ids, with optional sharding via rank/world_size.""" + def test_fetch_node_ids(self, mock_request, mock_async_request): + """Test fetch_node_ids returns node ids, with optional sharding via rank/world_size.""" cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # Basic: all nodes - result = remote_dataset.get_node_ids() + result = remote_dataset.fetch_node_ids() self.assertIn(0, result) self.assert_tensor_equality(result[0], torch.arange(10)) # With sharding: first half (rank 0 of 2) - result = remote_dataset.get_node_ids(rank=0, world_size=2) + result = remote_dataset.fetch_node_ids(rank=0, world_size=2) self.assert_tensor_equality(result[0], torch.arange(5)) # With sharding: second half (rank 1 of 2) - result = remote_dataset.get_node_ids(rank=1, world_size=2) + result = remote_dataset.fetch_node_ids(rank=1, world_size=2) self.assert_tensor_equality(result[0], torch.arange(5, 10)) @patch( "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_partition_book_homogeneous(self, mock_request): - """Test get_node_partition_book returns the tensor partition book for homogeneous graphs.""" + def test_fetch_node_partition_book_homogeneous(self, mock_request): + """Test fetch_node_partition_book returns the tensor partition book for homogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - result = remote_dataset.get_node_partition_book() + result = remote_dataset.fetch_node_partition_book() self.assertIsInstance(result, torch.Tensor) assert isinstance(result, torch.Tensor) # for type narrowing self.assertEqual(result.shape[0], 10) @@ -174,12 +174,12 @@ def test_get_node_partition_book_homogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_edge_partition_book_homogeneous(self, mock_request): - """Test get_edge_partition_book returns the tensor partition book for homogeneous graphs.""" + def test_fetch_edge_partition_book_homogeneous(self, mock_request): + """Test fetch_edge_partition_book returns the tensor partition book for homogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - result = remote_dataset.get_edge_partition_book() + result = remote_dataset.fetch_edge_partition_book() self.assertIsInstance(result, torch.Tensor) assert isinstance(result, torch.Tensor) # for type narrowing self.assertEqual(result.shape[0], 10) @@ -189,13 +189,15 @@ def test_get_edge_partition_book_homogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_partition_book_homogeneous_rejects_node_type(self, mock_request): - """Test get_node_partition_book raises ValueError when node_type is given for homogeneous graphs.""" + def test_fetch_node_partition_book_homogeneous_rejects_node_type( + self, mock_request + ): + """Test fetch_node_partition_book raises ValueError when node_type is given for homogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) with self.assertRaises(ValueError): - remote_dataset.get_node_partition_book(node_type=USER) + remote_dataset.fetch_node_partition_book(node_type=USER) class TestRemoteDistDatasetHeterogeneous(TestCase): @@ -225,22 +227,22 @@ def tearDown(self) -> None: side_effect=_mock_request_server, ) def test_graph_metadata_getters_heterogeneous(self, mock_request): - """Test get_node_feature_info, get_edge_dir, get_edge_types, get_node_types for heterogeneous graphs.""" + """Test fetch_node_feature_info, fetch_edge_dir, fetch_edge_types, fetch_node_types for heterogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) self.assertEqual( - remote_dataset.get_node_feature_info(), + remote_dataset.fetch_node_feature_info(), { USER: FeatureInfo(dim=2, dtype=torch.float32), STORY: FeatureInfo(dim=2, dtype=torch.float32), }, ) - self.assertEqual(remote_dataset.get_edge_dir(), "out") + self.assertEqual(remote_dataset.fetch_edge_dir(), "out") self.assertEqual( - remote_dataset.get_edge_types(), [USER_TO_STORY, STORY_TO_USER] + remote_dataset.fetch_edge_types(), [USER_TO_STORY, STORY_TO_USER] ) - node_types = remote_dataset.get_node_types() + node_types = remote_dataset.fetch_node_types() self.assertIsNotNone(node_types) assert node_types is not None # for type narrowing self.assertEqual(set(node_types), {USER, STORY}) @@ -249,43 +251,43 @@ def test_graph_metadata_getters_heterogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.async_request_server", side_effect=_mock_async_request_server, ) - def test_get_node_ids_with_node_type(self, mock_async_request): - """Test get_node_ids with node_type for heterogeneous graphs, with optional sharding.""" + def test_fetch_node_ids_with_node_type(self, mock_async_request): + """Test fetch_node_ids with node_type for heterogeneous graphs, with optional sharding.""" cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # Get user nodes - result = remote_dataset.get_node_ids(node_type=USER) + result = remote_dataset.fetch_node_ids(node_type=USER) self.assert_tensor_equality(result[0], torch.arange(5)) # Get story nodes - result = remote_dataset.get_node_ids(node_type=STORY) + result = remote_dataset.fetch_node_ids(node_type=STORY) self.assert_tensor_equality(result[0], torch.arange(5)) # With sharding: first half of user nodes (rank 0 of 2) - result = remote_dataset.get_node_ids(rank=0, world_size=2, node_type=USER) + result = remote_dataset.fetch_node_ids(rank=0, world_size=2, node_type=USER) self.assert_tensor_equality(result[0], torch.arange(2)) # With sharding: second half of user nodes (rank 1 of 2) - result = remote_dataset.get_node_ids(rank=1, world_size=2, node_type=USER) + result = remote_dataset.fetch_node_ids(rank=1, world_size=2, node_type=USER) self.assert_tensor_equality(result[0], torch.arange(2, 5)) @patch( "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_partition_book_heterogeneous(self, mock_request): - """Test get_node_partition_book returns per-type partition books for heterogeneous graphs.""" + def test_fetch_node_partition_book_heterogeneous(self, mock_request): + """Test fetch_node_partition_book returns per-type partition books for heterogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - user_pb = remote_dataset.get_node_partition_book(node_type=USER) + user_pb = remote_dataset.fetch_node_partition_book(node_type=USER) self.assertIsInstance(user_pb, torch.Tensor) assert isinstance(user_pb, torch.Tensor) self.assertEqual(user_pb.shape[0], 5) self.assert_tensor_equality(user_pb, torch.zeros(5, dtype=torch.int64)) - story_pb = remote_dataset.get_node_partition_book(node_type=STORY) + story_pb = remote_dataset.fetch_node_partition_book(node_type=STORY) self.assertIsInstance(story_pb, torch.Tensor) assert isinstance(story_pb, torch.Tensor) self.assertEqual(story_pb.shape[0], 5) @@ -295,12 +297,12 @@ def test_get_node_partition_book_heterogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_edge_partition_book_heterogeneous(self, mock_request): - """Test get_edge_partition_book returns per-type partition books for heterogeneous graphs.""" + def test_fetch_edge_partition_book_heterogeneous(self, mock_request): + """Test fetch_edge_partition_book returns per-type partition books for heterogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - user_to_story_pb = remote_dataset.get_edge_partition_book( + user_to_story_pb = remote_dataset.fetch_edge_partition_book( edge_type=USER_TO_STORY ) self.assertIsInstance(user_to_story_pb, torch.Tensor) @@ -317,33 +319,33 @@ def test_get_edge_partition_book_heterogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_partition_book_heterogeneous_requires_node_type( + def test_fetch_node_partition_book_heterogeneous_requires_node_type( self, mock_request ): - """Test get_node_partition_book raises ValueError when no node_type for heterogeneous graphs.""" + """Test fetch_node_partition_book raises ValueError when no node_type for heterogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) with self.assertRaises(ValueError): - remote_dataset.get_node_partition_book() + remote_dataset.fetch_node_partition_book() @patch( "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_edge_partition_book_heterogeneous_requires_edge_type( + def test_fetch_edge_partition_book_heterogeneous_requires_edge_type( self, mock_request ): - """Test get_edge_partition_book raises ValueError when no edge_type for heterogeneous graphs.""" + """Test fetch_edge_partition_book raises ValueError when no edge_type for heterogeneous graphs.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) with self.assertRaises(ValueError): - remote_dataset.get_edge_partition_book() + remote_dataset.fetch_edge_partition_book() class TestRemoteDistDatasetWithSplits(TestCase): - """Tests for get_node_ids with train/val/test splits.""" + """Tests for fetch_node_ids with train/val/test splits.""" def tearDown(self) -> None: global _test_server @@ -387,8 +389,8 @@ def _create_server_with_splits(self) -> None: "gigl.distributed.graph_store.remote_dist_dataset.async_request_server", side_effect=_mock_async_request_server, ) - def test_get_node_ids_with_splits(self, mock_async_request): - """Test get_node_ids with train/val/test splits and optional sharding.""" + def test_fetch_node_ids_with_splits(self, mock_async_request): + """Test fetch_node_ids with train/val/test splits and optional sharding.""" self._create_server_with_splits() cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) @@ -396,33 +398,33 @@ def test_get_node_ids_with_splits(self, mock_async_request): # Test each split returns correct nodes self.assert_tensor_equality( - remote_dataset.get_node_ids(node_type=USER, split="train")[0], + remote_dataset.fetch_node_ids(node_type=USER, split="train")[0], torch.tensor([0, 1, 2]), ) self.assert_tensor_equality( - remote_dataset.get_node_ids(node_type=USER, split="val")[0], + remote_dataset.fetch_node_ids(node_type=USER, split="val")[0], torch.tensor([3]), ) self.assert_tensor_equality( - remote_dataset.get_node_ids(node_type=USER, split="test")[0], + remote_dataset.fetch_node_ids(node_type=USER, split="test")[0], torch.tensor([4]), ) # No split returns all nodes self.assert_tensor_equality( - remote_dataset.get_node_ids(node_type=USER, split=None)[0], + remote_dataset.fetch_node_ids(node_type=USER, split=None)[0], torch.arange(5), ) # With sharding: train split [0, 1, 2] across 2 ranks self.assert_tensor_equality( - remote_dataset.get_node_ids( + remote_dataset.fetch_node_ids( rank=0, world_size=2, node_type=USER, split="train" )[0], torch.tensor([0]), ) self.assert_tensor_equality( - remote_dataset.get_node_ids( + remote_dataset.fetch_node_ids( rank=1, world_size=2, node_type=USER, split="train" )[0], torch.tensor([1, 2]), @@ -432,15 +434,15 @@ def test_get_node_ids_with_splits(self, mock_async_request): "gigl.distributed.graph_store.remote_dist_dataset.async_request_server", side_effect=_mock_async_request_server, ) - def test_get_ablp_input(self, mock_async_request): - """Test get_ablp_input with train/val/test splits.""" + def test_fetch_ablp_input(self, mock_async_request): + """Test fetch_ablp_input with train/val/test splits.""" self._create_server_with_splits() cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # Train split: nodes [0, 1, 2] - result = remote_dataset.get_ablp_input( + result = remote_dataset.fetch_ablp_input( split="train", anchor_node_type=USER, supervision_edge_type=USER_TO_STORY ) self.assertIn(0, result) @@ -461,7 +463,7 @@ def test_get_ablp_input(self, mock_async_request): ) # Val split: node [3] - result = remote_dataset.get_ablp_input( + result = remote_dataset.fetch_ablp_input( split="val", anchor_node_type=USER, supervision_edge_type=USER_TO_STORY ) ablp_input = result[0] @@ -480,7 +482,7 @@ def test_get_ablp_input(self, mock_async_request): # Test split: node [4] # Note: Labels are stored in CSR format which sorts by destination indices, # so [4, 0] from the input becomes [0, 4] in the stored format. - result = remote_dataset.get_ablp_input( + result = remote_dataset.fetch_ablp_input( split="test", anchor_node_type=USER, supervision_edge_type=USER_TO_STORY ) ablp_input = result[0] @@ -500,15 +502,15 @@ def test_get_ablp_input(self, mock_async_request): "gigl.distributed.graph_store.remote_dist_dataset.async_request_server", side_effect=_mock_async_request_server, ) - def test_get_ablp_input_with_sharding(self, mock_async_request): - """Test get_ablp_input with sharding across compute nodes.""" + def test_fetch_ablp_input_with_sharding(self, mock_async_request): + """Test fetch_ablp_input with sharding across compute nodes.""" self._create_server_with_splits() cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # With sharding: train split [0, 1, 2] across 2 ranks - result_rank0 = remote_dataset.get_ablp_input( + result_rank0 = remote_dataset.fetch_ablp_input( split="train", rank=0, world_size=2, @@ -532,7 +534,7 @@ def test_get_ablp_input_with_sharding(self, mock_async_request): torch.tensor([[2]]), ) - result_rank1 = remote_dataset.get_ablp_input( + result_rank1 = remote_dataset.fetch_ablp_input( split="train", rank=1, world_size=2, @@ -614,13 +616,13 @@ def _create_server_with_labeled_homogeneous_splits(self) -> None: "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_types_labeled_homogeneous(self, mock_request): - """Test get_node_types returns DEFAULT_HOMOGENEOUS_NODE_TYPE for labeled homogeneous datasets.""" + def test_fetch_node_types_labeled_homogeneous(self, mock_request): + """Test fetch_node_types returns DEFAULT_HOMOGENEOUS_NODE_TYPE for labeled homogeneous datasets.""" self._create_server_with_labeled_homogeneous_splits() cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - node_types = remote_dataset.get_node_types() + node_types = remote_dataset.fetch_node_types() self.assertIsNotNone(node_types) self.assertIn(DEFAULT_HOMOGENEOUS_NODE_TYPE, node_types) @@ -632,25 +634,25 @@ def test_get_node_types_labeled_homogeneous(self, mock_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_ids_auto_detects_default_node_type( + def test_fetch_node_ids_auto_detects_default_node_type( self, mock_request, mock_async_request ): - """Test get_node_ids without node_type auto-detects DEFAULT_HOMOGENEOUS_NODE_TYPE.""" + """Test fetch_node_ids without node_type auto-detects DEFAULT_HOMOGENEOUS_NODE_TYPE.""" self._create_server_with_labeled_homogeneous_splits() cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) - # No node_type provided: _get_node_ids should auto-detect DEFAULT_HOMOGENEOUS_NODE_TYPE + # No node_type provided: _fetch_node_ids should auto-detect DEFAULT_HOMOGENEOUS_NODE_TYPE self.assert_tensor_equality( - remote_dataset.get_node_ids(split="train")[0], + remote_dataset.fetch_node_ids(split="train")[0], torch.tensor([0, 1, 2]), ) self.assert_tensor_equality( - remote_dataset.get_node_ids(split="val")[0], + remote_dataset.fetch_node_ids(split="val")[0], torch.tensor([3]), ) self.assert_tensor_equality( - remote_dataset.get_node_ids(split="test")[0], + remote_dataset.fetch_node_ids(split="test")[0], torch.tensor([4]), ) @@ -658,14 +660,14 @@ def test_get_node_ids_auto_detects_default_node_type( "gigl.distributed.graph_store.remote_dist_dataset.async_request_server", side_effect=_mock_async_request_server, ) - def test_get_ablp_input_defaults_to_homogeneous_types(self, mock_async_request): - """Test get_ablp_input without anchor_node_type/supervision_edge_type uses homogeneous defaults.""" + def test_fetch_ablp_input_defaults_to_homogeneous_types(self, mock_async_request): + """Test fetch_ablp_input without anchor_node_type/supervision_edge_type uses homogeneous defaults.""" self._create_server_with_labeled_homogeneous_splits() cluster_info = _create_mock_graph_store_info(num_storage_nodes=1) remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # Train split: nodes [0, 1, 2] — no type params provided - result = remote_dataset.get_ablp_input(split="train") + result = remote_dataset.fetch_ablp_input(split="train") self.assertIn(0, result) ablp_input = result[0] self.assertIsInstance(ablp_input, ABLPInputNodes) @@ -687,14 +689,16 @@ def test_get_ablp_input_defaults_to_homogeneous_types(self, mock_async_request): "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_node_partition_book_auto_infers_default_node_type(self, mock_request): - """Test get_node_partition_book auto-infers DEFAULT_HOMOGENEOUS_NODE_TYPE when None.""" + def test_fetch_node_partition_book_auto_infers_default_node_type( + self, mock_request + ): + """Test fetch_node_partition_book auto-infers DEFAULT_HOMOGENEOUS_NODE_TYPE when None.""" self._create_server_with_labeled_homogeneous_splits() cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # No node_type: should auto-infer DEFAULT_HOMOGENEOUS_NODE_TYPE - result = remote_dataset.get_node_partition_book() + result = remote_dataset.fetch_node_partition_book() self.assertIsInstance(result, torch.Tensor) assert isinstance(result, torch.Tensor) self.assertEqual(result.shape[0], 5) @@ -704,40 +708,42 @@ def test_get_node_partition_book_auto_infers_default_node_type(self, mock_reques "gigl.distributed.graph_store.remote_dist_dataset.request_server", side_effect=_mock_request_server, ) - def test_get_edge_partition_book_auto_infers_default_edge_type(self, mock_request): - """Test get_edge_partition_book auto-infers DEFAULT_HOMOGENEOUS_EDGE_TYPE when None.""" + def test_fetch_edge_partition_book_auto_infers_default_edge_type( + self, mock_request + ): + """Test fetch_edge_partition_book auto-infers DEFAULT_HOMOGENEOUS_EDGE_TYPE when None.""" self._create_server_with_labeled_homogeneous_splits() cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) # No edge_type: should auto-infer DEFAULT_HOMOGENEOUS_EDGE_TYPE - result = remote_dataset.get_edge_partition_book() + result = remote_dataset.fetch_edge_partition_book() self.assertIsInstance(result, torch.Tensor) assert isinstance(result, torch.Tensor) self.assertEqual(result.shape[0], 5) self.assert_tensor_equality(result, torch.zeros(5, dtype=torch.int64)) - def test_get_ablp_input_mismatched_params_raises(self): - """Test get_ablp_input raises ValueError when exactly one type param is None.""" + def test_fetch_ablp_input_mismatched_params_raises(self): + """Test fetch_ablp_input raises ValueError when exactly one type param is None.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) with self.assertRaises(ValueError): - remote_dataset.get_ablp_input( + remote_dataset.fetch_ablp_input( split="train", anchor_node_type=DEFAULT_HOMOGENEOUS_NODE_TYPE, supervision_edge_type=None, ) with self.assertRaises(ValueError): - remote_dataset.get_ablp_input( + remote_dataset.fetch_ablp_input( split="train", anchor_node_type=None, supervision_edge_type=DEFAULT_HOMOGENEOUS_EDGE_TYPE, ) -def _test_get_free_ports_on_storage_cluster( +def _test_fetch_free_ports_on_storage_cluster( rank: int, world_size: int, init_process_group_init_method: str, @@ -763,7 +769,7 @@ def _test_get_free_ports_on_storage_cluster( "gigl.distributed.graph_store.remote_dist_dataset.request_server", return_value=mock_ports, ): - ports = remote_dataset.get_free_ports_on_storage_cluster(num_ports) + ports = remote_dataset.fetch_free_ports_on_storage_cluster(num_ports) assert len(ports) == num_ports, f"Expected {num_ports} ports, got {len(ports)}" @@ -793,7 +799,7 @@ def tearDown(self) -> None: if dist.is_initialized(): dist.destroy_process_group() - def test_get_free_ports_on_storage_cluster_distributed(self): + def test_fetch_free_ports_on_storage_cluster_distributed(self): """Test that free ports are correctly broadcast across all ranks.""" init_method = get_process_group_init_method() world_size = 2 @@ -801,18 +807,18 @@ def test_get_free_ports_on_storage_cluster_distributed(self): mock_ports = [10000, 10001, 10002] mp.spawn( - fn=_test_get_free_ports_on_storage_cluster, + fn=_test_fetch_free_ports_on_storage_cluster, args=(world_size, init_method, num_ports, mock_ports), nprocs=world_size, ) - def test_get_free_ports_fails_without_process_group(self): - """Test that get_free_ports_on_storage_cluster raises when dist not initialized.""" + def test_fetch_free_ports_fails_without_process_group(self): + """Test that fetch_free_ports_on_storage_cluster raises when dist not initialized.""" cluster_info = _create_mock_graph_store_info() remote_dataset = RemoteDistDataset(cluster_info=cluster_info, local_rank=0) with self.assertRaises(ValueError): - remote_dataset.get_free_ports_on_storage_cluster(num_ports=1) + remote_dataset.fetch_free_ports_on_storage_cluster(num_ports=1) class TestCallFuncOnServer(TestCase):