diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java index 52beacfd4f..565f7e7820 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/DefaultApplierServer.java @@ -67,7 +67,7 @@ public class DefaultApplierServer extends AbstractInstanceNode implements Applie public ApplierLwmManager lwmManager;*/ @InstanceDependency - public AtomicReference replication; + public ApplierSyncReplication replication; @InstanceDependency public ApplierCommandDispatcher dispatcher; @@ -153,7 +153,7 @@ public DefaultApplierServer(String clusterName, ClusterId clusterId, ShardId sha Long qpsThreshold, Long bytesPerSecondThreshold, Long memoryThreshold, Long concurrencyThreshold, String subenv) throws Exception { this.sequenceController = new DefaultSequenceController(qpsThreshold, bytesPerSecondThreshold, memoryThreshold, concurrencyThreshold); this.dispatcher = new DefaultCommandDispatcher(); - this.replication = new AtomicReference<>(); + this.replication = new DefaultPsyncReplication(this); this.offsetRecorder = new AtomicLong(-1); this.replId = new AtomicReference<>("?"); @@ -247,44 +247,15 @@ public ApplierInstanceMeta getApplierInstanceMeta() { @Override public void setStateActive(Endpoint endpoint, GtidSet gtidSet, boolean useXsync) { - createReplication(useXsync); this.state = STATE.ACTIVE; - replication.get().connect(endpoint, gtidSet); - } - - private void createReplication(boolean useXsync) { - if (Objects.nonNull(replication.get())) { - if ((replication.get() instanceof DefaultXsyncReplication) != useXsync) { - throw new IllegalStateException("can't change protocol"); - } - return; - } - logger.info("[setStateActive] useXsync:{}", useXsync); - try { - AbstractSyncReplication syncReplication; - if (useXsync) { - syncReplication = new DefaultXsyncReplication(this); - } else { - syncReplication = new DefaultPsyncReplication(this); - } - syncReplication.inject(dependencies); - ComponentRegistryHolder.getComponentRegistry().add(syncReplication); - this.replication.set(syncReplication); - } catch (Exception e) { - logger.error("[setStateActive] inject syncReplication error", e); - throw new RuntimeException("create replication error"); - } - + replication.connect(endpoint, gtidSet); } @Override public void setStateBackup() { - if (Objects.isNull(replication)) { - createReplication(true); - } this.state = STATE.BACKUP; - replication.get().connect(null); + replication.connect(null); } @Override @@ -309,7 +280,7 @@ public STATE getState() { @Override public Endpoint getUpstreamEndpoint() { - return replication.get().endpoint(); + return replication.endpoint(); } @Override