0x00 概述

Event是k8s一种重要的资源对象,k8s重要的组件基本都是基于event事件来作出相对应反映,驱动着整个k8s集群的运作。但是event 是如何产生的?如何上报给k8s APIserver?

0x01 EventBroadCaster事件管理器

  k8s通过EventBroadCaster事件管理器来将客户端生成的event传递给上游(apiserver/log….),在这个管理器里面有三个重要的组件:

  • EventRecorder: 事件产生者,k8s里面的组件都是通过EventRecorder 来记录他们所关心的事件
  • EventBroadCaster: 事件广播器,事件消费者,主要是将事件分发给和他们建立连接的broadcasterWatcher,目前主要有两种分发机制:BlockingNonBlocking默认情况下是NonBlocking
  • broadcasterWatcher: 事件观察者,主要用来从EventBroadCaster接收event,并做相应的处理(对event做聚合,过滤,上传APIserver/log等)

具体流程如下图所示:
event-flow

1
2
3
4
5
6
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{Interface: clientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "ExampleEvent"})
pod,_ := clientset.CoreV1().Pods(metav1.NamespaceAll).Get("xxxx", metav1.GetOptions{})
recorder.Event(pod, corev1.EventTypeNormal, "event reason", "event message")

下面分别解析三个组件

0x01 EventBroadCaster组件

EventBroadCaster 事件广播器,它是一个接口定义如下

1
2
3
4
5
6
7
8
9
10
11
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// 主要是初始化一个watcher并和EventBroadcaster做绑定
StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
// 对event做处理,StartRecordingToSink会调用StartEventWatcher
StartRecordingToSink(sink EventSink) watch.Interface
// 将event上报给日志
StartLogging(logf func(format string, args ...interface{})) watch.Interface
// 创建一个recorder
NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}

上面例子的第一行代码eventBroadcaster := record.NewBroadcaster()

1
2
3
4
5
6
7
8
9
10
11
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
}
m.distributing.Add(1)
go m.loop()
return m
}

这段代码其实就是New一个Broadcaster 事件广播器,并且初始化一个watchersincoming的缓冲channel, 然后在执行一个loop的goroutine,这个loop的goroutine主要负责从incoming管道里面接收event,再发送给注册到broadcaster的watchers,可以看下它具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (m *Broadcaster) loop() {
// 从incoming 接收到event
for event := range m.incoming {
if event.Type == internalRunFunctionMarker {
// 如果event类型是内部的类型,那么则跳过
event.Object.(functionFakeRuntimeObject)()
continue
}
// 将event分发给watchers
m.distribute(event)
}
m.closeAll()
m.distributing.Done()
}

从上面代码可以看出上面对loop的描述,loop主要从m.incoming 的channel里面接收Event,在通过m.distribute函数将Event分发出去,但是具体分发给谁?看下distribute函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull {
// 遍历所有的watcher,将event写入watcher的result写管道
for _, w := range m.watchers {
select {
// 将event发送给watcher
case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}

通过上面流程可以看出EventBroadcaster主要做的事情: 创建一个Broadcaster,并且创建一个incoming的缓冲channel,和broadcasterWatcher, 然后开启一个goroutine,负责从incoming读取event,再写入所有注册到Broadcaster的eventwatcher的result写管道里面.

流程图如下:
broadcaster-flow

0x02 broadcasterWatcher

broadcasterWatcher主要负责接收来自EventBroadcaster的event,然后放到eventHandler里面做相应的处理, broadcasterWatcher主要是通过EventBroadcaster里面StartRecordingToSink这个函数来创建,看下 StartRecordingToSink函数的具体实现:

1
2
3
4
5
6
7
8
9
10
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
// 创建一个eventCorrlator
eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options)
// 调用eventBroadcaster.StartEventWatcher 函数,函数参数是一个eventhandler函数
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}

我们发现StartRecordingToSink这个函数主要就是做了两件事情1: 创建了一个eventCorrator 2. 调用eventBroadcaster.StartEventWatcher函数,到这里我们其实还是不清楚eventBroadcastwatcher是如何工作的,以及这个broadcasterWatcher 到底是个啥, 继续看下StartEventWatcher这个函数的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
// 调用eventBroadcaster.Watch()函数
watcher := eventBroadcaster.Watch()
go func() {
defer utilruntime.HandleCrash()
// 从watcher.ResultChan()管道读取event object
for watchEvent := range watcher.ResultChan() {
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
// 将读取到的event传递给eventHandler来做进一步处理,我们可以在之前StartRecordSink函数里面可以发现,这个eventhandler 就是recordEventSink函数
eventHandler(event)
}
}()
return watcher
}

这段代码里面有两部分是重要的eventBroadcaster.Watch()eventHandler(event)这两个函数,在看具体函数实现之前,先说明下这两个函数的主要做了那些事情:eventBroadcaster.Watch()这个函数主要是创建一个broadcasterWatcher,并且把这个broadcasterWatcherEventBroadcaster关联起来,eventHandler(event)这个函数主要负责具体针对event做一些处理(对event做一些过滤,聚合,发送到apiserver),下面看下具体函数的实现,先看下eventBroadcaster.Watch()这段代码的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (m *Broadcaster) Watch() Interface {
// 创建一个新的broadcasterWatcher, 并且返回这个broadcaster
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
})
return w
}

ok,我们可以看到这段代码其实就创建了一个broadcasterWatcherblockQueue的作用就是清空incomingchannel,以确保在broadcasterWatcher在启动之前incoming里面没有任何event事件, 下面在看下eventHandler(event)这个具体实现,上面我们以及说过eventHandlerStartRecordingToSink里面就是recordToSink这个函数,因此我们需要看的函数代码是recordToSink函数的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {
eventCopy := *event
event = &eventCopy
// 通过生成的eventCorrelator.EventCorrelator对event做进一步的处理
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
utilruntime.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
// 通过recordEvent 函数将Event发送给APIserver
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}

看代码的具体实现,这段函数里面其实主要就做了两件事情1 通过eventCorrelator.EventCorrelate对event做了一些聚合/过滤等处理操作,2.通过recordEvent将event发送给APIserver,。到了这个大概对eventwatcher有了一个大概的认知,总结下流程图如下:

0x03 Recorder

Recorder相当于是event的生产者,从EventBroadcaster接口可以看到通过NewRecorder函数来创建一个eventRecorder, 看下NewRecorder函数的具体实现:

1
2
3
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}

通过代码可以发现,NewRecorder主要就是返回一个实现了EventRecorder接口的对象,EventRecorder的定义如下:

1
2
3
4
5
6
7
8
9
type EventRecorder interface {
Event(object runtime.Object, eventtype, reason, message string)

Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

这个接口主要定义了record如何生成一个event,主要有上面描述四种方法,上面我们daemon里面使用Event这个方法来生成一个event, 我们看下event方法的具体实现:

1
2
3
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}

从函数的具体实现,我们可以发现Event方法主要调用了recorder.generateEvent()这个方法来生成event的,具体看下recorder.generateEvent的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}

if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
//创建一个event对象
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source

go func() {
defer utilruntime.HandleCrash()
// 执行recorder.Action()函数
recorder.Action(watch.Added, event)
}()
}

通过具体的函数实现,我们可以发现,generateEvent这段函数主要就做了以下两件事情1 通过makeEvent函数来创建一个event对象,2 通过调用recorder.Action()函数来对生成的event做处理,具体Action是做啥看下它具体实现:

1
2
3
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}

incomingok 到了这里我们就清楚了,recorder创建event的逻辑 创建event 对象,并且把这个对象写入到broadcaster的incoming channel里面

0x04 总结

 通过上面对三个组建的分析,我们知道eventbroadcaster的具体工作逻辑
发送端/生成者: 创建event对象,将event对象写入到eventbroadcaster的incoming缓冲管道里面
消费者/eventbroadcaster, eventbroadcaster 从incoming管道里面接收event,并把这个event发送给和它关联的watcher(将event写入到每个watcher里面result channel 里面)
观察者: watcher 从result管道里面读取event,并对event做一些处理,发送出去