作者: 李镇伟

Eclipse下PlantUML 的正确安装以及使用

最近做项目管理,总是需要画图。发现Eclipse下安装PlantUML最方便。但是网上的教程大多数都不完整,各种缺斤少两,我决定自己来一次

1.通过eclipse install new software 下载plantUML插件.下载地址

http://hallvard.github.io/plantuml/ - http://hallvard.github.io/plantuml/

截图:

2.下载安装Graphviz

windows版本下载地址:http://www.graphviz.org/Download_windows.php

下载上图红框所示文件完成后,双击msi文件,然后一直next(记住安装路径,后面配置环境变量会用到路径信息),安装完成之后,会在windows开始菜单创建快捷信息,默认快捷方式不放在桌面,如图所示。
配置Graphviz的环境变量
将graphviz安装目录下的bin文件夹添加到Path环境变量中,比如D:\Program Files (x86)\Graphviz2.38\bin(注意Path路径下最后有没有“;”,没有的话应该添加;D:\Program Files (x86)\Graphviz2.38\bin)

3.Eclipse下的PlantUML需要指定Graphviz的路径

在 window–preferences–PlantUML–指定Graphviz安装目录bin文件夹下的dot.exe

4.如何在eclipse里使用

1.打开windows -> show view -> other -> PlantUML

会在界面下方生成一个view.
2.在代码里。或者new一个任意的txt文件。输入测试代码

@startuml
Alice -> Bob: Authentication Request
Bob --> Alice: Authentication Response
Alice -> Bob: Another authentication Request
Alice <-- Bob: another authentication Response
@enduml

检查界面下方的view

python常用的几种文件操作,复制,移动,打包

需求

经常会有用到python的文件复制,移动,打包的功能, 我这边用的方法是shuil库来进行这些操作。偶尔穿插了一些图片大小重置pil库和汉字转拼音的方法pypinyin库

代码

# -*- coding: UTF-8 -*-
import os
import shutil
import traceback
import pypinyin
from PIL import Image
from globalLog import ta_log
def copy_file(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        # 一般来说,是因为复制的不是文件所导致的,不影响
        ta_log.info("%s not file!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, dstfile)  # 移动文件
        ta_log.info("copy %s -> %s" % (srcfile, dstfile))
def movefile(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        ta_log.error("%s not exist!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.move(srcfile, dstfile)  # 移动文件
        ta_log.info("move %s -> %s" % (srcfile, dstfile))
def copy_file_to_capture(srcfile):
    '''
    复制到capture目录,并且把汉字改成拼音
    :param srcfile:
    :return:
    '''
    fpath, fname = os.path.split(srcfile)
    filename_pre_ = pypinyin.slug(fname)
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/capture/" + filename_pre_)
    copy_file(srcfile, dstfile)
def resize_mage(filein):
    img = Image.open(filein)
    out = img.resize((129, 149), Image.ANTIALIAS)  # resize image with high-quality
    out.save(filein, 'png')
# 只能用于删除本地文件
def delete_many_file(file_list):
    for path in file_list:
        if os.path.exists(path):
            # 删除文件,可使用以下两种方法。
            try:
                os.remove(path)
            except Exception:
                ta_log.error(traceback.format_exc())
            # os.unlink(my_file)
def copy_and_zip(file_list, dst_folder_name):
    '''
    批量复制文件到指定文件夹,然后把指定文件夹的内容压缩成ZIP并且删掉该文件夹
    :param file_list: 文件或文件夹
    :param dst_folder_name: 目标压缩文件的名称
    :return:
    '''
    for item in file_list:
        copy_files_to_attachment(item, dst_folder_name)
    source = os.getcwd() + "/filepath/attachement/" + dst_folder_name
    shutil.make_archive(source, "zip", source)
    shutil.rmtree(source)
def copy_files_to_attachment(srcfile, filename):
    '''
    把文件或文件夹复制到指定目录中
    :param srcfile: 文件或者文件夹的绝对路径
    :param filename: 指定目录
    :return:
    '''
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/")
    folder_name = dstfile + "\\" + filename + "\\"
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))
def copy_file_to_folder(srcfile, folder_name):
    '''
       把文件或文件夹复制到指定目录中
       :param srcfile: 文件或者文件夹的绝对路径
       :param filename: 指定目录
       :return:
       '''
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))

python生成bat脚本,并且执行bat脚本

需求

关键词:python,sqllite,bat,进程
最近有一个需求,需要用python读取一个txt文件里的sql语句,用sqllite去执行,执行之前先备份一次sqllite数据库到临时目录。然后sql执行成功则删除临时文件,sql执行失败则使用备份的sqllite去覆盖掉当前错误的sqllite数据库。

关键点

  • 生成临时目录。使用python的tempfile库
  • 复制文件,使用python的shutil库
  • 从文本中读取sql,使用open方法:
  • f = open(file_name, 'r')
    sql = f.read()
  • 生成bat文件,也使用open方法。模式选择w
  • bat命令中的暂停使用@ping -n 5 127.1 >nul 2>nul
  • bat命令中的删除使用“del 路径”
  • bat命令中的复制使用“copy 源文件 目标文件”
  • bat命令执行完后,等待用户手动关闭使用pause,自动关闭使用exit

部分代码

# -*- coding: UTF-8 -*-
# 命名方式为表名_操作_字段
import os
import tempfile
import connectDB
from controller import fileController
curosrdiction = connectDB.curosr_diction
database_back_folder = ""
def exec_sql(sqltext):
    '''
    执行sql,成功返回true
    :param sqltext: sql语句
    :return:
    '''
    result = curosrdiction.executescript(sqltext)
    curosrdiction.commit()
    return result
def backup_database():
    '''
    备份数据库
    :return:
    '''
    global database_back_folder
    database_back_folder = tempfile.mkdtemp()
    fileController.copy_file_to_folder('resource/sqllite.db', database_back_folder)
    write_bat()
def restore_database():
    '''
    恢复数据库
    :return:
    '''
    curosrdiction.close()
    fileController.copy_file(srcfile=database_back_folder + "\\sqllite.db", dstfile='resource/sqllite.db')
def read_sql_from_text(file_name):
    '''
    从文本文件中读取sql并且执行,执行失败,就把原来的数据库覆盖回来
    :param file_name:
    :return:
    '''
    f = open(file_name, 'r')
    sql = f.read()
    if not exec_sql(sql):
        run_bat()
def write_bat():
    sql_database_path = os.getcwd() + "\\resource\\sqllite.db"
    sql_database_path_shm = os.getcwd() + "\\resource\\sqllite.db-shm"
    sql_database_path_wal = os.getcwd() + "\\resource\\sqllite.db-wal"
    bat_name = 'copy.bat'
    s1 = '''@echo off
@ping -n 5 127.1 >nul 2>nul
echo delete database...
del ''' + sql_database_path + '''
del ''' + sql_database_path_shm + '''
del ''' + sql_database_path_wal + '''
echo restore database...
@ping -n 5 127.1 >nul 2>nul
copy ''' + database_back_folder + "\\sqllite.db " + sql_database_path + '''
echo restore finish
pause'''
    f = open(bat_name, 'w')
    f.write(s1)
    f.close()
def run_bat():
    '''
    运行bat
    :return:
    '''
    os.system('start copy.bat')
def update_sql():
    '''
    开始更新数据库的主入口
    :return:
    '''
    backup_database()
    read_sql_from_text("resource/download/1.2.3_sql.txt")

使用jira的api获得project的board参数和sprint参数

需求

每个jira项目都有sprint参数和board参数。关系为一对多的关系。
project:board=1:n
board:sprint=1:n
如果要查询出一个项目有哪些正在进行的sprint,还需要费一番功夫。因为目前jira -api的python库里并没有给出方法,不过我们可以通过下面的方法获得:

通过get请求,根据项目的key或者ID获得board信息,地址和参数如下

url+"/rest/agile/1.0/board?projectKeyOrId=" + projectKeyOrId

通过get请求,根据board的ID查询到sprint的信息,根据state进行筛选

url+"/rest/agile/1.0/board/" + str(item['id']) + "/sprint?state=future,active"

代码

# -- coding: UTF-8 --
import requests
from jira import JIRA
url = 'https://jira.atlassian.com'
jira = JIRA(server=url, basic_auth=('username', 'password'))
cookies = jira._session.cookies
projectKeyOrId = "project_key"
board_url = url+"/rest/agile/1.0/board?projectKeyOrId=" + projectKeyOrId
response = requests.get(url, cookies=cookies,
                        headers={"Accept": "application/json"})
# print(json.dumps(json.loads(response.text), sort_keys=True, indent=4, separators=(",", ": ")))
qq = response.json()
sprint_result_list = []
for item in qq['values']:
    sprint_url = url+"/rest/agile/1.0/board/" + str(item['id']) + "/sprint?state=future,active"
    response = requests.get(url, cookies=cookies,
                            headers={"Accept": "application/json"})
    sprint_json = response.json()
    if 'values' in sprint_json:
        sprint_list = sprint_json['values']
        for sprint in sprint_list:
            element = {'sprint_id': sprint['id'], 'sprint_name': sprint['name']}
            if element not in sprint_result_list:
                sprint_result_list.append(element)
print(sprint_result_list)

VUE+FLASK前后端分离(回答面试题之“你来说说restful前后端代码怎么写”)

动机

前段时间出去面试,遇到了好几次对方面试官问这样的问题,”restful风格的代码,前后端怎么写?”,“从浏览器前端到后台经过了哪些?说的越详细越好。”这样的问题,我听到的第一时刻是懵逼的,啥情况?我要从vue的双向数据绑定开始说吗?axios的用法要说吗?falsk的restful是如何用‘’/api/‘’对应前台的url的吗?还是去说spring框架的mvc? 产生这样的疑惑,主要原因是,我不明白面试官为什么要问这样的问题?实现起来很简单,但是说起来又太宽泛,不知道说的是不是面试官想要的答案,容易偏题。 在我回头仔细想想了之后,决定以后再遇到这样的问题,就使用vue+falsk做例子来讲解这个。

回答策略

按照下面的几步,顺序回答

  1. 以vue+flask 前后端分离为基础,以用户登录,输入用户名密码为场景。
  2. vue前端框架通过v-model获得输入框输入的用户名以及密码。通过引入axios向后台发起http请求,axios是一个http库,可以在nodejs中使用,使用方式有一点类似ajax。通过axios.post(“/api”,{param:”param”})的方式向后台发起http请求。
  3. 后台的flask运行起来之后,通过装饰圈route.配置路由@app.route(‘/api’,methods=[“GET”,”POST”]) 来对应前台http请求的url,如果没有对应的url会返回404。如果找到对应的路由,则会进入相应的方法,进行运算,完成运算之后,可以用json.dumps把数据作为json返回。
  4. axios前台的response收到后,通过response.data获得返回的json,然后可以把相应的值进行变更

代码实现

  • 前端vue关键代码之 index.html
<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width,initial-scale=1.0">
  <title>vueapp01</title>
  <script src="https://cdn.staticfile.org/vue/2.4.2/vue.min.js"></script>
</head>
<style>
  .class1{
    background: #444;
    color: #eee;
  }
</style>
<body>
  <div id="app3"></div>
</body>
</html>
  • 前端vue关键代码之main.js
// The Vue build version to load with the `import` command
// (runtime-only or standalone) has been set in webpack.base.conf with an alias.
import Vue from 'vue'
import App from './App'
import Lzw from './components/Lzw'
import router from './router'
import axios from 'axios'
import BootstrapVue from 'bootstrap-vue'
import 'bootstrap/dist/css/bootstrap.css'
import 'bootstrap-vue/dist/bootstrap-vue.css'
Vue.use(BootstrapVue)
Vue.config.productionTip = false
/* eslint-disable no-new */
new Vue({
  el: '#app3',
  router,
  axios,
  components: { Lzw },
  template: '<Lzw/>'
})
  • 前端vue关键代码之Lzw.vue

使用axios需要先安装axios库和HTTP2库

npm install –save axios
npm install –save http2

<template>
  <div class="hello">
    <h1>{{ msg }}</h1>
    <div id="example-3">
      <input type="checkbox" id="jack" value="白金会员" v-model="checkedNames">
      <label for="jack">白金会员</label>
      <input type="checkbox" id="john" value="黄金会员" v-model="checkedNames">
      <label for="john">黄金会员</label>
      <input type="checkbox" id="mike" value="王者会员" v-model="checkedNames">
      <label for="mike">王者会员</label>
      <br>
      <strong>选择会员种类: {{ checkedNames }}</strong>
    </div>
    <div id="app2">
      <input v-model="username" placeholder="用户名">
      <p>用户名是: {{ username }}</p>
      <input type="password" v-model="password" placeholder="密码">
      <p>密码是: {{ password }}</p>
      <button class="btn btn-large btn-primary" v-on:click="login">向后台发送post请求,传递用户名和密码,变更用户ID</button>
      <p>用户ID是: {{ id }}</p>
      <button class="btn btn-large btn-primary" v-on:click="getmsg">向后台发送get请求,把用户ID变成0</button>
    </div>
  </div>
</template>
<script>
import axios from "axios";
export default {
  name: "hello",
  data() {
    return {
      msg: "欢迎来到测试开发笔记!",
      checkedNames: [],
      username: "",
      password: "",
      id: "密码反转+用户名反转"
    };
  },
  methods: {
    login() {
      var that = this;
      // 对应 Python 提供的接口,这里的地址填写下面服务器运行的地址,本地则为127.0.0.1,外网则为 your_ip_address
      const path = "http://127.0.0.1:5000/getMsg";
      axios
        .post(path, { username: this.username, password: this.password })
        .then(response => {
          this.id = response.data.userid;
        });
    },
    getmsg() {
      var that = this;
      // 对应 Python 提供的接口,这里的地址填写下面服务器运行的地址,本地则为127.0.0.1,外网则为 your_ip_address
      const path = "http://127.0.0.1:5000/getMsg";
      // 务必使用箭头函数的方法,这样this.id能直接对上,不然会报错提示id没找到
      axios
        .get(path, { username: this.username, password: this.password })
        .then(response => {
          this.id = response.data.userid;
        });
    }
  }
};
</script>
  • 后端flask关键代码main.py

flask要避免跨域问题。需要安装Flask库和Falsk-Cors库

from flask import Flask, url_for,request
from flask_cors import *
import json
app = Flask(__name__)
# 这句话解决跨域问题
CORS(app, supports_credentials=True)
@app.route('/getMsg',methods=["GET","POST"])
def getMsg():
    if request.method == 'POST':
        username = request.json['username']
        password= request.json['password']
        # 假定用户id是密码反转+用户名反转得出来的
        datat = {
            "userid": username[::-1]+password[::-1],
        }
        return json.dumps(datat)
    elif request.method == 'GET':
        datat = {
            "userid": 0,
        }
        return json.dumps(datat)
if __name__ == '__main__':
    app.debug = True
    app.run()

wxpython 从剪贴板读取文件,读取文字,读取图像

需求

前段时间有这样一个需求,要读取用户的剪贴板的内容,然后把剪贴板的信息复制到另一个地方。例如:

  • 当用户复制的是图片时,把图片复制到一个指定位置。
  • 当用户复制的是txt中的一段文字时,获得复制的文字内容。
  • 当用户复制的是一个文件时,获得复制的文件名和路径,然后复制到一个指定位置。

设计

1.通过wx自带的检查剪贴板功能。
wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP)) 这样的方式,判断是不是图片。
文字和文件的方法类似,可以在下面的代码里看到。
2.通过wx.TheClipboard.GetData(file_obj)的方法获得剪贴板的内容
3.如果是图片,通过wx.BitmapDataObject()的GetBitmap()方法获得图片信息。再通过SaveFile(name=filename, type=wx.BITMAP_TYPE_BMP)方法保存图片
4.如果是文字。通过wx.TextDataObject()的GetText()方法获得文字内容。
5.如果是文件。通过wx.FileDataObject()的GetFilenames()获得复制的文件列表。然后可以通过shutil库的copy2方法复制文件到指定位置

wxpython窗体的部分代码

#!/usr/bin/env python
import wx
class MyFrame(wx.Frame):
    def __init__(self, parent):
        wx.Frame.__init__(self, parent, title="Paste Button Demo")
        self.text = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.HSCROLL)
        self.count = 4  # gets incremented
        menu = wx.MenuBar()
        edit = wx.Menu()
        paste = edit.Append(wx.ID_PASTE, "&Paste", "Paste from the clipboard")
        menu.Append(edit, "&Edit")
        self.SetMenuBar(menu)
        self.toolbar = self.CreateToolBar()
        bmp = wx.ArtProvider.GetBitmap(wx.ART_PASTE, wx.ART_TOOLBAR)
        self.toolbar.AddTool(wx.ID_PASTE,"1234",bmp)
        self.toolbar.Realize()
        self.Bind(wx.EVT_IDLE, self.update_ui)
        self.Bind(wx.EVT_UPDATE_UI, self.update_ui, id=wx.ID_PASTE)
        wx.UpdateUIEvent.SetUpdateInterval(75)
        self.UpdateWindowUI()
    def update_ui(self, event):
        if event.GetId() == wx.ID_PASTE:  # check this less frequently, possibly expensive
            self.count += 1
            if self.count < 5:
                return
            if not wx.TheClipboard.IsOpened():
                self.count = 0
                wx.TheClipboard.Open()
                success = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP))
                success2 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_TEXT))
                success3 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_ENHMETAFILE))
                success4 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_FILENAME))
                success5 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_LOCALE))
                print("success"+str(success))
                print("success2"+str(success2))
                print("success3" + str(success3))
                print("success4" + str(success4))
                print("success5" + str(success5))
                if success2:
                    text_obj = wx.TextDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(text_obj):
                            value = text_obj.GetText()
                        wx.TheClipboard.Close()
                    self.text.SetValue(value)
                elif success4:
                    file_obj = wx.FileDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(file_obj):
                            value = file_obj.GetFilenames()
                            print(value[0])
                        wx.TheClipboard.Close()
                    self.text.SetValue(value[0])
                else:
                    event.Enable(False)
                    self.text.SetValue("You can't paste. :(")
app = wx.App(False)
f = MyFrame(None)
f.Show()
app.MainLoop()

奇葩需求之-不使用python的第三方库做一个简单的网页

需求

最近接了一个特别奇葩的需求,要求在最基础的python2.7的docker环境上运行一个小型网站,不能使用额外的第三方库。因为之前都用flask,忽然不用库,有一点懵逼。然后找了一圈,发现用python自带的httpserver似乎可以完成这样的任务。代码如下:

代码

index.html

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Insert title here</title>
</head>
<body>
	<form action="/signin" method="post">
		phone:<input type="text" name="phone"><br>
		vscode:<input type="password" name="vscode"><br>
		<input type="submit" value="login">
	</form>
</body>
</html>

test.py

# -*- coding:utf-8 -*-
# author: lichmama
# email: nextgodhand@163.com
# filename: httpd.py
import io
import os
import sys
import urllib
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import urllib2
import json
class MyRequestHandler(SimpleHTTPRequestHandler):
    protocol_version = "HTTP/1.1"
    server_version = "PSHS/0.1"
    sys_version = "Python/2.7.x"
    def get_evns(self):
        evns = os.environ
        return ' \\n '.join([k+"="+v for k,v in evns.items()])
    def do_GET(self):
        if self.path == "/" or self.path == "/index":
            content = open("index.html", "rb").read()
            self.send_head(content)
        elif self.path == "/evns":
            content = self.get_evns()
            self.send_head(content)
        else:
            path = self.translate_path(self.path)
            system=sys.platform
            osv=os.environ
            if osv.has_key('HOMEPATH'):
                content = os.environ['HOMEPATH']
            else:
                content = os.environ['HOME']
            self.send_head(content)
            if os.path.exists(path):
                extn = os.path.splitext(path)[1].lower()
                content = open(path, "rb").read()
                self.send_head(content, type=self.extensions_map[extn])
            else:
                content = open("404.html", "rb").read()
                self.send_head(content, code=404)
        self.send_content(content)
    def do_POST(self):
        if self.path == "/signin":
            data = self.rfile.read(int(self.headers["content-length"]))
            data = urllib.unquote(data)
            data = self.parse_data(data)
            test_data = {"username": "zhangyong_new","userpasswd": "123456"}
            requrl = "https://dev.honcloud.honeywell.com.cn/dashboard/usercentre/login"
            header = {"Content-type": "application/json"}
            test_data_urlencode=json.dumps(test_data)
            req = urllib2.Request(url=requrl, data=test_data_urlencode,headers=header)
            res_data = urllib2.urlopen(req)
            res = res_data.read()
            print res
            self.send_head(res)
            self.send_content(res)
    def parse_data(self, data):
        ranges = {}
        for item in data.split("&"):
            k, v = item.split("=")
            ranges[k] = v
        return ranges
    def send_head(self, content, code=200, type="text/html"):
        self.send_response(code)
        self.send_header("Content-Type", type)
        self.send_header("Content-Length", str(len(content)))
        self.end_headers()
    def send_content(self, content):
        f = io.BytesIO()
        f.write(content)
        f.seek(0)
        self.copyfile(f, self.wfile)
        f.close() if __name__ == "__main__":
    if len(sys.argv) == 2:
        # set the target where to mkdir, and default "D:/web"
        MyRequestHandler.target = sys.argv[1]
    try:
        server = HTTPServer(("0.0.0.0", 8282), MyRequestHandler)
        print "pythonic-simple-http-server started, serving at http://%s:%d"%(server.server_address[0],server.server_address[1])
        print server.server_address
        server.serve_forever()
    except KeyboardInterrupt:
        server.socket.close()

java测试pulsar实例

需求

最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

pulsar-server

使用docker搭建是最方便的。
输入如下命令就可以啦

docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone

它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:

80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service

我这边映射到了28000,28080,26650三个端口。

pulsar-client测试之代码结构


如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者
MessageConsumerAll是订阅所有主题(消费者
MessageProducer是发布指定主题(生产者

pulsar-client测试之Client.java

配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址

package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
    private PulsarClient client;
    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://0.0.0.0:26650/")
                .build();
    }
    public void Close() throws PulsarClientException {
    	client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

pulsar-client测试之MessageConsumer.java

单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageConsumerAll.java

下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageProducer.java

下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
    private Client client;
    private Producer<byte[]> producer;
    public MessageProducer(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });
    }
    public void sendOnce(String message) {
    	/**
    	 * 发送一次就关闭
    	 */
    	try {
			producer.send(message.getBytes());
			System.out.printf("Message with content %s successfully sent", message);
			producer.close();
			client.Close();
		} catch (PulsarClientException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
    // todo add exceptionally().
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }
    public static void main(String[] args) throws PulsarClientException {
        MessageProducer producer = new MessageProducer("topic1");
//        producer.sendMessage("Hello World ,lalla");
        producer.sendOnce("Hello World ,lizhenwei");
    }
}

运行效果

生产者console log:

Message with content Hello World ,lizhenwei successfully sent

消费者console log

Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei

java使用nats-client进行pub/sub发送json的实例

需求介绍:

NATS是一个开源且高性能的消息系统,它常常被认为是”一个为云服务的中央神经系统”.它每秒钟可以传送百万条消息,所以非常适合用来连接微服务和IOT设备。
NATS是一个发布订阅方式的消息系统。在这类系统中,一个或多个消息发布者将特定的主题发送给一个消息中介者,然后消息中介者再将消息分发给任意客户端(或者这个主题的订阅者)。消息发布者不知道也不关心消息订阅者是谁,反之依然。由于我们可以在不影响系统其它部分的情况下增加新的消息发布者和订阅者,这样的架构使得系统的伸缩性变得很好并且可以比较容易地增加系统的容量。这种类型的系统非常适合用来监测服务器和终端设备;终端设备可以发送消息,我们可以订阅这些消息,然后通过邮件或者其他的方式发送消息通知。

下面我们会部署一个nats-server在windows上,然后使用java创建两个nats客户端,一个发送pub请求,一个发送sub请求,检查sub的这个客户端上能否收到信息。

服务端部署

1.进入网站https://www.nats.io/download/nats-io/gnatsd/
2.下载windows版本,也可以用docker-image版,都很方便。
3.windows版本,下载之后,解压,双击gnatsd.exe即可运行
4.获得服务器地址:nats://localhost:4222

nats代码

1.先maven安装相关依赖,主要是nats的和json的,因为我们要pub一个json过去

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

 

nats客户端(sub订阅)

package nats.lzwtest;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
public class SubscribeAsync {
    public static void main(String[] args) {
        try {
            // [begin subscribe_async]
        	Connection nc = Nats.connect("nats://localhost:4222");
            // Use a latch to wait for a message to arrive
            CountDownLatch latch = new CountDownLatch(10);
            // Create a dispatcher and inline message handler
            Dispatcher d = nc.createDispatcher((msg) -> {
                String str = new String(msg.getData(), StandardCharsets.UTF_8);
                System.out.println(str);
                latch.countDown();
            });
            // Subscribe
            d.subscribe("lizhenwei");
            // Wait for a message to come in
            latch.await();
            // Close the connection
            nc.close();
            // [end subscribe_async]
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

nats客户端(pub发布)

package nats.lzwtest;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nats.client.Connection;
import io.nats.client.Nats;
// [begin publish_json]
class StockForJsonPub {
    public String symbol;
    public float price;
}
public class PublishJSON {
    public static void main(String[] args) {
        try {
        	Connection nc = Nats.connect("nats://localhost:4222");
            // Create the data object
            StockForJsonPub stk = new StockForJsonPub();
            stk.symbol="GOOG";
            stk.price=1200;
            // use Gson to encode the object to JSON
            GsonBuilder builder = new GsonBuilder();
            Gson gson = builder.create();
            String json = gson.toJson(stk);
            // Publish the message
            nc.publish("lizhenwei", json.getBytes(StandardCharsets.UTF_8));
            // Make sure the message goes through before we close
            nc.flush(Duration.ZERO);
            nc.close();
            System.out.println("pub success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// [end publish_json]

运行结果

sub端收到如下:

{"symbol":"GOOG","price":1200.0}

 

通过Queue解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。
刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()

苏ICP备18047533号-1