作者: 李镇伟

基于istio的灰度发布实验

背景

灰度发布又叫A/B测试,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。
因为最近刚好有灰度发布的需求,我又学了一遍istio,记录了本次灰度发布的实施过程(只包括应用,不包括数据库升级)


实验过程

  1. 先确定目前的应用版本为V1
  2. 通过helm包部署应用版本为V2的pod到K8S集群中
  3. 确定V2版本灰度的用户,方法包括IP,或者特定用户
  4. 通过istio的virtualservice功能把特定用户的流量指向V2版本
  5. 检查特定用户使用一段时间后,是否出现问题
  6. 若无问题,通过istio将所有用户的流量都指向V2版本
  7. 若所有用户都使用V2无问题,删除掉V1版本的pod

示例介绍

前端应用frontend,后端应用mqtt-server,后端应用mqtt-server 通过mqtt协议与设备相连接。
前端部署3个版本,分别是V1,V2,V3,后端同样部署3个版本,也是V1,V2,V3。3个前端版本,按钮文字不一样。3个后端版本,连接的mqtt设备不一样

版本 前端页面 后端返回参数
V1 显示V11按钮
{"message":["wsytest010","wsytest002",
"wsytest003","wsytest007","wsytest006",
"wsytest001","wsytest005","wsytest009",
"wsytest008","wsytest004"]}
V2 显示V22按钮
{"message":["wsytest019","wsytest020",
"wsytest017","wsytest012","wsytest011",
"wsytest014","wsytest018","wsytest015",
"wsytest013","wsytest016"]}
V3 显示V33按钮
{"message":["wsytest024","wsytest028",
"wsytest022","wsytest026","wsytest027",
"wsytest021","wsytest025","wsytest030",
"wsytest023","wsytest029"]}


根据需求,版本不能串,比如前端V1->后端V1,不允许出现前端V1→后端V2这样的情况发生
这里我们在选择分配流量方式时,不能使用权重的方式进行分配,只能选择指定用户或者指定IP,如果选择权重的方式,可能会出现如下的问题:
前端会访问多个js,css等文件,如果使用权重的方式,会出现一部分js来源于v1版本,一部分css来源于v2版本。
后端也同理,如果一个页面打开时,触发多个后端请求,部分来源于V2,部分来源于V1,肯定会导致前端显示出现问题。
所以只有把前后端通过某种方式一一对应,才能正常使用


代码实现与注意事项

1.部署前端的3个应用程序,所有的pod都加上 labels:[app:frontend,version:#{对应版本}]

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: frontend
  labels:
    app: frontend
    version: v1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: frontend
      version: v1
  template:
    metadata:
      labels:
        app: frontend
        version: v1
    spec:
      containers:
      - name: frontend
        image: 前端镜像:v1
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]    # 按照istio的说明,最好把这个pod安全策略加上
        imagePullPolicy: Always
        ports:
        - containerPort: 80
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: frontend-v2
  labels:
    app: frontend
    version: v2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: frontend
      version: v2
  template:
    metadata:
      labels:
        app: frontend
        version: v2
    spec:
      containers:
      - name: frontend
        image: 前端镜像:v2
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]
        imagePullPolicy: Always
        ports:
        - containerPort: 80
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: frontend-v3
  labels:
    app: frontend
    version: v3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: frontend
      version: v3
  template:
    metadata:
      labels:
        app: frontend
        version: v3
    spec:
      containers:
      - name: frontend
        image: 前端镜像:v3
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]
        imagePullPolicy: Always
        ports:
        - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: frontend
spec:
  selector:
    app: frontend
  type: ClusterIP   #这个不用NodePort,因为流量如果是从NodePort进来的,就控不住的
  ports:
    - port: 80
      targetPort: 80
      name: http-web

2.部署后端应用程序,与前端应用类似

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-server-v1
  labels:
    app: mqtt-server
    version: v1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mqtt-server
      version: v1
  template:
    metadata:
      labels:
        app: mqtt-server
        version: v1
    spec:
      serviceAccountName: mqtt-server
      containers:
      - name: mqtt-server
        image: 后端镜像:latest
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]    # 按照istio的说明,最好把这个pod安全策略加上
        imagePullPolicy: Always
        ports:
        - containerPort: 8000
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-server-v2
  labels:
    app: mqtt-server
    version: v2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mqtt-server
      version: v2
  template:
    metadata:
      labels:
        app: mqtt-server
        version: v2
    spec:
      serviceAccountName: mqtt-server
      containers:
      - name: mqtt-server
        image: 后端镜像:latest
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]
        imagePullPolicy: Always
        ports:
        - containerPort: 8000
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-server-v3
  labels:
    app: mqtt-server
    version: v3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mqtt-server
      version: v3
  template:
    metadata:
      labels:
        app: mqtt-server
        version: v3
    spec:
      serviceAccountName: mqtt-server
      containers:
      - name: mqtt-server
        image: 后端镜像:latest
        securityContext:
          capabilities:
            add: ["NET_ADMIN", "NET_RAW"]
        imagePullPolicy: Always
        ports:
        - containerPort: 8000
---
apiVersion: v1
kind: Service
metadata:
  name: mqtt-server
spec:
  selector:
    app: mqtt-server
  type: NodePort   #这个不用NodePort,因为流量如果是从NodePort进来的,就控不住的
  ports:
    - port: 8000
      targetPort: 8000
      name: http-web

3.区分外部流量和内部流量。我们将浏览器到前端的称为外部流量,K8S里的例如前端到后端的称为内部流量

4.外部流量出去,需要被istio的ingress gateway管控起来,所以需要配置一个gateway

apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: bookinfo
spec:
  selector:
    istio: ingressgateway # use istio default controller
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"

5.配置后端的virtualservice和destination,确保后端程序能与前端程序产生一对一的关系,在无对应关系时,默认使用V1版本

---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: mqtt-server-internal
spec:
  hosts:
  - "mqtt-server"     #此处是关键,把匹配到该url的流量,全部走到这个特定的virtualservice里
  http:
  - match:
    - sourceLabels:
        version: v1
    route:
    - destination:
        host: mqtt-server
        subset: v1             # 将匹配到的流量,转向subset的v1版本,这个subset: v1在destination.yaml里定义
      headers:
        response:
          add:
            user: v1
  - match:
    - sourceLabels:
        version: v2
    route:
    - destination:
        host: mqtt-server
        subset: v2
      headers:
        response:
          add:
            user: v2
  - match:
    - sourceLabels:
        version: v3
    route:
    - destination:
        host: mqtt-server
        subset: v3
      headers:
        response:
          add:
            user: v3
  - route:
    - destination:
        host: mqtt-server
        subset: v1
      headers:
        response:
          add:
            user: v1
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: mqtt-server
spec:
  host: mqtt-server.default.svc.cluster.local
  subsets:
  - name: v1
    labels:
      version: v1    # 根据pod的 version: v1 的label来进行匹配
  - name: v2
    labels:
      version: v2
  - name: v3
    labels:
      version: v3

6.配置前端的virtualservice和destination,我们可以设置来源于192.168.0.58这个IP的走V2版本,其余IP走V1版本

apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: frontend-server
spec:
  hosts:
  - "外网域名"     #此处是关键,把匹配到该url的流量,全部走到这个特定的virtualservice里
  gateways:
  - bookinfo-gateway              #此处必须对应上gateway的名字
  http:
  - match:
    - headers:
      X-Forwarded-For:
          exact: "192.168.0.58"            #此处表示匹配header里有{"user":"v1"}
    route:
    - destination:
        host: mqtt-server
        subset: v2             # 将匹配到的流量,转向subset的v1版本,这个subset: v1在destination.yaml里定义
      headers:
        response:
          add:
            user: v2
  - route:
    - destination:
        host: frontend
        subset: v1             # 将匹配到的流量,转向subset的v1版本,这个subset: v1在destination.yaml里定义
      headers:
        response:
          add:
            user: v1
---
apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: frontend
spec:
  host: frontend
  subsets:
  - name: v1
    labels:
      version: v1    # 根据pod的 version: v1 的label来进行匹配
  - name: v2
    labels:
      version: v2
  - name: v3
    labels:
      version: v3

7.因为我们的浏览器访问的时候,会经过istio,所以前端收到的IP并不是真是的IP,我们需要修改istio的ingress文件,把spec.externalTrafficPolicy设置成Local,如下图所示

8.最终情况


实验效果图

1.当本机IP地址不符合条件时,前端和后端都是V1版本的结果,第一张图是实际效果,第二张图是kiali显示的流量图
2.当本机IP符合条件时,前端和后端都是V2版本的结果,左图是实际效果,右图是kiali显示的流量图

3.当同时有满足IP和不满足IP条件的机器访问时,流量图效果如下

从零开始安装istio与skywalking

版本

istio 安装1.8.2版本
skywalking安装8.1.0版本
K8S集群使用rancher安装1.19版本


istio安装

1.下载istio包到k8s的任意一台master机器上

curl -L https://istio.io/downloadIstio | sh -

2. 进入istio目录,设置环境变量,后续我们的istio安装,都在该目录下进行操作

cd istio-1.8.2
export PATH=$PWD/bin:$PATH

3.安装istio,同时设置skywalking-oap地址

输入如下命令
istioctl install \
  --set profile=demo \
  --set meshConfig.enableEnvoyAccessLogService=true \
  --set meshConfig.defaultConfig.envoyAccessLogService.address=skywalking-oap.istio-system:11800 等待出现如下回显即可完成 ✔ Istio core installed
✔ Istiod installed
✔ Egress gateways installed
✔ Ingress gateways installed
✔ Installation complete

4.安装kiali。安装成功后,记得把kiali通过ingress暴露出来,我是使用的traefik来暴露的

kubectl apply -f samples/addons
kubectl rollout status deployment/kiali -n istio-system

skywalking安装

git clone https://github.com/apache/skywalking-kubernetes.git
cd skywalking-kubernetes/chart
helm repo add elastic https://helm.elastic.co
helm dep up skywalking
helm install 8.1.0 skywalking -n istio-system \
  --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
  --set fullnameOverride=skywalking \
  --set oap.envoy.als.enabled=true \
  --set ui.image.tag=8.1.0 \
  --set oap.image.tag=8.1.0-es6 \
  --set oap.storageType=elasticsearch

上述命令会安装一个skywalking和一个elasticsearch 6.8.6版本
安装完后,暴露skywalking-ui到外部,让用户可以通过页面访问


安装测试程序

通过以下命令进行安装,安装完成后,可以自己设置一下,通过loadalance暴露到外网访问,loadbalance的IP可以通过metallb搞一个

kubectl apply -f samples/bookinfo/platform/kube/bookinfo.yaml
kubectl apply -f samples/bookinfo/networking/bookinfo-gateway.yaml
kubectl apply -f samples/bookinfo/networking/destination-rule-all.yaml

部署效果

浏览器进入bookinfo测试程序

进入kiali检查出现的流量分布情况

skywalking的界面

EMQX在K8S中的扩缩容测试与雪崩

背景

emqx使用statefulset的方式部署3-5个pod ,设备认证采用redis的方式

测试步骤

测试目的 测试步骤 测试结果
1 搭建emqx环境,pod数量为3
2 确认emqx的负载均衡功能 通过python编写mqtt连接代码,
连接99个client,检查99个client的分布情况
平均分布在3个pod上,每个pod上连接了33个client。
多个pod能实现负载均衡。
3 增加emqx的数量,检查连接是否发生变化 通过rancher,修改pod的数量为5,
检查新pod启动之后,99个client的分布情况
新增的2个pod,不会自动分担之前连接的client,原来的
99个client 还是连接到之前的3个pod上。扩容不会影响现有连接。
4 增加客户端的数量,检查服务端负载情况。
看是否能出现削峰平谷,让新连接的pod多负载
一些连接
通过python编写mqtt连接代码,
增加50个client,检查99+50个client的分布情况
每一个pod的连接数量,平摊了10个。两个新的pod变成了10个连接数
原来的3个pod从33个连接变成了43个连接。
新增的客户端也会平均分配到各个pod上,但是分配的方法也是完全平均的,不会考虑emqx当前已经连接的数量。
没有出现削峰平谷的现象。
5 全部重连150个客户端,检查服务端负载情况。
看是否会全部平均分配到5个pod上
断掉之前的连接。
通过python编写mqtt连接代码,
一次性连接150个client,检查他们的分布情况
每个pod分摊30个连接,扩容之后,重新连接的client能够负载均衡
6 缩小emqx的数量,检查连接情况 通过rancher,修改pod的数量为3 消失的2个pod会导致相连客户端断开连接,剩余的3个pod连接的客户端无影响
7 给客户端加上掉线重连功能,再次缩小emqx的数量 通过python编写mqtt连接代码,加上自动重连功能 ,缩小emqx的数量 所有的客户端都会连接到那个1个pod上,虽然客户端产生了掉线,但是不会影响后续使用

结论

1.我们自己实现的mqtt客户端要有重连机制,如果无重连机制,会在缩容或者重启之后与emqx失去连接
2.只要资源充裕,emqx或者redis重启,不会产生太大的影响,顶多是emqx重启的那几秒钟会丢数据
3.emqx的扩容要提前扩容,不能等出现即将出现故障的时候在扩容,因为扩容之后的数据虽然是均摊了,但是前面的2个emqx的连接数还是没有减少
假设每个emqx能容纳100个client,当达到80个client的时候,我们才进行扩容,虽然后面再连上的是负载均衡了,但是在极端情况下还是有可能出现雪崩,如下图

除非此时断开emqx与外部的连接,等待emqx3个实例全部启动之后,再允许emqx的外部连接。
这样每个节点都到82%,没有雪崩

为k8s里运行的容器配置时区

需求背景

我们经常会用一些诸如emqx,nats等第三方中间件,这些中间件往往默认时区就是UTC时区,这其实也没关系,但是打印出来的日志,就会与我们的上海时区差8小时,为了解决这个问题,我的简单解决办法,就是把服务器的时区通过可读的方式挂载进去。

操作步骤

1、把服务器的时区设置成上海时区。ubuntu18系统时区设置方法如下:

timedatectl set-timezone Asia/Shanghai

2、修改emqx的StatefulSet.yaml 。注意挂载服务器的/etc/localtime到容器中,一定要设置readOnly: true,避免被误修改服务器的时区

# Source: emqx/templates/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  labels:
    app: lzw-emqx
  name: lzw-emqx
spec:
  serviceName: lzw-emqx
  replicas: 1
  updateStrategy:
    type: RollingUpdate
  selector:
    matchLabels:
      app: lzw-emqx
  template:
    metadata:
      labels:
        app: lzw-emqx
    spec:
      containers:
        - image: emqx/emqx:4.2.4-alpine-amd64
          imagePullPolicy: IfNotPresent
          name: lzw-emqx
          envFrom:
            - configMapRef:
                name: lzw-emqx
          volumeMounts:
            - name: lzw-emqx-log
              mountPath: /opt/emqx/log
            - name: sfere-time-zone
              mountPath: /etc/localtime
              readOnly: true
          readinessProbe:
            httpGet:
              path: /status
              port: 8081
            initialDelaySeconds: 15
            periodSeconds: 2
      restartPolicy: Always
      volumes:
        - name: lzw-emqx-log
          emptyDir: {}
        - name: sfere-time-zone
          hostPath:
              path: /etc/localtime

traefik配置用户登录,限制K8S的web服务访问

背景

像Elastic-APM, Traefik-Dashboard等页面,是没有用户登录限制的,如果我们希望给他们加上用户登录限制,我们需要在traeifk里给对应的ingress添加登录用的Middleware,那么该如何添加呢?本文以给K8S部署的traefik dashboard为例进行添加

用户名密码加密

1.假设有如下3个用户名密码
lizhenwei 123
zhenwei.li 456
hello thankyou
2.我们通过htpasswd进行加密

htpasswd -nb lizhenwei 123
lizhenwei:$apr1$0wIJg4EG$RZ7wOIyIdg1R4gj4zAlzq1
htpasswd -nb zhenwei.li 456
zhenwei.li:$apr1$PX8cqECj$5zvC3eB1vhLioyjVjdkkE/
htpasswd -nb hello thankyou
hello:$apr1$4nlPGEqZ$nqz2ojkuxAY4FUEy0Tp3x1

3.将加密的信息放入一个叫policy的文件

vi policy
lizhenwei:$apr1$0wIJg4EG$RZ7wOIyIdg1R4gj4zAlzq1 zhenwei.li:$apr1$PX8cqECj$5zvC3eB1vhLioyjVjdkkE/ hello:$apr1$4nlPGEqZ$nqz2ojkuxAY4FUEy0Tp3x1

4.进行base64加密,获得加密后的字符

cat policy | openssl base64 

bGl6aGVud2VpOiRhcHIxJDB3SUpnNEVHJFJaN3dPSXlJZGcxUjRnajR6QWx6cTEK emhlbndlaS5saTokYXByMSRQWDhjcUVDaiQ1enZDM2VCMXZoTGlveWpWamRra0Uv CmhlbGxvOiRhcHIxJDRubFBHRXFaJG5xejJvamt1eEFZNEZVRXkwVHAzeDEK

创建middleware.yaml

将加密后的字符,复制到data.users下面

# Declaring the user list
apiVersion: traefik.containo.us/v1alpha1
kind: Middleware
metadata:
  name: test-auth
spec:
  basicAuth:
    secret: authsecret
---
# Note: in a kubernetes secret the string (e.g. generated by htpasswd) must be base64-encoded first.
# To create an encoded user:password pair, the following command can be used:
# htpasswd -nb user password | openssl base64
apiVersion: v1
kind: Secret
metadata:
  name: authsecret
  namespace: default
data:
  users: |2
    bGl6aGVud2VpOiRhcHIxJDB3SUpnNEVHJFJaN3dPSXlJZGcxUjRnajR6QWx6cTEK
    emhlbndlaS5saTokYXByMSRQWDhjcUVDaiQ1enZDM2VCMXZoTGlveWpWamRra0Uv
    CmhlbGxvOiRhcHIxJDRubFBHRXFaJG5xejJvamt1eEFZNEZVRXkwVHAzeDEK

创建ingress.yaml

定义访问路径,定义中间件

# dashboard.yaml
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: dashboard
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`traefik.test.local`) && (PathPrefix(`/dashboard`) || PathPrefix(`/api`))
      kind: Rule
      services:
        - name: api@internal
          kind: TraefikService
      middlewares:
        - name: test-auth

生效配置

kubectl apply -f middleware.yaml
kubectl apply -f ingress.yaml

检查打开网页时,是否弹出登录对话框

更新密码

如果我们要更新密码,可以重新使用htpasswd生成密码,然后放在policy文件中,使用命令行更新

kubectl create secret generic authsecret --from-file=users=./policy --dry-run=client -o yaml | kubectl apply -f -

 

traefik暴露kubernetes里的http服务和tcp服务

traeifk使用说明

traefik使用helm安装,搭配metalLB使用,由metalLB分配IP地址给traefik的loadbalancer

helm repo add traefik https://helm.traefik.io/traefik
helm upgrade -i traefik traefik/traefik --version 9.11.0 -f traefik/values.yaml

traefik暴露http服务的例子

配置如下http-ingress.yaml文件,暴露一个nginx,浏览器通过nginx.demo.test.local访问

#http-ingress.yaml
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: simpleingressroute
  namespace: default
spec:
  entryPoints:
    - web
  routes:
  - match: Host(`nginx.demo.test.local`)
    kind: Rule
    services:
    - name: test-demo-service
      port: 80

解释:

  1. metadata.name不能重复
  2. namespace是暴露的svc对应的namespace
  3. Host(nginx.demo.test.local)表示接受到浏览器访问nginx.aimp.sferetest.local的时候,转到test-demo-service服务的80端口
  4. entryPoints对应的是我们安装traefik时,values.yaml里的ports参数下面的名称,如web的8000是内部端口,80是对外提供访问的端口
#values.yaml
ports:
  # The name of this one can't be changed as it is used for the readiness and
  # liveness probes, but you can adjust its config to your liking
  web:
    port: 8000
    # hostPort: 8000
    expose: true
    exposedPort: 80
    # The port protocol (TCP/UDP)
    protocol: TCP

traefik暴露tcp服务的例子

配置如下tcp-ingress.yaml文件,暴露一个redis,通过loadbalancerIP:6851访问

# tcp-ingress.yaml
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRouteTCP
metadata:
  name: redisingleressroute
  namespace: test
spec:
  entryPoints:
    - redis
  routes:
  - match: HostSNI(`*`)
    kind: Rule
    services:
    - name: redis
      port: 6379

解释:

  1. kind得是IngressRouteTCP
  2. match必须是HostSNI(*)
  3. entryPoints对应的是我们安装traefik时,values.yaml里的ports参数下面的名称,如redis的6379是内部端口,6851是对外提供访问的端口
#values.yaml
  redis:
    port: 6379
    # hostPort: 8000
    expose: true
    exposedPort: 6851
    # The port protocol (TCP/UDP)
    protocol: TCP

timescaledb升级

背景

使用docker安装的timescaledb与postgresQL数据库。目前我们需要升级docker镜像以及目前正在使用的数据库。单纯的更换docker镜像是行不通的,请按照如下方式进行升级。
原版本是timescale/timescaledb-postgis:1.4.0-pg11
新版本是timescale/timescaledb-postgis:1.7.4-pg11

操作步骤

1.拉取最新的镜像

docker pull timescale/timescaledb:1.7.4-pg11

2.检查老容器挂载的数据目录

$ docker inspect timescaledb --format='{{range .Mounts }}{{.Source}}{{end}}'
/path/to/data

3.删除老容器

docker stop timescaledb
docker rm timescaledb

4.使用挂载的数据目录和新拉取的镜像,创建新容器

docker run -v /path/to/data:/var/lib/postgresql/data -d --name timescaledb -p 5432:5432 timescale/timescaledb

5.更新template1的timescaledb插件(重要)
如果不更新template1的话,后续创建的所有database还是老的1.4.0插件

docker exec -it timescaledb bash
su postgres
psql template1
ALTER EXTENSION timescaledb UPDATE;

6.更新已经存在的database的timescaledb插件(重要)
对所有“已经存在的数据库”进行插件更新,不然会导致无法连接,报错如下:
ERROR: could not access file “$libdir/timescaledb-1.4.2”: No such file or directory

docker exec -it timescaledb bash
su postgres
psql 已经存在的数据库
ALTER EXTENSION timescaledb UPDATE;

Elasticsearch重建索引,收集nginx日志,以request_time为指标分析接口响应时间

起因

filebeat采集nginx的日志,以json格式解析后传入elasticsearch,全部字段都是text格式,我们需要把request_time变成double格式才能使用聚合搜索request_time的最大值.
但是elasticsearch的index一旦建立好之后,字段只能新增,不能修改,所以要修改request_time的数据类型,只能重建索引。
我们的步骤是:1.获得老索引的mapping信息,2.用这个mapping信息新建一个索引 3.用reindex方法,把老索引的数据迁移到新索引 4.确认新索引数据迁移成功,5.删除老索引 6.获得出新索引的mapping,7.使用新索引的mapping创建老索引。8.把新索引的数据倒回老索引 9.删除老索引
假设老索引:V1
临时索引:V2
nginx统计接口路径:path字段
nginx统计响应时间:request_time字段


流程图与说明


python代码

根据path,聚合查询出响应最大时间和平均时间,保留最大响应时间前500个到csv文件里

#
#  created by zhenwei.Li at 2020/11/3 17:50
#
#  filename : example4.py
#  description :
import csv
import json
import requests
if __name__ == '__main__':
    send_json = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": 1533556800000,
                                "lte": 1604470685934
                            }
                        }
                    }
                ]
            }
        },
        "size": 0,
        "aggs": {
            "Job_gender_stats": {
                "terms": {
                    "field": "path.keyword",
                    "size": 500,
                    "order": {
                        "max_request_time": "desc"
                    }
                },
                "aggs": {
                    "max_request_time": {
                        "max": {
                            "field": "request_time"
                        }
                    },
                    "avg_request_time": {
                        "avg": {
                            "field": "request_time"
                        }
                    }
                }
            }
        }
    }
    res = requests.post(url="http://192.168.0.174:32164/192.168.0.67-eiop-frontend/_search", json=send_json)
    print(json.dumps(res.json()['aggregations']['Job_gender_stats']['buckets'], sort_keys=True, indent=4))
    buckets = res.json()['aggregations']['Job_gender_stats']['buckets']
    file_handle = open('research.csv', 'w', encoding='utf-8', newline='' "")
    # 2. 基于文件对象构建 csv写入对象
    csv_writer = csv.writer(file_handle)
    # 3. 构建列表头
    csv_writer.writerow(["路径", "出现次数", "平均响应时间(秒)", "最大响应时间(秒)"])
    for item in buckets:
        csv_writer.writerow(
            [item['key'], item['doc_count'], item['avg_request_time']['value'], item['max_request_time']['value']])
    # 5. 关闭文件
    file_handle.close()

效果图

Golang使用协程并发多个mqtt的publish信息(读取csv,发送json格式报文)

需求

开发语言:golang
目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接
publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容
登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

话不多说,直接上代码

main.go

package main
import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"os"
	"strconv"
	"time"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)
var total int
// 读取csv文件里的第3列数据,存入一个string数组里
func readcsv(filename string) []string {
	var userNameList []string
	f, _ := os.Open(filename)
	defer f.Close()
	w := csv.NewReader(f)
	data, err := w.ReadAll()
	if err != nil {
		fmt.Println(err)
	}
	for i := range data {
		userNameList = append(userNameList, data[i][2])
	}
	return userNameList
}
func mqttDevice(username string, end_number chan int) {
	// mqtt设备连接,设置IP地址
	opts := mqtt.NewClientOptions().AddBroker("localhost:1883")
	// 设置连接的用户名密码
	opts.SetUsername(username)
	// 使用连接信息进行连接
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	time.Sleep(1 * time.Second)
	fmt.Println("connect success:" + username)
	// 读取json文件,json文件里的是默认参数
	fileReader, _ := os.Open("test.json")
	var eiopJsonMap map[string]interface{}
	json.NewDecoder(fileReader).Decode(&eiopJsonMap)
	// 设置一个开始时间戳和结束时间戳
	startTime := 1598167852000
	endTime := 1598167852000
	// 循环发送遥测,每次遥测间隔时间戳为15分钟
	for ; startTime < endTime; startTime = startTime + 900000 {
		// 定义一个ep的初始值为1,每循环一次就+1
		var ep int
		ep++
		eiopJsonMap["ts"] = startTime
		eiopJsonMap["values"].(map[string]interface{})["ep"] = ep
		// 把修改过的json内容从map转换为json格式
		eiopJsonText, _ := json.Marshal(eiopJsonMap)
		fmt.Println(string(eiopJsonText))
		// 发送遥测,发完之后休眠1秒
		result := client.Publish("topic", 0, true, eiopJsonText)
		result.Wait()
		time.Sleep(1 * time.Second)
	}
	// 发送完信息之后,退出连接
	fmt.Println("disconnect:" + username)
	client.Disconnect(250)
	total++
	end_number <- total
}
func main() {
	userNameList := readcsv("connect_info.csv")
	endNumber := make(chan int, len(userNameList))
	// 变量所有的username,通过go关键字并发多个设备
	for _, userName := range userNameList {
		go mqttDevice(userName, endNumber)
	}
	// 当所有的设备都发送完毕后,关闭程序
	for i := range endNumber {
		fmt.Println("已经有" + strconv.Itoa(i) + "个设备发送完毕")
		if i == len(userNameList) {
			return
		}
	}
}

test.json

{
	"ts": 1603088274000,
	"values": {
		"ep": 12
	}
}

connect_info.csv

localhost,1883,XecUwSmMGiYJp2BspMK2
localhost,1883,GqVqoPP2wblDjS2P9pQ9

在K8S里使用filebeat作为sidecar收集nginx日志

简介

通过sidecar方法进行接入,与提供日志的容器部署在同一个pod里,主要是配置statefulset里的containers和configmap里的filebeat.yaml
1.把nginx的日志文件挂载在access_log这个volume里,同时在filebeat这个pod里也挂载access_log这个volume
2.filebeat通过subpath的方法挂载单独一个filebeat.yml到/usr/share/filebeat/filebeat.yml。注意,如果不用subpath挂载单个文件的话,是会覆盖掉/usr/share/filebeat/目录的

3.configmap里设置elasticsearch的地址和index,指定日志文件

 

statefulset.yaml

containers:
  - image: nginx:latest
    name: nginx
    ports:
        - containerPort: 80
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /var/log/nginx/
  - image: docker.elastic.co/beats/filebeat:6.8.12
    imagePullPolicy: Always
    name: filebeat
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /log
        - name: filebeat-config
          mountPath: /usr/share/filebeat/filebeat.yml
          subPath: filebeat.yml
  volumes:
    - name: filebeat-config
      configMap:
        name: filebeat-config
        items:
        - key: filebeat.yml
          path: filebeat.yml

configmap.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
data:
  filebeat.yml: |
    filebeat.inputs:
    - type: log
      paths:
        - "/log/access.log"
    setup.template.name: "filebeat"
    setup.template.pattern: "filebeat-*"
    output.elasticsearch:
      hosts: ["{{ .Values.elastricsearch.addr }}"]
      index: "frontend-filebeat"

 


架构图


苏ICP备18047533号-1