Skip to content

Commit c35f769

Browse files
authored
RATIS-1912. Fix infinity election when perform membership change. (#954)
1 parent e2ea51b commit c35f769

File tree

11 files changed

+267
-18
lines changed

11 files changed

+267
-18
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ enum Phase {
104104
ELECTION
105105
}
106106

107-
enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
107+
enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
108108

109109
private static class ResultAndTerm {
110110
private final Result result;
@@ -331,6 +331,7 @@ private boolean askForVotes(Phase phase, int round) throws InterruptedException,
331331

332332
switch (r.getResult()) {
333333
case PASSED:
334+
case SINGLE_MODE_PASSED:
334335
return true;
335336
case NOT_IN_CONF:
336337
case SHUTDOWN:
@@ -379,6 +380,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
379380
Collection<RaftPeerId> votedPeers = new ArrayList<>();
380381
Collection<RaftPeerId> rejectedPeers = new ArrayList<>();
381382
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
383+
final boolean singleMode = conf.isSingleMode(server.getId());
382384

383385
while (waitForNum > 0 && shouldRun(electionTerm)) {
384386
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
@@ -387,6 +389,9 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
387389
// if some higher priority peer did not response when timeout, but candidate get majority,
388390
// candidate pass vote
389391
return logAndReturn(phase, Result.PASSED, responses, exceptions);
392+
} else if (singleMode) {
393+
// if candidate is in single mode, candidate pass vote.
394+
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
390395
} else {
391396
return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
392397
}
@@ -418,7 +423,7 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
418423
}
419424

420425
// If any peer with higher priority rejects vote, candidate can not pass vote
421-
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
426+
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId) && !singleMode) {
422427
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
423428
}
424429

@@ -447,6 +452,8 @@ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitt
447452
// received all the responses
448453
if (conf.hasMajority(votedPeers, server.getId())) {
449454
return logAndReturn(phase, Result.PASSED, responses, exceptions);
455+
} else if (singleMode) {
456+
return logAndReturn(phase, Result.SINGLE_MODE_PASSED, responses, exceptions);
450457
} else {
451458
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
452459
}

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,34 @@ Collection<RaftPeer> getOtherPeers(RaftPeerId selfId) {
232232
return others;
233233
}
234234

235+
/**
236+
* @return true if the new peers number reaches half of new conf peers number or the group is
237+
* changing from single mode to HA mode.
238+
*/
239+
boolean changeMajority(Collection<RaftPeer> newMembers) {
240+
Preconditions.assertNull(oldConf, "oldConf");
241+
final long newPeersCount = newMembers.stream().map(RaftPeer::getId).filter(id -> conf.getPeer(id) == null).count();
242+
243+
if (conf.size() == 1 && newMembers.size() == 2 && newPeersCount == 1) {
244+
// Change from single peer to HA mode. This is a special case, skip majority verification.
245+
return false;
246+
}
247+
248+
// If newPeersCount reaches majority number of new conf size, the cluster may end with infinity
249+
// election. See https://issues.apache.org/jira/browse/RATIS-1912 for more details.
250+
final long oldPeersCount = newMembers.size() - newPeersCount;
251+
return newPeersCount >= oldPeersCount;
252+
}
253+
254+
/** @return True if the selfId is in single mode. */
255+
boolean isSingleMode(RaftPeerId selfId) {
256+
if (isStable()) {
257+
return conf.size() == 1;
258+
} else {
259+
return oldConf.size() == 1 && oldConf.contains(selfId) && conf.size() == 2 && conf.contains(selfId);
260+
}
261+
}
262+
235263
/** @return true if the self id together with the others are in the majority. */
236264
boolean hasMajority(Collection<RaftPeerId> others, RaftPeerId selfId) {
237265
Preconditions.assertTrue(!others.contains(selfId));

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,10 @@ public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfiguration
13131313
pending.setReply(newSuccessReply(request));
13141314
return pending.getFuture();
13151315
}
1316+
if (current.changeMajority(serversInNewConf)) {
1317+
throw new SetConfigurationException("Failed to set configuration: request " + request
1318+
+ " changes a majority set of the current configuration " + current);
1319+
}
13161320

13171321
getRaftServer().addRaftPeers(serversInNewConf);
13181322
getRaftServer().addRaftPeers(listenersInNewConf);

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.ratis.conf.RaftProperties;
2222
import org.apache.ratis.protocol.RaftClientReply;
2323
import org.apache.ratis.protocol.RaftGroupId;
24+
import org.apache.ratis.protocol.RaftPeer;
2425
import org.apache.ratis.protocol.RaftPeerId;
2526
import org.apache.ratis.server.RaftServer;
2627
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -111,7 +112,8 @@ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
111112
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
112113
true);
113114
// trigger setConfiguration
114-
cluster.setConfiguration(change.allPeersInNewConf);
115+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
116+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
115117

116118
RaftServerTestUtil
117119
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);

ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.io.IOException;
5050
import java.nio.file.Files;
5151
import java.nio.file.Path;
52+
import java.util.Arrays;
5253
import java.util.Collections;
5354
import java.util.List;
5455
import java.util.concurrent.CompletableFuture;
@@ -242,7 +243,8 @@ private void testAddNewFollowers(CLUSTER cluster, int numRequests) throws Except
242243
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
243244
true);
244245
// trigger setConfiguration
245-
cluster.setConfiguration(change.allPeersInNewConf);
246+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
247+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
246248

247249
RaftServerTestUtil
248250
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
@@ -391,7 +393,8 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
391393
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
392394
true);
393395
// trigger setConfiguration
394-
cluster.setConfiguration(change.allPeersInNewConf);
396+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
397+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
395398
RaftServerTestUtil
396399
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
397400

@@ -478,7 +481,8 @@ private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws Exception
478481
// add one new peer
479482
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(1, true, true);
480483
// trigger setConfiguration
481-
cluster.setConfiguration(change.allPeersInNewConf);
484+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
485+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
482486

483487
RaftServerTestUtil
484488
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
@@ -556,7 +560,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
556560
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
557561
true);
558562
// trigger setConfiguration
559-
cluster.setConfiguration(change.allPeersInNewConf);
563+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
564+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
560565

561566
RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
562567

@@ -567,8 +572,8 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
567572
RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
568573
}
569574

570-
// Make sure each new peer got one snapshot notification.
571-
Assert.assertEquals(2, numSnapshotRequests.get());
575+
// Make sure each new peer got at least one snapshot notification.
576+
Assert.assertTrue(2 <= numSnapshotRequests.get());
572577
} finally {
573578
cluster.shutdown();
574579
}

ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ratis.protocol.RaftPeer;
2929
import org.apache.ratis.protocol.RaftPeerId;
3030
import org.apache.ratis.server.RaftServer;
31+
import org.apache.ratis.server.impl.RaftServerTestUtil;
3132
import org.apache.ratis.server.impl.RetryCacheTestUtil;
3233
import org.apache.ratis.server.raftlog.RaftLog;
3334
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -139,7 +140,8 @@ void runTestRetryOnNewLeader(CLUSTER cluster) throws Exception {
139140
RaftPeer[] allPeers = cluster.removePeers(2, true,
140141
asList(change.newPeers)).allPeersInNewConf;
141142
// trigger setConfiguration
142-
cluster.setConfiguration(allPeers);
143+
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(allPeers),
144+
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
143145

144146
final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
145147
final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();

ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,8 @@ public static <T extends Throwable> void runMultiGroupTest(
329329
}
330330
LOG.info(chosen + ") setConfiguration: " + cluster.printServers(groups[chosen].getGroupId()));
331331
try (final RaftClient client = cluster.createClient(groups[chosen])) {
332-
client.admin().setConfiguration(allPeers.toArray(RaftPeer.emptyArray()));
332+
RaftServerTestUtil.runWithMinorityPeers(cluster, allPeers,
333+
peers -> client.admin().setConfiguration(peers.toArray(RaftPeer.emptyArray())));
333334
}
334335

335336
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));

ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161

6262
import static java.util.Arrays.asList;
6363
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
64+
import static org.junit.Assert.assertThrows;
6465

6566
public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluster>
6667
extends BaseTest
@@ -145,6 +146,58 @@ public void testAddPeers() throws Exception {
145146
});
146147
}
147148

149+
/**
150+
* Test leader election when changing cluster from single mode to HA mode.
151+
*/
152+
@Test
153+
public void testLeaderElectionWhenChangeFromSingleToHA() throws Exception {
154+
runWithNewCluster(1, cluster -> {
155+
RaftTestUtil.waitForLeader(cluster);
156+
157+
RaftGroupId groupId = cluster.getGroup().getGroupId();
158+
RaftPeer curPeer = cluster.getGroup().getPeers().iterator().next();
159+
RaftPeer newPeer = cluster.addNewPeers(1, true, true).newPeers[0];
160+
161+
RaftServerProxy leaderServer = cluster.getServer(curPeer.getId());
162+
163+
// Update leader conf to transitional single mode.
164+
RaftConfigurationImpl oldNewConf = RaftConfigurationImpl.newBuilder()
165+
.setOldConf(new PeerConfiguration(Arrays.asList(curPeer)))
166+
.setConf(new PeerConfiguration(Arrays.asList(curPeer, newPeer)))
167+
.setLogEntryIndex(Long.MAX_VALUE / 2)
168+
.build();
169+
Assert.assertTrue(oldNewConf.isSingleMode(curPeer.getId()));
170+
RaftServerTestUtil.setRaftConf(leaderServer, groupId, oldNewConf);
171+
try(RaftClient client = cluster.createClient()) {
172+
client.admin().transferLeadership(null, leaderServer.getId(), 1000);
173+
}
174+
175+
final RaftServer.Division newLeader = RaftTestUtil.waitForLeader(cluster);
176+
Assert.assertEquals(leaderServer.getId(), newLeader.getId());
177+
Assert.assertEquals(oldNewConf, newLeader.getRaftConf());
178+
});
179+
}
180+
181+
@Test
182+
public void testChangeMajority() throws Exception {
183+
runWithNewCluster(1, cluster -> {
184+
RaftTestUtil.waitForLeader(cluster);
185+
final RaftPeerId leaderId = cluster.getLeader().getId();
186+
187+
try (final RaftClient client = cluster.createClient(leaderId)) {
188+
final PeerChanges c1 = cluster.addNewPeers(2, true);
189+
190+
SetConfigurationRequest.Arguments arguments = SetConfigurationRequest.Arguments.newBuilder()
191+
.setServersInCurrentConf(cluster.getPeers())
192+
.setServersInNewConf(c1.allPeersInNewConf)
193+
.setMode(SetConfigurationRequest.Mode.COMPARE_AND_SET)
194+
.build();
195+
assertThrows("Expect change majority error.", SetConfigurationException.class,
196+
() -> client.admin().setConfiguration(arguments));
197+
}
198+
});
199+
}
200+
148201
/**
149202
* remove 2 peers (5 peers -> 3 peers), no leader change
150203
*/
@@ -380,7 +433,17 @@ public void testBootstrapReconfWithSingleNodeAddOne() throws Exception {
380433
@Test
381434
public void testBootstrapReconfWithSingleNodeAddTwo() throws Exception {
382435
// originally 1 peer, add 2 more
383-
runWithNewCluster(1, cluster -> runTestBootstrapReconf(2, true, cluster));
436+
runWithNewCluster(1, cluster -> {
437+
RaftTestUtil.waitForLeader(cluster);
438+
final RaftPeerId leaderId = cluster.getLeader().getId();
439+
440+
try (final RaftClient client = cluster.createClient(leaderId)) {
441+
final PeerChanges c1 = cluster.addNewPeers(2, true);
442+
443+
assertThrows("Expect change majority error.", SetConfigurationException.class,
444+
() -> client.admin().setConfiguration(c1.allPeersInNewConf));
445+
}
446+
});
384447
}
385448

386449
@Test

ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.ratis.RaftTestUtil;
2121
import org.apache.ratis.conf.RaftProperties;
22+
import org.apache.ratis.proto.RaftProtos;
2223
import org.apache.ratis.protocol.RaftGroupId;
2324
import org.apache.ratis.protocol.RaftGroupMemberId;
2425
import org.apache.ratis.protocol.RaftPeer;
@@ -35,16 +36,22 @@
3536
import org.apache.ratis.util.JavaUtils;
3637
import org.apache.ratis.util.Slf4jUtils;
3738
import org.apache.ratis.util.TimeDuration;
39+
import org.apache.ratis.util.function.CheckedConsumer;
3840
import org.junit.Assert;
3941
import org.mockito.Mockito;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
4244
import org.slf4j.event.Level;
4345

46+
import java.io.IOException;
4447
import java.util.Arrays;
4548
import java.util.Collection;
4649
import java.util.Collections;
50+
import java.util.HashSet;
51+
import java.util.List;
4752
import java.util.Optional;
53+
import java.util.Set;
54+
import java.util.stream.Collectors;
4855
import java.util.stream.Stream;
4956

5057
public class RaftServerTestUtil {
@@ -135,6 +142,10 @@ public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> peers)
135142
return RaftConfigurationImpl.newBuilder().setConf(peers).build();
136143
}
137144

145+
public static void setRaftConf(RaftServer proxy, RaftGroupId groupId, RaftConfiguration conf) {
146+
((RaftServerImpl)getDivision(proxy, groupId)).getState().setRaftConf(conf);
147+
}
148+
138149
public static RaftServerRpc getServerRpc(RaftServer.Division server) {
139150
return ((RaftServerImpl)server).getRaftServer().getServerRpc();
140151
}
@@ -196,4 +207,40 @@ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, D
196207
public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId peerId) {
197208
return ((RaftConfigurationImpl)config).isHighestPriority(peerId);
198209
}
210+
211+
public static void runWithMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf,
212+
CheckedConsumer<Collection<RaftPeer>, IOException> consumer) throws IOException {
213+
Collection<RaftPeer> peers = parseMinorityPeers(cluster, peersInNewConf);
214+
while (peers != null) {
215+
consumer.accept(peers);
216+
peers = parseMinorityPeers(cluster, peersInNewConf);
217+
}
218+
}
219+
220+
private static Collection<RaftPeer> parseMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf) {
221+
RaftConfigurationImpl conf = (RaftConfigurationImpl) cluster.getLeader().getRaftConf();
222+
Set<RaftPeer> peers = new HashSet<>(conf.getCurrentPeers());
223+
224+
// Add new peers to construct minority conf peers.
225+
List<RaftPeer> peersToAdd = peersInNewConf.stream().filter(
226+
peer -> !conf.containsInConf(peer.getId(), RaftProtos.RaftPeerRole.FOLLOWER)).collect(Collectors.toList());
227+
if (!peersToAdd.isEmpty()) {
228+
for (RaftPeer peer : peersToAdd) {
229+
if (peers.add(peer) && conf.changeMajority(peers)) {
230+
peers.remove(peer);
231+
break;
232+
}
233+
}
234+
return peers;
235+
}
236+
237+
// All new peers has been added. Handle the removed peers.
238+
List<RaftPeer> peersToRemove = peers.stream().filter(peer -> !peersInNewConf.contains(peer)).collect(Collectors.toList());
239+
if (!peersToRemove.isEmpty()) {
240+
return peersInNewConf;
241+
}
242+
243+
// The peers in new conf are the same as current conf, return null.
244+
return null;
245+
}
199246
}

0 commit comments

Comments
 (0)