Category Archive : K8S相关

打造K8S与Jenkins的持续集成系统-CI部分

说明

持续集成系统包括CI和CD。在这篇文章里,我们只讨论CI的实现。关于K8S,Jenkins,私有仓库的安装这里不进行说明,只讲解如何将这些工具组合在一起。

1.拥有一个K8S集群

我的环境是6台虚拟机,3台Master,3台Slaver。如下图所示

运行结果如下图:

2.Jenkins中安装好了kubernetes插件


安装好之后,在系统设置中,能够添加kubernetes信息

3.根据Jenkins的配置要求,去自己的K8S平台上找到对应的信息。填入Jenkins中


我们根据图中的要求,可以在K8S里进行一个个的查找我们想要的信息

字段
查找方法
Kubernetes 地址
IP地址是在keepalived里配置的虚拟IP

在搭建k8S平台时,用到了haproxy,当时指向了**443端口
kubernetes名称 随意填写
kubernetes命名空间 需要在K8S里创建一个命名空间,然后在这个地方使用
凭据
1.进入K8S机器上,输入命令kubectl config view –minify –flatten查看凭据

2.把凭据信息复制成一个文件

3.在Jenkins里上传这个文件

4.这个凭据信息即可在Jenkins配置里使用
JENKINS地址 配置本次使用的Jenkins的url即可
Pod Retention 选择Never,Jenkins任务完成后就删除pod

4.添加私有密钥用于拉取我们的docker私有镜像

在K8S里输入如下命令
kubectl create secret docker-registry repo-k8s \
-n k8s-jx \
--docker-server='repo.local:5000' \
--docker-username='
zhenwei.li
' \
--docker-password='*******' \
--docker-email='357244849@qq
.com
'

5.在Jenkins里编写一个最简单的流水线任务。来测试Jenkins是否可以拉起K8S的节点,产生pod

5.1.从github上抄一段代码示例下来https://github.com/jenkinsci/kubernetes-plugin/blob/master/src/test/resources/org/csanchez/jenkins/plugins/kubernetes/pipeline/jnlpWorkingDir.groovy

podTemplate(label:'jnlp-slave',cloud: 'kubernetes',containers: [
    containerTemplate(name: 'busybox', image: 'nginx:latest', ttyEnabled: true)
],imagePullSecrets:['secret/repok8s']) {
    node ('jnlp-slave') {
        stage('Run') {
            sh 'env | sort'
            container('busybox') {
                sh 'echo "hello"'
                sh 'pwd'
                sh 'ls'
            }
        }
    }
}

5.2创建一个流水线任务

5.3运行这个流水线任务,查看结果

5.4在K8S上输入查找pod的命令,应该会发现有一个pod被运行起来了,名字中含有jnlp-slave的字样,2/2运行两个容器,一个是jnlp容器,一个是busybox容器

kubectl get pods --all-namespaces -o wide

6.在代码仓库中添加CI所需要的Jenkinsfile和BuildPod.yaml等流水线文件

文件名
用处
sonar-project.properties 定义代码需要执行什么语言的扫描,哪些目录需要剔除
JenkinsfileTag 定义打tag操作对应的流水线行为
Jenkinsfile 定义代码提交与合并的流水线行为
BuildPod.yaml K8S产生的pod中,包含哪些容器,容器的环境变量等参数设置

7.BuildPod.yaml中需要包含哪些内容(以nodejs代码举例)

8.Jenkinsfile中需要包含哪些内容

最后在Jenkins里查看运行效果


 

使用dd命令在linux系统里制作U盘装系统

需求

我们需要在一台全新的机器上安装一台新的debian机器。

准备工作:

1.一台已经有linux系统的机器(一般来说,linux机器都会有dd命令)
2.一个U盘
3.一个待安装的全新机器

实施步骤

1.在这台已经有linux系统的机器上下载debian的ISO镜像。放在指定目录下,如:

/home/sfere/debian-10.1.0-amd64-DVD-1.iso

2.把U盘插入这台linux电脑里,输入命令

fdisk -l

检查硬盘挂载情况

3.使用dd命令,把U盘做成启动盘。此步需要注意,上面看到我们的28.9G的U盘是/dev/sdb4。但是我们下面的命令里要用/dev/sdb而不要加最后的4。如果加了4会导致下一步安装失败

dd if=/home/sfere/debian-10.1.0-amd64-DVD-1.iso of=/dev/sdb bs=8M;sync

完成之后,效果如下:

root@debian:~# dd if=/home/sfere/debian-10.1.0-amd64-DVD-1.iso of=/dev/sdb bs=4M;sync
930+1 records in
930+1 records out
3901456384 bytes (3.9 GB, 3.6 GiB) copied, 563.589 s, 6.9 MB/s

4.把U盘拔下来,插入另外一台全新的电脑中,bios启动顺序中,选择USB为第一位启动。即可开始安装

Jenkins根据pipeline的运行结果,给gitea的合并请求pull-request进行评论

需求

Jenkins我们已经集成了sonarqube,并且通过sonarqube的gate能获取到结果,并且告知Jenkins是成功还是失败。但是这样还不够,我们需要限制队友向主分支提交代码。我们锁住了master分支,队友只能通过在gitea中向master分支提交pull-request并且通过了sonarqube的代码质量检查,我们才允许合并进主分支。我们急需要一个评论功能,告知我们,这次的合并请求是否允许合并

最终效果图

关键点

1.在gitea中创建一个用户,用这个用户专门进行评论,例如我这里的“guardian”账户
2.把该账户加入Jenkins的凭据管理里。如下图:

3.在gitea的官方文档里找到添加评论的API,链接如下:https://try.gitea.io/api/swagger#/issue/issueCreateComment
4.在Jenkins的pipeline中定义添加评论的方法,并且分别放在success和unsuccessful结果里。如下:

//这一段是添加评论的方法
def gitComment(String comment) {
        script {
                if (IS_PULL_REQUEST==true){
                     numb=env.BRANCH_NAME.split('-')[-1];
                    def check_result = '{"body": "'+comment+'"}';
                    def response = httpRequest httpMode: 'POST',
                    url: 'https://{服务器地址}/git/api/v1/repos/{所属人}/{仓库名}/issues/'+numb+'/comments',
                    authentication : '74849b03-79ba-43f5-b4b6-920',
                    acceptType: 'APPLICATION_JSON_UTF8', contentType: 'APPLICATION_JSON_UTF8',
                    requestBody : check_result
                }
        }
}
//根据执行结果,进行不同的评论
post {
        success {
            gitComment("通过CI检查,允许合并")
        }
        unsuccessful {
            gitComment("没有通过CI检查,建议不要合并"+env.RUN_DISPLAY_URL)
        }
    }

							

解决Sonarqube quality gate获取不到Sonarqube正确扫描结果的问题

问题

在Jenkins pipeline中,一般都会用到Sonar-scanner来扫描代码,扫描完之后,把结果上传到SonarQube中,SonarQube把结果与质量阀进行对比,然后通过Sonarqube quality gate来判断这次扫描结果是成功还是失败。
不少同学都遇到过Sonarqube quality gate 获得的最后结果不正确,明明SonarQube中的结果是success,而Sonarqube quality gate判断的结果是pending。
这是怎么一回事呢?
原因在于,SonarQube如果没有配置webhook的情况下,Sonarqube quality gate只会第一次主动去请求结果,如果这个时候SonarQube还没有分析完毕,那么就会返回一个in_progress.接下来,Sonarqube quality gate不会再去主动请求,而是被动等待webhook。如果没有配置webhook,那么就会进入一直等待的状态。
常用的解决方法有二:
1.在Sonarqube quality gate添加等待10秒的时间(治标不治本)

2.在SonarQube里添加Jenkins的webhook。(这种方法最好,不需要加等待时间)

添加之后。一切正常,可以正确获得Sonarqube的最新结果

nginx做LDAP端口转发

在nginx的配置文件/etc/nginx/nginx.conf中配置

stream {
    server {
        proxy_timeout 300s;
        proxy_connect_timeout 300s;
        listen       1234;
        proxy_pass 127.0.0.1:389;
        proxy_busy_buffers_size 64k;
    }
}

389是ldap原来的端口,1234是用nginx转发的新端口

SonarQube的LDAP配置

1.先登陆openldap的配置页面

添加一个组叫sonar-administrators,里面添加上自己的账户

在uniqueMember下假如有一个cn=lizhenwei,dc=lzwsoft,dc=com

2.在SonarQube里配置增加一个sonar-administrators的组,把这个组的权限设置成管理员

3.进入SonarQube的配置文件sonar.properties里修改LDAP相关的配置文件。参考如下

# LDAP configuration
# General Configuration
sonar.security.realm=LDAP
ldap.url=ldap://192.168.114.170:389
#这个bindDn一般用的管理员的用户名和密码
ldap.bindDn=cn=admin,dc=lzwsoft,dc=com
ldap.bindPassword=xx
# User Configuration
# 这个一般用的搜索路径
ldap.user.baseDn=dc=lzwsoft,dc=com
ldap.user.request=(&(objectClass=inetorgperson)(uid=${login}))
ldap.user.realNameAttribute=cn
ldap.user.emailAttribute=mail
# Group Configuration
ldap.group.baseDn=dc=lzwsoft,dc=com
# 这个地方很重要,grou的idAttribute用cn。cn是判断用户属于那个组的关键参数
ldap.group.idAttribute=cn

注意ldap.group.baseDn是基础搜索。

搜索出来之后,根据dn进行二次搜索,获得cn

然后根据cn与我们的权限系统相对应。所以后续我们可以用jira来管理sonar和其他的权限。以后jira上的前端和后端需要分开建立权限了

4.重启SonarQube,登录检查是否成功

一般来说,用lizhenwei对应的密码是可以成功的。而且登录进去之后,权限就是管理员。
后续如果想要添加组,重复上面1和2的操作,必须先在LDAP里面添加组,然后在sonarqube里添加组,然后把员工绑入这个组,才能生效,如果没有这样做,会导致这个员工登录之后,并没有对应组的权限。

GitOps时序图

gitops是现在一个比较火的话题,最近刚好我也在调研,根据我自己的理解,画了如下时序图。

git push


git merge

python-检查一个json是否包含另外一个json

需求:

有两个json文件。两个都是复杂嵌套格式。需要比对A.json里是否包含b.json。
例如A.json是

{
  "role": "admin",
  "routes": [
    "/Home",
    "/DeviceManagement",
    "/UserManagement"
  ]
}

B.json是

{
  "role": "admin",
  "routes": [
    "/Home",
    "/TemplateManagement",
    "/DataDictionary",
    "/ClassifyAndSubEntry",
    "/ProjectManagement",
    "/DeviceManagement",
    "/UserManagement"
  ]
}

要检查B文件是否包含A文件。这个json文件还好嵌套不多,当多个dict和list魂用,即[]和{}太多时,则会出现很难比对的问题。我考虑使用jsonpath来解决这个问题

代码实现

1.把json文件变成一个新的dict[jsonpath,value] ,例如’infos/0/item’: ‘direction’  表示jsonpath为”infos.0.item”对应的值是direction。具体的可以打印一下JsonPathValue这个类的final_dict就能明白了

#  filename : test1.py
#  description :
#
#  created by zhenwei.li at 2019/5/27 10:59
import json
class JsonPathValue(object):
    def __init__(self, datadict):
        self.stack = []
        self.final_dict = {}
        self.do_walk(datadict)
    def get_dict(self):
        return self.final_dict
    def do_walk(self, datadict):
        if isinstance(datadict, dict):
            for key, value in datadict.items():
                self.stack.append(key)
                # print("/".join(self.stack))
                if isinstance(value, dict) and len(value.keys()) == 0:
                    self.final_dict["/".join(self.stack)] = "EMPTY_DICT"
                if isinstance(value, list) and len(value) == 0:
                    self.final_dict["/".join(self.stack)] = 'EMPTY_LIST'
                if isinstance(value, dict):
                    self.do_walk(value)
                if isinstance(value, list):
                    self.do_walk(value)
                else:
                    self.final_dict["/".join(self.stack)] = value
                self.stack.pop()
        if isinstance(datadict, list):
            n = 0
            for key in datadict:
                self.stack.append(str(n))
                n = n + 1
                if isinstance(key, dict):
                    self.do_walk(key)
                if isinstance(key, list):
                    self.do_walk(key)
                if isinstance(key, str):
                    self.final_dict["/".join(self.stack)] = key
                self.stack.pop()
def json_contain(checkpoint, actual, assert_flag):
    """
    检查实际结果(json)中是否包含检查点(json)。两个必须是同种格式,比如同时是{}或者[]
    :param checkpoint: 检查点(期望结果)
    :param actual:  实际结果
    :param assert_flag: 是否启用assert检查
    :return: 匹配成功或失败
    """
    result = False
    if isinstance(checkpoint, list):
        '''如果期望的检查点是list[]格式,使用此方法检查'''
        find_count = 0
        check_lenth = len(checkpoint)
        for item in checkpoint:
            checkpoint_dict = JsonPathValue(item).get_dict()
            if isinstance(actual, list):
                find_flag = False
                for actual_item in actual:
                    actual_dict = JsonPathValue(actual_item).get_dict()
                    find_flag = list_contain(checkpoint_dict, actual_dict, False)
                    if find_flag:
                        find_count += 1
                        break
                print(find_flag)
            else:
                assert False, "返回的实际结果格式不正确"
            if assert_flag:
                assert find_flag, "期望结果中的\n%s\n匹配失败,实际结果是:\n%s" % (item, actual)
        if find_count == check_lenth:
            result = True
    elif isinstance(checkpoint, dict):
        '''
        如果期望的检查点是dict{}格式
        '''
        checkpoint_dict = JsonPathValue(checkpoint).get_dict()
        actual_dict = JsonPathValue(actual).get_dict()
        if list_contain(checkpoint_dict, actual_dict, True):
            result = True
    return result
def list_contain(checkpoint_dict, actual_dict, assert_flag):
    """
     检查实际结果(list)中是否包含期望结果(list)
    :param checkpoint_dict: 实际结果
    :param actual_dict: 期望结果
    :param assert_flag: 是否启用assert检查
    """
    result = set(checkpoint_dict.items()).issubset(set(actual_dict.items()))
    if assert_flag is True:
        different = set(checkpoint_dict.items()) - (set(actual_dict.items()))
        assert result, \
            "期望结果中的%s匹配失败,实际结果是:\n%s" % (different, actual_dict)
    return result
json_data = open('A.json', 'rb').read()
json_dict = json.loads(json_data)
json_data2 = open('B.json', 'rb').read()
json_dict2 = json.loads(json_data2)
res1 = json_contain(json_dict, json_dict2, True)
print(res1)

jenkins进阶之docker运行pytest并且出allure报告

背景

最近想做一个简单的pytest 测试,用allure出报告,结果发现网上的方法都是在windows上装jenkins,然后用jenkins跑一个本地的运行环境。这种做法明显很不2019年。于是我决定做一个在jenkins上使用docker运行pytest,然后再出allure报告的文章。

思路

  1. 在一台电脑上安装jenkins,可以参考我的文章https://www.yinyubo.cn/?p=268
  2. 准备python代码,放到git上,jenkins通过git拉取代码。代码中包含Dockerfile,测试用例,requirements.txt
  3. 拉取代码之后,通过Dockerfile产生一个python的镜像,并且在镜像中通过requirements.txt去安装pytest,allure的相关依赖
  4. 通过这个镜像,产生一个容器,容器命令运行pytest,产生测试报告,并且挂载到jenkins服务器上.
  5. (注意,这里为什么要在容器里运行pytest呢?因为如果在Dockerfile里运行pytest,一旦产生测试失败,是会导致jenkins退出shell执行的)
  6. Jenkins通过allure插件读取第4步产生的测试报告。生成allure报告

具体步骤

  1. 第一步忽略。请参考文章https://www.yinyubo.cn/?p=268
  2. 准备python代码。产生Dockerfile,测试用例,requirements.txt,如下图


Dockerfile内容如下:

FROM python:3.7.3
WORKDIR .
ADD . .
RUN pip install -r requirements.txt
CMD ["pytest", "-q","TestCase/AIMP/Connect/AIMP_Connect_01.py","--alluredir","allure-results"]

requirements.txt内容如下:

allure-pytest==2.6.2
allure-python-commons==2.6.2
atomicwrites==1.3.0
attrs==19.1.0
colorama==0.4.1
more-itertools==7.0.0
pluggy==0.9.0
py==1.8.0
pytest==4.4.1
six==1.12.0

测试用例可以如下

import pytest
def test_success():
    """this test succeeds"""
    print(1)
    assert True
def test_failure():
    print(2)
    """this test fails"""
    assert False
def test_skip():
    print(3)
    """this test is skipped"""
    pytest.skip('for a reason!')
def test_broken():
    raise Exception('oops')
def test_success2():
    print(4)
    assert True

     3.jenkins配置,这里如何拉取git代码就不写了。这个很简单,主要是构建命令和构建后操作,这里我复制了我的构建命令如下:

name="apitest"
docker build -t $name .
docker run -d --name $name -v $PWD/allure-results:/allure-results $name

效果如图:

    4.运行job,查看allure结果

java测试pulsar实例

需求

最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

pulsar-server

使用docker搭建是最方便的。
输入如下命令就可以啦

docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone

它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:

80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service

我这边映射到了28000,28080,26650三个端口。

pulsar-client测试之代码结构


如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者
MessageConsumerAll是订阅所有主题(消费者
MessageProducer是发布指定主题(生产者

pulsar-client测试之Client.java

配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址

package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
    private PulsarClient client;
    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://0.0.0.0:26650/")
                .build();
    }
    public void Close() throws PulsarClientException {
    	client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

pulsar-client测试之MessageConsumer.java

单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageConsumerAll.java

下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageProducer.java

下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
    private Client client;
    private Producer<byte[]> producer;
    public MessageProducer(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });
    }
    public void sendOnce(String message) {
    	/**
    	 * 发送一次就关闭
    	 */
    	try {
			producer.send(message.getBytes());
			System.out.printf("Message with content %s successfully sent", message);
			producer.close();
			client.Close();
		} catch (PulsarClientException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
    // todo add exceptionally().
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }
    public static void main(String[] args) throws PulsarClientException {
        MessageProducer producer = new MessageProducer("topic1");
//        producer.sendMessage("Hello World ,lalla");
        producer.sendOnce("Hello World ,lizhenwei");
    }
}

运行效果

生产者console log:

Message with content Hello World ,lizhenwei successfully sent

消费者console log

Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei

苏ICP备18047533号-1