Category Archive : python相关

python实现的一个简易注册机,用于离线工控机验证注册码

简易的设计流程

有一批工控机长期是断网离线的,而我们又想检查这批工控机的设备上的注册码是否已经过期,那该怎么办呢?工控机是交付出去了,需要用到非对称加密机制来设计,防止算法被破解之后,别人有能力能破解我们所有的工控机。下面废话不多说,直接上代码

服务端程序

1. 安装 cryptography

pip install cryptography

2. 生成密钥对(一次性操作)

在服务器端生成 RSA 公钥和私钥,这个过程只需要进行一次,并且私钥需要保密。

# 注册机密钥对生成代码
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

# 生成 RSA 密钥对
private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
)

# 将私钥保存到文件
with open("private_key.pem", "wb") as private_file:
    private_file.write(
        private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption(),
        )
    )

# 将公钥保存到文件
public_key = private_key.public_key()
with open("public_key.pem", "wb") as public_file:
    public_file.write(
        public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo,
        )
    )

private_key.pem 文件将用于服务器端签名注册码。(千万不要泄漏)

public_key.pem 文件可以安全地分发给客户端,用于验证签名。

3. 服务器端生成注册码并签名

服务器端生成注册码并用私钥签名。这里我们基于 IP、MAC 地址和注册时间生成注册信息。

# 注册码生成程序
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization
import hashlib

# 加载私钥
with open("private_key.pem", "rb") as private_file:
    private_key = serialization.load_pem_private_key(
        private_file.read(),
        password=None,
    )


# 生成注册信息
def generate_registration_info(ip, mac, reg_time):
    return f"{ip}#{mac}#{reg_time}"


# 生成签名
def sign_registration_info(private_key, registration_info):
    # 先对注册信息进行哈希处理
    hash_value = hashlib.sha256(registration_info.encode()).digest()
    # 使用私钥对哈希值签名
    signature = private_key.sign(
        hash_value,
        padding.PSS(
            mgf=padding.MGF1(hashes.SHA256()),
            salt_length=padding.PSS.MAX_LENGTH,
        ),
        hashes.SHA256(),
    )
    return signature


# 示例使用
ip = "192.168.1.1"
mac = "00:1A:2B:3C:4D:5E"
reg_time = "2023-09-03T12:00:00"
registration_info = generate_registration_info(ip, mac, reg_time)
signature = sign_registration_info(private_key, registration_info)

print(f"注册信息: {registration_info}")
print(f"签名: {signature.hex()}")

客户端程序(可参考修改)

1.安装在工控机上,用于验证注册码是否正确,是否过期,设置有效期365天

2.客户端使用公钥来验证签名,以确保注册码的有效性。

# 注册机尝试
import hashlib
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization
from datetime import datetime

# 加载公钥
with open("public_key.pem", "rb") as public_file:
    public_key = serialization.load_pem_public_key(public_file.read())


# 验证签名
def verify_registration_info(public_key, registration_info, signature):
    # 对注册信息进行哈希处理
    hash_value = hashlib.sha256(registration_info.encode()).digest()
    # 使用公钥验证签名
    try:
        public_key.verify(
            signature,
            hash_value,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH,
            ),
            hashes.SHA256(),
        )
        return True
    except Exception as e:
        print(f"签名验证失败: {e}")
        return False


# 检查注册码是否过期
def is_registration_expired(registration_info):
    # 假设 registration_info 的格式为 "IP#MAC#DATE"
    try:
        _, _, date_str = registration_info.split("#")
        registration_date = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
        current_date = datetime.now()
        # 计算日期差
        difference = current_date - registration_date
        if difference.days > 365:
            print("注册码已过期。")
            return True
        else:
            print(f"注册码有效,距离到期还有 {365 - difference.days} 天。")
            return False
    except Exception as e:
        print(f"无法解析注册信息中的日期: {e}")
        return True


# 示例使用
registration_info = "192.168.1.1#00:1A:2B:3C:4D:5E#2023-09-03T12:00:00"
signature = "4747554cb3bc12f1b25f7079f435338f78f8b9a096f7a40fe6f4d22535d5d1038ae0cff10f1f7d648201e585b19b15d45a0b0903886abda9096f7ce3dbba78b3076f7df9cec9f19616512c7dd20a4448ab1eb544be0163e84cb811cf415986551f4be21878c4ca620843e4109d4e625756560f0746dbbccd2f57ee1ba6d8f751bde45839e75229160371c955bc8d19931d647d1281f4ae6baa08dd460dbf3de0d1ac76f2217ed8e81657cce00da6342ef5d453afb0c24da10197896f89347a3dc81a482eab2e41ffe311222e86d0e6a0901ae6cce38e69700d7d4a18c9e902ea802b05292c166d561b4877619283026542e319ca35708fa32e6f86e5615f6fb0"
is_valid = verify_registration_info(
    public_key, registration_info, bytes.fromhex(signature)
)

if is_valid:
    print("注册码验证成功。")
    # 检查注册码是否过期
    is_expired = is_registration_expired(registration_info)
    if not is_expired:
        print("激活成功!")
else:
    print("注册码验证失败。")

UPS软件关闭多台linux服务器的方法

背景

因为我们的施工现场往往是有一到多台服务器的,这些服务器有的是双路电源,有的是只有UPS供电。众所周知,服务器如果遇到突然断电,是有损坏硬盘的风险的,为了避免断电停机的风险,一般会设置UPS来给服务器供电,如果需求支撑长时间供电,甚至会加上储能设备,但是只要不来电,存储的电终究是有用完的时候的,所以我们还额外需要在其中一台服务器上安装一个agent程序。用于和UPS进行通信,实时检查UPS的剩余电量,如果剩余电量不足时,通过ssh关闭机房里的所有服务器

示意图

1.UPS的控制程序winpower下载

接下来的文章,我们将使用山特的linux版本的winpower程序来做配置

下载地址https://www.santak.com.cn/page/santak-downloads.html

我们可以在软件列表找到linux版本的winpower,如果第一页没有就在第二页

2.在linux服务器上安装winpower

在服务器上解压下载下载好的压缩文件Winpower_setup_LinuxAMD64.tar.gz

解压之后进入到该目录下的LinuxAMD64下,里面有一个install.bin文件,这里别急着安装,得先安装几个包,不然会报错

sudo apt install -y libxtst6 libxi6 x11-common libxrender1

安装完后再输入winpower的安装命令,过程中遇到提示一路按回车就可以了

./install.bin -i console

安装完成后,winpower程序会安装在/opt/MonitorSoftware/目录下,建议此时可以重启一次电脑,这样会自动启动agent程序,如果不重启的话,可以手动开启agent,命令如下

sudo systemctl start upsagent.service

3.配置winpower

linux版本的必须进入/opt/MonitorSoftware/目录下然后输入命令./monitor进行启动,注意如果是ssh到服务器上的话,需要用一个支持x11转发的ssh工具,像putty或者xshell,powershell肯定是不行了,可以去下载一个mobaxterm来用。

启动之后,先退出向导界面

然后成为系统管理员,默认密码是空

接下来是打开通讯口设定

我们是需要UPS上接一个RS232转USB的线到服务器上的,要检查linux服务器上的USB的串口信息可以用如下命令,把查出来的USB信息填到winpower的通讯口信息里

ls -l /dev/ttyUSB*

添加完成后,再从菜单栏里,点击自动搜索设备,一般1-2分钟就能搜索到UPS了

接下来是配置关机参数设定

建议配置

1.【允许放电时间】建议勾选。因为实际现场使用来看,这个串口通信并不总是很稳定,如果不勾选,可能agent一直获取不到电池百分比信息,默认是会在市电断开2小时后才关机,而30分钟之后可能UPS都没电了,这不是没用么。所以这个功能和【当电池容量百分比低于80%】可以相辅相成,那个先满足,那个先触发。

2.【低电位立即关机】,【剩余放电时间少于10分】,可选可不选,因为实际现场应该用不到

3.【系统】单选【关闭】。这个必须这样选,因为这个是和下面的【关机前执行档案】是联动的,如果选休眠就没用

4.【关机前执行档案】。我的建议是使用python去写一个程序,然后用pyinstaller打包出来放到这里执行。如果一定想用shell脚本也是可以的,这里如果存在程序调用程序或者配置文件,那需要非常注意路径,因为这个程序是会在/opt/MonitorSoftware/目录下执行。建议如果存在程序调用配置文件,配置文件的路径使用绝对路径。例如我的ups_shutdown程序是使用了paramiko库和os库去查找配置文件、读取需要远程关闭的服务器信息、ssh成功之后按需求去安全关闭服务器上的应用程序,数据库等再执行shutdown的关机命令。

生成oracle客户端docker镜像的两种玩法

背景

我们的oracle服务端是oracle12g版本,应用程序均由golang或者python编写,运行在k8s 容器里,那我们就需要build一些docker容器来,那如何build呢?可以参考以下我的方法。文章最后有我编写过程中的参考文章,也可以根据参考文章自己创新。

玩法1:参考oracle官方文档制作

1.下载代码https://github.com/oracle/docker-images.git 到本地

2.进入OracleInstantClient/oraclelinux8/21/目录,该目录下有一个原始的dockerfile文件,可以使用该文件build一个基础镜像,例如

docker build --pull -t oracle/instantclient:21 .

使用build出来的这个oracle/instantclient:21镜像可以二次进行dockerfile编辑加入golang或者python。

也可以用这个oracle/instantclient:21来测试一下oracle数据库是否能正常连接。测试连接命令如下:

docker run -ti --rm oracle/instantclient:21 sqlplus 用户名/密码@数据库IP:数据库端口/数据库名

玩法2:从debian开始制作一个镜像

除了上面的方法外,我们还可以从debian开始制作一个包含python的镜像

1.进入oracle客户端下载页https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html

2.下载https://download.oracle.com/otn_software/linux/instantclient/218000/instantclient-basic-linux.x64-21.8.0.0.0dbru.zip到本地,如下图

3.编写Dockerfile,以debian+oracle+python举例

FROM debian:11-slim

LABEL maintainer="zhenwei.li <zhenwei.li@sfere-elec.com>"
RUN set -eux \
    && sed -i "s@http://ftp.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://security.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && apt-get update \
    && apt-get install -y -q libaio1 unzip python3 pip
    && pip install cx_Oracle

# 清理垃圾
RUN set -eux \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/* \
    && rm -rf /tmp/*
ENV TZ=Asia/Shanghai \
    DEBIAN_FRONTEND=noninteractive

RUN ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
    && echo ${TZ} > /etc/timezone \
    && dpkg-reconfigure --frontend noninteractive tzdata \
    && rm -rf /var/lib/apt/lists/*

COPY instantclient-basic-linux.x64-21.8.0.0.0dbru.zip /opt/oracle/instantclient-basic-linux.x64-21.8.0.0.0dbru.zip

WORKDIR /opt/oracle/

RUN unzip instantclient-basic-linux.x64-21.8.0.0.0dbru.zip

RUN sh -c "echo /opt/oracle/instantclient_21_8 > /etc/ld.so.conf.d/oracle-instantclient.conf"

RUN ldconfig

RUN useradd sfere

4. 目录下放Dockerfile和oracle客户端zip包

5. 制作镜像

docker build -t debian-oracle .

6.运行镜像,测试python连接oracle服务端可行,依次输入如下命令

docker run -ti --rm debian-oracle python
import cx_Oracle as cx
con = cx.connect('用户名', '密码', '数据库IP:数据库端口/数据库名')

参考文章

https://github.com/oracle/docker-images/tree/main/OracleInstantClient

https://csiandal.medium.com/install-oracle-instant-client-on-ubuntu-4ffc8fdfda08

playwright拖拽元素和获取元素集合

背景

因为这两个功能经常用,网上又很少有直接可以抄的代码,所以我写个文章记录一下

元素向下拖拽100个px

拖拽演示

解说:先定位到要拖拽的元素,然后鼠标移动到元素的中心点,接着鼠标选中,鼠标移动,鼠标放下

src_elem = page.locator("xpath=元素定位")
box = src_elem.bounding_box(timeout=60000)
page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
page.mouse.down()
page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2 + 100)
page.mouse.up()

元素拖拽到另外一个元素上

这种方法也有用,比如把一个按钮丢到另外一个画板里

src_elem = page.locator("xpath=元素原来的")
src_elem.drag_to(page.locator("xpath=元素新的位置"))

获取一批元素集合,然后截图

我们的页面上有一批图片,以瀑布流的方式展示出来,他们的css都是相同的,我们需要给每一个元素截图,并且截图之后再鼠标向下滚动一下

elements = page.query_selector_all(".search-result__item")
number = 1
for item in elements:
    number = number + 1
    page.mouse.wheel(0, 100)
    item.screenshot(path="image/"+str(number) + ".png")
    time.sleep(1)

python通过sdk从minio下载文件时添加进度条

背景

Minio是就地环境下比较好用的对象存储工具,适合在CI/CD流程中使用。主要是因为GIT里用LFS来放大文件不妥,把部署流程中需要的中间文件放minio上,通过SDK去存取文件非常方便。

Minio的上传文件fput_object有progress参数,但是下载文件fget_object默认没有 progress 参数,所以我们需要自己用get_object对代码稍加改造

涉及到的库

https://github.com/verigak/progress

用于提供进度条

pip install progress
pip install minio

代码

from minio import Minio
from progress.bar import Bar


def get_object_with_progress(client, bucket_name, object_name):
    try:
        data = client.get_object(bucket_name, object_name)
        total_length = int(data.headers.get('content-length'))
        bar = Bar(object_name, max=total_length / 1024 / 1024, fill='*', check_tty=False,
                  suffix='%(percent).1f%% - %(eta_td)s')
        with open('./' + object_name, 'wb') as file_data:
            for d in data.stream(1024 * 1024):
                bar.next(1)
                file_data.write(d)
        bar.finish()
    except Exception as err:
        print(err)


if __name__ == '__main__':
    client = Minio(
        "play.min.io",
        access_key="Q3AM3UQ867SPQQA43P2F",
        secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
    )

    bucket_name = "downlaod"
    object_name = "eiop-timescaledb-offline.zip"
    get_object_with_progress(client, bucket_name, object_name)

实现效果

electron+droneCI+minio流水线

背景

因为我们的electron程序已经开发完成,期望要能开发人员每次上传代码,打了tag就自动build一份deb文件,自动上传到minio,方便运维人员去拿deb文件部署到ubuntu环境上。我们已有的技术栈包含droneCI,minio,python,于是边有了该方案。本文省略了vault,ldap,minio,harbor的安装与配置,这些程序的安装配置在本网站的其他文章里,就不一一贴出来了


架构图

解释:

1.前端开发上传electron代码到git服务端

2.git服务端通过webhook方式通知drone-server产生了。例如本文只测试的是发布tag触发webhook,还有很多种触发方式都可以设置

3.drone-server收到通知后,再在drone-runner所在的k8s集群里启动一个包含nodejs和python的任务容器

4.任务容器通过electron-forge make 命令打包一个deb文件

5.任务容器通过minio提供的python sdk上传deb文件到minio


drone插件编写

要完成上述目标,第一步就是得编写一个drone的插件

我编写该插件使用的是nodejs16版本的debian系统,然后通过提前安装好需要的如下表格里的工具。注意,因为我用的是华为源,2021年12月9日的时候,华为镜像上最新的electron只到16.0.2版本,所以注意指定版本号

介绍:该插件使用nodejs16版本的debian系统,然后通过提前安装好需要的如下工具。注意,因为我用的是华为源,2021年12月9日的时候,华为镜像上最新的electron只到16.0.2版本,所以注意指定版本号

工具名
rpm
python3-pip
python3
fakeroot
electron@v16.0.2 
electron-prebuilt-compile
electron-forge 
dpkg
minio的python sdk

代码有3个文件main.py Dockerfile ,requirements.txt,下面是详细介绍

main.py

代码功能是先获取环境变量,然后使用git的tag号替换掉package.json里的version字段。执行yarn install,yanr make,通过环境变量找到需要上传的文件,通过pythonde的sdk上传到minio里。详细代码如下

#main.py
import json
import os
import subprocess

from minio import Minio
from minio.error import S3Error

endpoint = "minio.sfere.local"
access_key = "bababa"
secret_key = "bababa"
bucket = "electronjs"
folder_path = "/drone/src/out/make/deb/x64"
suffix = "deb"
tag = "0.0.0"


def find_file_by_suffix(target_dir, target_suffix="deb"):
    find_res = []
    target_suffix_dot = "." + target_suffix
    walk_generator = os.walk(target_dir)
    for root_path, dirs, files in walk_generator:
        if len(files) < 1:
            continue
        for file in files:
            file_name, suffix_name = os.path.splitext(file)
            if suffix_name == target_suffix_dot:
                find_res.append(os.path.join(root_path, file))
    return find_res


def get_environment():
    global endpoint, access_key, secret_key, bucket, suffix, tag

    if "PLUGIN_ENDPOINT" in os.environ:
        endpoint = os.environ["PLUGIN_ENDPOINT"]
    if "PLUGIN_ACCESS_KEY" in os.environ:
        access_key = os.environ["PLUGIN_ACCESS_KEY"]
    if "PLUGIN_SECRET_KEY" in os.environ:
        secret_key = os.environ["PLUGIN_SECRET_KEY"]
    if "PLUGIN_BUCKET" in os.environ:
        bucket = os.environ["PLUGIN_BUCKET"]
    if "PLUGIN_SUFFIX" in os.environ:
        suffix = os.environ["PLUGIN_SUFFIX"]
    if "PLUGIN_TAG" in os.environ:
        tag = os.environ["PLUGIN_TAG"]


def yarn_make():
    with open('./package.json', 'r', encoding='utf8')as fp:
        json_data = json.load(fp)
    json_data['version'] = tag
    with open('./package.json', 'w', encoding='utf8')as fp:
        json.dump(json_data, fp, ensure_ascii=False, indent=2)
    print('package version replace to ' + tag)
    print(subprocess.run("yarn install", shell=True))
    print(subprocess.run("yarn make", shell=True))


def upload_file():
    file_list = find_file_by_suffix(folder_path, suffix)
    # 创建minio连接,这里因为我们是http的,所以secure=False
    client = Minio(
        endpoint=endpoint,
        access_key=access_key,
        secure=False,
        secret_key=secret_key,
    )

    # 检查bucket是否存在,不存在就创建bucket
    found = client.bucket_exists(bucket)
    if not found:
        client.make_bucket(bucket)
    else:
        print("Bucket 'electronjs' already exists")

    # 上传文件到bucket里
    for file in file_list:
        name = os.path.basename(file)
        client.fput_object(
            bucket, name, file,
        )
        print(
            "'" + file + "' is successfully uploaded as "
                         "object '" + name + "' to bucket '" + bucket + "'."
        )


if __name__ == "__main__":
    get_environment()
    yarn_make()
    try:
        upload_file()
    except S3Error as exc:
        print("error occurred.", exc)

Dockerfile

取一个node16版本的debian系统,使用国内源安装我们在之前列出来要用的工具,然后指定程序入口是我们的python程序。编写完后,使用docker build -t drone-electron-minio-plugin:0.1.0 . 做一个镜像上传到私仓里

FROM node:16-buster
RUN npm config set registry https://mirrors.huaweicloud.com/repository/npm/ \
    && npm config set disturl https://mirrors.huaweicloud.com/nodejs \
    && npm config set sass_binary_site https://mirrors.huaweicloud.com/node-sass \
    && npm config set phantomjs_cdnurl https://mirrors.huaweicloud.com/phantomjs \
    && npm config set chromedriver_cdnurl https://mirrors.huaweicloud.com/chromedriver \
    && npm config set operadriver_cdnurl https://mirrors.huaweicloud.com/operadriver \
    && npm config set electron_mirror https://mirrors.huaweicloud.com/electron/ \
    && npm config set python_mirror https://mirrors.huaweicloud.com/python \
    && npm config set canvas_binary_host_mirror https://npm.taobao.org/mirrors/node-canvas-prebuilt/ \
    && npm install -g npm@8.2.0 \
    && yarn config set registry https://mirrors.huaweicloud.com/repository/npm/ \
    && yarn config set disturl https://mirrors.huaweicloud.com/nodejs \
    && yarn config set sass_binary_site https://mirrors.huaweicloud.com/node-sass \
    && yarn config set phantomjs_cdnurl https://mirrors.huaweicloud.com/phantomjs \
    && yarn config set chromedriver_cdnurl https://mirrors.huaweicloud.com/chromedriver \
    && yarn config set operadriver_cdnurl https://mirrors.huaweicloud.com/operadriver \
    && yarn config set electron_mirror https://mirrors.huaweicloud.com/electron/ \
    && yarn config set python_mirror https://mirrors.huaweicloud.com/python \
    && yarn config set canvas_binary_host_mirror https://npm.taobao.org/mirrors/node-canvas-prebuilt/ \
    && yarn global add electron@v16.0.2 electron-forge electron-prebuilt-compile\
    && sed -i "s@http://ftp.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://security.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://deb.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && apt update \
    && apt install -y fakeroot dpkg rpm python3 python3-pip
ADD . .   
WORKDIR . 
RUN pip3 install -r ./requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
#CMD ["python3","/main.py"]
WORKDIR /drone/src
ENTRYPOINT ["python3", "/main.py"]

requirements.txt

minio==7.1.2

electron仓库代码

我们的electron仓库里要添加一个.drone.yml文件和对package.json稍微进行一些修改

package.json

.drone.yml

droneCI的流水线文件,使用了我们在上一节里build出来的drone插件镜像


流水线演示

需要人手动操作的

流水线自动操作的

用python实现helm template功能

背景

众所周知,helm template 包名 -f values.yaml >输出文件。这个方式能渲染go-template,自动填充{{ .Values.XXX }}参数到文件里。现在有一个需求,需要用python来实现类似的功能。那么就来看看我的最后实现吧


sapmle.tmpl 待填充文件

{{ .Values.Count }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Count }}
{{ .Values.mqtt.server }} dadasdjsaijaid

values.yml 参数文件

Count: 14
Material: Wool
mqtt:
  server: 172.15.62.2

python 代码

import re

from ruamel import yaml


def traverse(dic, path=None):
    if not path:
        path = []
    if isinstance(dic, dict):
        for x in dic.keys():
            local_path = path[:]
            local_path.append(x)
            for b in traverse(dic[x], local_path):
                yield b
    else:
        yield path, dic


def template_render(source_file, values_file, dest_file):
    with open(source_file, 'r') as source:
        origin = source.read()

    with open(values_file, 'r', encoding='utf-8') as vaules:
        result = yaml.load_all(vaules.read(), Loader=yaml.Loader)
        yaml_dict = list(result)[0]
    for x in traverse(yaml_dict):
        match = "\{\{ \.Values." + '.'.join(x[0]) + " \}?\}"
        origin = re.sub(match, str(x[1]), origin)

    with open(dest_file, 'w+') as dest:
        dest.write(origin)


if __name__ == '__main__':
    template_render('sample.tmpl', "values.yml","result.yaml")

result.yaml 渲染结果

14 items are made of Wool
Wool items are made of Wool
Wool items are made of 14
172.15.62.2 dadasdjsaijaid

自定义一个kaniko镜像

背景

kaniko是一款方便我们从K8S内部构建docker容器的工具,以前我们在CI过程中,使用的是docker-in-docker技术,这种技术最主要的缺陷就是当一台机器上同时运行多个docker build流水线时,会出现阻塞的情况,因为这一批流水线用的是宿主机上的同一个docker进程。
基于这种情况,我们在droneCI流水线中换用了kaniko来进行docker镜像的创建。

遇到的难题

  1. kaniko是基于scratch构建的,里面没有shell,所以想在kaniko原生镜像里在调用python是很麻烦的
  2. kaniko创建docker镜像使用的是file system功能,如果想在一个kaniko容器里先创建ubuntu镜像,再创建alpine镜像, 是会有各种冲突问题的,需要使用–cleanup功能。此功能会清空文件系统,同时如果有自己装的shell,也会被清空,导致无法再次使用

解决方案

  1. kaniko的关键文件其实是/kaniko目录下的哪些 二进制文件,官方推荐是用gcr.io/kaniko-project/executor 镜像,其实我们可以拷贝这个/kaniko目录到我们自己的私有镜像
  2. shell没有的话,我们可以拷贝一个busybox进去,这样就有shell了
  3. 虽然–cleanup会清空file system,但是根据代码里的ignorepath设定,volume挂载目录和/kaniko目录会被忽略掉。所以我们可以有两种方式选择:一、通过volume的方式哦挂载busybox和自己的python代码到私有镜像里。二、把busybox和python代码加入/kaniko目录。

示例代码

Dockerfile如下:

FROM heiheidoc/kaniko-project-executor:v1.3.0 AS plugin

# 1.6.0的clean up有问题 https://github.com/GoogleContainerTools/kaniko/issues/1586

FROM heiheidoc/kaniko-project-executor:debug AS debug

FROM python:3.9.5-buster

COPY --from=背景plugin /kaniko /kaniko

COPY --from=debug /busybox /kaniko/busybox

ADD . /kaniko

ENV DOCKER_CONFIG /kaniko/.docker

CMD ["python3","/kaniko/main.py"]


部分python代码如下,功能是按照一定规则生成Docker镜像:

def run_shell(shell):
    print_green(shell)
    cmd = subprocess.Popen(shell, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True,
                           stdout=sys.stdout, universal_newlines=True, shell=True,executable='/kaniko/busybox/sh', bufsize=1)
    cmd.communicate()
    return cmd.returncode
def run_executor():
    for folder_name, sub_dir, files in os.walk(os.getcwd()):
        if 'Dockerfile' in files:
            Dockefile_path = folder_name + "/Dockerfile"
            docker_info = folder_name.replace(os.getcwd(),'').split('/')
            harbor_image_name = REGISTRY_URL + "/" + owner_name + "/" + docker_info[1] + ":" + docker_info[2]
            cmd_build = "/kaniko/executor --cache=true --cache-dir=/cache --cleanup --skip-tls-verify --skip-tls-verify-pull --skip-tls-verify-registry" \
                        " --dockerfile=" + Dockefile_path + \
                        " --context=dir://" + folder_name + \
                        " --destination=" + harbor_image_name
            assert run_shell(cmd_build) == 0, "镜像build失败: " + harbor_image_name
if __name__ == "__main__":
    run_executor()

Docker安装redis环形集群

背景

因为有需求要3台机器来做一个redis高可用(o(╥﹏╥)哭o~~~),没办法,只能用一个奇怪的方式安装redis集群了。这里我们用的不是主从哨兵哦,阅读本文的作者不要误会啦。再就是,真不推荐用3个机器来做redis集群。

因为配置里的cluster-announce-ip不能用域名,只能用ip,而pod的IP在K8S里经常变,所以不能用K8S部署了,我们这里使用docker和docker-compose部署,总得来说还是很简单的,代码我上传至github上了
https://github.com/lizhenwei/redis-cluster-in-docker.git

拓扑图

假设我们用3台机器,IP地址192.168.0.94,192.168.0.95,192.168.0.96 ;分别是node1,node2,node3

-------    -------    -------
|node1| ---|node2|----|node3|
-------    -------    -------
master1    master2    master3
slaver2    slaver3    slaver1

安装完毕之后,需要我们手动通过`CLUSTER REPLICATE`命令调节master和slaver所处的容器,形成如上拓扑,这样假使node1挂掉的时候,我们的node2上能运行一个master2,node3上运行master3,master1(此处的master1是由slaver1变过来的)

安装方法

1.在每台机器上安装docker和docker-compose。国内可以用daocloud去下载。会比较快http://get.daocloud.io/
2. 在每台机器上下载该代码,进入代码目录,运行shell命令,会根据IP地址更改配置文件,并且创建redis的docker应用:

# 在每台机器上输入该命令,注意替换IP地址
bash docker-init.sh 192.168.0.94
3. 在任意一台机器上通过redis-cli创建集群
# 在每台机器上输入该命令,注意替换IP地址
bash redis-init.sh 192.168.0.94 192.168.0.95 192.168.0.96
# 弹出来的提示直接输入yes
4. 完成之后执行命令,检查集群是否运行成功
#进入redis容器
docker exec -it redis-cluster bash
#检查集群是否运行成功
redis-cli -a 92F1q99f9CnrkAuwJPItdj8brqeMtN3r -p 7000 cluster nodes

python代码访问redis-cluster

$ pip install redis-py-cluster
```
测试代码
```
>>> from rediscluster import RedisCluster
>>> # Requires at least one node for cluster discovery. Multiple nodes is recommended.
>>> startup_nodes = [{"host": "192.168.0.94", "port": "7000"}, {"host": "192.168.0.94", "port": "7001"},{"host": "192.168.0.95", "port": "7000"}, {"host": "192.168.0.95", "port": "7001"},{"host": "192.168.0.96", "port": "7000"}, {"host": "192.168.0.96", "port": "7001"}]
>>> rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True,password='password')
>>> rc.set("foo", "bar")
True
>>> print(rc.get("foo"))
'bar'

Sanic中添加基于Cron表达式的协程定时任务

需求

从页面上触发某一个接口之后,要在sanic里面添加一个计划任务。
例如:从页面上填写一个con表达式,后台根据此cron表达式定时执行任务

'*/1 * * * *'   # 这个是每1分钟执行一次的表达式

需要用到的python包有:Sanic,Apscheduler


示例代码

import time
from sanic import Sanic
from sanic.response import json
app = Sanic("App Name")
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
def cron_job():
    print('打印当前时间:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
@app.route('/')
async def index(request):
    # generate a URL for the endpoint `post_handler`
    print("浏览器里打开一下首页")
    scheduler = AsyncIOScheduler()
    scheduler.add_job(cron_job, CronTrigger.from_crontab('*/1 * * * *'))
    scheduler.start()
    return json({"hello": "thank you"})
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

实验效果


苏ICP备18047533号-1