月度归档: 2022 年 1 月

一篇文章带你入门K8S二次开发

背景

我们经常会在网上看到K8S和周边工具的教程,例如HELM的使用,droneCI的使用,但是很少有文章写,如何基于K8S进行二次开发,本篇文章将使用python和vue进行K8S的二次开发,实现一个简单的查询k8s的pod和node信息的页面


效果图

通过前端页面,调用后端python接口,查询k8s当前的节点状态和应用状态


涉及到的知识点

知识点说明
python-sanic库为前台提供API接口
python-kubernetes库访问k8s,获取pod和node资源信息
nodejs-vue前端框架
nodejs-element-UI提供UI组件,用了图标和表格组件
k8s-helm程序最后是要运行在K8S里,所以要编写helm包,包括rbac,svc,deployment文件
docker前后端的docker镜像制作

用户故事


后端python代码解说

main.py 主函数入口

#main.py
from kubernetes import client, config
from sanic import Sanic
from sanic.response import json

from cors import add_cors_headers
from options import setup_options

# sanic程序必须有个名字
app = Sanic("backend")
# 在本地调试,把config文件拷贝到本机的~/.kube/config然后使用load_kube_config,在K8S集群里使用load_incluster_config
# config.load_kube_config()
config.load_incluster_config()


def check_node_status(receiver):
    '''
    检查节点的状态是否正确,正确的设为1,不正确的设为0
    '''
    # 期望结果
    expect = {"NetworkUnavailable": "False",
              "MemoryPressure": "False",
              "DiskPressure": "False",
              "PIDPressure": "False",
              "Ready": "True"
              }
    result_dict = {}
    for (key, value) in receiver.items():
        # 这个逻辑是判断k8s传过来的值与expect的值是否相同
        if expect[key] == value:
            result_dict[key] = 1
        else:
            result_dict[key] = 0
    return result_dict


@app.route("/api/node")
async def node(request):
    result = []
    v1 = client.CoreV1Api()
    node_rest = v1.list_node_with_http_info()
    for i in node_rest[0].items:
        computer_ip = i.status.addresses[0].address
        computer_name = i.status.addresses[1].address
        # 先获得节点的IP和名字
        info = {"computer_ip": computer_ip, "computer_name": computer_name}
        status_json = {}
        # 节点有多个状态,把所有状态查出来,存入json里
        # 这里有一个flannel插件的坑,及时节点关机了,NetworkUnavailable查出来还是False
        for node_condition in i.status.conditions:
            status_json[node_condition.type] = node_condition.status
        check_dict = check_node_status(status_json)
        # 把节点的状态加入节点信息json里
        info.update(check_dict)
        # 把每一个节点的查询结果加入list里,返回给前端
        result.append(info)
    return json(result)


@app.route("/api/pod")
async def pod(request):
    '''
    接口名是pod,其实是检查所有的deployment,statefulset,daemonset的副本状态
    通过这些状态判断当前的程序是否正常工作
    '''
    pod_list = []
    apis_api = client.AppsV1Api()
    # 检查deployment信息
    resp = apis_api.list_deployment_for_all_namespaces()
    for i in resp.items:
        pod_name = i.metadata.name
        pod_namespace = i.metadata.namespace
        pod_unavailable_replicas = i.status.unavailable_replicas
        # 不可用副本状态为None表示没有不可用的副本,程序正常
        if pod_unavailable_replicas == None:
            pod_status = 1
        else:
            pod_status = 0
        pod_json = {"pod_namespace": pod_namespace, "pod_name": pod_name, "pod_status": pod_status}
        pod_list.append(pod_json)
    # 检查stateful_set信息
    resp_stateful = apis_api.list_stateful_set_for_all_namespaces()
    for i in resp_stateful.items:
        pod_name = i.metadata.name
        pod_namespace = i.metadata.namespace
        # 正常工作的副本数量,等于期望的副本数量时,表明程序是可用的
        if i.status.ready_replicas == i.status.replicas:
            pod_status = 1
        else:
            pod_status = 0
        pod_json = {"pod_namespace": pod_namespace, "pod_name": pod_name, "pod_status": pod_status}
        pod_list.append(pod_json)
    # 检查daemonset信息
    resp_daemonset = apis_api.list_daemon_set_for_all_namespaces()
    for i in resp_daemonset.items:
        pod_name = i.metadata.name
        pod_namespace = i.metadata.namespace
        # 不可用副本状态为None表示没有不可用的副本,程序正常
        if i.status.number_unavailable == None:
            pod_status = 1
        else:
            pod_status = 0
        pod_json = {"pod_namespace": pod_namespace, "pod_name": pod_name, "pod_status": pod_status}
        pod_list.append(pod_json)
    return json(pod_list)


# Add OPTIONS handlers to any route that is missing it
app.register_listener(setup_options, "before_server_start")

# Fill in CORS headers
app.register_middleware(add_cors_headers, "response")
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

cors.py 解决跨域问题,主要是本地调试方便,放到我的helm包里部署到K8S上时,是不需要的,因为我会用nginx把他反向代理过去

#cors.py
from typing import Iterable


def _add_cors_headers(response, methods: Iterable[str]) -> None:
    '''
    为了在测试的时候偷懒,我把Access-Control-Allow-Origin设置成了*
    如果是做成镜像和我的helm包一起用,是不需要这样的,因为我会用nginx把后端和前端设置成同源
    '''
    allow_methods = list(set(methods))
    if "OPTIONS" not in allow_methods:
        allow_methods.append("OPTIONS")
    headers = {
        "Access-Control-Allow-Methods": ",".join(allow_methods),
        "Access-Control-Allow-Origin": "*",
        "Access-Control-Allow-Credentials": "true",
        "Access-Control-Allow-Headers": (
            "origin, content-type, accept, "
            "authorization, x-xsrf-token, x-request-id"
        ),
    }
    response.headers.extend(headers)


def add_cors_headers(request, response):
    if request.method != "OPTIONS":
        methods = [method for method in request.route.methods]
        _add_cors_headers(response, methods)

options.py 搭配上面的cors.py使用

# options.py
from collections import defaultdict
from typing import Dict, FrozenSet

from sanic import Sanic, response
from sanic.router import Route

from cors import _add_cors_headers


def _compile_routes_needing_options(
        routes: Dict[str, Route]
) -> Dict[str, FrozenSet]:
    needs_options = defaultdict(list)
    # This is 21.12 and later. You will need to change this for older versions.
    for route in routes.values():
        if "OPTIONS" not in route.methods:
            needs_options[route.uri].extend(route.methods)

    return {
        uri: frozenset(methods) for uri, methods in dict(needs_options).items()
    }


def _options_wrapper(handler, methods):
    def wrapped_handler(request, *args, **kwargs):
        nonlocal methods
        return handler(request, methods)

    return wrapped_handler


async def options_handler(request, methods) -> response.HTTPResponse:
    resp = response.empty()
    _add_cors_headers(resp, methods)
    return resp


def setup_options(app: Sanic, _):
    app.router.reset()
    needs_options = _compile_routes_needing_options(app.router.routes_all)
    for uri, methods in needs_options.items():
        app.add_route(
            _options_wrapper(options_handler, methods),
            uri,
            methods=["OPTIONS"],
        )
    app.router.finalize()

requirements.txt 放置python需要用到的sdk

aiofiles==0.8.0
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.10
google-auth==2.3.3
httptools==0.3.0
idna==3.3
Jinja2==3.0.3
kubernetes==21.7.0
MarkupSafe==2.0.1
multidict==5.2.0
oauthlib==3.1.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
python-dateutil==2.8.2
PyYAML==6.0
requests==2.27.1
requests-oauthlib==1.3.0
rsa==4.8
sanic==21.12.1
sanic-ext==21.12.3
sanic-routing==0.7.2
six==1.16.0
urllib3==1.26.8
websocket-client==1.2.3
websockets==10.1

Dockerfile 打包后端代码成镜像使用 docker build -t k8s-backend .

FROM python:3.9
ADD . .
RUN pip install -r /requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
WORKDIR .
CMD ["python3","main.py"]

前端代码解说

主要就是App.vue和main.ts两个文件

这里省略nodejs和vue的安装过程,使用下面的命令创建一个vue3的项目

# 下载vue的过程省略,创建一个vue项目,创建的时候,选择typescript版本
vue create k8s-frontend
# 安装element-ui的vue3版本
npm install element-plus --save
# npm安装axios,用于向后台发起请求
npm i axios -S

main.ts 主入口

import { Component, createApp } from 'vue'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
import App from './App.vue'
const app = createApp(App)

app.use(ElementPlus)

app.mount('#app')

App.vue 做demo图省事,我就用了这一个vue文件放了所有功能

<template>

  <h2>服务器信息</h2>
  <el-table 
    :data="tableData" style="width: 100%">
    <el-table-column prop="computer_ip" label="IP地址" width="180" />
    <el-table-column prop="computer_name" label="服务器名字" width="180" />
    <el-icon><check /></el-icon>
    <el-table-column label="网络插件" width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.NetworkUnavailable ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
    <el-table-column label="内存压力" width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.MemoryPressure ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
    <el-table-column label="硬盘压力" width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.DiskPressure ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
    <el-table-column label="进程压力" width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.PIDPressure ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
    <el-table-column label="K3S状态" width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.Ready ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
  </el-table>
  <el-divider></el-divider>
  <h2>应用程序信息</h2>
  <el-table
    :data="podData"
    style="width: 100%"
    :default-sort="{ prop: 'pod_status', order: 'ascending' }"
  >
    <el-table-column prop="pod_namespace" sortable  label="命名空间" width="180" />
    <el-table-column prop="pod_name" sortable label="应用名字" width="180" />
    <el-icon><check /></el-icon>
    <el-table-column prop="pod_status" label="是否正常" sortable width="100">
      <template #default="scope">
        <el-icon :size="20">
          <check class="check" v-if="scope.row.pod_status ==1" />
          <close class="close" v-else />
        </el-icon>
      </template>
    </el-table-column>
  </el-table>
  <el-divider></el-divider>
</template>

<script lang="ts" >
import { Options, Vue } from 'vue-class-component';
import { Check, Close } from '@element-plus/icons-vue';
import axios from 'axios'

@Options({
    // 这里可以配置Vue组件支持的各种选项
    components: {
        Check,
        Close
    },
    data() {
        return {
          podData: [],
          tableData: [],
        }
    },
    mounted() {
      this.pod();
      this.show();
    },
    methods: {
        say(){
          console.log("say");
        },
        pod(){
          const path = "http://127.0.0.1:8000/api/pod";
          //本地调试使用,在服务器上还是用相对路径
          // const path = "http://127.0.0.1:8000/node";
          // 务必使用箭头函数的方法,这样this.id能直接对上,不然会报错提示id没找到
          axios.get(path).then((response) => {
            this.podData = response.data;
          });
        },
        show() {
        const path = "http://127.0.0.1:8000/api/node";
        //本地调试使用,在服务器上还是用相对路径
        // const path = "http://127.0.0.1:8000/node";
        // 务必使用箭头函数的方法,这样this.id能直接对上,不然会报错提示id没找到
        axios.get(path).then((response) => {
          this.tableData = response.data;
        });
      },
    }
})
export default class App extends Vue {
}
</script>

<style>
#app {
  font-family: Avenir, Helvetica, Arial, sans-serif;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  text-align: left;
  color: #2c3e50;
  margin-top: 60px;
}
</style>

Dockerfile 用于制作前端镜像 docker build -t k8s-frontend .

FROM  node:14-alpine3.12 AS build

LABEL maintainer="sunj@sfere-elec.com"

ADD . /build/

RUN set -eux \
    && yarn config set registry https://mirrors.huaweicloud.com/repository/npm/ \
    && yarn config set sass_binary_site https://mirrors.huaweicloud.com/node-sass \
    && yarn config set python_mirror https://mirrors.huaweicloud.com/python \
    && yarn global add yrm \
    && yrm add sfere http://repo.sfere.local:8081/repository/npm-group/ \
    && yrm use sfere \
    && cd /build \
    && yarn install \
    && yarn build

FROM nginx:1.21.5-alpine
LABEL zhenwei.li "zhenwei.li@sfere-elec.com"
COPY --from=build /build/dist/ /usr/share/nginx/html
# 暴露端口映射
EXPOSE 80

HELM包解说

deployment.yaml 把两个docker镜像放在同一个deployment里

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: {{ .Release.Name }}
  name: {{ .Release.Name }}
spec:
  replicas: 1
  revisionHistoryLimit: 5
  selector:
    matchLabels:
      app: {{ .Release.Name }}
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: {{ .Release.Name }}
    spec:
      containers:
        - image: k8s-check-backend
          imagePullPolicy: Always
          name: server-check-backend
          resources: {}
        - image: k8s-check-frontend
          imagePullPolicy: Always
          name: server-check-frontend
          resources: {}
          volumeMounts:
          - name: nginx-conf
            mountPath: /etc/nginx/conf.d/default.conf
            subPath: default.conf
      restartPolicy: Always
      volumes:
        - name: nginx-conf
          configMap:
            name: {{ .Release.Name }}
            items:
            - key: default.conf
              path: default.conf
      serviceAccountName: {{ .Release.Name }}

service.yaml 把前端通过nodeport方式暴露出去,方便测试

apiVersion: v1
kind: Service
metadata:
  labels:
    app: {{ .Release.Name }}
  name: {{ .Release.Name }}
spec:
  type: NodePort
  ports:
    - name: web
      port: 80
      targetPort: 80
      nodePort: 32666
  selector:
    app: {{ .Release.Name }}

configmap.yaml nginx的配置文件,反向代理后端

apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ .Release.Name }}
data:
  default.conf: |
    # 当前项目nginx配置文件,lzw
    server {
        listen       80;
        server_name  _A;
        gzip on;
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   /usr/share/nginx/html;
        }

        location / {
            root /usr/share/nginx/html;
            index index.html index.htm;

            if (!-e $request_filename){
                    rewrite ^/.* /index.html last;
            }
        }
        location /api {
            proxy_pass          http://localhost:8000;
            proxy_http_version 1.1;
            proxy_set_header    X-Real-IP           $remote_addr;
            proxy_set_header    X-Forwarded-For     $proxy_add_x_forwarded_for;
        }

        error_page   500 502 503 504  /50x.html;
    }

rbac.yaml 我们的程序是需要访问k8s资源的,如果没有配置rbac,调用K8S的API会报403错误

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  labels:
    app.kubernetes.io/name: {{ .Release.Name }}
  name: {{ .Release.Name }}
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: {{ .Release.Name }}
subjects:
- kind: ServiceAccount
  name: {{ .Release.Name }}
  namespace: {{ .Release.Namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    app.kubernetes.io/name: {{ .Release.Name }}
  name: {{ .Release.Name }}
rules:
- apiGroups:
  - ""
  resources:
  - configmaps
  - secrets
  - nodes
  - pods
  - services
  - resourcequotas
  - replicationcontrollers
  - limitranges
  - persistentvolumeclaims
  - persistentvolumes
  - namespaces
  - endpoints
  verbs:
  - list
  - watch
- apiGroups:
  - extensions
  resources:
  - daemonsets
  - deployments
  - replicasets
  - ingresses
  verbs:
  - list
  - watch
- apiGroups:
  - apps
  resources:
  - statefulsets
  - daemonsets
  - deployments
  - replicasets
  verbs:
  - list
  - watch
---
apiVersion: v1
kind: ServiceAccount
metadata:
  labels:
    app.kubernetes.io/name: {{ .Release.Name }}
  name: {{ .Release.Name }}
  namespace: {{ .Release.Namespace }}

程序安装

helm install k8s-check helm/k8s-server-check

安装完成后,通过http://masterip:32666访问即可

python通过sdk从minio下载文件时添加进度条

背景

Minio是就地环境下比较好用的对象存储工具,适合在CI/CD流程中使用。主要是因为GIT里用LFS来放大文件不妥,把部署流程中需要的中间文件放minio上,通过SDK去存取文件非常方便。

Minio的上传文件fput_object有progress参数,但是下载文件fget_object默认没有 progress 参数,所以我们需要自己用get_object对代码稍加改造

涉及到的库

https://github.com/verigak/progress

用于提供进度条

pip install progress
pip install minio

代码

from minio import Minio
from progress.bar import Bar


def get_object_with_progress(client, bucket_name, object_name):
    try:
        data = client.get_object(bucket_name, object_name)
        total_length = int(data.headers.get('content-length'))
        bar = Bar(object_name, max=total_length / 1024 / 1024, fill='*', check_tty=False,
                  suffix='%(percent).1f%% - %(eta_td)s')
        with open('./' + object_name, 'wb') as file_data:
            for d in data.stream(1024 * 1024):
                bar.next(1)
                file_data.write(d)
        bar.finish()
    except Exception as err:
        print(err)


if __name__ == '__main__':
    client = Minio(
        "play.min.io",
        access_key="Q3AM3UQ867SPQQA43P2F",
        secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
    )

    bucket_name = "downlaod"
    object_name = "eiop-timescaledb-offline.zip"
    get_object_with_progress(client, bucket_name, object_name)

实现效果


苏ICP备18047533号-1