需求
最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通
pulsar-server
使用docker搭建是最方便的。
输入如下命令就可以啦
docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone
它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:
80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service
我这边映射到了28000,28080,26650三个端口。
pulsar-client测试之代码结构
如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者)
MessageConsumerAll是订阅所有主题(消费者)
MessageProducer是发布指定主题(生产者)
pulsar-client测试之Client.java
配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址
package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
private PulsarClient client;
public Client() throws PulsarClientException {
client = PulsarClient.builder()
.serviceUrl("pulsar://0.0.0.0:26650/")
.build();
}
public void Close() throws PulsarClientException {
client.close();
}
public PulsarClient getPulsarClient(){
return client;
}
}
pulsar-client测试之MessageConsumer.java
单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接
package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
private Client client;
private Consumer consumer;
public MessageConsumer(String topic, String subscription) throws PulsarClientException {
client = new Client();
consumer = createConsumer(topic, subscription);
}
private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
}
public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
/***
* 用来异步获取,保持回话
*/
do {
// Wait for a message
CompletableFuture<Message> msg = consumer.receiveAsync();
System.out.printf("Message received: %s", new String(msg.get().getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg.get());
} while (true);
}
public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
/***
* 获取一次,就关闭会话
*/
// Wait for a message
System.out.printf("Start pulsar");
CompletableFuture<Message> msg = consumer.receiveAsync();
// System.out.printf("Message received: %s", new String(msg.get().getData()));
String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg.get());
consumer.close();
client.Close();
return result;
}
public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
consumer.receiveMessage();
// String reString = consumer.getMessage();
// System.err.println(reString);
// consumer.client.Close();
}
}
pulsar-client测试之MessageConsumerAll.java
下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接
package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
private Client client;
private Consumer consumer;
public MessageConsumer(String topic, String subscription) throws PulsarClientException {
client = new Client();
consumer = createConsumer(topic, subscription);
}
private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
}
public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
/***
* 用来异步获取,保持回话
*/
do {
// Wait for a message
CompletableFuture<Message> msg = consumer.receiveAsync();
System.out.printf("Message received: %s", new String(msg.get().getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg.get());
} while (true);
}
public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
/***
* 获取一次,就关闭会话
*/
// Wait for a message
System.out.printf("Start pulsar");
CompletableFuture<Message> msg = consumer.receiveAsync();
// System.out.printf("Message received: %s", new String(msg.get().getData()));
String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg.get());
consumer.close();
client.Close();
return result;
}
public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
consumer.receiveMessage();
// String reString = consumer.getMessage();
// System.err.println(reString);
// consumer.client.Close();
}
}
pulsar-client测试之MessageProducer.java
下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接
package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
private Client client;
private Producer<byte[]> producer;
public MessageProducer(String topic) throws PulsarClientException {
client = new Client();
producer = createProducer(topic);
}
private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
return client.getPulsarClient().newProducer()
.topic(topic)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();
}
public void sendMessage(String message) {
producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId);
});
}
public void sendOnce(String message) {
/**
* 发送一次就关闭
*/
try {
producer.send(message.getBytes());
System.out.printf("Message with content %s successfully sent", message);
producer.close();
client.Close();
} catch (PulsarClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// todo add exceptionally().
public void close(Producer<byte[]> producer){
producer.closeAsync()
.thenRun(() -> System.out.println("Producer closed"));
}
public static void main(String[] args) throws PulsarClientException {
MessageProducer producer = new MessageProducer("topic1");
// producer.sendMessage("Hello World ,lalla");
producer.sendOnce("Hello World ,lizhenwei");
}
}
运行效果
生产者console log:
Message with content Hello World ,lizhenwei successfully sent
消费者console log
Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei