Skip to content

Commit

Permalink
add redis client name
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Nov 5, 2024
1 parent 97e9741 commit a61a477
Show file tree
Hide file tree
Showing 16 changed files with 227 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ public RetryByteBufReadPolicy(int retry){

@Override
public void read(Channel channel, ByteBuf byteBuf, ByteBufReadAction byteBufReadAction) throws ByteBufReadPolicyException {

// logger.info("[RetryByteBufReadPolicy] start");
for(int i = 0; i < retry ; ){
try {
// logger.info("[RetryByteBufReadPolicy] {} times", i);
int before = byteBuf.readableBytes();
// logger.info("[RetryByteBufReadPolicy] before:{}", before);
byteBufReadAction.read(channel, byteBuf);
int after = byteBuf.readableBytes();
// logger.info("[RetryByteBufReadPolicy] after:{}", after);
if( after <= 0 ){
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,21 @@ protected void doSendRequest(final NettyClient nettyClient, ByteBuf byteBuf) {
}

if(getCommandTimeoutMilli() > 0 && scheduled != null){
int commandTimeoutMilli = getCommandTimeoutMilli();
if (nettyClient instanceof RedisAsyncNettyClient && !((RedisAsyncNettyClient) nettyClient).getDoAfterConnectedOver()) {
commandTimeoutMilli += ((RedisAsyncNettyClient) nettyClient).getAfterConnectCommandTimeoutMill();
}

getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, getCommandTimeoutMilli());
getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, commandTimeoutMilli);
int finalCommandTimeoutMilli = commandTimeoutMilli;
timeoutFuture = scheduled.schedule(new AbstractExceptionLogTask() {

@Override
public void doRun() {
getLogger().info("[{}][run][timeout]{}", AbstractNettyRequestResponseCommand.this, nettyClient);
future().setFailure(new CommandTimeoutException("timeout " + + getCommandTimeoutMilli()));
future().setFailure(new CommandTimeoutException("timeout " + finalCommandTimeoutMilli));
}
}, getCommandTimeoutMilli(), TimeUnit.MILLISECONDS);
}, commandTimeoutMilli, TimeUnit.MILLISECONDS);

future().addListener(new CommandFutureListener<V>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.proxy.ProxyEnabled;
import com.ctrip.xpipe.utils.ChannelUtil;
import com.sun.org.apache.xpath.internal.operations.Bool;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicReference;


/**
* @author chen.zhu
Expand All @@ -21,6 +24,8 @@ public class AsyncNettyClient extends DefaultNettyClient {

private ChannelFuture future;

protected final AtomicReference<Boolean> doAfterConnectedOver = new AtomicReference<>(false);

public AsyncNettyClient(ChannelFuture future, Endpoint endpoint) {
super(future.channel());
this.future = future;
Expand All @@ -35,6 +40,8 @@ public void operationComplete(ChannelFuture future) {
} else {
desc.set(ChannelUtil.getDesc(future.channel()));
}
doAfterConnected();
doAfterConnectedOver.set(true);
} else {
logger.info("[async][connect-fail] endpint: {}", endpoint, future.cause());
}
Expand Down Expand Up @@ -93,4 +100,16 @@ public String toString() {
return super.toString();
}

protected void doAfterConnected() {

}

protected boolean getDoAfterConnectedOver() {
return doAfterConnectedOver.get();
}

protected int getAfterConnectCommandTimeoutMill() {
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ public void handleResponse(Channel channel, ByteBuf byteBuf) {
ByteBufReceiver byteBufReceiver = receivers.peek();

if(byteBufReceiver != null){

logger.info("[handleResponse][before]{}, {}", byteBufReceiver, channel);
ByteBufReceiver.RECEIVER_RESULT result = byteBufReceiver.receive(channel, byteBuf);
logger.info("[handleResponse][after]");
switch (result){
case SUCCESS:
logger.debug("[handleResponse][remove receiver]");
logger.info("[handleResponse][remove receiver]");
receivers.poll();
break;
case CONTINUE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void read(Channel channel, ByteBuf byteBuf) throws ByteBufReadActionException {
// logger.info("[NettyClientHandler] before handleResponse");
byteBuf.touch("NettyClientHandler-49");
nettyClient.handleResponse(channel, byteBuf);
byteBuf.touch("NettyClientHandler-51");
// logger.info("[NettyClientHandler] after handleResponse");

}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.ctrip.xpipe.netty.commands;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import io.netty.channel.ChannelFuture;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class NettyRedisPoolClientFactory extends NettyKeyedPoolClientFactory{

private String clientName;

public NettyRedisPoolClientFactory(int eventLoopThreads, String clientName) {
super(eventLoopThreads);
this.clientName = clientName;
}

@Override
public PooledObject<NettyClient> makeObject(Endpoint key) throws Exception {
ChannelFuture f = b.connect(key.getHost(), key.getPort());
NettyClient nettyClient = new RedisAsyncNettyClient(f, key, clientName);
f.channel().attr(NettyClientHandler.KEY_CLIENT).set(nettyClient);
return new DefaultPooledObject<NettyClient>(nettyClient);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.ctrip.xpipe.netty.commands;

import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;

public class RedisAsyncNettyClient extends AsyncNettyClient{

protected Logger logger = LoggerFactory.getLogger(RedisAsyncNettyClient.class);

private String clientName;

private static final String CLIENT_SET_NAME = "CLIENT SETNAME ";

private static final String EXPECT_RESP = "OK";

private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660;

public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName) {
super(future, endpoint);
this.clientName = clientName;
}

@Override
protected void doAfterConnected() {
String command = CLIENT_SET_NAME + clientName + "\r\n";
ByteBuf byteBuf = Unpooled.wrappedBuffer(command.getBytes(Codec.defaultCharset));
byteBuf.touch("RedisAsyncNettyClient-35");
RedisAsyncNettyClient.super.sendRequest(byteBuf, new ByteBufReceiver() {
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
logger.info("[redisAsync][clientSetName][start][{}]", desc);
String result = doReceiveResponse(byteBuf);
logger.info("[redisAsync][clientSetName][doReceiveOver][{}]result:{}", desc, result);
if (result == null) {
logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result);
}
if (EXPECT_RESP.equalsIgnoreCase(result)) {
logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result);
}
return RECEIVER_RESULT.SUCCESS;
}

@Override
public void clientClosed(NettyClient nettyClient) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel());
}

@Override
public void clientClosed(NettyClient nettyClient, Throwable th) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th);
}
});
byteBuf.touch("RedisAsyncNettyClient-59");
}

@Override
protected int getAfterConnectCommandTimeoutMill() {
return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI;
}

public static String doReceiveResponse(ByteBuf byteBuf) {
ByteArrayOutputStream baous = new ByteArrayOutputStream();
CRLF_STATE crlfState = CRLF_STATE.CONTENT;
int readable = byteBuf.readableBytes();
for(int i=0; i < readable ;i++){

byte data = byteBuf.readByte();
baous.write(data);
switch(data){
case '\r':
crlfState = CRLF_STATE.CR;
break;
case '\n':
if(crlfState == CRLF_STATE.CR){
crlfState = CRLF_STATE.CRLF;
break;
}
default:
crlfState = CRLF_STATE.CONTENT;
break;
}

if(crlfState == CRLF_STATE.CRLF){
break;
}
}
if (baous.toByteArray().length != 0 && crlfState == CRLF_STATE.CRLF) {
String data = new String(baous.toByteArray(), Codec.defaultCharset);
int beginIndex = 0;
int endIndex = data.length();
if(data.charAt(0) == '+'){
beginIndex = 1;
}
if(data.charAt(endIndex - 2) == '\r' && data.charAt(endIndex - 1) == '\n'){
endIndex -= 2;
}
return data.substring(beginIndex, endIndex);
}
return null;
}

public enum CRLF_STATE{
CR,
CRLF,
CONTENT
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

/**
Expand All @@ -49,7 +49,7 @@ public class MasterOverOneMonitor implements RedisMasterActionListener, OneWaySu
@Autowired
private AlertManager alertManager;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = REDIS_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Resource(name = SCHEDULED_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;
import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.SENTINEL_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.THREAD_POOL_TIME_OUT;

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ public class DefaultSentinelHelloCollector implements SentinelHelloCollector {
@Autowired
private PersistenceCache persistenceCache;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private SentinelLeakyBucket leakyBucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

@Component
Expand All @@ -28,7 +28,7 @@ public class ProxyConnectedChecker implements ProxyChecker {
@Autowired
private CheckerConfig checkerConfig;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Resource(name = SCHEDULED_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ public class Resource {

public static final String REDIS_COMMAND_EXECUTOR = "redisCommandExecutor";

public static final String KEYED_NETTY_CLIENT_POOL = "keyedClientPool";
public static final String REDIS_KEYED_NETTY_CLIENT_POOL = "redisKeyedClientPool";

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";
public static final String SENTINEL_KEYED_NETTY_CLIENT_POOL = "sentinelKeyedClientPool";

public static final String KEEPER_KEYED_NETTY_CLIENT_POOL = "keeperKeyedClientPool";

public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool";
public static final String PROXY_KEYED_NETTY_CLIENT_POOL = "proxyKeyedClientPool";

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";

public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public ScheduledExecutorService getRedisCommandExecutor() {
);
}

@Bean(name = KEYED_NETTY_CLIENT_POOL)
@Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

@Component
Expand All @@ -49,7 +49,7 @@ public class DefaultProxyMonitorCollectorManager extends AbstractStartStoppable
@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR;
import static com.ctrip.xpipe.redis.checker.resource.Resource.*;

/**
* @author chen.zhu
Expand All @@ -53,7 +52,7 @@ public class DefaultSentinelManager implements SentinelManager, ShardEventHandle

private static final int LONG_SENTINEL_COMMAND_TIMEOUT = 2000;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedClientPool;

@Resource(name = REDIS_COMMAND_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class ShardModelServiceImpl implements ShardModelService{
@Resource(name = REDIS_COMMAND_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = MIGRATE_KEEPER_CLIENT_POOL)
@Resource(name = KEEPER_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private RetryCommandFactory<Object> retryCommandFactory;
Expand Down
Loading

0 comments on commit a61a477

Please sign in to comment.