Skip to content

Commit

Permalink
fix-npe-when-pulsar-ZkBookieRackAffinityMapping-getBookieAddressResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
gavingaozhangmin committed Sep 16, 2021
1 parent 97818f5 commit 9f2bddb
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
try {
dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
dnsResolver.setBookieAddressResolver(bookieAddressResolver);
if (dnsResolver instanceof Configurable) {
((Configurable) dnsResolver).setConf(conf);
}
Expand All @@ -269,9 +270,10 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
}
} catch (RuntimeException re) {
if (!conf.getEnforceMinNumRacksPerWriteQuorum()) {
LOG.error("Failed to initialize DNS Resolver {}, used default subnet resolver : {} {}",
dnsResolverName, re, re.getMessage());
LOG.error("Failed to initialize DNS Resolver {}, used default subnet resolver ",
dnsResolverName, re);
dnsResolver = new DefaultResolver(this::getDefaultRack);
dnsResolver.setBookieAddressResolver(bookieAddressResolver);
} else {
/*
* if minNumRacksPerWriteQuorum is enforced, then it
Expand All @@ -282,9 +284,6 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
}
}
}
if (dnsResolver != null) {
dnsResolver.setBookieAddressResolver(bookieAddressResolver);
}
slowBookies = CacheBuilder.newBuilder()
.expireAfterWrite(conf.getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS)
.build(new CacheLoader<BookieId, Long>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public Integer getSample() {
} else {
String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
actualDNSResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
actualDNSResolver.setBookieAddressResolver(bookieAddressResolver);
if (actualDNSResolver instanceof Configurable) {
((Configurable) actualDNSResolver).setConf(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.net.ScriptBasedMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -146,6 +149,17 @@ static void updateMyRack(String rack) throws Exception {
StaticDNSResolver.addNodeToRack("localhost", rack);
}

@Test
public void testInitalize() throws Exception{
String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
DNSToSwitchMapping dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
AbstractDNSToSwitchMapping tmp = (AbstractDNSToSwitchMapping) dnsResolver;
assertNull(tmp.getBookieAddressResolver());

dnsResolver.setBookieAddressResolver(repp.bookieAddressResolver);
assertNotNull(tmp.getBookieAddressResolver());
}

@Test
public void testNodeDown() throws Exception {
repp.uninitalize();
Expand Down

0 comments on commit 9f2bddb

Please sign in to comment.