Skip to content

blueprint.provider.mqtt

Blueprint MQTT client

Configuration

The MQTT client uses the following configuration:

{
  "mqtt": {
    "brokers": ["127.0.0.1:1883"],
    "protocol": "tcp",
    "username": "",
    "password": "",
    "timeout": 5,
    "connectionTimeout": 30,
    "qos": 0,
    "clientId": "",
    "retain": false,
    "keepAlive": 0,
    "autoReconnect": true,
    "persistentSession": false,
    "tlsEnable": false,
    "tlsCa": "",
    "tlsCert": "",
    "tlsKey": "",
    "tlsKeyPassword": "",
    "tlsInsecureVerify": true
  }
}

Using the MQTT client

package main

import (
 paho "github.com/eclipse/paho.mqtt.golang"
 "github.com/oddbit-project/blueprint/log/zerolog/writer"
 "github.com/oddbit-project/blueprint/provider/mqtt"
 "github.com/rs/zerolog/log"
 "time"
)

func main() {
    // use zerolog as logger with console writer
    writer.UseDefaultWriter()

    cfg := mqtt.NewConfig()
    cfg.Username = "testUser"
    cfg.Password = "someTestPassword"
    cfg.Protocol = "tcp"
    cfg.Retain = false
    cfg.TLSEnable = true
    cfg.TLSInsecureSkipVerify = true
    cfg.TLSCA = "../../infra/mosquitto/config/certs/ca.crt"
    cfg.TLSCert = "../../infra/mosquitto/config/certs/client.crt"
    cfg.TLSKey = "../../infra/mosquitto/config/certs/client.key"
    cfg.QoS = 0
    cfg.PersistentSession = false
    cfg.Brokers = []string{"localhost:18883"}

    client, err := mqtt.NewClient(cfg)
    if err != nil {
        log.Fatal().Msgf("cannot initialize mqtt: %v", err)
    }
    _, err = client.Connect()
    if err != nil {
        log.Fatal().Msgf("cannot connect to mqtt: %v", err)
    }
    defer client.Close()

    topicName := "blueprint/test"
    message := []byte("the quick brown fox jumps over the lazy dog")

    var received []byte = nil
    // subscribe topic
    client.Subscribe(topicName, 2, func(c paho.Client, msg paho.Message) {
        log.Info().Msgf("Received message: %s", msg.Payload())
        received = msg.Payload()
    })

    // write to topic
    log.Info().Msgf("Writing message: %v", string(message))
    client.Write(topicName, message)

    for received == nil {
        // sleep 10ms each time
        time.Sleep(10 * time.Millisecond)
    }
}

Additional Methods

Publishing

// Write publishes a message to a topic
func (c *Client) Write(topic string, value []byte) error

// WriteJson publishes a JSON-encoded message
func (c *Client) WriteJson(topic string, value interface{}) error

Example:

// Publish raw bytes
client.Write("sensors/temperature", []byte("25.5"))

// Publish JSON
type SensorData struct {
    DeviceID    string  `json:"device_id"`
    Temperature float64 `json:"temperature"`
    Timestamp   int64   `json:"timestamp"`
}
data := SensorData{DeviceID: "sensor-1", Temperature: 25.5, Timestamp: time.Now().Unix()}
client.WriteJson("sensors/data", data)

Subscribing

// Subscribe subscribes to a topic with a message handler
func (c *Client) Subscribe(topic string, qos byte, handler paho.MessageHandler) error

// SubscribeMultiple subscribes to multiple topics with different QoS levels
func (c *Client) SubscribeMultiple(filters map[string]byte, handler paho.MessageHandler) error

// ChannelSubscribe subscribes to a topic and sends messages to a channel
func (c *Client) ChannelSubscribe(topic string, qos byte, ch chan paho.Message) error

// BufferedChannelSubscribe subscribes with a buffered channel
func (c *Client) BufferedChannelSubscribe(topic string, qos byte, ch chan paho.Message, bufferSize int) error

Example using channel-based subscription:

msgChan := make(chan paho.Message, 100)

// Subscribe using channel
if err := client.ChannelSubscribe("sensors/#", 1, msgChan); err != nil {
    log.Fatal(err)
}

// Process messages from channel
go func() {
    for msg := range msgChan {
        log.Printf("Topic: %s, Payload: %s", msg.Topic(), string(msg.Payload()))
    }
}()

Routing

// AddRoute adds a topic-specific message handler
func (c *Client) AddRoute(topic string, handler paho.MessageHandler)

Example:

// Add routes for different topics
client.AddRoute("sensors/temperature", func(c paho.Client, msg paho.Message) {
    log.Printf("Temperature: %s", string(msg.Payload()))
})

client.AddRoute("sensors/humidity", func(c paho.Client, msg paho.Message) {
    log.Printf("Humidity: %s", string(msg.Payload()))
})

Connection Management

func (c *Client) Connect() (paho.Token, error)  // Connect to broker
func (c *Client) Close()                         // Disconnect from broker
func (c *Client) IsConnected() bool             // Check connection status

Configuration Options

The full configuration structure:

type Config struct {
    Brokers              []string `json:"brokers"`              // Broker addresses
    Protocol             string   `json:"protocol"`             // Protocol (tcp, ssl, ws, wss)
    Username             string   `json:"username"`             // MQTT username
    Password             string   `json:"password"`             // MQTT password
    Timeout              int64    `json:"timeout"`              // Operation timeout in seconds
    ConnectionTimeout    int64    `json:"connectionTimeout"`    // Connection timeout in seconds
    QoS                  byte     `json:"qos"`                  // Quality of Service (0, 1, 2)
    ClientId             string   `json:"clientId"`             // Client identifier
    Retain               bool     `json:"retain"`               // Retain messages
    KeepAlive            int64    `json:"keepAlive"`            // Keep-alive interval in seconds
    AutoReconnect        bool     `json:"autoReconnect"`        // Auto-reconnect on disconnect
    PersistentSession    bool     `json:"persistentSession"`    // Maintain session across reconnects
    TLSEnable            bool     `json:"tlsEnable"`            // Enable TLS
    TLSCA                string   `json:"tlsCa"`                // CA certificate path
    TLSCert              string   `json:"tlsCert"`              // Client certificate path
    TLSKey               string   `json:"tlsKey"`               // Client key path
    TLSKeyPassword       string   `json:"tlsKeyPassword"`       // Key password
    TLSInsecureSkipVerify bool    `json:"tlsInsecureVerify"`   // Skip certificate verification
}

Event Handlers

The client supports custom handlers for connection events:

type MqttHandlers struct {
    OnConnect         paho.OnConnectHandler
    OnConnectionLost  paho.ConnectionLostHandler
    OnReconnecting    paho.ReconnectHandler
}

// Create client with custom handlers
handlers := &mqtt.MqttHandlers{
    OnConnect: func(c paho.Client) {
        log.Println("Connected to MQTT broker")
    },
    OnConnectionLost: func(c paho.Client, err error) {
        log.Printf("Connection lost: %v", err)
    },
}
client, err := mqtt.NewClientWithHandlers(cfg, handlers)