Skip to content

Commit

Permalink
Merge pull request #911 from ctripcorp/bugfix/hailu_abort_xsync
Browse files Browse the repository at this point in the history
repair memory leak for applierserver
  • Loading branch information
LanternLee authored Dec 16, 2024
2 parents 10d170b + 5c8aa5d commit 0170e35
Showing 1 changed file with 5 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DefaultApplierServer extends AbstractInstanceNode implements Applie
public ApplierLwmManager lwmManager;*/

@InstanceDependency
public AtomicReference<ApplierSyncReplication> replication;
public ApplierSyncReplication replication;

@InstanceDependency
public ApplierCommandDispatcher dispatcher;
Expand Down Expand Up @@ -153,7 +153,7 @@ public DefaultApplierServer(String clusterName, ClusterId clusterId, ShardId sha
Long qpsThreshold, Long bytesPerSecondThreshold, Long memoryThreshold, Long concurrencyThreshold, String subenv) throws Exception {
this.sequenceController = new DefaultSequenceController(qpsThreshold, bytesPerSecondThreshold, memoryThreshold, concurrencyThreshold);
this.dispatcher = new DefaultCommandDispatcher();
this.replication = new AtomicReference<>();
this.replication = new DefaultPsyncReplication(this);
this.offsetRecorder = new AtomicLong(-1);
this.replId = new AtomicReference<>("?");

Expand Down Expand Up @@ -247,44 +247,15 @@ public ApplierInstanceMeta getApplierInstanceMeta() {

@Override
public void setStateActive(Endpoint endpoint, GtidSet gtidSet, boolean useXsync) {
createReplication(useXsync);
this.state = STATE.ACTIVE;
replication.get().connect(endpoint, gtidSet);
}

private void createReplication(boolean useXsync) {
if (Objects.nonNull(replication.get())) {
if ((replication.get() instanceof DefaultXsyncReplication) != useXsync) {
throw new IllegalStateException("can't change protocol");
}
return;
}
logger.info("[setStateActive] useXsync:{}", useXsync);
try {
AbstractSyncReplication syncReplication;
if (useXsync) {
syncReplication = new DefaultXsyncReplication(this);
} else {
syncReplication = new DefaultPsyncReplication(this);
}
syncReplication.inject(dependencies);
ComponentRegistryHolder.getComponentRegistry().add(syncReplication);
this.replication.set(syncReplication);
} catch (Exception e) {
logger.error("[setStateActive] inject syncReplication error", e);
throw new RuntimeException("create replication error");
}

replication.connect(endpoint, gtidSet);
}


@Override
public void setStateBackup() {
if (Objects.isNull(replication)) {
createReplication(true);
}
this.state = STATE.BACKUP;
replication.get().connect(null);
replication.connect(null);
}

@Override
Expand All @@ -309,7 +280,7 @@ public STATE getState() {

@Override
public Endpoint getUpstreamEndpoint() {
return replication.get().endpoint();
return replication.endpoint();
}

@Override
Expand Down

0 comments on commit 0170e35

Please sign in to comment.