Category Archive : python相关

Elasticsearch重建索引,收集nginx日志,以request_time为指标分析接口响应时间

起因

filebeat采集nginx的日志,以json格式解析后传入elasticsearch,全部字段都是text格式,我们需要把request_time变成double格式才能使用聚合搜索request_time的最大值.
但是elasticsearch的index一旦建立好之后,字段只能新增,不能修改,所以要修改request_time的数据类型,只能重建索引。
我们的步骤是:1.获得老索引的mapping信息,2.用这个mapping信息新建一个索引 3.用reindex方法,把老索引的数据迁移到新索引 4.确认新索引数据迁移成功,5.删除老索引 6.获得出新索引的mapping,7.使用新索引的mapping创建老索引。8.把新索引的数据倒回老索引 9.删除老索引
假设老索引:V1
临时索引:V2
nginx统计接口路径:path字段
nginx统计响应时间:request_time字段


流程图与说明


python代码

根据path,聚合查询出响应最大时间和平均时间,保留最大响应时间前500个到csv文件里

#
#  created by zhenwei.Li at 2020/11/3 17:50
#
#  filename : example4.py
#  description :
import csv
import json
import requests
if __name__ == '__main__':
    send_json = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": 1533556800000,
                                "lte": 1604470685934
                            }
                        }
                    }
                ]
            }
        },
        "size": 0,
        "aggs": {
            "Job_gender_stats": {
                "terms": {
                    "field": "path.keyword",
                    "size": 500,
                    "order": {
                        "max_request_time": "desc"
                    }
                },
                "aggs": {
                    "max_request_time": {
                        "max": {
                            "field": "request_time"
                        }
                    },
                    "avg_request_time": {
                        "avg": {
                            "field": "request_time"
                        }
                    }
                }
            }
        }
    }
    res = requests.post(url="http://192.168.0.174:32164/192.168.0.67-eiop-frontend/_search", json=send_json)
    print(json.dumps(res.json()['aggregations']['Job_gender_stats']['buckets'], sort_keys=True, indent=4))
    buckets = res.json()['aggregations']['Job_gender_stats']['buckets']
    file_handle = open('research.csv', 'w', encoding='utf-8', newline='' "")
    # 2. 基于文件对象构建 csv写入对象
    csv_writer = csv.writer(file_handle)
    # 3. 构建列表头
    csv_writer.writerow(["路径", "出现次数", "平均响应时间(秒)", "最大响应时间(秒)"])
    for item in buckets:
        csv_writer.writerow(
            [item['key'], item['doc_count'], item['avg_request_time']['value'], item['max_request_time']['value']])
    # 5. 关闭文件
    file_handle.close()

效果图

[docker]通过rsyslog记录日志并转发nginx日志到python程序

记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的

关键点1 nginx日志配置

nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地

user  root;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    log_format main '{"time_local": "$time_local", '
   '"path": "$request_uri", '
   '"ip": "$remote_addr", '
   '"time": "$time_iso8601", '
   '"user_agent": "$http_user_agent", '
   '"user_id_got": "$uid_got", '
   '"user_id_set": "$uid_set", '
   '"remote_user": "$remote_user", '
   '"request": "$request", '
   '"status": "$status", '
   '"body_bytes_sent": "$body_bytes_sent", '
   '"request_time": "$request_time", '
   '"http_referrer": "$http_referer" }';
    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    keepalive_timeout  65;
    #gzip  on;
    include /etc/nginx/conf.d/*.conf;
}

关键点2 rsyslog配置与Dockerfile

编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可

module(load="imfile")
input(type="imfile"
      File="/var/log/nginx/access.log"
      Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:')  then {
  action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
            action.resumeRetryCount="100"
            queue.type="linkedList" queue.size="10000")
}

我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载

FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
    rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog

关键点3 python程序通过tcp的方式读取rsyslog

python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句

import asyncio
import json
import time
import database_init
class LogAnalyser:
    def __init__(self):
        pass
    def process(self, str_input):
        # print(str_input)
        str_input = str_input.decode("utf-8", errors="ignore")
        # Add your processing steps here
        # ...
        try:
            # Extract created_at from the log string
            str_splits = str_input.split("{", 1)
            json_text = "{" + str_splits[1]
            data = json.loads(json_text)
            created_at = data["time"]
            request_all = data["request"].split(" /", 1)
            http_type = request_all[0]
            path = data["path"]
            request_time = data["request_time"]
            if PREFIX in data["path"]:
                path = data["path"]
                return http_type, path, created_at,request_time  # The order is relevant for INSERT query params
        except Exception as e:
            print("error in read_rsylog.py,Class LogAnalyser,function process")
            print(e)
        return None
@asyncio.coroutine
def handle_echo(reader, writer):
    log_filter = LogAnalyser()
    while True:
        line = yield from reader.readline()
        if not line:
            break
        params = log_filter.process(line)
        if params:
            # 进行一堆操作,例如进行数据库的插入
            # execute_sql(params=params)
if __name__ == '__main__':
    CURSOR = database_init.DBConnect().CURSOR
    CONN = database_init.DBConnect().CONN
    PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
    database_init.DBConnect().create_table()
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    # Close the server
    print("Closing the server.")
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    CURSOR.close()
    CONN.close()

jenkins进阶之docker运行pytest并且出allure报告

背景

最近想做一个简单的pytest 测试,用allure出报告,结果发现网上的方法都是在windows上装jenkins,然后用jenkins跑一个本地的运行环境。这种做法明显很不2019年。于是我决定做一个在jenkins上使用docker运行pytest,然后再出allure报告的文章。

思路

  1. 在一台电脑上安装jenkins,可以参考我的文章https://www.yinyubo.cn/?p=268
  2. 准备python代码,放到git上,jenkins通过git拉取代码。代码中包含Dockerfile,测试用例,requirements.txt
  3. 拉取代码之后,通过Dockerfile产生一个python的镜像,并且在镜像中通过requirements.txt去安装pytest,allure的相关依赖
  4. 通过这个镜像,产生一个容器,容器命令运行pytest,产生测试报告,并且挂载到jenkins服务器上.
  5. (注意,这里为什么要在容器里运行pytest呢?因为如果在Dockerfile里运行pytest,一旦产生测试失败,是会导致jenkins退出shell执行的)
  6. Jenkins通过allure插件读取第4步产生的测试报告。生成allure报告

具体步骤

  1. 第一步忽略。请参考文章https://www.yinyubo.cn/?p=268
  2. 准备python代码。产生Dockerfile,测试用例,requirements.txt,如下图


Dockerfile内容如下:

FROM python:3.7.3
WORKDIR .
ADD . .
RUN pip install -r requirements.txt
CMD ["pytest", "-q","TestCase/AIMP/Connect/AIMP_Connect_01.py","--alluredir","allure-results"]

requirements.txt内容如下:

allure-pytest==2.6.2
allure-python-commons==2.6.2
atomicwrites==1.3.0
attrs==19.1.0
colorama==0.4.1
more-itertools==7.0.0
pluggy==0.9.0
py==1.8.0
pytest==4.4.1
six==1.12.0

测试用例可以如下

import pytest
def test_success():
    """this test succeeds"""
    print(1)
    assert True
def test_failure():
    print(2)
    """this test fails"""
    assert False
def test_skip():
    print(3)
    """this test is skipped"""
    pytest.skip('for a reason!')
def test_broken():
    raise Exception('oops')
def test_success2():
    print(4)
    assert True

     3.jenkins配置,这里如何拉取git代码就不写了。这个很简单,主要是构建命令和构建后操作,这里我复制了我的构建命令如下:

name="apitest"
docker build -t $name .
docker run -d --name $name -v $PWD/allure-results:/allure-results $name

效果如图:

    4.运行job,查看allure结果

python常用的几种文件操作,复制,移动,打包

需求

经常会有用到python的文件复制,移动,打包的功能, 我这边用的方法是shuil库来进行这些操作。偶尔穿插了一些图片大小重置pil库和汉字转拼音的方法pypinyin库

代码

# -*- coding: UTF-8 -*-
import os
import shutil
import traceback
import pypinyin
from PIL import Image
from globalLog import ta_log
def copy_file(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        # 一般来说,是因为复制的不是文件所导致的,不影响
        ta_log.info("%s not file!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, dstfile)  # 移动文件
        ta_log.info("copy %s -> %s" % (srcfile, dstfile))
def movefile(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        ta_log.error("%s not exist!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.move(srcfile, dstfile)  # 移动文件
        ta_log.info("move %s -> %s" % (srcfile, dstfile))
def copy_file_to_capture(srcfile):
    '''
    复制到capture目录,并且把汉字改成拼音
    :param srcfile:
    :return:
    '''
    fpath, fname = os.path.split(srcfile)
    filename_pre_ = pypinyin.slug(fname)
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/capture/" + filename_pre_)
    copy_file(srcfile, dstfile)
def resize_mage(filein):
    img = Image.open(filein)
    out = img.resize((129, 149), Image.ANTIALIAS)  # resize image with high-quality
    out.save(filein, 'png')
# 只能用于删除本地文件
def delete_many_file(file_list):
    for path in file_list:
        if os.path.exists(path):
            # 删除文件,可使用以下两种方法。
            try:
                os.remove(path)
            except Exception:
                ta_log.error(traceback.format_exc())
            # os.unlink(my_file)
def copy_and_zip(file_list, dst_folder_name):
    '''
    批量复制文件到指定文件夹,然后把指定文件夹的内容压缩成ZIP并且删掉该文件夹
    :param file_list: 文件或文件夹
    :param dst_folder_name: 目标压缩文件的名称
    :return:
    '''
    for item in file_list:
        copy_files_to_attachment(item, dst_folder_name)
    source = os.getcwd() + "/filepath/attachement/" + dst_folder_name
    shutil.make_archive(source, "zip", source)
    shutil.rmtree(source)
def copy_files_to_attachment(srcfile, filename):
    '''
    把文件或文件夹复制到指定目录中
    :param srcfile: 文件或者文件夹的绝对路径
    :param filename: 指定目录
    :return:
    '''
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/")
    folder_name = dstfile + "\\" + filename + "\\"
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))
def copy_file_to_folder(srcfile, folder_name):
    '''
       把文件或文件夹复制到指定目录中
       :param srcfile: 文件或者文件夹的绝对路径
       :param filename: 指定目录
       :return:
       '''
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))

python生成bat脚本,并且执行bat脚本

需求

关键词:python,sqllite,bat,进程
最近有一个需求,需要用python读取一个txt文件里的sql语句,用sqllite去执行,执行之前先备份一次sqllite数据库到临时目录。然后sql执行成功则删除临时文件,sql执行失败则使用备份的sqllite去覆盖掉当前错误的sqllite数据库。

关键点

  • 生成临时目录。使用python的tempfile库
  • 复制文件,使用python的shutil库
  • 从文本中读取sql,使用open方法:
  • f = open(file_name, 'r')
    sql = f.read()
  • 生成bat文件,也使用open方法。模式选择w
  • bat命令中的暂停使用@ping -n 5 127.1 >nul 2>nul
  • bat命令中的删除使用“del 路径”
  • bat命令中的复制使用“copy 源文件 目标文件”
  • bat命令执行完后,等待用户手动关闭使用pause,自动关闭使用exit

部分代码

# -*- coding: UTF-8 -*-
# 命名方式为表名_操作_字段
import os
import tempfile
import connectDB
from controller import fileController
curosrdiction = connectDB.curosr_diction
database_back_folder = ""
def exec_sql(sqltext):
    '''
    执行sql,成功返回true
    :param sqltext: sql语句
    :return:
    '''
    result = curosrdiction.executescript(sqltext)
    curosrdiction.commit()
    return result
def backup_database():
    '''
    备份数据库
    :return:
    '''
    global database_back_folder
    database_back_folder = tempfile.mkdtemp()
    fileController.copy_file_to_folder('resource/sqllite.db', database_back_folder)
    write_bat()
def restore_database():
    '''
    恢复数据库
    :return:
    '''
    curosrdiction.close()
    fileController.copy_file(srcfile=database_back_folder + "\\sqllite.db", dstfile='resource/sqllite.db')
def read_sql_from_text(file_name):
    '''
    从文本文件中读取sql并且执行,执行失败,就把原来的数据库覆盖回来
    :param file_name:
    :return:
    '''
    f = open(file_name, 'r')
    sql = f.read()
    if not exec_sql(sql):
        run_bat()
def write_bat():
    sql_database_path = os.getcwd() + "\\resource\\sqllite.db"
    sql_database_path_shm = os.getcwd() + "\\resource\\sqllite.db-shm"
    sql_database_path_wal = os.getcwd() + "\\resource\\sqllite.db-wal"
    bat_name = 'copy.bat'
    s1 = '''@echo off
@ping -n 5 127.1 >nul 2>nul
echo delete database...
del ''' + sql_database_path + '''
del ''' + sql_database_path_shm + '''
del ''' + sql_database_path_wal + '''
echo restore database...
@ping -n 5 127.1 >nul 2>nul
copy ''' + database_back_folder + "\\sqllite.db " + sql_database_path + '''
echo restore finish
pause'''
    f = open(bat_name, 'w')
    f.write(s1)
    f.close()
def run_bat():
    '''
    运行bat
    :return:
    '''
    os.system('start copy.bat')
def update_sql():
    '''
    开始更新数据库的主入口
    :return:
    '''
    backup_database()
    read_sql_from_text("resource/download/1.2.3_sql.txt")

wxpython 从剪贴板读取文件,读取文字,读取图像

需求

前段时间有这样一个需求,要读取用户的剪贴板的内容,然后把剪贴板的信息复制到另一个地方。例如:

  • 当用户复制的是图片时,把图片复制到一个指定位置。
  • 当用户复制的是txt中的一段文字时,获得复制的文字内容。
  • 当用户复制的是一个文件时,获得复制的文件名和路径,然后复制到一个指定位置。

设计

1.通过wx自带的检查剪贴板功能。
wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP)) 这样的方式,判断是不是图片。
文字和文件的方法类似,可以在下面的代码里看到。
2.通过wx.TheClipboard.GetData(file_obj)的方法获得剪贴板的内容
3.如果是图片,通过wx.BitmapDataObject()的GetBitmap()方法获得图片信息。再通过SaveFile(name=filename, type=wx.BITMAP_TYPE_BMP)方法保存图片
4.如果是文字。通过wx.TextDataObject()的GetText()方法获得文字内容。
5.如果是文件。通过wx.FileDataObject()的GetFilenames()获得复制的文件列表。然后可以通过shutil库的copy2方法复制文件到指定位置

wxpython窗体的部分代码

#!/usr/bin/env python
import wx
class MyFrame(wx.Frame):
    def __init__(self, parent):
        wx.Frame.__init__(self, parent, title="Paste Button Demo")
        self.text = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.HSCROLL)
        self.count = 4  # gets incremented
        menu = wx.MenuBar()
        edit = wx.Menu()
        paste = edit.Append(wx.ID_PASTE, "&Paste", "Paste from the clipboard")
        menu.Append(edit, "&Edit")
        self.SetMenuBar(menu)
        self.toolbar = self.CreateToolBar()
        bmp = wx.ArtProvider.GetBitmap(wx.ART_PASTE, wx.ART_TOOLBAR)
        self.toolbar.AddTool(wx.ID_PASTE,"1234",bmp)
        self.toolbar.Realize()
        self.Bind(wx.EVT_IDLE, self.update_ui)
        self.Bind(wx.EVT_UPDATE_UI, self.update_ui, id=wx.ID_PASTE)
        wx.UpdateUIEvent.SetUpdateInterval(75)
        self.UpdateWindowUI()
    def update_ui(self, event):
        if event.GetId() == wx.ID_PASTE:  # check this less frequently, possibly expensive
            self.count += 1
            if self.count < 5:
                return
            if not wx.TheClipboard.IsOpened():
                self.count = 0
                wx.TheClipboard.Open()
                success = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP))
                success2 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_TEXT))
                success3 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_ENHMETAFILE))
                success4 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_FILENAME))
                success5 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_LOCALE))
                print("success"+str(success))
                print("success2"+str(success2))
                print("success3" + str(success3))
                print("success4" + str(success4))
                print("success5" + str(success5))
                if success2:
                    text_obj = wx.TextDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(text_obj):
                            value = text_obj.GetText()
                        wx.TheClipboard.Close()
                    self.text.SetValue(value)
                elif success4:
                    file_obj = wx.FileDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(file_obj):
                            value = file_obj.GetFilenames()
                            print(value[0])
                        wx.TheClipboard.Close()
                    self.text.SetValue(value[0])
                else:
                    event.Enable(False)
                    self.text.SetValue("You can't paste. :(")
app = wx.App(False)
f = MyFrame(None)
f.Show()
app.MainLoop()

奇葩需求之-不使用python的第三方库做一个简单的网页

需求

最近接了一个特别奇葩的需求,要求在最基础的python2.7的docker环境上运行一个小型网站,不能使用额外的第三方库。因为之前都用flask,忽然不用库,有一点懵逼。然后找了一圈,发现用python自带的httpserver似乎可以完成这样的任务。代码如下:

代码

index.html

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Insert title here</title>
</head>
<body>
	<form action="/signin" method="post">
		phone:<input type="text" name="phone"><br>
		vscode:<input type="password" name="vscode"><br>
		<input type="submit" value="login">
	</form>
</body>
</html>

test.py

# -*- coding:utf-8 -*-
# author: lichmama
# email: nextgodhand@163.com
# filename: httpd.py
import io
import os
import sys
import urllib
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import urllib2
import json
class MyRequestHandler(SimpleHTTPRequestHandler):
    protocol_version = "HTTP/1.1"
    server_version = "PSHS/0.1"
    sys_version = "Python/2.7.x"
    def get_evns(self):
        evns = os.environ
        return ' \\n '.join([k+"="+v for k,v in evns.items()])
    def do_GET(self):
        if self.path == "/" or self.path == "/index":
            content = open("index.html", "rb").read()
            self.send_head(content)
        elif self.path == "/evns":
            content = self.get_evns()
            self.send_head(content)
        else:
            path = self.translate_path(self.path)
            system=sys.platform
            osv=os.environ
            if osv.has_key('HOMEPATH'):
                content = os.environ['HOMEPATH']
            else:
                content = os.environ['HOME']
            self.send_head(content)
            if os.path.exists(path):
                extn = os.path.splitext(path)[1].lower()
                content = open(path, "rb").read()
                self.send_head(content, type=self.extensions_map[extn])
            else:
                content = open("404.html", "rb").read()
                self.send_head(content, code=404)
        self.send_content(content)
    def do_POST(self):
        if self.path == "/signin":
            data = self.rfile.read(int(self.headers["content-length"]))
            data = urllib.unquote(data)
            data = self.parse_data(data)
            test_data = {"username": "zhangyong_new","userpasswd": "123456"}
            requrl = "https://dev.honcloud.honeywell.com.cn/dashboard/usercentre/login"
            header = {"Content-type": "application/json"}
            test_data_urlencode=json.dumps(test_data)
            req = urllib2.Request(url=requrl, data=test_data_urlencode,headers=header)
            res_data = urllib2.urlopen(req)
            res = res_data.read()
            print res
            self.send_head(res)
            self.send_content(res)
    def parse_data(self, data):
        ranges = {}
        for item in data.split("&"):
            k, v = item.split("=")
            ranges[k] = v
        return ranges
    def send_head(self, content, code=200, type="text/html"):
        self.send_response(code)
        self.send_header("Content-Type", type)
        self.send_header("Content-Length", str(len(content)))
        self.end_headers()
    def send_content(self, content):
        f = io.BytesIO()
        f.write(content)
        f.seek(0)
        self.copyfile(f, self.wfile)
        f.close() if __name__ == "__main__":
    if len(sys.argv) == 2:
        # set the target where to mkdir, and default "D:/web"
        MyRequestHandler.target = sys.argv[1]
    try:
        server = HTTPServer(("0.0.0.0", 8282), MyRequestHandler)
        print "pythonic-simple-http-server started, serving at http://%s:%d"%(server.server_address[0],server.server_address[1])
        print server.server_address
        server.serve_forever()
    except KeyboardInterrupt:
        server.socket.close()

通过Queue解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。
刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()

jira-api使用gevent,快速批量修改数据

需求

因为我们本地数据库中经常会产生大量的数据,这些数据需要同步至jira.覆盖掉jira上的老数据。
因为jira服务器部署在国外,连接巨慢。每次修改一个,至少需要花费10秒左右的时间。
而jira-api没有提供批量或者异步同步的方法,所以需要自己使用一个异步的方式来实现一次性把所有的请求发送出去。
jira服务端有时候吃不消,会关闭掉我的一些连接,所以还需要在被关闭之后,记录下这些同步至jira失败的,重新继续修改操作

用到的库

gevent  库

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API。

jira库。
里面有jira登录,修改,新增issue的方法

代码实现

# -*- coding: UTF-8 -*-
import datetime
import random
import gevent
from gevent import monkey
from jira import JIRA
from model import addDefectDAO
monkey.patch_all()
issue_list = []
def update(issue_key):
    # 产生一点小小的间隔,避免一瞬间发出去100多个请求
    gevent.sleep(random.randint(1, 5) * 0.001)
    global issue_list
    starttime = datetime.datetime.now()
    print(starttime)
    print("start:" + issue_key)
    print("start:" + issue_key)
    issue = jira.issue(issue_key)
    issue_dict = {
        'summary': issue_key
    }
    try:
        issue.update(fields=issue_dict)
        issue_list.remove(issue_key)
    except Exception:
        pass
    print("end:" + issue_key)
# 从一个本地数据库取出一堆issue的信息,602是本地的project信息编号
all = addDefectDAO.defect_select_not_draft_by_project_id(602)
for item in all:
    issue_list.append(item["issue_key"])
url = "https://www.jira.com"
username = "username"
password = "password"
jira = JIRA(server=url, basic_auth=(username, password), max_retries=0, timeout=400, async_=True, async_workers=5)
# jira2 = Jira(url=url, username=username, password=password)
print("登录成功")
starttime = datetime.datetime.now()
api_list = []
while issue_list != []:
    for i in issue_list:
        api_list.append(gevent.spawn(update, i))
    gevent.joinall(api_list)
endtime = datetime.datetime.now()
print('总耗时')
print((endtime - starttime).seconds)
print('*******')

注意事项

1.需要引用monkey.patch_all()
2.引用monkey.patch_all()之后会有错误信息:

UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms

这个是已知bug,在11月29日有人在github上已经提过了,作者会在之后的版本修改。目前不影响使用。请忽略
3.通过这样的方法修改之后,经过我的测试,速度大概快了5倍以上

通过python-docx给word文档中的指定位置添加表格

需求

1.读取一个已有的word文档。docx格式。
2.在该word文档中,通过一个给定的文字。找到该位置。在该位置的下方添加一个表格。例如在图中“BUG情况表”的下方插入一个表格

3.表格内容如下。要求添加完该表格后,如果表格内容发生变更。还能再次通过该程序,修改表格里的数据。

设计

通过python-docx读取word文档。通过document.paragraphs定位指定文字的位置。
通过xlwings读取excel的内容,存成list[list[]]。
通过docx的add_table增加一个表格,并且更改表头颜色,合并表格等操作
通过识别表头的第一行,判断是否是已经存在这个表格,来决定是否要删除原表格

代码

# -*- coding: UTF-8 -*-
import sys
from copy import deepcopy
import xlwings
from docx import Document
from docx.oxml.ns import nsdecls
from docx.oxml import parse_xml
def copy_table_after(table, paragraph):
    tbl, p = table._tbl, paragraph._p
    new_tbl = deepcopy(tbl)
    p.addnext(new_tbl)
def move_table_after(table, paragraph):
    tbl, p = table._tbl, paragraph._p
    p.addnext(tbl)
def get_excel_date(filename):
    '''
    获得excel里的所有内容,返回list
    :param filename:  excel路径
    :return: list[list[]]
    '''
    app = xlwings.App(visible=False, add_book=True)
    app.display_alerts = False
    app.screen_updating = False
    wb = app.books.open(filename)
    sht = wb.sheets[0]
    rng = sht.range('A1')
    # 把excel里的数据读取成 年-月-日 时:分:秒的格式
    my_date_handler = lambda year, month, day, hour, minute, second, **kwargs: "%04i-%02i-%02i %02i:%02i:%02i" % (
        year, month, day, hour, minute, second)
    # 取出所有内容,这里用ig这个变量,是为了庆祝I.G获得LOL S8赛季总冠军
    ig = rng.current_region.options(index=False, numbers=int, empty='N/A', dates=my_date_handler)
    result = ig.value
    wb.close()
    app.quit()
    return result
def delete_table_with_title(document,expect_text):
    allTables = document.tables
    for activeTable in allTables:
        if activeTable.cell(0, 0).paragraphs[0].text == expect_text:
            print('删除成功')
            activeTable._element.getparent().remove(activeTable._element)
def insert_table_after_text(file_name,excel_name,expect_text):
    document = Document(file_name)
    # 因为docx读出来的都是unicode类型的,所以我们要用unicode类型的进行查找
    expect_text=expect_text.decode('utf-8')
    delete_table_with_title(document,expect_text)
    target = None
    for paragraph in document.paragraphs:
        paragraph_text = paragraph.text
        if paragraph_text.endswith(expect_text):
            target = paragraph
            break
    if target is not None:
        records = get_excel_date(excel_name)
        # 获得excel数据的栏数,初始化一个空的table
        col = len(records[0])
        table = document.add_table(rows=1, cols=col)
        table.style = 'Table Grid'
        # 给table加一个表头,并且合并第一栏
        shading_elm_1 = parse_xml(r'<w:shd {} w:fill="D9E2F3"/>'.format(nsdecls('w')))
        table.rows[0].cells[0]._tc.get_or_add_tcPr().append(shading_elm_1)
        table.rows[0].cells[0].text=expect_text
        table_row=table.rows[0]
        first=table_row.cells[0]
        end=table_row.cells[-1]
        first.merge(end)
        # 合并结束,开始把excel里的内容添加到table里
        for tr_list in records:
            row_cells = table.add_row().cells
            index = 0
            for td_list in tr_list:
                row_cells[index].text = td_list
                index = index + 1
        # 把添加的table移动到指定的位置
        move_table_after(table, target)
        # 保存
    document.save(file_name)
if __name__ == '__main__':
    insert_table_after_text('demo2.docx', 'demo.xlsx',"BUG情况表")

最终效果


苏ICP备18047533号-1