Golang使用协程并发多个mqtt的publish信息(读取csv,发送json格式报文)

Golang使用协程并发多个mqtt的publish信息(读取csv,发送json格式报文)

需求

开发语言:golang
目的:并发10000个mqtt连接,循环发送publish信息,当时间戳小于某个值的时候,中止循环,退出连接
publish内容是json格式的,未设置时,有默认值,可以通过golang代码修改json内容
登录信息存取在csv文件中,csv文件有多少列,就并发多少个设备连接

话不多说,直接上代码

main.go

package main
import (
	"encoding/csv"
	"encoding/json"
	"fmt"
	"os"
	"strconv"
	"time"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)
var total int
// 读取csv文件里的第3列数据,存入一个string数组里
func readcsv(filename string) []string {
	var userNameList []string
	f, _ := os.Open(filename)
	defer f.Close()
	w := csv.NewReader(f)
	data, err := w.ReadAll()
	if err != nil {
		fmt.Println(err)
	}
	for i := range data {
		userNameList = append(userNameList, data[i][2])
	}
	return userNameList
}
func mqttDevice(username string, end_number chan int) {
	// mqtt设备连接,设置IP地址
	opts := mqtt.NewClientOptions().AddBroker("localhost:1883")
	// 设置连接的用户名密码
	opts.SetUsername(username)
	// 使用连接信息进行连接
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	time.Sleep(1 * time.Second)
	fmt.Println("connect success:" + username)
	// 读取json文件,json文件里的是默认参数
	fileReader, _ := os.Open("test.json")
	var eiopJsonMap map[string]interface{}
	json.NewDecoder(fileReader).Decode(&eiopJsonMap)
	// 设置一个开始时间戳和结束时间戳
	startTime := 1598167852000
	endTime := 1598167852000
	// 循环发送遥测,每次遥测间隔时间戳为15分钟
	for ; startTime < endTime; startTime = startTime + 900000 {
		// 定义一个ep的初始值为1,每循环一次就+1
		var ep int
		ep++
		eiopJsonMap["ts"] = startTime
		eiopJsonMap["values"].(map[string]interface{})["ep"] = ep
		// 把修改过的json内容从map转换为json格式
		eiopJsonText, _ := json.Marshal(eiopJsonMap)
		fmt.Println(string(eiopJsonText))
		// 发送遥测,发完之后休眠1秒
		result := client.Publish("topic", 0, true, eiopJsonText)
		result.Wait()
		time.Sleep(1 * time.Second)
	}
	// 发送完信息之后,退出连接
	fmt.Println("disconnect:" + username)
	client.Disconnect(250)
	total++
	end_number <- total
}
func main() {
	userNameList := readcsv("connect_info.csv")
	endNumber := make(chan int, len(userNameList))
	// 变量所有的username,通过go关键字并发多个设备
	for _, userName := range userNameList {
		go mqttDevice(userName, endNumber)
	}
	// 当所有的设备都发送完毕后,关闭程序
	for i := range endNumber {
		fmt.Println("已经有" + strconv.Itoa(i) + "个设备发送完毕")
		if i == len(userNameList) {
			return
		}
	}
}

test.json

{
	"ts": 1603088274000,
	"values": {
		"ep": 12
	}
}

connect_info.csv

localhost,1883,XecUwSmMGiYJp2BspMK2
localhost,1883,GqVqoPP2wblDjS2P9pQ9

苏ICP备18047533号-1