streamsx.mqtt package¶
MQTT integration for IBM Streams topology applications¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
This package exposes the com.ibm.streamsx.mqtt toolkit for use with Python.
Overview¶
Provides functions:
to connect to an MQTT server
to subscribe to topics and receive events
to publish events
Sample¶
A simple example of a Streams application uses the MQTTSink
and MQTTSource
classes:
from streamsx.mqtt import MQTTSource, MQTTSink
from streamsx.topology import context
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
mqtt_server_uri = 'tcp://host.domain:1883'
s = 'Each character will be an MQTT message'
topology = Topology()
data = topology.source([c for c in s]).as_string()
# publish to MQTT
data.for_each(MQTTSink(server_uri=mqtt_server_uri, topic='topic'),
name='MQTTpublish')
# subscribe for data and print to stdout
received = topology.source(MQTTSource(mqtt_server_uri, schema=CommonSchema.String, topics='topic'))
received.print()
context.submit(context.ContextTypes.DISTRIBUTED, topology)
# the Streams Job keeps running and must be cancelled manually
-
class
streamsx.mqtt.
MQTTSink
(server_uri, topic=None, topic_attribute_name=None, data_attribute_name=None, **options)¶ Bases:
streamsx.mqtt._mqtt.MQTTComposite
,streamsx.topology.composite.ForEach
The
MQTTSink
represents a stream termination that publishes messages to one or more MQTT topics. Instances can be passed toStream.for_each()
to create a sink (stream termination).- Parameters
server_uri (str) – The MQTT server URI
topic (str) – The topic to publish the messages to. Mutually exclusive with
topic_attribute_name
.topic_attribute_name (str) – The name of a tuple attribute denoting the destination topic. Mutually exclusive with
topic
.data_attribute_name (str) – The name of the tuple attribute containing the message data to be published.
data
is assumed as default.**options (kwargs) – optional parameters as keyword arguments
-
property
app_config_name
¶ The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties
username
andpassword
and exist when the topology is submitted. Username and password given in an application configuration overridepassword
andusername
.- Type
str
-
property
client_cert
¶ A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.
When no keystore file is given with
keystore
, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.The private key of the certificate must be provided in
client_private_key
.- Type
str
-
property
client_id
¶ A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.
- Type
str
-
property
client_private_key
¶ The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in
client_cert
.Example:
private_key = """ -----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi 0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn ... I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1 YBdL1LnNMCOKPo2gPB19HgR7 -----END PRIVATE KEY----- """ mqtt = ... mqtt.client_private_key = private_key
Key is stored in a file:
mqtt = ... mqtt.client_private_key = '/home/user/private_key.pem'
- Type
str
-
property
command_timeout_millis
¶ The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.
- Type
int
-
property
keep_alive_seconds
¶ Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.
- Type
int
-
property
keystore
¶ The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.
Note
When this property is set, also the
keystore_password
must be given.
-
property
keystore_password
¶ The password of the keystore given in
keystore
. The same password is used to decrypt the the private key given inclient_private_key
.- Type
str
-
property
password
¶ The password for login to the MQTT server. This property is overridden by the password given in an application configuration.
- Type
str
-
populate
(topology, stream, name, **options)¶ Populate the topology with this composite for each transformation. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:sink = input_stream.for_each(myForEachComposite)
- Parameters
topology – Topology containing the composite map.
stream – Stream to be transformed.
name – Name passed into
for_each
.**options – Future options passed to
for_each
.
- Returns
Termination for this composite transformation of stream.
- Return type
Sink
-
property
qos
¶ qos value for publishing messages. Allowed values are 0, 1, and 2.
- Type
int
-
property
reconnection_bound
¶ Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.
- Type
int
-
property
retain
¶ Indicates if messages should be retained on the MQTT server if there is no subscription for the topic. The default is
False
.- Type
bool
-
property
server_uri
¶ The URI of the MQTT server, either
tcp://<hostid>[:<port>]
orssl://<hostid>[:<port>]
. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.- Type
str
-
property
ssl_debug
¶ When
True
the property enables verbose SSL debug output at runtime.- Type
bool
-
property
ssl_protocol
¶ The SSL protocol for SSL connections. The default value is
TLSv1.2
.- Type
str
-
property
trusted_certs
¶ Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with
truststore
, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.Example with one certificate literally:
mqtt_comp = ... mqtt_comp.trusted_certs = """ -----BEGIN CERTIFICATE----- MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1 YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9 ... ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2 6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q== -----END CERTIFICATE----- """
Example with two certificates stored in files in /tmp:
mqtt_comp = ... mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
- Type
str|list
-
property
truststore
¶ The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.
Note
When this property is set, also the
truststore_password
must be given.- Type
str
-
property
truststore_password
¶ The password of the truststore given in
truststore
.- Type
str
-
property
username
¶ The username for login to the MQTT server. This property is overridden by the username given in an application configuration.
- Type
str
-
property
vm_arg
¶ Arguments for the Java Virtual Machine used at Runtime, for example
-Xmx2G
. For multiple arguments, use a list:mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
- Type
str|list
-
class
streamsx.mqtt.
MQTTSource
(server_uri, topics, schema, data_attribute_name=None, topic_attribute_name=None, **options)¶ Bases:
streamsx.mqtt._mqtt.MQTTComposite
,streamsx.topology.composite.Source
Represents a source for messages read from an MQTT server, which can be passed to
Topology.source()
to create a stream.- Parameters
server_uri (str) – The MQTT server URI
topics (str|list) – The topic or topics to subscribe for messages.
schema – The schema of the created stream
data_attribute_name (str) – The name of the tuple attribute containing the message data.
data
is assumed as default.topic_attribute_name (str) – The name of a tuple attribute denoting the source topic of received messages.
**options (kwargs) – optional parameters as keyword arguments
-
property
app_config_name
¶ The name of an application configuration with username and password for login to the MQTT server. The application configuration must contain the properties
username
andpassword
and exist when the topology is submitted. Username and password given in an application configuration overridepassword
andusername
.- Type
str
-
property
client_cert
¶ A client certificate or a filename with a client certificate. Certificates must be given in PEM format or as a filename that contains the PEM formatted certificate.
When no keystore file is given with
keystore
, a new keystore is created and added as a file dependency to the topology. Otherwise the certificate is added to the given keystore, and the keystore is added as a file dependency.The private key of the certificate must be provided in
client_private_key
.- Type
str
-
property
client_id
¶ A unique identifier for a connection to the MQTT server. The MQTT broker only allows a single connection for a particular clientID. By default a unique client ID is automatically generated.
- Type
str
-
property
client_private_key
¶ The private RSA key of a client certificate or a filename with the private key. The key must be given in PEM format literally or as a filename. The client certificate must be provided in
client_cert
.Example:
private_key = """ -----BEGIN PRIVATE KEY----- MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDBqvzZWSC+cF0T cTk6xC9iJDdWyqamh/9HnOVWDLzAKQ0XsdSHONVLjs3fwY8bM1tZywBDUujCujJT NaAW3YDkiVG8v+j90eR6UUc/zlG5zC6tZ6/UlHgXSUOCVuB7d3Y3v9Aoiz8HfPEi 0uV+6WOHCPg9df0gwmzPRYDtobMWat0jCXLkekLjc2gx8muPsOnMAaepNHYmfvFn ... I4ahg9LgF3pT8BEr5xxfwX3U/i0pHz5qc+kuuXxfAoGBAI0SmUDSGT7rTgxvwy7o fRIOJz3KlIYZ+JugC6Zj7GYv7fISzVRVENpaeL6jPGmhId/Pu++yT7x9FdmlGfMv UGu65oHRMYGb+dMCeYF6XymSXy1MOHTH38AE27/BHm8bDe/CYJgcKXo+3FvCRal1 YBdL1LnNMCOKPo2gPB19HgR7 -----END PRIVATE KEY----- """ mqtt = ... mqtt.client_private_key = private_key
Key is stored in a file:
mqtt = ... mqtt.client_private_key = '/home/user/private_key.pem'
- Type
str
-
property
command_timeout_millis
¶ The maximum time in milliseconds to wait for an MQTT connect or publish action to complete. A value of 0 causes the client to wait infinitely. The default is 0.
- Type
int
-
property
keep_alive_seconds
¶ Automatically generate an MQTT ping message to the server if a message or ping hasn’t been sent or received in the last keelAliveInterval seconds. Enables the client to detect if the server is no longer available without having to wait for the TCP/IP timeout. A value of 0 disables keepalive processing. The default is 60.
- Type
int
-
property
keystore
¶ The filename of a keystore with client certificate and its private key. This keystore file is added as a file dependency to the topology.
Note
When this property is set, also the
keystore_password
must be given.
-
property
keystore_password
¶ The password of the keystore given in
keystore
. The same password is used to decrypt the the private key given inclient_private_key
.- Type
str
-
property
message_queue_size
¶ The size, in number of messages, of an internal receive buffer. The receiver stops fetching messages when the buffer is full. The default is 500.
- Type
int
-
property
password
¶ The password for login to the MQTT server. This property is overridden by the password given in an application configuration.
- Type
str
-
populate
(topology, name, **options)¶ Populate the topology with this composite source. Subclasses must implement the
populate
function.populate
is called when the composite is added to the topology with:topo = Topology() source_stream = topo.source(mySourceComposite)
- Parameters
topology – Topology containing the source.
name – Name passed into
source
.**options – Future options passed to
source
.
- Returns
Single stream representing the source.
- Return type
Stream
-
property
qos
¶ qos values for the messages to subscribe.
Example:
mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String) mqttSource.qos = 2
The qos value can also be a list of integers:
mqttSource = MQTTSource('tcp://host.domain:1883', ['topic1', 'topic2'], CommonSchema.String) mqttSource.qos = [1, 2]
- Type
int|list
-
property
reconnection_bound
¶ Number of reconnection attempts in case of connect failures. Specify 0 for no retry, a non-zero positive number n for n retries, or -1 for inifinite retry.
- Type
int
-
property
server_uri
¶ The URI of the MQTT server, either
tcp://<hostid>[:<port>]
orssl://<hostid>[:<port>]
. The port defaults to 1883 for “tcp:” and 8883 for “ssl:” URIs.- Type
str
-
property
ssl_debug
¶ When
True
the property enables verbose SSL debug output at runtime.- Type
bool
-
property
ssl_protocol
¶ The SSL protocol for SSL connections. The default value is
TLSv1.2
.- Type
str
-
property
trusted_certs
¶ Certificates or filenames with certificates in PEM format that shall be trusted when SSL connections are made. Self-signed server certificates or the root certificate of server certificates must go in here. When no truststore is given with
truststore
, a new truststore is created and added as a file dependency to the topology. Otherwise the certificates are added to the given truststore, and the truststore is added as a file dependency.Example with one certificate literally:
mqtt_comp = ... mqtt_comp.trusted_certs = """ -----BEGIN CERTIFICATE----- MIIDJzCCAg+gAwIBAgIJAJhu1pO2qbj7MA0GCSqGSIb3DQEBCwUAMCoxEzARBgNV BAoMCmlvLnN0cmltemkxEzARBgNVBAMMCmNsdXN0ZXItY2EwHhcNMTkwODE2MDc1 YSTwYvxfkrxGKtDiLjIZ6Q6LJjMcWLG4x3I0WmGgvytTs04S4B+1vp721jmqRKm9 ... ocAT5iL3ZDUj/lwqJRptmzGFcdko+woFae68HRx1ygSgROls7bXy/CwgME0LFFQp B+2YAhUw1sPU410JUxU3/v6R5vJfI9imE75aha3U7UbeOX8+1+Cu3HOT1QMn80k2 6LnZeMCCgCBp+Yz3YNeUMRejMU6x4WlhTPO7bBq3tKGgwCoyGIX25wMM1Q== -----END CERTIFICATE----- """
Example with two certificates stored in files in /tmp:
mqtt_comp = ... mqtt_comp.trusted_certs = ['/tmp/ca_cert_1.pem', '/tmp/ca_cert_2.pem']
- Type
str|list
-
property
truststore
¶ The filename of a truststore with certificates that shall be trusted when SSL connections are made. This truststore file is added as a file dependency to the topology.
Note
When this property is set, also the
truststore_password
must be given.- Type
str
-
property
truststore_password
¶ The password of the truststore given in
truststore
.- Type
str
-
property
username
¶ The username for login to the MQTT server. This property is overridden by the username given in an application configuration.
- Type
str
-
property
vm_arg
¶ Arguments for the Java Virtual Machine used at Runtime, for example
-Xmx2G
. For multiple arguments, use a list:mqtt.vm_arg = ["-Xmx2G", "-Xms1G"]
- Type
str|list