内容字号:默认大号超大号

段落设置:段首缩进取消段首缩进

字体设置:切换到微软雅黑切换到宋体

KubeController异步事件处理Infomer的实现

2018-03-12 17:17 出处:清屏网 人气: 评论(0

前两年根据Kube-Infomer的框架,写过route-controller实现gorouter和kubenretes集群中应用路由的转发,还写过一个stack-controller实现AOS的堆栈管理,当时对照1.5和1.7版本的代码做了些总结。1.9之后informer已经完全迁移到client-go了,根据1.9的代码整理一遍框架的源码实现。informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。

  1. 监听通过controller中的Reflector实现,上一节记录过reflector如何将listWatcher得到的事件写到Store里,这里informer使用的Store是DeltaFIFO,它支持实现完全按事件发生顺序的分发处理。
  2. 由Reflector生产的事件最终由processor消费。processor通过Pop队列里的事件,更新informer本地的indexer缓存,同时将事件distribute给所有的listerner。
  3. processor的listener由外部通过AddEventHandler注册,每个listener提供AddFunc UpdateFunc DeleteFunc方法。listener内部的实现加了一层缓存,用于存放pendingNotification。listerner最终实现了事件的分发,事件最终被注册的handler处理。

InfomerFactory

在DeploymentController初始化时,使用了PodInformer中的Lister()和Informer(),并通过AddEventHandler给Informer注册了事件分发处理的方式。Controller中使用的所有Informer都是从SharedInformerFactory中根据GroupVersionResource得到,同时informer的启动也是从这里开始start。

func Run(c *config.CompletedConfig)error { 
    sharedInformerFactory := informers.NewSharedInformerFactory(
      verClient, ResyncPeriod)
    // startController
    // podInformer := sharedInformerFactory.Core().V1().Pods()
    sharedInformerFactory.Start()
    select {}
}

Informer Register

informer的初始化通过client-go/tools/cache包提供的接口完成。而每个informer都通过其Informer接口实现向factory的注册。实际上一旦调用podInformer.Informer()就完成了注册,这是在startController中完成的,这之后就可以通过informerFactory.Start启动所有informer了。

func NewFilteredPodInformer(client kubernetes.Interface, namespacestring,
resyncPeriod time.Duration, indexers cache.Indexers,
/*tweakListOptions*/)cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
      &cache.ListWatch{
         ListFunc: func(options metav1.ListOptions)(runtime.Object, error) {
            return client.CoreV1().Pods(namespace).List(options)
         },
         WatchFunc: func(options metav1.ListOptions)(watch.Interface, error) {
            return client.CoreV1().Pods(namespace).Watch(options)
         },
        },
        &core_v1.Pod{},
        resyncPeriod,
        indexers,
    )
}
func (f *sharedInformerFactory)InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc)cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer
    }
    informer = newFunc(f.client, f.defaultResync)
    f.informers[informerType] = informer

    return informer
}

Informer Run

最终informerFactory将注册到工厂的所有informer都启动,而informer启动后的工作就是事件监听和分发。 cache.WaitCacheSync 遍历所有informer看是否所有的informer都收到过事件,最终 HasSynced 的判断来自DeltaFIFO。

type sharedInformerFactory struct {
    client        clientset.Interface
    lock          sync.Mutex
    defaultResync time.Duration

    informers map[reflect.Type]cache.SharedIndexInformer
    startedInformers map[reflect.Type]bool
}

func (f *sharedInformerFactory)Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

SharedInformer

informer作为异步事件处理框架,完成了事件监听和分发处理两个过程。成员中indexer是一个保存全量数据的缓存Store,informer对外提供的Lister通过Store完成,即Lister并没有直接操作etcd。

type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller

    processor             *sharedProcessor
    cacheMutationDetector CacheMutationDetector

    listerWatcher ListerWatcher
    objectType    runtime.Object

    resyncCheckPeriod time.Duration
    defaultEventHandlerResyncPeriod time.Duration
    clock clock.Clock

    started, stopped bool
    startedLock      sync.Mutex

    blockDeltas sync.Mutex
}
func (s *sharedIndexInformer)Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
  
    cfg := &Config{
        Queue:            fifo,
        ListerWatcher:    s.listerWatcher,
        ObjectType:       s.objectType,
        FullResyncPeriod: s.resyncCheckPeriod,
        RetryOnError:     false,
        ShouldResync:     s.processor.shouldResync,
        Process: s.HandleDeltas,
    }
    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    processorStopCh := make(chan struct{})
    var wg wait.Group
    defer wg.Wait()              // Wait for Processor to stop
    defer close(processorStopCh) // Tell Processor to stop
    wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    wg.StartWithChannel(processorStopCh, s.processor.run)

    s.controller.Run(stopCh)
}

Controller

controller的Run主要是个生产者消费者模式,reflector是生产者,而controller的Process函数 s.HandleDeltas 是事件的消费者。在controller的processLoop中不断地调用reflector的store的Pop消费事件,事件最终由sharedIndexInformer的HandleDeltas处理。

func (c *controller)Run(stopCh <-chan struct{}) {
    go func() {
        <-stopCh
        c.config.Queue.Close()
    }()
    r := NewReflector(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        c.config.FullResyncPeriod,
    )
    var wg wait.Group
    defer wg.Wait()
    wg.StartWithChannel(stopCh, r.Run)
    wait.Until(c.processLoop, time.Second, stopCh)
}

func (c *controller)processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
        // handler err
    }
}

DeltaFIFO

在informer框架中DeltaFIFO作为Reflector的Store,根据list watch结果对Store进行Add/Update/Delete等操作。数据结构中最重要的是items和queue,其中items缓存了几乎所有add到FIFO中的事件,它以[]Delta的形式存储,而queue则是存储这些事件的id作为FIFO处理的先后顺序。

跟UndeltaStore不同的是

type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond

    // We depend on the property that items in the set are in
    // the queue and vice versa, and that all Deltas in this
    // map have at least one Delta.
    items map[string]Deltas
    queue []string

    populated bool
    initialPopulationCount int

    keyFunc KeyFunc
    knownObjects KeyListerGetter

    closed     bool
    closedLock sync.Mutex
}

所有对Store的增删改都会经过下面的函数,它负责将这个obj入队,并存储到items缓存中,即使是删除事件也进入items事件中等到处理。当DeltaFIFO有新的内容加入后通过调用f.cond.BroadCast通知所有在f.cond.Wait中的goroutine可以去尝试Lock。

func (f *DeltaFIFO)queueActionLocked(actionType DeltaType, objinterface{})error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }

    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)
    _, exists := f.items[id]
    if len(newDeltas) > 0 {
        if !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else if exists {
        delete(f.items, id)
    }
    return nil
}

DeltaFIFO的生产者是Reflector,而消费者就是调用DeltaFIFO.Pop()的客户端。Pop方法的所有客户端都cond.Wait,但是只有当在cond.Wait中能真的获取到Lock才能从cond.Wait中返回。返回后取出queue的第一个id,在items获取该id对应的所有Delta事件,调用PopProcessFunc去处理。处理失败的item有可能再次加入队列。

处理失败后如果后续已经有deltas在缓存里,这些item就舍弃了。

func (f *DeltaFIFO)Pop(process PopProcessFunc)(interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            if f.IsClosed() {
                return nil, FIFOClosedError
            }

            f.cond.Wait()
        }
        id := f.queue[0]
        f.queue = f.queue[1:]
        item, ok := f.items[id]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        if !ok {
            // Item may have been deleted subsequently.
            continue
        }
        delete(f.items, id)
        err := process(item)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }

        return item, err
    }
}

ResourceEventHandler

通常往informer里添加的处理函数都满足以下接口,cache包里的ResourceEventHandlerFuncs刚好实现了这一组方法,因此只要注册AddFunc UpdatdFunc以及DeleteFunc即可。

type ResourceEventHandler interface {
    OnAdd(obj interface{})
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

在informer里AddEventHandler时,实际是向informer的processor里添加了listener,这个processorListener通过add run pop三个基本方法对外提供事件分发处理的功能。下面是简化删减版的注册方法。

func (s *sharedIndexInformer)AddEventHandler(handler ResourceEventHandler,
resyncPeriod time.Duration) {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    listener := newProcessListener(handler, resyncPeriod, 
       determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), 
       s.clock.Now(), initialBufferSize)

    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    s.processor.addListener(listener)
}

sharedProcessor

注册到controller里的处理函数是HandleDeltas,它主要通过processor分发事件。除了分发事件以外,处理函数会同时更新informer本地的store。

func (s *sharedIndexInformer)HandleDeltas(objinterface{})error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Added, Updated:
            isSync := d.Type == Sync
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                s.indexer.Update(d.Object)
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                s.indexer.Add(d.Object)
                s.processor.distribute(addNotification{newObj: d.Object}, isSync)
            }
        case Deleted:
            s.indexer.Delete(d.Object)
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

在消费事件时,通过informer的processor进行distrubute。processor进行分发的处理函数由外部通过 AddEventHandler ,向processor里addListener。其中addListener只是增加一个processor管理的listener,并在分发时遍历listeners,将事件发给所有的listener。

type sharedProcessor struct {
    listenersStarted bool
    listenersLock    sync.RWMutex
    listeners        []*processorListener
    syncingListeners []*processorListener
    clock            clock.Clock
    wg               wait.Group
}

func (p *sharedProcessor)distribute(objinterface{}, syncbool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
        listener.add(obj)
    }
}

processor的run保证所有listener都开始运行,并保证退出时所有listener的chan都关闭

func (p *sharedProcessor)run(stopCh <-chan struct{}) {
    func() {
        p.listenersLock.RLock()
        defer p.listenersLock.RUnlock()
        for _, listener := range p.listeners {
            p.wg.Start(listener.run)
            p.wg.Start(listener.pop)
        }
        p.listenersStarted = true
    }()
    <-stopCh
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
    // Tell .pop() to stop. .pop() will tell .run() to stop
        close(listener.addCh) 
    }
    p.wg.Wait() // Wait for all .pop() and .run() to stop
}

processorListener

pendingNotifications装了所有还没分发的事件。其中处理事件processor开始distribute时,会调用listener的add方法,将事件发到addCh上。

buffer.NewRingGrowing
type processorListener struct {
    nextCh chan interface{}
    addCh  chan interface{}

    handler ResourceEventHandler
    pendingNotifications buffer.RingGrowing

    requestedResyncPeriod time.Duration
    resyncPeriod time.Duration
  
    nextResync time.Time
    resyncLock sync.Mutex
}

func (p *processorListener)add(notificationinterface{}) {
    p.addCh <- notification
}

listener的pop goroutine不断地从addCh中获取事件,写到本地的pendingNotification或写给nextCh,而nextCh从本地pendingNotification或addCh获取事件。最后由run方法消费事件和分发事件。run方法支持指数重试,退出也会重新开始。

func (p *processorListener)pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop

    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { 
                nextCh = nil 
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil {
                notification = notificationToAdd
                nextCh = p.nextCh
            } else {
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener)run() {
    stopCh := make(chan struct{})
    wait.Until(func() {
        err := wait.ExponentialBackoff(retry.DefaultRetry, func()(bool, error) {
            for next := range p.nextCh {
                switch notification := next.(type) {
                case updateNotification:
                    p.handler.OnUpdate(notification.oldObj, notification.newObj)
                case addNotification:
                    p.handler.OnAdd(notification.newObj)
                case deleteNotification:
                    p.handler.OnDelete(notification.oldObj)
                default:
                }
            }
            return true, nil
        })
        if err == nil {
            close(stopCh)
        }
    }, 1*time.Minute, stopCh)
}

NewController

Kube-controller-manager是一个controller的集合,它实现了异步事件通知的一个通用框架,所有controller都以它为中心工作。以deploymentController为例记录下controller的实现逻辑。

func startDeploymentController(ctx ControllerContext)(bool, error) {
    go deployment.NewDeploymentController(
        ctx.InformerFactory.Extensions().V1beta1().Deployments(),
        ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("deployment-controller"),
    ).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
    return true, nil
}

DeploymentController的数据结构里listerSynced返回true表示这个lister已经至少开始工作了,Lister则是从informers的缓存中get数据的通道,rsControl提供一组操作RS的接口。节选一部分代码内容。

type DeploymentController struct {
    rsControl     controller.RSControlInterface
    client        clientset.Interface
    eventRecorder record.EventRecorder

    syncHandler func(dKeystring)error
enqueueDeploymentfunc(deployment *extensions.Deployment)

dListerextensionslisters.DeploymentLister
dListerSyncedcache.InformerSynced
queueworkqueue.RateLimitingInterface
}

funcNewDeploymentController(dInformer extensionsinformers.DeploymentInformer,
rsInformer extensionsinformers.ReplicaSetInformer,
podInformer coreinformers.PodInformer,
client clientset.Interface)*DeploymentController {
    dc := &DeploymentController{
        client:        client,
        queue:         workqueue.NewNamedRateLimitingQueue(
          workqueue.DefaultControllerRateLimiter(), "deployment"),
    }
    dc.rsControl = controller.RealRSControl{
        KubeClient: client,
        Recorder:   dc.eventRecorder,
    }
    dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    dc.addDeployment,
        UpdateFunc: dc.updateDeployment,
        DeleteFunc: dc.deleteDeployment,
    })

    dc.syncHandler = dc.syncDeployment
    dc.enqueueDeployment = dc.enqueue
    dc.dLister = dInformer.Lister()
    dc.dListerSynced = dInformer.Informer().HasSynced
    return dc
}

Controller.Run的过程首先等待所有informer都工作之后,开始并发的起N个goroutine来处理事件分发。这个goroutine循环处理一个事情,就是从queue里拿任务交给syncHandler处理。

queue何时退出

func (dc *DeploymentController)Run(workersint, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dc.queue.ShutDown()

    if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, 
        dc.rsListerSynced, dc.podListerSynced) {
        return
    }
    for i := 0; i < workers; i++ {
        go wait.Until(dc.worker, time.Second, stopCh)
    }
    <-stopCh
}

func (dc *DeploymentController)worker() {
    for dc.processNextWorkItem() {
    }
}

func (dc *DeploymentController)processNextWorkItem()bool {
    key, quit := dc.queue.Get()
    if quit {
        return false
    }
    defer dc.queue.Done(key)
    err := dc.syncHandler(key.(string))
    dc.handleErr(err, key)
    return true
}

queue当中的事件是如何进队的呢,通过注册到informer上的eventHandler处理的,比如当有新的deployment创建时,最终将要处理的deployment的key进队。

func (dc *DeploymentController)addDeployment(objinterface{}) {
    d := obj.(*extensions.Deployment)
    dc.enqueueDeployment(d)
}

func (dc *DeploymentController)enqueue(deployment *extensions.Deployment) {
    key, err := controller.KeyFunc(deployment)
    // process error
    dc.queue.Add(key)
}
分享给小伙伴们:
本文标签: KubeControllInfomer

相关文章

发表评论愿您的每句评论,都能给大家的生活添色彩,带来共鸣,带来思索,带来快乐。

CopyRight © 2015-2016 QingPingShan.com , All Rights Reserved.

清屏网 版权所有 豫ICP备15026204号