streamsx.mqtt package

MQTT integration for IBM Streams topology applications

For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:

This package exposes the com.ibm.streamsx.mqtt toolkit for use with Python.

Overview

Provides functions:

  • to connect to an MQTT server
  • to subscribe to topics and receive events
  • to publish events

Sample

A simple example of a Streams application uses the MQTTSink and MQTTSource classes:

from streamsx.mqtt import MQTTSource, MQTTSink
from streamsx.topology import context
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema

mqtt_server_uri = 'tcp://host.domain:1883'
s = 'Each character will be an MQTT message'
topology = Topology()
data = topology.source([c for c in s]).as_string()
# publish to MQTT
data.for_each(MQTTSink(server_uri=mqtt_server_uri, topic='topic'),
              name='MQTTpublish')

# subscribe for data and print to stdout
received = topology.source(MQTTSource(mqtt_server_uri, schema=CommonSchema.String, topics='topic'))
received.print()

context.submit(context.ContextTypes.DISTRIBUTED, topology)
# the Streams Job keeps running and must be cancelled manually
class streamsx.mqtt.MQTTSink(server_uri, topic=None, topic_attribute_name=None, data_attribute_name=None, **options)

Bases: streamsx.mqtt._mqtt.MQTTComposite, streamsx.topology.composite.ForEach

The MQTTSink represents a stream termination that publishes messages to one or more MQTT topics. Instances can be passed to Stream.for_each() to create a sink (stream termination).

Parameters:
  • server_uri (str) – The MQTT server URI
  • topic (str) – The topic to publish the messages to. Mutually exclusive with topic_attribute_name.
  • topic_attribute_name (str) – The name of a tuple attribute denoting the destination topic. Mutually exclusive with topic.
  • data_attribute_name (str) – The name of the tuple attribute containing the message data to be published. data is assumed as default.
  • **options (kwargs) – optional parameters as keyword arguments
app_config_name

The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties username and password and exist when the topology is submitted. Username and password given in an application configuration override password and username.

Type:str
client_cert

A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.

When no keystore file is given with keystore, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.

The private key of the certificate must be provided in client_private_key.

Type:str
client_id

A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.

Type:str
client_private_key

The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in client_cert.

Example:

private_key = """
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T
cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT
NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi
0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn
...
I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o
fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv
UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1
YBdL1LnNMCOKPo2gPB19HgR7
-----END PRIVATE KEY-----
"""

mqtt = ...
mqtt.client_private_key = private_key

Key is stored in a file:

mqtt = ...
mqtt.client_private_key = '/home/user/private_key.pem'
Type:str
command_timeout_millis

The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.

Type:int
keep_alive_seconds

Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.

Type:int
keystore

The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.

Note

When this property is set, also the keystore_password must be given.

keystore_password

The password of the keystore given in keystore. The same password is used to decrypt the the private key given in client_private_key.

Type:str
password

The password for login to the MQTT server. This property is overridden by the password given in an application configuration.

Type:str
populate(topology, stream, name, **options)

Populate the topology with this composite for each transformation.

Parameters:
  • topology – Topology containing the composite map.
  • stream – Stream to be transformed.
  • name – Name passed into for_each.
  • **options – Future options passed to for_each.
Returns:

Termination for this composite transformation of stream.

Return type:

Sink

qos

qos value for publishing messages. Allowed values are 0, 1, and 2.

Type:int
reconnection_bound

Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.

Type:int
retain

Indicates if messages should be retained on the MQTT server if there is no subscription for the topic. The default is False.

Type:bool
server_uri

The URI of the MQTT server, either tcp://<hostid>[:<port>] or ssl://<hostid>[:<port>]. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.

Type:str
ssl_debug

When True the property enables verbose SSL debug output at runtime.

Type:bool
ssl_protocol

The SSL protocol for SSL connections. The default value is TLSv1.2.

Type:str
trusted_certs

Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with truststore, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.

Example with one certificate literally:

mqtt_comp = ...
mqtt_comp.trusted_certs = """
-----BEGIN CERTIFICATE-----
MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV
BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1
YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9
...
ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp
B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2
6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q==
-----END CERTIFICATE-----
"""

Example with two certificates stored in files in /tmp:

mqtt_comp = ...
mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
Type:str|list
truststore

The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.

Note

When this property is set, also the truststore_password must be given.

Type:str
truststore_password

The password of the truststore given in truststore.

Type:str
username

The username for login to the MQTT server. This property is overridden by the username given in an application configuration.

Type:str
vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
Type:str|list
class streamsx.mqtt.MQTTSource(server_uri, topics, schema, data_attribute_name=None, topic_attribute_name=None, **options)

Bases: streamsx.mqtt._mqtt.MQTTComposite, streamsx.topology.composite.Source

Represents a source for messages read from an MQTT server, which can be passed to Topology.source() to create a stream.

Parameters:
  • server_uri (str) – The MQTT server URI
  • topics (str|list) – The topic or topics to subscribe for messages.
  • schema – The schema of the created stream
  • data_attribute_name (str) – The name of the tuple attribute containing the message data. data is assumed as default.
  • topic_attribute_name (str) – The name of a tuple attribute denoting the source topic of received messages.
  • **options (kwargs) – optional parameters as keyword arguments
app_config_name

The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties username and password and exist when the topology is submitted. Username and password given in an application configuration override password and username.

Type:str
client_cert

A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.

When no keystore file is given with keystore, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.

The private key of the certificate must be provided in client_private_key.

Type:str
client_id

A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.

Type:str
client_private_key

The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in client_cert.

Example:

private_key = """
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T
cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT
NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi
0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn
...
I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o
fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv
UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1
YBdL1LnNMCOKPo2gPB19HgR7
-----END PRIVATE KEY-----
"""

mqtt = ...
mqtt.client_private_key = private_key

Key is stored in a file:

mqtt = ...
mqtt.client_private_key = '/home/user/private_key.pem'
Type:str
command_timeout_millis

The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.

Type:int
keep_alive_seconds

Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.

Type:int
keystore

The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.

Note

When this property is set, also the keystore_password must be given.

keystore_password

The password of the keystore given in keystore. The same password is used to decrypt the the private key given in client_private_key.

Type:str
message_queue_size

The size, in number of messages, of an internal receive buffer. The receiver stops fetching messages when the buffer is full. The default is 500.

Type:int
password

The password for login to the MQTT server. This property is overridden by the password given in an application configuration.

Type:str
populate(topology, name, **options)

Populate the topology with this composite source.

Parameters:
  • topology – Topology containing the source.
  • name – Name passed into source.
  • **options – Future options passed to source.
Returns:

Single stream representing the source.

Return type:

Stream

qos

qos values for the messages to subscribe.

Example:

mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String)
mqttSource.qos = 2

The qos value can also be a list of integers:

mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String)
mqttSource.qos = [1, 2]
Type:int|list
reconnection_bound

Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.

Type:int
server_uri

The URI of the MQTT server, either tcp://<hostid>[:<port>] or ssl://<hostid>[:<port>]. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.

Type:str
ssl_debug

When True the property enables verbose SSL debug output at runtime.

Type:bool
ssl_protocol

The SSL protocol for SSL connections. The default value is TLSv1.2.

Type:str
trusted_certs

Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with truststore, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.

Example with one certificate literally:

mqtt_comp = ...
mqtt_comp.trusted_certs = """
-----BEGIN CERTIFICATE-----
MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV
BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1
YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9
...
ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp
B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2
6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q==
-----END CERTIFICATE-----
"""

Example with two certificates stored in files in /tmp:

mqtt_comp = ...
mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
Type:str|list
truststore

The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.

Note

When this property is set, also the truststore_password must be given.

Type:str
truststore_password

The password of the truststore given in truststore.

Type:str
username

The username for login to the MQTT server. This property is overridden by the username given in an application configuration.

Type:str
vm_arg

Arguments for the Java Virtual Machine used at Runtime, for example -Xmx2G. For multiple arguments, use a list:

mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
Type:str|list

Indices and tables