EndpointSliceCache
This struct stores the simplified information about endpointslices. First keyed by svc name and secondly keyed by endpointslice name, the stored value is endpoint struct defined by kube-proxy
The key component is endpointSliceTracker, it stores all the applied and pending endpoint slice in memory and will be used to update the changes. The changes are commited by checkoutChanges as below
EndpointSliceTracker
This is the main component to track the changes of endpoint slice, it utilizes the above cache to update/delete/add and record metrics to the relative endpoint changes. And it will call the cache.checkoutChanges to update the cache stored in it self.
The update function is the main handler, it is resposible for checking if changes are needed and interacting with the internal cache above
// EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing
// endpointSlice (depending on removeSlice). It returns true if this update contained a
// change that needs to be synced; note that this is different from the return value of
// ServiceChangeTracker.Update().
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}
// This should never happen
if endpointSlice == nil {
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
return false
}
namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
if err != nil {
klog.InfoS("Error getting endpoint slice cache keys", "err", err)
return false
}
metrics.EndpointChangesTotal.Inc()
ect.lock.Lock()
defer ect.lock.Unlock()
changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
if changeNeeded {
metrics.EndpointChangesPending.Inc()
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
// by-definition coming from the time of last update, which is not what
// we want to measure. So we simply ignore it in this cases.
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
// when other EndpointSlice for that service still exist.
if removeSlice {
delete(ect.lastChangeTriggerTimes, namespacedName)
} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}
}
return changeNeeded
}
// checkoutChanges returns a map of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
metrics.EndpointChangesPending.Set(0)
return ect.endpointSliceCache.checkoutChanges()
}
ServiceChangeTracker
As commented says, it is the same purpose as endpoint slice tracker but it focus on service. It will tracks all changes to the services. The architecture is always the same.
// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
// Services, keyed by their namespace and name.
type ServiceChangeTracker struct {
// lock protects items.
lock sync.Mutex
// items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows the proxier to inject customized information when
// processing services.
makeServiceInfo makeServicePortFunc
// processServiceMapChange is invoked by the apply function on every change. This
// function should not modify the ServicePortMaps, but just use the changes for
// any Proxier-specific cleanup.
processServiceMapChange processServiceMapChangeFunc
ipFamily v1.IPFamily
recorder events.EventRecorder
}
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
type processServiceMapChangeFunc func(previous, current ServicePortMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes.
type serviceChange struct {
previous ServicePortMap
current ServicePortMap
}
And it also has a handler called update to check the changes and do the internal cache update. Notice, there is no servicecache struct like endpointslicecache, just a field defined inside the servicechangetracker
// Update updates ServicePortMap base on the given changes, returns information about the
// diff since the last Update, triggers processServiceMapChange on every change, and
// clears the changes map.
func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
sct.lock.Lock()
defer sct.lock.Unlock()
result := UpdateServiceMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPClusterIPs: sets.New[string](),
}
for nn, change := range sct.items {
if sct.processServiceMapChange != nil {
sct.processServiceMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)
sm.merge(change.current)
// filter out the Update event of current changes from previous changes
// before calling unmerge() so that can skip deleting the Update events.
change.previous.filter(change.current)
sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
}
// clear changes after applying them to ServicePortMap.
sct.items = make(map[types.NamespacedName]*serviceChange)
metrics.ServiceChangesPending.Set(0)
return result
}
There are also NodePodCIDRHandler struct to track the node’s addition and remove. This is added in response to https://issues.k8s.io/111321.
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
// Implements the config.NodeHandler interface
// https://issues.k8s.io/111321
type NodePodCIDRHandler struct {
mu sync.Mutex
podCIDRs []string
}
func NewNodePodCIDRHandler(podCIDRs []string) *NodePodCIDRHandler {
return &NodePodCIDRHandler{
podCIDRs: podCIDRs,
}
}
var _ config.NodeHandler = &NodePodCIDRHandler{}
// OnNodeAdd is a handler for Node creates.
func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) {
n.mu.Lock()
defer n.mu.Unlock()
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
klog.InfoS("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
// OnNodeUpdate is a handler for Node updates.
func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) {
n.mu.Lock()
defer n.mu.Unlock()
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
klog.InfoS("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
klog.ErrorS(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPODCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
// OnNodeDelete is a handler for Node deletes.
func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
klog.ErrorS(nil, "Current Node is being deleted", "node", klog.KObj(node))
}
By the definition, it looks more like one informer handler react to the relative event such as update, add. It maintains the node pod CIDR internal and every time a node event happens, the CIDR internal will be checked with the incoming node changing event. If the incoming pod CIDR doesn’t match, then restart the kube-proxy, if update then rewrite the CIDR internally in kube-proxy’s NodePodCIDRHandler. If remove, just remove from the NodePodCIDRHandler.
Conclusion
But kube-proxy is actually managing the rulesused by iptables or ipvs. Why such changes doesn’t trigger the iptables update? Actually kube-proxy has a library to do the trick. Let me explain it in next article
https://github.com/kubernetes/kubernetes/tree/master/pkg/proxy
Leave a Reply