Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -368,6 +369,16 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return future;
}

private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles,
NamespaceBundle bundleRange) {
try {
namespaceBundles.validateBundle(bundleRange);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}

// Attempt to local the data for the given bundle in metadata store
// If it cannot be found, return the default bundle data.
@Override
Expand Down Expand Up @@ -762,8 +773,14 @@ public void checkNamespaceBundleSplit() {
try {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
if (!namespaceBundleFactory
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, bundleRange);
if (!namespaceBundleFactory.canSplitBundle(bundle)) {
continue;
}

NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName));
if (!checkBundleDataExistInNamespaceBundles(bundles, bundle)) {
log.warn("Bundle {} has been removed, skip split this bundle ", bundleName);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int size() {
return bundles.size();
}

public void validateBundle(NamespaceBundle nsBundle) throws Exception {
public void validateBundle(NamespaceBundle nsBundle) throws IllegalArgumentException {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -1130,4 +1132,56 @@ public void testRemoveNonExistBundleData()
assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange()));
}

@Test
public void testRepeatSplitBundle() throws Exception {
final String cluster = "use";
final String tenant = "my-tenant";
final String namespace = "repeat-split-bundle";
final String topicName = tenant + "/" + namespace + "/" + "topic";
int bundleNumbers = 8;

admin1.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(pulsar1.getWebServiceAddress()).build());
admin1.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers);

LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData");
LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData");

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build();

// create a lot of topic to fully distributed among bundles.
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String topicNameI = topicName + i;
admin1.topics().createPartitionedTopic(topicNameI, 20);
// trigger bundle assignment

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicNameI)
.subscriptionName("my-subscriber-name2").subscribe();
consumers.add(consumer);
}

String topicToFindBundle = topicName + 0;
NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
String bundleKey = realBundle.toString();
log.info("Before bundle={}", bundleKey);

NamespaceBundleStats stats = new NamespaceBundleStats();
stats.msgRateIn = 100000.0;
localData.getLastStats().put(bundleKey, stats);
pulsar1.getBrokerService().updateRates();

primaryLoadManager.updateAll();

primaryLoadManager.updateAll();
Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey));
Comment thread
lhotari marked this conversation as resolved.

for (Consumer consumer : consumers) {
consumer.close();
}
}

}
Loading