物联网市场的快速增长极大地提高了MQTT协议的普及程度。MQTT由最流行的物联网平台支持,用于数据收集、推送通知、实时消息传递等。
在本文中,我们将分享我们对MQTT服务器运行性能测试的经验。我们将集中讨论用于在服务器上生成负载的工具。我们平台的性能测试结果将作为单独的帖子发布。
我们已经选择了Gatling作为运行我们测试的框架,原因有很多:
不幸的是,来自的Gatling.io框架不支持现成的MQTT协议。同时,加特林是一个开源框架,我们发现了一个非官方的MQTT插件。
这Gatling-MQTT插件是由Muneyuki Noguchi目前,它是以Apache 2.0许可证的形式托管在GitHub上的。我们已经开始使用加特林和加特林-MQTT插件实现一个Thingsboard性能测试项目。过了一段时间,我们意识到插件不支持我们想要验证的场景,默认场景的行为也不是我们所期望的。
非官方的Gatling-MQTT插件的默认场景的问题是,每次发布一些数据时,客户端都会等待服务器的回复,并发送MQTT断开连接。因此,消息序列如下所示:
这种方法非常消耗资源,与超文本传输协议或其他协议相比,它的好处微乎其微。这种行为对于一个超文本传输协议请求-响应模型来说是正常的,但是对于MQTT来说不是。典型的MQTT会话保持一定时间,并且在客户端和MQTT代理之间发送和接收多个MQTT发布消息。当然,还有其他类型的MQTT消息,但是它们超出了我们的测试范围。在我们的场景中,物联网平台的负载测试必须按照以下方式进行:
一旦我们将设备连接到充当MQTT代理的物联网平台,我们将重用该会话,并使用相同的会话发布MQTT消息。当然,会话可以在某个时候重新创建,但不是每次我们想向服务器发布消息的时候。
为了支持这个场景,我们决定不从头开始实现新的东西,而是使用Gatling-MQTT插件作为基础,考虑到这是开源的事实,我们可以根据自己的需要自由修改软件。
我们已经完成了Gatling-MQTT插件,花了一些时间研究插件是如何构建的,修改了它,并添加了连接、发布和断开操作作为单独的步骤。
然后,我们能够支持预期的场景。加特林-MQTT插件的扩展版本位于此处Extended Gatling-MQTT。
最后,我们能够实现适合我们需求的场景。下面的加特林模拟将为10个模拟设备创建单独的MQTT会话,并将为每个会话发送100条发布消息。
class MqttSimulation extends Simulation {
val mqttConfiguration = mqtt
// MQTT broker
.host("tcp://localhost:1883")
val connect = exec(mqtt("connect")
.connect())
// send 100 publish MQTT messages
val publish = repeat(100) {
exec(mqtt("publish")
// topic: "foo"
// payload: "Hello"
// QoS: AT_LEAST_ONCE (1)
// retain: false
.publish("foo", "Hello", QoS.AT_LEAST_ONCE, retain = false))
// 1 seconds pause between sending messages
.pause(1000 milliseconds)
}
val disconnect = exec(mqtt("disconnect")
.disconnect())
val scn = scenario("MQTT Test")
.exec(connect, publish, disconnect)
setUp(scn
// linearly connect 10 devices over 1 second
// and send 100 publish messages
.inject(rampUsers(10) over (1 seconds))
).protocols(mqttConfiguration)
}
我们的性能测试项目是hosted on GitHub。它主要用Java编写,并使用Maven作为构建工具。加特林和加特林-MQTT插件是用Scala编写的,并使用SBT工具来构建源代码和运行测试。然而,在Thingsboard,我们更多的是Java人,而不是Scala。这就是为什么我们实现了定制的Java代码,它连接到平台,创建一个设备,对它进行预热,并返回凭据标识字符串。
MqttSimulation.scala
:
// get the device credential ids of the created devices
val deviceCredentialsIds: Array[String] = MqttStressTestTool.createDevices(testParams).asScala.toArray
MqttStressTestTool.java
:
RestClient restClient = new RestClient(params.getRestApiUrl());
// login to the Thingsboard server
restClient.login(params.getUsername(), params.getPassword());
for (int i = 0; i < params.getDeviceCount(); i++) {
// create device using REST API
Device device = restClient.createDevice("Device " + UUID.randomUUID());
// get credentials from the created device
DeviceCredentials credentials = restClient.getCredentials(device.getId());
// store in the array that eventually will be used by Simulation class
deviceCredentialsIds.add(credentials.getCredentialsId());
String[] mqttUrls = params.getMqttUrls();
String mqttURL = mqttUrls[i % mqttUrls.length];
MqttStressTestClient client = new MqttStressTestClient(results, mqttURL, credentials.getCredentialsId());
// connect to the server and do the warm up
client.connect().waitForCompletion();
client.warmUp(data);
client.disconnect();
}
Thread.sleep(1000);
凭据标识列表用于MqttSimulation.scala
文件进行压力测试:
// get the device credential ids of the created devices
val deviceCredentialsIds: Array[String] = MqttStressTestTool.createDevices(testParams).asScala.toArray
// provide device credential id as username during the connection phase to the Thingsboard server
val mqttConf = mqtt
.host(testParams.getMqttUrls.head)
.userName("${deviceCredentialsId}")
val connect = exec(mqtt("connect").connect())
val publish = repeat(testParams.getPublishTelemetryCount.toInt) {
exec(mqtt("publish")
// publish single message and verify that it was delivered at least once
.publish("v1/devices/me/telemetry", "{\"temp\":73.2}", QoS.AT_LEAST_ONCE, retain = false))
.pause(testParams.getPublishTelemetryPause milliseconds)
}
val disconnect = exec(mqtt("disconnect").disconnect())
// create map of device credential ids of the connected devices
// and use it as a feeder in the scenario
val deviceCredentialsIdsFeeder = deviceCredentialsIds.map( x => {Map("deviceCredentialsId" -> x)})
val scn = scenario("Scenario Name")
// go over the map and take column deviceCredentialsId as username
.feed(deviceCredentialsIdsFeeder)
.exec(connect, publish, disconnect)
setUp(scn
.inject(rampUsers(deviceCredentialsIds.length) over (1 seconds))
).protocols(mqttConf)
对于喜欢Java和Maven的人来说,有一个Maven插件gatling-maven-plugin
:
<plugin>
<groupId>io.gatling</groupId>
<artifactId>gatling-maven-plugin</artifactId>
</plugin>
这些插件发现Simulation
并编译和运行它们。结果将以一种非常好的格式存储在目标文件夹中,您可以在运行后对其进行检查:
要运行测试,您只需键入:
mvn clean install gatling:execute
总的来说,我们已经描述了我们的方法如何在Thingsboard IoT platform并且验证了它提供了良好的结果。我们将在下一篇文章中分享平台性能测试的结果。此外,我们还将描述代码更改、改进和实例调优,这些都是我们为实现每分钟处理100多万条MQTT消息所做的工作。这些基本上是我们在平台性能测试方向上的第一步,任何关于这种方法的反馈都非常受欢迎。
我们希望所提供的经验将帮助您根据您期望在生产中看到的负载规格来验证您的解决方案。敬请关注star our project在GitHub上。