Skip to content

Commit

Permalink
Merge pull request #337 from mucahitkurt/use-node-informer
Browse files Browse the repository at this point in the history
Use Node informer to prevent hitting the API server for all time
  • Loading branch information
k8s-ci-robot authored Oct 28, 2019
2 parents 4160887 + 99a83ac commit 16a2da3
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 30 deletions.
8 changes: 6 additions & 2 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
utilflag "k8s.io/component-base/cli/flag"
csitrans "k8s.io/csi-translation-lib"
Expand Down Expand Up @@ -184,11 +185,13 @@ func main() {
}

var csiNodeLister storagelisters.CSINodeLister
var nodeLister v1.NodeLister
var factory informers.SharedInformerFactory
if ctrl.SupportsTopology(pluginCapabilities) {
// Create informer to prevent hit the API server for all resource request
factory = informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
}

// Create the provisioner: it implements the Provisioner interface expected by
Expand All @@ -207,7 +210,8 @@ func main() {
supportsMigrationFromInTreePluginName,
*strictTopology,
translator,
csiNodeLister)
csiNodeLister,
nodeLister)

provisionController = controller.NewProvisionController(
clientset,
Expand All @@ -224,7 +228,7 @@ func main() {
cacheSyncResult := factory.WaitForCacheSync(stopCh)
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync CsiNodeInformer!")
klog.Fatalf("Failed to sync Informers!")
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"k8s.io/klog"

"google.golang.org/grpc"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
)

Expand Down Expand Up @@ -197,6 +198,7 @@ type csiProvisioner struct {
strictTopology bool
translator ProvisionerCSITranslator
csiNodeLister storagelisters.CSINodeLister
nodeLister corelisters.NodeLister
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -257,7 +259,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
supportsMigrationFromInTreePluginName string,
strictTopology bool,
translator ProvisionerCSITranslator,
csiNodeLister storagelisters.CSINodeLister) controller.Provisioner {
csiNodeLister storagelisters.CSINodeLister,
nodeLister corelisters.NodeLister) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -276,6 +279,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
strictTopology: strictTopology,
translator: translator,
csiNodeLister: csiNodeLister,
nodeLister: nodeLister,
}
return provisioner
}
Expand Down Expand Up @@ -504,7 +508,8 @@ func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.
options.StorageClass.AllowedTopologies,
options.SelectedNode,
p.strictTopology,
p.csiNodeLister)
p.csiNodeLister,
p.nodeLister)
if err != nil {
return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test",
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1464,7 +1464,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil)
nil, provisionDriverName, pluginCaps, controllerCaps, supportsMigrationFromInTreePluginName, false, csitrans.New(), nil, nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2031,7 +2031,7 @@ func TestProvisionFromSnapshot(t *testing.T) {

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
client, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2201,11 +2201,11 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset(nodes, csiNodes)

csiNodeLister, stopChan := csiNodeLister(clientSet, t)
csiNodeLister, nodeLister, stopChan := listers(clientSet)
defer close(stopChan)

csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister)
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), csiNodeLister, nodeLister)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -2260,7 +2260,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {
clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2440,7 +2440,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5,
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down Expand Up @@ -3096,7 +3096,7 @@ func TestProvisionFromPVC(t *testing.T) {
}

csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn,
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil)
nil, driverName, pluginCaps, controllerCaps, "", false, csitrans.New(), nil, nil)

pv, err := csiProvisioner.Provision(tc.volOpts)
if tc.expectErr && err == nil {
Expand Down Expand Up @@ -3175,7 +3175,7 @@ func TestProvisionWithMigration(t *testing.T) {
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps,
inTreePluginName, false, mockTranslator, nil)
inTreePluginName, false, mockTranslator, nil, nil)

// Set up return values (AnyTimes to avoid overfitting on implementation)

Expand Down Expand Up @@ -3335,7 +3335,7 @@ func TestDeleteMigration(t *testing.T) {
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner",
"test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "",
false, mockTranslator, nil)
false, mockTranslator, nil, nil)

// Set mock return values (AnyTimes to avoid overfitting on implementation details)
mockTranslator.EXPECT().IsPVMigratable(gomock.Any()).Return(tc.expectTranslation).AnyTimes()
Expand Down
22 changes: 12 additions & 10 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/klog"
)
Expand Down Expand Up @@ -152,7 +153,8 @@ func GenerateAccessibilityRequirements(
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node,
strictTopology bool,
csiNodeLister storagelisters.CSINodeLister) (*csi.TopologyRequirement, error) {
csiNodeLister storagelisters.CSINodeLister,
nodeLister corelisters.NodeLister) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
Expand Down Expand Up @@ -219,7 +221,7 @@ func GenerateAccessibilityRequirements(
requisiteTerms = flatten(allowedTopologies)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister)
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode, csiNodeLister, nodeLister)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +311,8 @@ func aggregateTopologies(
kubeClient kubernetes.Interface,
driverName string,
selectedCSINode *storage.CSINode,
csiNodeLister storagelisters.CSINodeLister) ([]topologyTerm, error) {
csiNodeLister storagelisters.CSINodeLister,
nodeLister corelisters.NodeLister) ([]topologyTerm, error) {

// 1. Determine topologyKeys to use for aggregation
var topologyKeys []string
Expand Down Expand Up @@ -371,15 +374,14 @@ func aggregateTopologies(
if err != nil {
return nil, err
}
// TODO (#144): use informers
nodes, err := kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector: selector})
nodes, err := nodeLister.List(selector)
if err != nil {
return nil, fmt.Errorf("error listing nodes: %v", err)
}

var terms []topologyTerm
for _, node := range nodes.Items {
term, _ := getTopologyFromNode(&node, topologyKeys)
for _, node := range nodes {
term, _ := getTopologyFromNode(node, topologyKeys)
terms = append(terms, term)
}
if len(terms) == 0 {
Expand Down Expand Up @@ -508,7 +510,7 @@ func getTopologyFromNode(node *v1.Node, topologyKeys []string) (term topologyTer
return term, false
}

func buildTopologyKeySelector(topologyKeys []string) (string, error) {
func buildTopologyKeySelector(topologyKeys []string) (labels.Selector, error) {
var expr []metav1.LabelSelectorRequirement
for _, key := range topologyKeys {
expr = append(expr, metav1.LabelSelectorRequirement{
Expand All @@ -523,10 +525,10 @@ func buildTopologyKeySelector(topologyKeys []string) (string, error) {

selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return "", fmt.Errorf("error parsing topology keys selector: %v", err)
return nil, fmt.Errorf("error parsing topology keys selector: %v", err)
}

return selector.String(), nil
return selector, nil
}

func (t topologyTerm) clone() topologyTerm {
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/apis/core/helper"
)
Expand Down Expand Up @@ -392,7 +393,7 @@ func TestStatefulSetSpreading(t *testing.T) {

kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)

csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
defer close(stopChan)

for name, tc := range testcases {
Expand All @@ -410,6 +411,7 @@ func TestStatefulSetSpreading(t *testing.T) {
nil,
strictTopology,
csiNodeLister,
nodeLister,
)

if err != nil {
Expand Down Expand Up @@ -804,6 +806,7 @@ func TestAllowedTopologies(t *testing.T) {
nil, /* selectedNode */
strictTopology,
nil,
nil,
)

if err != nil {
Expand Down Expand Up @@ -1080,7 +1083,7 @@ func TestTopologyAggregation(t *testing.T) {

kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)

csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
defer close(stopChan)

var selectedNode *v1.Node
Expand All @@ -1095,6 +1098,7 @@ func TestTopologyAggregation(t *testing.T) {
selectedNode,
strictTopology,
csiNodeLister,
nodeLister,
)

if tc.expectError {
Expand Down Expand Up @@ -1332,7 +1336,7 @@ func TestPreferredTopologies(t *testing.T) {
kubeClient := fakeclientset.NewSimpleClientset(nodes, csiNodes)
selectedNode := &nodes.Items[0]

csiNodeLister, stopChan := csiNodeLister(kubeClient, t)
csiNodeLister, nodeLister, stopChan := listers(kubeClient)
defer close(stopChan)

requirements, err := GenerateAccessibilityRequirements(
Expand All @@ -1343,6 +1347,7 @@ func TestPreferredTopologies(t *testing.T) {
selectedNode,
strictTopology,
csiNodeLister,
nodeLister,
)

if tc.expectError {
Expand Down Expand Up @@ -1545,11 +1550,12 @@ func requisiteEqual(t1, t2 []*csi.Topology) bool {
return unchecked.Len() == 0
}

func csiNodeLister(kubeClient *fakeclientset.Clientset, t *testing.T) (v1beta1.CSINodeLister, chan struct{}) {
func listers(kubeClient *fakeclientset.Clientset) (v1beta1.CSINodeLister, corelisters.NodeLister, chan struct{}) {
factory := informers.NewSharedInformerFactory(kubeClient, ResyncPeriodOfCsiNodeInformer)
stopChan := make(chan struct{})
csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister()
nodeLister := factory.Core().V1().Nodes().Lister()
factory.Start(stopChan)
factory.WaitForCacheSync(stopChan)
return csiNodeLister, stopChan
return csiNodeLister, nodeLister, stopChan
}
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ k8s.io/apiserver/pkg/util/feature
# k8s.io/client-go v0.0.0-20190918200256-06eb1244587a => k8s.io/client-go v0.0.0-20190918200256-06eb1244587a
k8s.io/client-go/informers
k8s.io/client-go/kubernetes
k8s.io/client-go/listers/core/v1
k8s.io/client-go/listers/storage/v1beta1
k8s.io/client-go/rest
k8s.io/client-go/tools/clientcmd
Expand Down Expand Up @@ -416,7 +417,6 @@ k8s.io/client-go/listers/batch/v2alpha1
k8s.io/client-go/listers/certificates/v1beta1
k8s.io/client-go/listers/coordination/v1
k8s.io/client-go/listers/coordination/v1beta1
k8s.io/client-go/listers/core/v1
k8s.io/client-go/listers/events/v1beta1
k8s.io/client-go/listers/extensions/v1beta1
k8s.io/client-go/listers/networking/v1
Expand Down

0 comments on commit 16a2da3

Please sign in to comment.