文章目录
- 前言背景
- 一、总体架构
- 二、edgestream 启动 创建websocket
- 三. 监听该 Websocket
- 四. cloud的tunnel 保存session
- 五. tunnel 监听上一步接收的wss connection
- 六. 创建stream server 时引用了tunnel 指针
- 七. API Server 发起`/containerLogs`
- 八. 从api server 的请求中提取 session并往session 中设置sessionid
- 九. 找到session后往session里的wss 写一个查询日志的message,message里必须要带着sessionid,用于查询apiserver 的连接
- 十. edgenode收到后转发给kebelet 的server
- 十一. kubelt 的日志返回写回wss
- 十二. cloud 那边的tunnel 收到message 后,写回apiserver
- 总结
前言背景
kubeedge 这个边缘计算平台有一个特点:它的边缘节点网络和云端节点的网络不是在一个平面的,边缘的计算节点通过keadm join
加入边缘集群的时候,并不要求自身的IP从云端可达,这是很符合真实场景的。
-
kubelet 上报的kubelet server 信息
它是kubelet 上报的信息,kubectl describe node raspberrypi
看到的重要信息如下:addresses: - address: 192.168.4.87 type: InternalIP - address: raspberrypi type: Hostname daemonEndpoints: kubeletEndpoint: Port: 10350
在边缘计算节点加入集群后,cloudcore 生成的 node 中包含了该edge node 的 KubeletEndpoint 端口和它的IP 以及主机名(也就是
kubectl get node
看到的node 名称)。 -
kubectl logs
的实现原理(how kubectl logs works)
发起一个日志查询请求,打开详细日志:root@172:~# kubectl logs -f nginx-edge-6785d8586b-g7j6p -v=7 I1021 15:21:43.922181 17648 loader.go:375] Config loaded from file: /root/.kube/config I1021 15:21:43.924765 17648 round_trippers.go:420] GET https://172.171.1.220:6443/api/v1/namespaces/default/pods/nginx-edge-6785d8586b-g7j6p I1021 15:21:43.924774 17648 round_trippers.go:427] Request Headers: I1021 15:21:43.924777 17648 round_trippers.go:431] Accept: application/json, */* I1021 15:21:43.924781 17648 round_trippers.go:431] User-Agent: kubectl/v1.18.3 (linux/amd64) kubernetes/2e7996e I1021 15:21:43.929433 17648 round_trippers.go:446] Response Status: 200 OK in 4 milliseconds I1021 15:21:43.932459 17648 round_trippers.go:420] GET https://172.171.1.220:6443/api/v1/namespaces/default/pods/nginx-edge-6785d8586b-g7j6p/log?follow=true I1021 15:21:43.932467 17648 round_trippers.go:427] Request Headers: I1021 15:21:43.932471 17648 round_trippers.go:431] Accept: application/json, */* I1021 15:21:43.932475 17648 round_trippers.go:431] User-Agent: kubectl/v1.18.3 (linux/amd64) kubernetes/2e7996e I1021 15:21:43.980607 17648 round_trippers.go:446] Response Status: 200 OK in 48 milliseconds /docker-entrypoint.sh: /docker-entrypoint.d/ is not empty, will attempt to perform configuration /docker-entrypoint.sh: Looking for shell scripts in /docker-entrypoint.d/ /docker-entrypoint.sh: Launching /docker-entrypoint.d/10-listen-on-ipv6-by-default.sh 10-listen-on-ipv6-by-default.sh: Getting the checksum of /etc/nginx/conf.d/default.conf 10-listen-on-ipv6-by-default.sh: Enabled listen on IPv6 in /etc/nginx/conf.d/default.conf /docker-entrypoint.sh: Launching /docker-entrypoint.d/20-envsubst-on-templates.sh /docker-entrypoint.sh: Configuration complete; ready for start up
可以看到,kubectl 其实是发起了2个请求:
- 查询该pod,主要是确认pod是否存在,以及是否存在多个容器
- 拼接出来
/logs
并向真正的该pod 所在的node kubelet server 发起请求
我们从kubeapiserver 的相关代码就可以看出来,具体的解释在下边代码中做了编辑:
func LogLocation( ctx context.Context, getter ResourceGetter, connInfo client.ConnectionInfoGetter, name string, opts *api.PodLogOptions, ) (*url.URL, http.RoundTripper, error) { //1. 根据name 查询pod详细信息 pod, err := getPod(ctx, getter, name) if err != nil { return nil, nil, err } // 2. 判断是否提供了容器名称,如果指定了,就获取指定的容器,如果没指定,判断能不能给默认的容器,如果既没有指定,也无法给出默认,那么就err。 // Try to figure out a container // If a container was provided, it must be valid container := opts.Container container, err = validateContainer(container, pod) if err != nil { return nil, nil, err } // 3. 根据pod name 查询它所在的node name nodeName := types.NodeName(pod.Spec.NodeName) if len(nodeName) == 0 { // If pod has not been assigned a host, return an empty location return nil, nil, nil } // 4. 根据node name 查询到node info。 nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
接下来我们看下
connInfo.GetConnectionInfo(ctx, nodeName)
// GetConnectionInfo retrieves connection info from the status of a Node API object. func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx context.Context, nodeName types.NodeName) (*ConnectionInfo, error) { node, err := k.nodes.Get(ctx, string(nodeName), metav1.GetOptions{}) if err != nil { return nil, err } // Find a kubelet-reported address, using preferred address type host, err := nodeutil.GetPreferredNodeAddress(node, k.preferredAddressTypes) if err != nil { return nil, err } // Use the kubelet-reported port, if present port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) if port <= 0 { port = k.defaultPort } return &ConnectionInfo{ Scheme: k.scheme, Hostname: host, Port: strconv.Itoa(port), Transport: k.transport, InsecureSkipTLSVerifyTransport: k.insecureSkipTLSVerifyTransport, }, nil }
这里会从Node Status的address 中选一个地址做url的host,一般是InternalIP,显然是一个内网IP,当这个IP 从云端可达的时候,我们去执行
kubectl logs
或者kubectl exec
的时候会用这个IP 和 kubeletEndpoint(默认是10250) 拼接出目的kubelet server 的地址。但是当Pod 运行在Edge Node 上时,这个IP是不可达的,那么kubeedge 是怎么做的呢?前戏很长,但是很有必要,让我们开启今天的正题。
一、总体架构
可以看到apiserver 通过 tunnelserver与的node 的ws通道发给node上的kubelet server,我们分了10步去解释整个过程,接下来一步步的从代码去深入分析。
二、edgestream 启动 创建websocket
1. 读取本地的证书配置
2. 连接cloud 端的 tunnel server
dial := websocket.Dialer{
TLSClientConfig: tlsConfig,
HandshakeTimeout: time.Duration(config.Config.HandshakeTimeout) * time.Second,
}
header := http.Header{}
// 在请求里带上自己的Ip与Hostname
header.Add(stream.SessionKeyHostNameOveride, e.hostnameOveride)
header.Add(stream.SessionKeyInternalIP, e.nodeIP)
con, _, err := dial.Dial(url.String(), header)
三. 监听该 Websocket
_, r, err := s.Tunnel.NextReader()
if err != nil {
klog.Errorf("Read Message error %v", err)
return err
}
mess, err := stream.ReadMessageFromTunnel(r)
if err != nil {
klog.Errorf("Get tunnel Message error %v", err)
return err
}
if mess.MessageType < stream.MessageTypeData {
go s.ServeConnection(mess)
}
s.WriteToLocalConnection(mess)
}
这里启动go s.ServeConnection(mess)
四. cloud的tunnel 保存session
// 1. con 这个对象非常重要,下一步会用来封装在session里
con, err := s.upgrader.Upgrade(w, r.Request, nil)
if err != nil {
return
}
klog.Infof("get a new tunnel agent hostname %v, internalIP %v", hostNameOverride, interalIP)
session := &Session{
tunnel: stream.NewDefaultTunnel(con),
apiServerConn: make(map[uint64]APIServerConnection),
apiConnlock: &sync.Mutex{},
sessionID: hostNameOverride,
}
// 2. 把session保存在tunnel的kv结构里
s.addSession(hostNameOverride, session)
s.addSession(interalIP, session)
它会把hostname 和 internalIP 做key,session 做value。
五. tunnel 监听上一步接收的wss connection
// Serve read tunnel message ,and write to specific apiserver connection
func (s *Session) Serve() {
defer s.Close()
for {
// 1. 监听这个session connection
t, r, err := s.tunnel.NextReader()
if err != nil {
klog.Errorf("get %v reader error %v", s, err)
return
}
if t != websocket.TextMessage {
klog.Errorf("Websocket message type must be %v type", websocket.TextMessage)
return
}
// 2. 从接收到的消息里,组装message
message, err := stream.ReadMessageFromTunnel(r)
if err != nil {
klog.Errorf("Read message from tunnel %v error %v", s.String(), err)
return
}
}
}
六. 创建stream server 时引用了tunnel 指针
func newStreamServer(t *TunnelServer) *StreamServer {
return &StreamServer{
container: restful.NewContainer(),
tunnel: t,
}
}
这样,我们就可以在stream server 的处理中,获取到根据request 的host获取到session,并且获取到该wss connection
七. API Server 发起/containerLogs
前言背景中,提到了API Server 会对host:10350 发起请求/containerLogs/{podNamespace}/{podID}/{containerName}
八. 从api server 的请求中提取 session并往session 中设置sessionid
// 1. 从request中分离Host,作为sessionkey
sessionKey := strings.Split(r.Request.Host, ":")[0]
// 2. 根据host 查询到session,查不到就凉了,报错
session, ok := s.tunnel.getSession(sessionKey)
if !ok {
err = fmt.Errorf("Can not find %v session ", sessionKey)
return
}
// 3. 往session里保存一个sessionid,作为当前请求的唯一标示,后续这个id,又叫messageid,会发给edgecore,edgecore 返回消息给wss 时,也会根据这个id 找到apiserver 的connection,用fw 写回apiserver,理解这一步很重要。
logConnection, err := session.AddAPIServerConnection(s, &ContainerLogsConnection{
r: r,
flush: fw,
session: session,
ctx: r.Request.Context(),
edgePeerStop: make(chan struct{}),
})
解释在code里。
九. 找到session后往session里的wss 写一个查询日志的message,message里必须要带着sessionid,用于查询apiserver 的连接
func (l *ContainerLogsConnection) Serve() error {
defer func() {
klog.Infof("%s end successful", l.String())
}()
// first send connect message
// 发送消息,给这个wss,这个消息里说明了,我要查日志,请求地址是127.0.0.1:10350
if _, err := l.SendConnection(); err != nil {
klog.Errorf("%s send %s info error %v", l.String(), stream.MessageTypeLogsConnect, err)
return err
}
十. edgenode收到后转发给kebelet 的server
func (l *EdgedLogsConnection) Serve(tunnel SafeWriteTunneler) error {
//connect edged
client := http.Client{}
req, err := http.NewRequest("GET", l.URL.String(), nil)
if err != nil {
klog.Errorf("create new logs request error %v", err)
return err
}
// header里有该请求的地址url,127.0.0.1:10350
req.Header = l.Header
resp, err := client.Do(req)
if err != nil {
klog.Errorf("request logs error %v", err)
return err
}
defer resp.Body.Close()
// 用scanner 查日志的返回
scan := bufio.NewScanner(resp.Body)
stop := make(chan struct{})
十一. kubelt 的日志返回写回wss
for scan.Scan() {
select {
case <-stop:
klog.Infof("receive stop single, so stop logs scan ...")
return nil
default:
}
// 10 = \n
msg := NewMessage(l.MessID, MessageTypeData, append(scan.Bytes(), 10))
err := msg.WriteTo(tunnel)
if err != nil {
klog.Errorf("write tunnel message %v error", msg)
return err
}
klog.Infof("%v write logs %v", l.String(), string(scan.Bytes()))
}
这一步,写回tunnel
十二. cloud 那边的tunnel 收到message 后,写回apiserver
// Serve read tunnel message ,and write to specific apiserver connection
func (s *Session) Serve() {
defer s.Close()
for {
....
// 发给了APISERVER,这一步是根据
if err := s.ProxyTunnelMessageToApiserver(message); err != nil {
klog.Errorf("Proxy tunnel message [%s] to kube-apiserver error %v", message.String(), err)
continue
}
}
}
func (s *Session) ProxyTunnelMessageToApiserver(message *stream.Message) error {
// 1. 根据message 的id,其实就是sessionid查出来apiserver connetcion
kubeCon, ok := s.apiServerConn[message.ConnectID]
if !ok {
return fmt.Errorf("Can not find apiServer connection id %v in %v",
message.ConnectID, s.String())
}
switch message.MessageType {
case stream.MessageTypeRemoveConnect:
kubeCon.SetEdgePeerDone()
case stream.MessageTypeData:
for i := 0; i < len(message.Data); {
// 2.把全部消息写回到apiserver connection里。其实就是前面的fw对象,这个对象封装了rest.response.
n, err := kubeCon.WriteToAPIServer(message.Data[i:])
if err != nil {
return err
}
i += n
}
default:
}
return nil
}
代码中做了解释。
总结
至此我们对每一步全部做了解释,除了一个小尾巴,apiserver 为什么会把10350 的流量全部写入了stream server呢?
这里社区,只能做一个dnat了。
iptables -t nat -A OUTPUT -p tcp --dport 10350 -j DNAT --to $CLOUDCOREIPS:10003
这样,在apiserver 发出请求时,所有流量都会发给了10003端口,这个端口就是stream server 的监听端口。
这个过程还是比较清晰的,不必要求edge node 的cloud node 网络互相可达。
更多推荐
所有评论(0)