k8s-eventbroadcaster
0x00 概述
Event是k8s一种重要的资源对象,k8s重要的组件基本都是基于event事件来作出相对应反映,驱动着整个k8s集群的运作。但是event 是如何产生的?如何上报给k8s APIserver?
0x01 EventBroadCaster事件管理器
k8s通过EventBroadCaster事件管理器来将客户端生成的event传递给上游(apiserver/log….),在这个管理器里面有三个重要的组件:
- EventRecorder: 事件产生者,k8s里面的组件都是通过EventRecorder 来记录他们所关心的事件
- EventBroadCaster: 事件广播器,事件消费者,主要是将事件分发给和他们建立连接的
broadcasterWatcher,目前主要有两种分发机制:Blocking和NonBlocking默认情况下是NonBlocking的 - broadcasterWatcher: 事件观察者,主要用来从
EventBroadCaster接收event,并做相应的处理(对event做聚合,过滤,上传APIserver/log等)
具体流程如下图所示:
1 | eventBroadcaster := record.NewBroadcaster() |
下面分别解析三个组件
0x01 EventBroadCaster组件
EventBroadCaster 事件广播器,它是一个接口定义如下
1 | // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. |
上面例子的第一行代码eventBroadcaster := record.NewBroadcaster()
1 | func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { |
这段代码其实就是New一个Broadcaster 事件广播器,并且初始化一个
watchers和incoming的缓冲channel, 然后在执行一个loop的goroutine,这个loop的goroutine主要负责从incoming管道里面接收event,再发送给注册到broadcaster的watchers,可以看下它具体的实现:
1 | func (m *Broadcaster) loop() { |
从上面代码可以看出上面对loop的描述,
loop主要从m.incoming的channel里面接收Event,在通过m.distribute函数将Event分发出去,但是具体分发给谁?看下distribute函数的实现:
1 | func (m *Broadcaster) distribute(event Event) { |
通过上面流程可以看出
EventBroadcaster主要做的事情: 创建一个Broadcaster,并且创建一个incoming的缓冲channel,和broadcasterWatcher, 然后开启一个goroutine,负责从incoming读取event,再写入所有注册到Broadcaster的eventwatcher的result写管道里面.
流程图如下:
0x02 broadcasterWatcher
broadcasterWatcher主要负责接收来自EventBroadcaster的event,然后放到eventHandler里面做相应的处理, broadcasterWatcher主要是通过EventBroadcaster里面StartRecordingToSink这个函数来创建,看下 StartRecordingToSink函数的具体实现:
1 | func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { |
我们发现
StartRecordingToSink这个函数主要就是做了两件事情1: 创建了一个eventCorrator 2. 调用eventBroadcaster.StartEventWatcher函数,到这里我们其实还是不清楚eventBroadcastwatcher是如何工作的,以及这个broadcasterWatcher到底是个啥, 继续看下StartEventWatcher这个函数的具体实现:
1 | func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { |
这段代码里面有两部分是重要的
eventBroadcaster.Watch()和eventHandler(event)这两个函数,在看具体函数实现之前,先说明下这两个函数的主要做了那些事情:eventBroadcaster.Watch()这个函数主要是创建一个broadcasterWatcher,并且把这个broadcasterWatcher和EventBroadcaster关联起来,eventHandler(event)这个函数主要负责具体针对event做一些处理(对event做一些过滤,聚合,发送到apiserver),下面看下具体函数的实现,先看下eventBroadcaster.Watch()这段代码的具体实现
1 | func (m *Broadcaster) Watch() Interface { |
ok,我们可以看到这段代码其实就创建了一个
broadcasterWatcher,blockQueue的作用就是清空incomingchannel,以确保在broadcasterWatcher在启动之前incoming里面没有任何event事件, 下面在看下eventHandler(event)这个具体实现,上面我们以及说过eventHandler在StartRecordingToSink里面就是recordToSink这个函数,因此我们需要看的函数代码是recordToSink函数的具体实现:
1 | func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) { |
看代码的具体实现,这段函数里面其实主要就做了两件事情1 通过
eventCorrelator.EventCorrelate对event做了一些聚合/过滤等处理操作,2.通过recordEvent将event发送给APIserver,。到了这个大概对eventwatcher有了一个大概的认知,总结下流程图如下:
0x03 Recorder
Recorder相当于是event的生产者,从EventBroadcaster接口可以看到通过NewRecorder函数来创建一个eventRecorder, 看下NewRecorder函数的具体实现:
1 | func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder { |
通过代码可以发现,
NewRecorder主要就是返回一个实现了EventRecorder接口的对象,EventRecorder的定义如下:
1 | type EventRecorder interface { |
这个接口主要定义了record如何生成一个event,主要有上面描述四种方法,上面我们daemon里面使用
Event这个方法来生成一个event, 我们看下event方法的具体实现:
1 | func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { |
从函数的具体实现,我们可以发现
Event方法主要调用了recorder.generateEvent()这个方法来生成event的,具体看下recorder.generateEvent的实现:
1 | func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { |
通过具体的函数实现,我们可以发现,
generateEvent这段函数主要就做了以下两件事情1 通过makeEvent函数来创建一个event对象,2 通过调用recorder.Action()函数来对生成的event做处理,具体Action是做啥看下它具体实现:
1 | func (m *Broadcaster) Action(action EventType, obj runtime.Object) { |
incomingok 到了这里我们就清楚了,recorder创建event的逻辑 创建event 对象,并且把这个对象写入到broadcaster的incoming channel里面
0x04 总结
通过上面对三个组建的分析,我们知道eventbroadcaster的具体工作逻辑
发送端/生成者: 创建event对象,将event对象写入到eventbroadcaster的incoming缓冲管道里面
消费者/eventbroadcaster, eventbroadcaster 从incoming管道里面接收event,并把这个event发送给和它关联的watcher(将event写入到每个watcher里面result channel 里面)
观察者: watcher 从result管道里面读取event,并对event做一些处理,发送出去
