Skip to content

Commit

Permalink
deploy: support for read affinity options per cluster
Browse files Browse the repository at this point in the history
Implemented the capability to include read affinity options
for individual clusters within the ceph-csi-config ConfigMap.
This allows users to configure the crush location for each
cluster separately. The read affinity options specified in
the ConfigMap will supersede those provided via command line arguments.

Signed-off-by: Praveen M <[email protected]>
  • Loading branch information
iPraveenParihar committed Oct 20, 2023
1 parent cba5402 commit 416e1a0
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 65 deletions.
3 changes: 1 addition & 2 deletions charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ spec:
{{- if .Values.nodeplugin.profiling.enabled }}
- "--enableprofiling={{ .Values.nodeplugin.profiling.enabled }}"
{{- end }}
- "--enable-read-affinity={{ and .Values.readAffinity .Values.readAffinity.enabled }}"
{{- if and .Values.readAffinity .Values.readAffinity.enabled }}
{{- if and .Values.readAffinity .Values.readAffinity.crushLocationLabels }}
- "--crush-location-labels={{ .Values.readAffinity.crushLocationLabels | join "," }}"
{{- end }}
env:
Expand Down
6 changes: 5 additions & 1 deletion charts/ceph-csi-rbd/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ serviceAccounts:
# - "<MONValue2>"
# rbd:
# netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net"
# readAffinity:
# enabled: true
# crushLocationLabels:
# - topology.kubernetes.io/region
# - topology.kubernetes.io/zone
csiConfig: []

# Configuration details of clusterID,PoolID and FscID mapping
Expand Down Expand Up @@ -282,7 +287,6 @@ topology:
# readAffinity:
# Enable read affinity for RBD volumes. Recommended to
# set to true if running kernel 5.8 or newer.
# enabled: false
# Define which node labels to use as CRUSH location.
# This should correspond to the values set in the CRUSH map.
# NOTE: the value here serves as an example
Expand Down
2 changes: 1 addition & 1 deletion cmd/cephcsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func init() {
"",
"list of Kubernetes node labels, that determines the topology"+
" domain the node belongs to, separated by ','")
flag.BoolVar(&conf.EnableReadAffinity, "enable-read-affinity", false, "enable read affinity")
flag.StringVar(
&conf.CrushLocationLabels,
"crush-location-labels",
"",
"list of Kubernetes node labels, that determines the"+
" CRUSH location the node belongs to, separated by ','")
flag.StringVar(&conf.ContainerOrchestrator, "container-orchestrator", "kubernetes", "container orchestrator")

// cephfs related flags
flag.BoolVar(
Expand Down
9 changes: 9 additions & 0 deletions deploy/csi-config-map-sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ data:
}
"nfs": {
"netNamespaceFilePath": "<kubeletRootPath>/plugins/nfs.csi.ceph.com/net",
},
"readAffinity": {
"enabled": "false",
"crushLocationLabels": [
"<Label1>",
"<Label2>"
...
"<Label3>"
]
}
}
]
Expand Down
11 changes: 8 additions & 3 deletions docs/deploy-rbd.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ make image-cephcsi
| `--skipforceflatten` | `false` | skip image flattening on kernel < 5.2 which support mapping of rbd images which has the deep-flatten feature |
| `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening |
| `--setmetadata` | `false` | Set metadata on volume |
| `--enable-read-affinity` | `false` | enable read affinity |
| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' |
| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.<br>`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` |

**Available volume parameters:**

Expand Down Expand Up @@ -215,7 +214,7 @@ for more details.

This can be enabled by adding labels to Kubernetes nodes like
`"topology.io/region=east"` and `"topology.io/zone=east-zone1"` and
passing command line arguments `"--enable-read-affinity=true"` and
passing command line argument
`"--crush-location-labels=topology.io/zone,topology.io/region"` to Ceph CSI
RBD daemonset pod "csi-rbdplugin" container, resulting in Ceph CSI adding
`"--options read_from_replica=localize,crush_location=zone:east-zone1|region:east"`
Expand All @@ -224,6 +223,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI.
Well known labels can be found
[here](https://kubernetes.io/docs/reference/labels-annotations-taints/).

Read affinity can be configured for individual clusters within the
`ceph-csi-config` ConfigMap. This allows configuring the crush location labels
for each cluster separately. The crush location labels specified in the
ConfigMap will supersede those provided via command line argument
`--crush-location-labels`.

>Note: Label values will have all its dots `"."` normalized with dashes `"-"`
in order for it to work with ceph CRUSH map.

Expand Down
26 changes: 16 additions & 10 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
func NewNodeServer(
d *csicommon.CSIDriver,
t string,
topology map[string]string,
crushLocationMap map[string]string,
nodeLabels, topology, crushLocationMap map[string]string,
) (*rbd.NodeServer, error) {
ns := rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
VolumeLocks: util.NewVolumeLocks(),
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
VolumeLocks: util.NewVolumeLocks(),
NodeLabels: nodeLabels,
CLIReadAffinityMapOptions: rbd.ConstructReadAffinityMapOption(crushLocationMap),
}
ns.SetReadAffinityMapOptions(crushLocationMap)

return &ns, nil
}
Expand All @@ -87,8 +88,8 @@ func NewNodeServer(
// setupCSIAddonsServer().
func (r *Driver) Run(conf *util.Config) {
var (
err error
topology, crushLocationMap map[string]string
err error
nodeLabels, topology, crushLocationMap map[string]string
)
// update clone soft and hard limit
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
Expand Down Expand Up @@ -125,8 +126,13 @@ func (r *Driver) Run(conf *util.Config) {
})
}

if conf.EnableReadAffinity {
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
nodeLabels, err = k8s.GetNodeLabels(conf.NodeID)
if err != nil {
log.FatalLogMsg(err.Error())
}

if conf.CrushLocationLabels != "" {
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels)
if err != nil {
log.FatalLogMsg(err.Error())
}
Expand All @@ -140,7 +146,7 @@ func (r *Driver) Run(conf *util.Config) {
if err != nil {
log.FatalLogMsg(err.Error())
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap)
if err != nil {
log.FatalLogMsg("failed to start node server, err %v\n", err)
}
Expand Down
60 changes: 49 additions & 11 deletions internal/rbd/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ type NodeServer struct {
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks
// readAffinityMapOptions contains map options to enable read affinity.
readAffinityMapOptions string
// NodeLabels stores the node labels
NodeLabels map[string]string
// CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity.
CLIReadAffinityMapOptions string
}

// stageTransaction struct represents the state a transaction was when it either completed
Expand Down Expand Up @@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol(
rv.Mounter = rbdNbdMounter
}

err = getMapOptions(req, rv)
err = ns.getMapOptions(req, rv)
if err != nil {
return nil, err
}
ns.appendReadAffinityMapOptions(rv)

rv.VolID = volID

Expand All @@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol(

// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions
// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty.
func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) {
func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) {
switch {
case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
return
case rv.MapOptions != "":
rv.MapOptions += "," + ns.readAffinityMapOptions
rv.MapOptions += "," + readAffinityMapOptions
default:
rv.MapOptions = ns.readAffinityMapOptions
rv.MapOptions = readAffinityMapOptions
}
}

Expand Down Expand Up @@ -1396,9 +1397,12 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) {
return size, nil
}

func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) {
// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap.
// It appends crush location labels in the format
// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...".
func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string {
if len(crushLocationMap) == 0 {
return
return ""
}

var b strings.Builder
Expand All @@ -1412,5 +1416,39 @@ func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]stri
b.WriteString(fmt.Sprintf("|%s:%s", key, val))
}
}
ns.readAffinityMapOptions = b.String()

return b.String()
}

// getReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists.
// If not, it falls back to returning the `CLIReadAffinityMapOptions` from the command line.
// If neither of these options is available, it returns an empty string.
func (ns *NodeServer) getReadAffinityMapOptions(clusterID string) (string, error) {
var (
err error
configReadAffinityEnabled bool
configCrushLocationLabels string
)

configReadAffinityEnabled, configCrushLocationLabels, err = util.GetCrushLocationLabels(util.CsiConfigFile, clusterID)
if err != nil {
return "", err
}

if !configReadAffinityEnabled {
return "", nil
}

if configCrushLocationLabels == "" {
return ns.CLIReadAffinityMapOptions, nil
}

crushLocationMap, err := util.GetCrushLocationMap(configCrushLocationLabels, ns.NodeLabels)
if err != nil {
log.FatalLogMsg(err.Error())
}

readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap)

return readAffinityMapOptions, nil
}
14 changes: 6 additions & 8 deletions internal/rbd/nodeserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestParseBoolOption(t *testing.T) {
}
}

func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) {
func TestNodeServer_ConstructReadAffinityMapOption(t *testing.T) {
t.Parallel()
tests := []struct {
name string
Expand Down Expand Up @@ -147,9 +147,10 @@ func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) {
currentTT := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ns := &NodeServer{}
ns.SetReadAffinityMapOptions(currentTT.crushLocationmap)
assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions)
ns := &NodeServer{
CLIReadAffinityMapOptions: ConstructReadAffinityMapOption(currentTT.crushLocationmap),
}
assert.Contains(t, currentTT.wantAny, ns.CLIReadAffinityMapOptions)
})
}
}
Expand Down Expand Up @@ -236,10 +237,7 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
MapOptions: currentTT.args.mapOptions,
Mounter: currentTT.args.mounter,
}
ns := &NodeServer{
readAffinityMapOptions: currentTT.args.readAffinityMapOptions,
}
ns.appendReadAffinityMapOptions(rv)
rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions)
assert.Equal(t, currentTT.want, rv.MapOptions)
})
}
Expand Down
8 changes: 7 additions & 1 deletion internal/rbd/rbd_attach.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) {

// getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the
// rbdVolume object.
func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"])
if err != nil {
return err
Expand All @@ -312,6 +312,12 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
rv.UnmapOptions = nbdUnmapOptions
}

readAffinityMapOptions, err := ns.getReadAffinityMapOptions(rv.ClusterID)
if err != nil {
return err
}
rv.appendReadAffinityMapOptions(readAffinityMapOptions)

return nil
}

Expand Down
9 changes: 2 additions & 7 deletions internal/util/crushlocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@ import (
)

// GetCrushLocationMap returns the crush location map, determined from
// the crush location labels and their values from the CO system.
// the crush location labels and their values from the node labels passed in arg.
// Expects crushLocationLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
// Returns map of crush location types with its array of associated values.
func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) {
func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) (map[string]string, error) {
if crushLocationLabels == "" {
return nil, nil
}

nodeLabels, err := k8sGetNodeLabels(nodeName)
if err != nil {
return nil, err
}

return getCrushLocationMap(crushLocationLabels, nodeLabels), nil
}

Expand Down
22 changes: 22 additions & 0 deletions internal/util/csiconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type ClusterInfo struct {
// symlink filepath for the network namespace where we need to execute commands.
NetNamespaceFilePath string `json:"netNamespaceFilePath"`
} `json:"nfs"`
// Read affinity map options
ReadAffinity struct {
Enabled bool `json:"enabled"`
CrushLocationLabels []string `json:"crushLocationLabels"`
} `json:"readAffinity"`
}

// Expected JSON structure in the passed in config file is,
Expand Down Expand Up @@ -209,3 +214,20 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error)

return cluster.NFS.NetNamespaceFilePath, nil
}

// GetCrushLocationLabels returns the crushLocationLabels from csi config for the given clusterID
// If `readAffinity.enabled` is set to true, else returns an empty string.
func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID)
if err != nil {
return false, "", err
}

if !cluster.ReadAffinity.Enabled {
return false, "", nil
}

crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",")

return true, crushLocationLabels, nil
}
Loading

0 comments on commit 416e1a0

Please sign in to comment.