Skip to content

Commit

Permalink
Merge pull request #130050 from andrewbaptist/backport24.1-126150
Browse files Browse the repository at this point in the history
release-24.1: rpc: add locality to dialing
  • Loading branch information
andrewbaptist authored Sep 6, 2024
2 parents 15f7bf6 + 1b7f96b commit b76b7cb
Show file tree
Hide file tree
Showing 40 changed files with 558 additions and 268 deletions.
3 changes: 3 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,10 @@
<tr><td>APPLICATION</td><td>physical_replication.sst_bytes</td><td>SST bytes (compressed) sent to KV by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>requests.slow.distsender</td><td>Number of range-bound RPCs currently stuck or retrying for a long time.<br/><br/>Note that this is not a good signal for KV health. The remote side of the<br/>RPCs tracked here may experience contention, so an end user can easily<br/>cause values for this metric to be emitted by leaving a transaction open<br/>for a long time and contending with it using a second transaction.</td><td>Requests</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>round-trip-latency</td><td>Distribution of round-trip latencies with other nodes.<br/><br/>This only reflects successful heartbeats and measures gRPC overhead as well as<br/>possible head-of-line blocking. Elevated values in this metric may hint at<br/>network issues and/or saturation, but they are no proof of them. CPU overload<br/>can similarly elevate this metric. The operator should look towards OS-level<br/>metrics such as packet loss, retransmits, etc, to conclusively diagnose network<br/>issues. Heartbeats are not very frequent (~seconds), so they may not capture<br/>rare or short-lived degradations.<br/></td><td>Round-trip time</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.client.bytes.egress</td><td>Counter of TCP bytes sent via gRPC on connections we initiated.</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.client.bytes.ingress</td><td>Counter of TCP bytes received via gRPC on connections we initiated.</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.avg_round_trip_latency</td><td>Sum of exponentially weighted moving average of round-trip latencies, as measured through a gRPC RPC.<br/><br/>Dividing this Gauge by rpc.connection.healthy gives an approximation of average<br/>latency, but the top-level round-trip-latency histogram is more useful. Instead,<br/>users should consult the label families of this metric if they are available<br/>(which requires prometheus and the cluster setting &#39;server.child_metrics.enabled&#39;);<br/>these provide per-peer moving averages.<br/><br/>This metric does not track failed connection. A failed connection&#39;s contribution<br/>is reset to zero.<br/></td><td>Latency</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.connected</td><td>Counter of TCP level connected connections.<br/><br/>This metric is the number of gRPC connections from the TCP level. Unlike rpc.connection.healthy<br/>this metric does not take into account whether the application has been able to heartbeat<br/>over this connection.<br/></td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.failures</td><td>Counter of failed connections.<br/><br/>This includes both the event in which a healthy connection terminates as well as<br/>unsuccessful reconnection attempts.<br/><br/>Connections that are terminated as part of local node shutdown are excluded.<br/>Decommissioned peers are excluded.<br/></td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.healthy</td><td>Gauge of current connections in a healthy state (i.e. bidirectionally connected and heartbeating)</td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.healthy_nanos</td><td>Gauge of nanoseconds of healthy connection time<br/><br/>On the prometheus endpoint scraped with the cluster setting &#39;server.child_metrics.enabled&#39; set,<br/>the constituent parts of this metric are available on a per-peer basis and one can read off<br/>for how long a given peer has been connected</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr()).Connect(ctx)
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func setUpService(
require.NoError(t, err)

localDialer := nodedialer.New(rpcContext,
func(nodeID roachpb.NodeID) (net.Addr, error) {
func(nodeID roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
if nodeID == remoteNodeID {
return ln.Addr(), nil
return ln.Addr(), roachpb.Locality{}, nil
} else if nodeID == localNodeID {
return ln2.Addr(), nil
return ln2.Addr(), roachpb.Locality{}, nil
}
return nil, errors.Errorf("node %d not found", nodeID)
return nil, roachpb.Locality{}, errors.Errorf("node %d not found", nodeID)
},
)
localNodeIDContainer := &base.NodeIDContainer{}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/serverccl/statusccl/tenant_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -33,6 +34,8 @@ func TestTenantGRPCServices(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "test can time out under stress")

ctx := context.Background()

testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
Expand Down Expand Up @@ -121,7 +124,7 @@ func TestTenantGRPCServices(t *testing.T) {
rpcCtx := tenant2.RPCContext()

nodeID := roachpb.NodeID(tenant.SQLInstanceID())
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, rpc.DefaultClass).Connect(ctx)
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, roachpb.Locality{}, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)

client := serverpb.NewStatusClient(conn)
Expand All @@ -135,7 +138,7 @@ func TestTenantGRPCServices(t *testing.T) {
grpcAddr := server.RPCAddr()
rpcCtx := tenant.RPCContext()

conn, err := rpcCtx.GRPCDialNode(grpcAddr, server.NodeID(), rpc.DefaultClass).Connect(ctx)
conn, err := rpcCtx.GRPCDialNode(grpcAddr, server.NodeID(), roachpb.Locality{}, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)

client := serverpb.NewStatusClient(conn)
Expand Down
17 changes: 15 additions & 2 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type client struct {
peerID roachpb.NodeID
resolvedPlaceholder bool // Whether we've resolved the nodeSet's placeholder for this client
addr net.Addr // Peer node network address
locality roachpb.Locality // Peer node locality (if known)
forwardAddr *util.UnresolvedAddr // Set if disconnected with an alternate addr
remoteHighWaterStamps map[roachpb.NodeID]int64 // Remote server's high water timestamps
closer chan struct{} // Client shutdown channel
Expand All @@ -55,11 +56,14 @@ func extractKeys(delta map[string]*Info) string {
}

// newClient creates and returns a client struct.
func newClient(ambient log.AmbientContext, addr net.Addr, nodeMetrics Metrics) *client {
func newClient(
ambient log.AmbientContext, addr net.Addr, locality roachpb.Locality, nodeMetrics Metrics,
) *client {
return &client{
AmbientContext: ambient,
createdAt: timeutil.Now(),
addr: addr,
locality: locality,
remoteHighWaterStamps: map[roachpb.NodeID]int64{},
closer: make(chan struct{}),
clientMetrics: makeMetrics(),
Expand Down Expand Up @@ -103,7 +107,16 @@ func (c *client) startLocked(
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCUnvalidatedDial(c.addr.String()).Connect(ctx)
var connection *rpc.Connection
if c.peerID != 0 {
connection = rpcCtx.GRPCDialNode(c.addr.String(), c.peerID, c.locality, rpc.SystemClass)
} else {
// TODO(baptist): Use this as a temporary connection for getting
// onto gossip and then replace with a validated connection.
log.Infof(ctx, "unvalidated bootstrap gossip dial to %s", c.addr)
connection = rpcCtx.GRPCUnvalidatedDial(c.addr.String(), c.locality)
}
conn, err := connection.Connect(ctx)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestClientGossip(t *testing.T) {
local, _ := startGossip(clusterID, 1, stopper, t, metric.NewRegistry())
remote, _ := startGossip(clusterID, 2, stopper, t, metric.NewRegistry())
disconnected := make(chan *client, 1)
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), remote.GetNodeAddr(), makeMetrics())
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), remote.GetNodeAddr(), roachpb.Locality{}, makeMetrics())

defer func() {
stopper.Stop(ctx)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestClientGossipMetrics(t *testing.T) {
gossipSucceedsSoon(
t, stopper, clusterID, make(chan *client, 2),
map[*client]*Gossip{
newClient(log.MakeTestingAmbientCtxWithNewTracer(), local.GetNodeAddr(), remote.nodeMetrics): remote,
newClient(log.MakeTestingAmbientCtxWithNewTracer(), local.GetNodeAddr(), roachpb.Locality{}, remote.nodeMetrics): remote,
},
func() error {
// Infos/Bytes Sent/Received should not be zero.
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestClientNodeID(t *testing.T) {
// Use an insecure context. We're talking to tcp socket which are not in the certs.
rpcContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)

c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, makeMetrics())
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, roachpb.Locality{}, makeMetrics())
disconnected <- c

defer func() {
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestClientDisconnectLoopback(t *testing.T) {
local, localCtx := startGossip(uuid.Nil, 1, stopper, t, metric.NewRegistry())
local.mu.Lock()
lAddr := local.mu.is.NodeAddr
local.startClientLocked(lAddr, localCtx)
local.startClientLocked(lAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
local.manage(localCtx)
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestClientDisconnectRedundant(t *testing.T) {
// Restart the client connection in the loop. It might have failed due to
// a heartbeat time.
local.mu.Lock()
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
return fmt.Errorf("unable to find local to remote client")
}
Expand All @@ -398,7 +398,7 @@ func TestClientDisconnectRedundant(t *testing.T) {
// Start a remote to local client. This client will get removed as being
// redundant as there is already a connection between the two nodes.
remote.mu.Lock()
remote.startClientLocked(lAddr, remoteCtx)
remote.startClientLocked(lAddr, roachpb.Locality{}, remoteCtx)
remote.mu.Unlock()

testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -433,8 +433,8 @@ func TestClientDisallowMultipleConns(t *testing.T) {
// Start two clients from local to remote. RPC client cache is
// disabled via the context, so we'll start two different outgoing
// connections.
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
remote.mu.Unlock()
local.manage(localCtx)
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestClientForwardUnresolved(t *testing.T) {
local, _ := startGossip(uuid.Nil, nodeID, stopper, t, metric.NewRegistry())
addr := local.GetNodeAddr()

client := newClient(log.MakeTestingAmbientCtxWithNewTracer(), addr, makeMetrics()) // never started
client := newClient(log.MakeTestingAmbientCtxWithNewTracer(), addr, roachpb.Locality{}, makeMetrics()) // never started

newAddr := util.UnresolvedAddr{
NetworkField: "tcp",
Expand Down
66 changes: 48 additions & 18 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewKeyNotPresentError(key string) error {
// AddressResolver is a thin wrapper around gossip's GetNodeIDAddress
// that allows it to be used as a nodedialer.AddressResolver.
func AddressResolver(gossip *Gossip) nodedialer.AddressResolver {
return func(nodeID roachpb.NodeID) (net.Addr, error) {
return func(nodeID roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
return gossip.GetNodeIDAddress(nodeID)
}
}
Expand Down Expand Up @@ -264,6 +264,8 @@ type Gossip struct {
bootstrapInterval time.Duration
cullInterval time.Duration

// TODO(baptist): Remember the localities for each remote address. Then pass
// it into the Dial.
// addresses is a list of bootstrap host addresses for
// connecting to the gossip network.
addressIdx int
Expand Down Expand Up @@ -488,26 +490,32 @@ func (g *Gossip) GetAddresses() []util.UnresolvedAddr {
}

// GetNodeIDAddress looks up the RPC address of the node by ID.
func (g *Gossip) GetNodeIDAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) {
func (g *Gossip) GetNodeIDAddress(
nodeID roachpb.NodeID,
) (*util.UnresolvedAddr, roachpb.Locality, error) {
return g.getNodeIDAddress(nodeID, false /* locked */)
}

// GetNodeIDSQLAddress looks up the SQL address of the node by ID.
func (g *Gossip) GetNodeIDSQLAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) {
func (g *Gossip) GetNodeIDSQLAddress(
nodeID roachpb.NodeID,
) (*util.UnresolvedAddr, roachpb.Locality, error) {
nd, err := g.getNodeDescriptor(nodeID, false /* locked */)
if err != nil {
return nil, err
return nil, roachpb.Locality{}, err
}
return &nd.SQLAddress, nil
return &nd.SQLAddress, nd.Locality, nil
}

// GetNodeIDHTTPAddress looks up the HTTP address of the node by ID.
func (g *Gossip) GetNodeIDHTTPAddress(nodeID roachpb.NodeID) (*util.UnresolvedAddr, error) {
func (g *Gossip) GetNodeIDHTTPAddress(
nodeID roachpb.NodeID,
) (*util.UnresolvedAddr, roachpb.Locality, error) {
nd, err := g.getNodeDescriptor(nodeID, false /* locked */)
if err != nil {
return nil, err
return nil, roachpb.Locality{}, err
}
return &nd.HTTPAddress, nil
return &nd.HTTPAddress, nd.Locality, nil
}

// GetNodeDescriptor looks up the descriptor of the node by ID.
Expand All @@ -525,6 +533,8 @@ func (g *Gossip) GetNodeDescriptorCount() int {
return count
}

// TODO(baptist): StoreDescriptors don't belong in the Gossip package at all.
// This method should be moved out of gossip.
// GetStoreDescriptor looks up the descriptor of the node by ID.
func (g *Gossip) GetStoreDescriptor(storeID roachpb.StoreID) (*roachpb.StoreDescriptor, error) {
if value, ok := g.storeDescs.Load(int64(storeID)); ok {
Expand Down Expand Up @@ -877,12 +887,12 @@ func (g *Gossip) getNodeDescriptor(
// "distant" node address to connect directly to.
func (g *Gossip) getNodeIDAddress(
nodeID roachpb.NodeID, locked bool,
) (*util.UnresolvedAddr, error) {
) (*util.UnresolvedAddr, roachpb.Locality, error) {
nd, err := g.getNodeDescriptor(nodeID, locked)
if err != nil {
return nil, err
return nil, roachpb.Locality{}, err
}
return nd.AddressForLocality(g.locality), nil
return nd.AddressForLocality(g.locality), nd.Locality, nil
}

// AddInfo adds or updates an info object. Returns an error if info
Expand Down Expand Up @@ -1147,7 +1157,7 @@ func (g *Gossip) hasOutgoingLocked(nodeID roachpb.NodeID) bool {
// outgoing nodeSet due to the way that outgoing clients' node IDs are only
// resolved once the connection has been established (rather than as soon as
// we've created it).
nodeAddr, err := g.getNodeIDAddress(nodeID, true /* locked */)
nodeAddr, _, err := g.getNodeIDAddress(nodeID, true /* locked */)
if err != nil {
// If we don't have the address, fall back to using the outgoing nodeSet
// since at least it's better than nothing.
Expand Down Expand Up @@ -1200,8 +1210,21 @@ func (g *Gossip) bootstrap(rpcContext *rpc.Context) {
log.Eventf(ctx, "have clients: %t, have sentinel: %t", haveClients, haveSentinel)
if !haveClients || !haveSentinel {
// Try to get another bootstrap address.
//
// TODO(baptist): The bootstrap address from the
// configuration does not have locality information. We
// could use "our" locality or leave it blank like it
// currently does. Alternatively we could break and
// reconnect once we determine the remote locality from
// gossip.
if addr := g.getNextBootstrapAddressLocked(); !addr.IsEmpty() {
g.startClientLocked(addr, rpcContext)
locality := roachpb.Locality{}
nodeID := g.bootstrapAddrs[addr]
// We may not have a node descriptor for this node yet, in which case we dial without one.
if nd, err := g.getNodeDescriptor(nodeID, true); err == nil {
locality = nd.Locality
}
g.startClientLocked(addr, locality, rpcContext)
} else {
bootstrapAddrs := make([]string, 0, len(g.bootstrapping))
for addr := range g.bootstrapping {
Expand Down Expand Up @@ -1339,13 +1362,13 @@ func (g *Gossip) tightenNetwork(ctx context.Context, rpcContext *rpc.Context) {
// If tightening is needed, then reset lastTighten to avoid restricting how
// soon we try again.
g.mu.lastTighten = time.Time{}
if nodeAddr, err := g.getNodeIDAddress(distantNodeID, true /* locked */); err != nil || nodeAddr == nil {
if nodeAddr, locality, err := g.getNodeIDAddress(distantNodeID, true /* locked */); err != nil || nodeAddr == nil {
log.Health.Errorf(ctx, "unable to get address for n%d: %s", distantNodeID, err)
} else {
log.Health.Infof(ctx, "starting client to n%d (%d > %d) to tighten network graph",
distantNodeID, distantHops, maxHops)
log.Eventf(ctx, "tightening network with new client to %s", nodeAddr)
g.startClientLocked(*nodeAddr, rpcContext)
g.startClientLocked(*nodeAddr, locality, rpcContext)
}
}
}
Expand All @@ -1357,7 +1380,12 @@ func (g *Gossip) doDisconnected(c *client, rpcContext *rpc.Context) {

// If the client was disconnected with a forwarding address, connect now.
if c.forwardAddr != nil {
g.startClientLocked(*c.forwardAddr, rpcContext)
locality := roachpb.Locality{}
// If we have a node descriptor for this node use it when dialing.
if nd, err := g.getNodeDescriptor(c.peerID, true); err == nil {
locality = nd.Locality
}
g.startClientLocked(*c.forwardAddr, locality, rpcContext)
}
g.maybeSignalStatusChangeLocked()
}
Expand Down Expand Up @@ -1429,12 +1457,14 @@ func (g *Gossip) signalConnectedLocked() {
// startClientLocked launches a new client connected to remote address.
// The client is added to the outgoing address set and launched in
// a goroutine.
func (g *Gossip) startClientLocked(addr util.UnresolvedAddr, rpcContext *rpc.Context) {
func (g *Gossip) startClientLocked(
addr util.UnresolvedAddr, locality roachpb.Locality, rpcContext *rpc.Context,
) {
g.clientsMu.Lock()
defer g.clientsMu.Unlock()
ctx := g.AnnotateCtx(context.TODO())
log.VEventf(ctx, 1, "starting new client to %s", addr)
c := newClient(g.server.AmbientContext, &addr, g.serverMetrics)
c := newClient(g.server.AmbientContext, &addr, locality, g.serverMetrics)
g.clientsMu.clients = append(g.clientsMu.clients, c)
c.startLocked(g, g.disconnected, rpcContext, g.server.stopper)
}
Expand Down
Loading

0 comments on commit b76b7cb

Please sign in to comment.