Introduction
A Node-RED flow that monitors the successful transmission of messages via MQTT on the Enterprise IIoT gateway. If a message fails to send—due to internet connectivity loss or disconnection from the MQTT broker—the sensor data is saved locally or at a user-defined location. Once the connection to the broker is restored, the stored data is transmitted and removed from local storage.
Requiremets
In order to follow along with this guide you will need these components:
- An Industrial IoT Wireless Enterprise Sensor.
- An Enterprise Gateway or an Enterprise Gateway Lite
- A computer with WiFi/Ethernet to connect to the Enterprise IIoT Gateway and access to Node-RED.
The Enterprise IIoT Gateway is designed to simplify, streamline, and customize your NCD IIoT installation. It features built-in Node-RED capabilities for no-code integration into new or existing cloud/SCADA systems, web-based security and access management, SSH access to the Debian OS for full customization, as well as VPN and remote management options. This edge computer ensures a seamless deployment experience.
We offer two versions of the Enterprise IIoT Gateway: a standard version with all features, including integrated Wi-Fi, Ethernet, and cellular connectivity, and a Lite version with reduced CPU and RAM capabilities, and no cellular connectivity.
Before you begin using the NCD-Dashboard, you’ll need to have the following software installed:
- Node-RED (Already installed on The Enterprise IIoT Gateway)
- NCD Library (Already installed on The Enterprise IIoT Gateway)
Overview
This solution is particularly useful for sending sensor data from NCD devices to a cloud service or database via MQTT. If the MQTT broker is disconnected or the internet connection is lost, the flow detects the disruption and temporarily stores the sensor data locally on the Enterprise IIoT Gateway, using either a predefined or user-defined path. Once the MQTT connection is restored, the flow automatically retrieves the data from the local storage and sends it to the target service (cloud), one entry at a time. After all data has been successfully transmitted, the local storage is cleared to free up space on the Enterprise IIoT Gateway.
Node-RED
Node-RED, is a visual programming tool, It provides a browser-based editor that allows creating applications by wiring predefined black box functions (nodes) . Despite its technical nature, Node-RED offers a user-friendly interface. Node-RED is an open-source project under the OpenJS Foundation. This means it’s free to use, without any monthly subscription fees. You can install and run Node-RED locally on your PC, or, in the case of our Enterprise IIoT Gateway, it comes pre-installed and runs as a service.
NCD Enterprise Library
This library facilitates communication with and configuration of the NCD Wireless Sensor Line. It can be employed in conjunction with Node-RED to communicate and manage a Wireless Sensor Network using the Node-RED flow-based development tool on any platform. Additionally, as demonstrated, the library acts as an interface for uploading Firmware Updates to NCD Sensors. As mentioned earlier, this library is pre-installed on the Enterprise IIoT Gateway and is ready for immediate deployment.
Enterprise IIoT Gateway - Setup
Here you can find a complete Getting Started guide for setting up and accessing Node-RED from the Enterprise IIoT Gateway:
Importing and Setting up
This example flow consists of a series of nodes. To use it within your Node-RED NCD application, you need to copy or download the flow’s JSON code and import it into Node-RED. Follow the step-by-step procedure below to complete this process:
Repository
1. Click on the following link to access the NCD node-red online monitor flow source code:
NCD Internet Monitor Node-RED Flow Repository:
https://github.com/ncdcommunity/node-red-flow-gateway-online-monitor/blob/main/nodered/flows.json
2. Once the repository is open, locate the button that allows you to Copy raw file of the NCD node-red online monitor flow. Refer to the following image for a visual reference:
Import
Once you have copied or downloaded the flow’s JSON code from the GitHub repository, return to Node-RED to import it. Below are the steps to import the stream into your current project:
1. Go to the Main Menu (icon in the upper right corner) then click on “Import” option, as shown in picture:
2. A text-box will be opened. Right click and paste the code you just copied from GitHub, as shown in picture:
3. You should see the JSON code in text-box, now you can press the red “Import” button at the bottom right (by default the “current flow” option is selected):
4. In the top of the Node-RED editor, you will see information of the NCD Monitor fr MQTT Flow you just imported, and automatically you will have the flow available inside the node editor, now you can position it inside the editor or workspace by left clicking:
Main Nodes
This flow consists of a series of nodes, with the main ones described below to help clarify the functionality and logic of each part of the flow.
Set LogFile
This node allows you to specify the path or location where data will be stored locally on the Enterprise IIoT Gateway. By default, this property is set to the Node-RED path. The node is automatically executed each time a flow is deployed.
Complete node
This node triggers a flow when another node has completed its message handling. It can be associated with one or more nodes; in this case, it must be linked to the MQTT Out node. The node triggers a flow message once it has successfully sent data via MQTT. (It is important that the MQTT node has QoS enabled, with at least QoS level 1 or 2.)
Prepare Data Function node
This node creates an array of message identifiers, ‘msg._msgid’ (a unique identifier automatically assigned to each message by Node-RED, useful for tracking and debugging messages within the flow). Using an algorithm and the complete node, it becomes possible to determine whether a message was successfully sent via MQTT.
let delay = flow.get('mqtt_timeout_ms');
if(delay != null){
msg.delay = delay;
}
let pending = flow.get('messages_pending');
if(pending != null){
if (pending.includes(msg._msgid)){
pending = pending.filter(id => id !== msg._msgid);
flow.set('messages_pending', pending);
return;
}else{
pending.push(msg._msgid);
flow.set('messages_pending', pending);
return msg;
}
}
return msg;
If Message is still pending send to backlog
This node works in conjunction with the complete and prepare node and contains an algorithm that evaluates whether a message attempted to be sent via MQTT was successfully delivered or if it failed to be completed within a defined timeout period.
If a message is not successfully completed, the node sends it through output number one, allowing the flow to add the unsent message to the backlog. If the message was successfully sent but originated from the backlog (msg.befrom), it is routed to the node’s output number two to remove it from the backlog.
let beFrom = msg.beFrom || false;
let filename = flow.get("log");
let pending = flow.get('messages_pending');
if(pending.includes(msg._msgid)){
if (!beFrom) {
pending = pending.filter(id => id !== msg._msgid);
flow.set('messages_pending', pending);
msg.filename = filename;
return [msg, null];
}
}
else{
if (beFrom == "local"){
msg.payload = "sed '1d' " + filename + " -i";
return [null, msg];
}
}
Get path & Read first line form backlogs
These nodes are executed automatically as part of a flow that periodically checks for data in the backlog file (or the path specified by the user). When data is found in the backlog file, it is passed to the main flow, which attempts to send the data via MQTT.
let path = flow.get("log");
msg.payload = "head -1 " + path;
return msg;
MQTT out
An outgoing MQTT node, preconfigured with a Quality of Service (QoS) level of 1. You must select or configure the MQTT server you’re using through the ‘Server’ property. The Complete node in this flow is associated with this MQTT node.
Conexion Flow and Practical Example
The following section explains how to establish the connection between the Wireless Gateway node and the flow. It describes how incoming sensor messages are processed in different scenarios: normal operation, MQTT broker disconnection, or internet outage.
Importing this flow into your workspace assumes that you have a Wireless Gateway node configured and ready to receive messages from the NCD sensors. In this case, we will add a filter so that only messages of type ‘sensor_data’ are allowed to pass through. To do this, we will use a switch node with the ‘msg.topic’ property configured. The rule we define will allow messages to flow if the ‘msg.topic’ is equal to ‘sensor_data’; otherwise, the messages will be blocked from passing through the output terminal.
The output of the ‘sensor_data’ node should be connected to the flow input, as shown in the image:
Once you have connected the flow, click the Deploy button to save and apply the changes to your Node-RED flow. After the deployment is complete and the sensor is configured, the following image illustrates the sequence of message flow when the MQTT broker is correctly configured, and the MQTT Out node has successfully established a connection with the broker.
(Red Circles)
The Wireless Gateway node processes incoming NCD sensor messages and sends them through its output terminal.
The Prepare Data node extracts the unique identifier of the current incoming message (stored in the msg._msgid property) and saves it in an array. This allows us to determine whether the message was successfully sent via the MQTT node.
The message then flows to the MQTT Out node, which, with an active connection to the MQTT broker, successfully sends the message.
We can verify that the message was sent correctly using an MQTT In node (though this node is not required in your flow; it is included here solely to demonstrate how the flow works in this example).
The message is displayed in the debug window, as we have connected a debug node to the output of the MQTT In node subscribed to the same topic.
The Complete node verifies whether the message was successfully sent via the MQTT protocol. It does this by sending the same message, with the identical ‘msg._msgid’ property, through its output terminal. This message then flows from the Complete node to the ‘Prepare Data’ node.
The Prepare Data node checks if the message ID is already in the list. If it is found at the end of the list, the ID is removed. This helps ensure that the message with that ID has already been successfully sent by the MQTT Out node.
A delay is applied to the message flow. In this case, the message flow between the input and output terminals of the delay node is paused for a defined period.
The code within the node checks if the message ID exists in the array. If the message was successfully sent via the MQTT node, the ID will no longer be present, as it was removed by the ‘Prepare Data’ node. This causes the message to stop flowing through the output terminal, effectively ending the execution of the main flow.
(Black Circle)
The inject nodes are automatically activated after deployment. These nodes set the Timeout values, initialize the array to store message IDs, and establish the path where data not sent through MQTT will be stored locally on the Enterprise IIoT Gateway.
Once the flow is activated, it first retrieves the defined path, then constructs a command to access the local storage and begins checking if any data is stored locally.
In this initial scenario, with a stable connection to the MQTT broker, no data is stored, and the data flow halts at the ‘if backlogged packet’ node.
As shown in the flow above, when a sensor message is received by the Wireless Gateway node and the MQTT node has an active connection to the broker, the messages are successfully transmitted to the MQTT broker or database without any issues. However, if the connection to the broker is lost—due to factors like a loss of internet connectivity or MQTT broker disconnection—the following process occurs: the messages flow from the Wireless Gateway node to the local backlog, where all incoming sensor messages are temporarily stored. Once the connection is re-established, the messages are sent again via MQTT.
(Red Circles)
The Wireless Gateway node receives and processes incoming NCD sensor messages, forwarding them through its output terminal to flow within the stream.
The Prepare Data node processes the incoming sensor data, identifies the unique message ID, and adds it to an array. This array will later help determine whether the message was successfully sent via MQTT. The message then flows to the MQTT Out node and the delay node.
The MQTT Out node attempts to send the message for publication. However, if a disconnection occurs with the Broker, the message fails to be sent successfully.
Once the timeout is reached, the message flows to the ‘If Message is Still Pending, Send to Backlog’ node. This node checks if the current message exists within the pending messages array.
If the message ID is found, it flows through output terminal one, which writes the message to a local storage location on the Enterprise IIoT Gateway, as it has not been successfully transmitted via MQTT.
(Black Circles)
The inject nodes are automatically activated after deployment. These nodes set the Timeout values, initialize the array to store message IDs, and establish the path where data not sent through MQTT will be stored locally on the Enterprise IIoT Gateway.
Once the flow is activated, it first retrieves the defined path, then constructs a command to access the local storage and begins checking if any data is stored locally.
In this initial scenario, with a stable connection to the MQTT broker, no data is stored, and the data flow halts at the ‘if backlogged packet’ node.
While the incoming messages from the NCD sensors are being processed by the Wireless Gateway node, the flow periodically executes a process that checks for any data stored in the local backlog. If data is found, it is loaded into the flow and an attempt is made to resend it via MQTT. Below is the flow for this scenario:
The flow periodically triggers the inject node, which activates the ‘Get Path’ node. This node retrieves the path defined in the ‘Set LogFile’ node and constructs a command to read the first line of data (if any) from the backlog.log file.
The ‘If Backlogged Packet’ node checks if there is data in the file. If data is found, the message loaded from the backlog flows.
The ‘Tag Data from Local’ node tags the message to indicate it originated from the backlog (rather than the Wireless Gateway node). This property helps identify the message, and if it is successfully sent via MQTT, the flow will remove the corresponding entry from the backlog.log file to free up memory.
The message then flows to the ‘Prepare Data’ node, where the message ID is retrieved and added to the array.
If the connection with the broker has not yet been established, there will be no response from the ‘Complete’ node after the defined timeout.
The message then flows to the ‘If Message is Still Pending, Send to Backlog’ node, where it is evaluated. If the message was not successfully sent via MQTT, it will not flow to the output terminal, meaning the same message stored in the backlog.log will be retried in the next execution, as it has not been deleted.
Automatically, the MQTT node will attempt to re-establish the connection with the broker. During this process, the flow will continue trying to send messages via MQTT, whether they are from the NCD sensors or from the local backlog.log storage. Once the connection to the MQTT broker is re-established, successfully sent messages from the backlog will be removed from the backlog file. This ensures that the storage memory on the Enterprise IIoT Gateway does not become saturated and that the backlog file only contains messages that have not yet been sent. The message flow for this scenario is described below.
The flow periodically triggers the inject node, which activates the ‘Get Path’ node. This node retrieves the path defined in the ‘Set LogFile’ node and constructs a command to read the first line of data (if any) from the backlog.log file.
The ‘If Backlogged Packet’ node checks if there is data in the file. If data is found, the message loaded from the backlog flows.
The ‘Tag Data from Local’ node tags the message to indicate it originated from the backlog (rather than the Wireless Gateway node). This property helps identify the message, and if it is successfully sent via MQTT, the flow will remove the corresponding entry from the backlog.log file to free up memory.
The message then flows to the ‘Prepare Data’ node, where the message ID is retrieved and added to the array.
The message flows to the MQTT out node. Since it has an active connection to the broker, the message is successfully sent via MQTT.
This triggers the ‘Complete’ node to confirm that the message has been sent successfully, and a message then flows to the ‘Prepare Data’ node. Here, the message ID is retrieved and removed from the array.
Next, the message flows to the ‘If Message is Still Pending, Send to Backlog’ node. The node checks the array and confirms that the message was sent successfully via MQTT. It also identifies that the message originated from the backlog, causing it to flow through output terminal two of the node.
The node then issues the command to remove the successfully sent message from the backlog, completing the cycle.
Share this on:
You Might Also Like: