月度归档: 2018 年 12 月

java测试pulsar实例

需求

最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

pulsar-server

使用docker搭建是最方便的。
输入如下命令就可以啦

docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone

它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:

80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service

我这边映射到了28000,28080,26650三个端口。

pulsar-client测试之代码结构


如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者
MessageConsumerAll是订阅所有主题(消费者
MessageProducer是发布指定主题(生产者

pulsar-client测试之Client.java

配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址

package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
    private PulsarClient client;
    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://0.0.0.0:26650/")
                .build();
    }
    public void Close() throws PulsarClientException {
    	client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

pulsar-client测试之MessageConsumer.java

单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageConsumerAll.java

下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageProducer.java

下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
    private Client client;
    private Producer<byte[]> producer;
    public MessageProducer(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });
    }
    public void sendOnce(String message) {
    	/**
    	 * 发送一次就关闭
    	 */
    	try {
			producer.send(message.getBytes());
			System.out.printf("Message with content %s successfully sent", message);
			producer.close();
			client.Close();
		} catch (PulsarClientException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
    // todo add exceptionally().
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }
    public static void main(String[] args) throws PulsarClientException {
        MessageProducer producer = new MessageProducer("topic1");
//        producer.sendMessage("Hello World ,lalla");
        producer.sendOnce("Hello World ,lizhenwei");
    }
}

运行效果

生产者console log:

Message with content Hello World ,lizhenwei successfully sent

消费者console log

Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei

java使用nats-client进行pub/sub发送json的实例

需求介绍:

NATS是一个开源且高性能的消息系统,它常常被认为是”一个为云服务的中央神经系统”.它每秒钟可以传送百万条消息,所以非常适合用来连接微服务和IOT设备。
NATS是一个发布订阅方式的消息系统。在这类系统中,一个或多个消息发布者将特定的主题发送给一个消息中介者,然后消息中介者再将消息分发给任意客户端(或者这个主题的订阅者)。消息发布者不知道也不关心消息订阅者是谁,反之依然。由于我们可以在不影响系统其它部分的情况下增加新的消息发布者和订阅者,这样的架构使得系统的伸缩性变得很好并且可以比较容易地增加系统的容量。这种类型的系统非常适合用来监测服务器和终端设备;终端设备可以发送消息,我们可以订阅这些消息,然后通过邮件或者其他的方式发送消息通知。

下面我们会部署一个nats-server在windows上,然后使用java创建两个nats客户端,一个发送pub请求,一个发送sub请求,检查sub的这个客户端上能否收到信息。

服务端部署

1.进入网站https://www.nats.io/download/nats-io/gnatsd/
2.下载windows版本,也可以用docker-image版,都很方便。
3.windows版本,下载之后,解压,双击gnatsd.exe即可运行
4.获得服务器地址:nats://localhost:4222

nats代码

1.先maven安装相关依赖,主要是nats的和json的,因为我们要pub一个json过去

<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

 

nats客户端(sub订阅)

package nats.lzwtest;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
public class SubscribeAsync {
    public static void main(String[] args) {
        try {
            // [begin subscribe_async]
        	Connection nc = Nats.connect("nats://localhost:4222");
            // Use a latch to wait for a message to arrive
            CountDownLatch latch = new CountDownLatch(10);
            // Create a dispatcher and inline message handler
            Dispatcher d = nc.createDispatcher((msg) -> {
                String str = new String(msg.getData(), StandardCharsets.UTF_8);
                System.out.println(str);
                latch.countDown();
            });
            // Subscribe
            d.subscribe("lizhenwei");
            // Wait for a message to come in
            latch.await();
            // Close the connection
            nc.close();
            // [end subscribe_async]
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

nats客户端(pub发布)

package nats.lzwtest;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nats.client.Connection;
import io.nats.client.Nats;
// [begin publish_json]
class StockForJsonPub {
    public String symbol;
    public float price;
}
public class PublishJSON {
    public static void main(String[] args) {
        try {
        	Connection nc = Nats.connect("nats://localhost:4222");
            // Create the data object
            StockForJsonPub stk = new StockForJsonPub();
            stk.symbol="GOOG";
            stk.price=1200;
            // use Gson to encode the object to JSON
            GsonBuilder builder = new GsonBuilder();
            Gson gson = builder.create();
            String json = gson.toJson(stk);
            // Publish the message
            nc.publish("lizhenwei", json.getBytes(StandardCharsets.UTF_8));
            // Make sure the message goes through before we close
            nc.flush(Duration.ZERO);
            nc.close();
            System.out.println("pub success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
// [end publish_json]

运行结果

sub端收到如下:

{"symbol":"GOOG","price":1200.0}

 

通过Queue解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。
刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()

jira-api使用gevent,快速批量修改数据

需求

因为我们本地数据库中经常会产生大量的数据,这些数据需要同步至jira.覆盖掉jira上的老数据。
因为jira服务器部署在国外,连接巨慢。每次修改一个,至少需要花费10秒左右的时间。
而jira-api没有提供批量或者异步同步的方法,所以需要自己使用一个异步的方式来实现一次性把所有的请求发送出去。
jira服务端有时候吃不消,会关闭掉我的一些连接,所以还需要在被关闭之后,记录下这些同步至jira失败的,重新继续修改操作

用到的库

gevent  库

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API。

jira库。
里面有jira登录,修改,新增issue的方法

代码实现

# -*- coding: UTF-8 -*-
import datetime
import random
import gevent
from gevent import monkey
from jira import JIRA
from model import addDefectDAO
monkey.patch_all()
issue_list = []
def update(issue_key):
    # 产生一点小小的间隔,避免一瞬间发出去100多个请求
    gevent.sleep(random.randint(1, 5) * 0.001)
    global issue_list
    starttime = datetime.datetime.now()
    print(starttime)
    print("start:" + issue_key)
    print("start:" + issue_key)
    issue = jira.issue(issue_key)
    issue_dict = {
        'summary': issue_key
    }
    try:
        issue.update(fields=issue_dict)
        issue_list.remove(issue_key)
    except Exception:
        pass
    print("end:" + issue_key)
# 从一个本地数据库取出一堆issue的信息,602是本地的project信息编号
all = addDefectDAO.defect_select_not_draft_by_project_id(602)
for item in all:
    issue_list.append(item["issue_key"])
url = "https://www.jira.com"
username = "username"
password = "password"
jira = JIRA(server=url, basic_auth=(username, password), max_retries=0, timeout=400, async_=True, async_workers=5)
# jira2 = Jira(url=url, username=username, password=password)
print("登录成功")
starttime = datetime.datetime.now()
api_list = []
while issue_list != []:
    for i in issue_list:
        api_list.append(gevent.spawn(update, i))
    gevent.joinall(api_list)
endtime = datetime.datetime.now()
print('总耗时')
print((endtime - starttime).seconds)
print('*******')

注意事项

1.需要引用monkey.patch_all()
2.引用monkey.patch_all()之后会有错误信息:

UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms

这个是已知bug,在11月29日有人在github上已经提过了,作者会在之后的版本修改。目前不影响使用。请忽略
3.通过这样的方法修改之后,经过我的测试,速度大概快了5倍以上


苏ICP备18047533号-1