streamsx.mqtt package

MQTT integration for IBM Streams topology applications

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

This package exposes the com.ibm.streamsx.mqtt toolkit for use with Python.

Overview

Provides functions:

  • to connect to an MQTT server

  • to subscribe to topics and receive events

  • to publish events

Sample

A simple example of a Streams application uses the MQTTSink and MQTTSource classes:

from streamsx.mqtt import MQTTSource, MQTTSink
from streamsx.topology import context
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema

mqtt_server_uri = 'tcp://host.domain:1883'
s = 'Each character will be an MQTT message'
topology = Topology()
data = topology.source([c for c in s]).as_string()
# publish to MQTT
data.for_each(MQTTSink(server_uri=mqtt_server_uri, topic='topic'),
              name='MQTTpublish')

# subscribe for data and print to stdout
received = topology.source(MQTTSource(mqtt_server_uri, schema=CommonSchema.String, topics='topic'))
received.print()

context.submit(context.ContextTypes.DISTRIBUTED, topology)
# the Streams Job keeps running and must be cancelled manually
class streamsx.mqtt.MQTTSink(server_uri, topic=None, topic_attribute_name=None, data_attribute_name=None, **options)

Bases: streamsx.mqtt._mqtt.MQTTComposite, streamsx.topology.composite.ForEach

The MQTTSink represents a stream termination that publishes messages to one or more MQTT topics. Instances can be passed to Stream.for_each() to create a sink (stream termination).

Parameters
  • server_uri (str) – The MQTT server URI

  • topic (str) – The topic to publish the messages to. Mutually exclusive with topic_attribute_name.

  • topic_attribute_name (str) – The name of a tuple attribute denoting the destination topic. Mutually exclusive with topic.

  • data_attribute_name (str) – The name of the tuple attribute containing the message data to be published. data is assumed as default.

  • **options (kwargs) – optional parameters as keyword arguments

property app_config_name

The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties username and password and exist when the topology is submitted. Username and password given in an application configuration override password and username.

Type

str

property client_cert

A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.

When no keystore file is given with keystore, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.

The private key of the certificate must be provided in client_private_key.

Type

str

property client_id

A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.

Type

str

property client_private_key

The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in client_cert.

Example:

private_key = """
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T
cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT
NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi
0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn
...
I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o
fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv
UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1
YBdL1LnNMCOKPo2gPB19HgR7
-----END PRIVATE KEY-----
"""

mqtt = ...
mqtt.client_private_key = private_key

Key is stored in a file:

mqtt = ...
mqtt.client_private_key = '/home/user/private_key.pem'
Type

str

property command_timeout_millis

The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.

Type

int

property keep_alive_seconds

Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.

Type

int

property keystore

The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.

Note

When this property is set, also the keystore_password must be given.

property keystore_password

The password of the keystore given in keystore. The same password is used to decrypt the the private key given in client_private_key.

Type

str

property password

The password for login to the MQTT server. This property is overridden by the password given in an application configuration.

Type

str

populate(topology, stream, name, **options)

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

property qos

qos value for publishing messages. Allowed values are 0, 1, and 2.

Type

int

property reconnection_bound

Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.

Type

int

property retain

Indicates if messages should be retained on the MQTT server if there is no subscription for the topic. The default is False.

Type

bool

property server_uri

The URI of the MQTT server, either tcp://<hostid>[:<port>] or ssl://<hostid>[:<port>]. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.

Type

str

property ssl_debug

When True the property enables verbose SSL debug output at runtime.

Type

bool

property ssl_protocol

The SSL protocol for SSL connections. The default value is TLSv1.2.

Type

str

property trusted_certs

Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with truststore, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.

Example with one certificate literally:

mqtt_comp = ...
mqtt_comp.trusted_certs = """
-----BEGIN CERTIFICATE-----
MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV
BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1
YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9
...
ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp
B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2
6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q==
-----END CERTIFICATE-----
"""

Example with two certificates stored in files in /tmp:

mqtt_comp = ...
mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
Type

str|list

property truststore

The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.

Note

When this property is set, also the truststore_password must be given.

Type

str

property truststore_password

The password of the truststore given in truststore.

Type

str

property username

The username for login to the MQTT server. This property is overridden by the username given in an application configuration.

Type

str

property vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
Type

str|list

class streamsx.mqtt.MQTTSource(server_uri, topics, schema, data_attribute_name=None, topic_attribute_name=None, **options)

Bases: streamsx.mqtt._mqtt.MQTTComposite, streamsx.topology.composite.Source

Represents a source for messages read from an MQTT server, which can be passed to Topology.source() to create a stream.

Parameters
  • server_uri (str) – The MQTT server URI

  • topics (str|list) – The topic or topics to subscribe for messages.

  • schema – The schema of the created stream

  • data_attribute_name (str) – The name of the tuple attribute containing the message data. data is assumed as default.

  • topic_attribute_name (str) – The name of a tuple attribute denoting the source topic of received messages.

  • **options (kwargs) – optional parameters as keyword arguments

property app_config_name

The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties username and password and exist when the topology is submitted. Username and password given in an application configuration override password and username.

Type

str

property client_cert

A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.

When no keystore file is given with keystore, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.

The private key of the certificate must be provided in client_private_key.

Type

str

property client_id

A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.

Type

str

property client_private_key

The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in client_cert.

Example:

private_key = """
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T
cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT
NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi
0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn
...
I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o
fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv
UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1
YBdL1LnNMCOKPo2gPB19HgR7
-----END PRIVATE KEY-----
"""

mqtt = ...
mqtt.client_private_key = private_key

Key is stored in a file:

mqtt = ...
mqtt.client_private_key = '/home/user/private_key.pem'
Type

str

property command_timeout_millis

The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.

Type

int

property keep_alive_seconds

Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.

Type

int

property keystore

The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.

Note

When this property is set, also the keystore_password must be given.

property keystore_password

The password of the keystore given in keystore. The same password is used to decrypt the the private key given in client_private_key.

Type

str

property message_queue_size

The size, in number of messages, of an internal receive buffer. The receiver stops fetching messages when the buffer is full. The default is 500.

Type

int

property password

The password for login to the MQTT server. This property is overridden by the password given in an application configuration.

Type

str

populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

property qos

qos values for the messages to subscribe.

Example:

mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String)
mqttSource.qos = 2

The qos value can also be a list of integers:

mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String)
mqttSource.qos = [1, 2]
Type

int|list

property reconnection_bound

Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.

Type

int

property server_uri

The URI of the MQTT server, either tcp://<hostid>[:<port>] or ssl://<hostid>[:<port>]. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.

Type

str

property ssl_debug

When True the property enables verbose SSL debug output at runtime.

Type

bool

property ssl_protocol

The SSL protocol for SSL connections. The default value is TLSv1.2.

Type

str

property trusted_certs

Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with truststore, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.

Example with one certificate literally:

mqtt_comp = ...
mqtt_comp.trusted_certs = """
-----BEGIN CERTIFICATE-----
MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV
BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1
YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9
...
ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp
B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2
6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q==
-----END CERTIFICATE-----
"""

Example with two certificates stored in files in /tmp:

mqtt_comp = ...
mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
Type

str|list

property truststore

The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.

Note

When this property is set, also the truststore_password must be given.

Type

str

property truststore_password

The password of the truststore given in truststore.

Type

str

property username

The username for login to the MQTT server. This property is overridden by the username given in an application configuration.

Type

str

property vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
Type

str|list

Indices and tables