Python
Our Python SDK is designed to be used for high performance telemetry services where we need to process high volumes of data in a nanosecond response time.
We also provide HTTP and Websockets services to read and write realtime telemetry data for less performance oriented use cases like Mobile or Web applications.
Connecting to Quix
Streams are written and read using an instance of StreamingClient
class.
You can instantiate an instance of StreamingClient
with a string containing your Quix Kafka server list and some SecurityOptions
to access it. You can find your Kafka configuration parameters using the automatically generated Samples in the platform for your specific workspace.
security = SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD)
client = StreamingClient('kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093', security)
Writing to Quix
You need a Topic to write data to Quix. You can create one in platform. This instance allow you to write new Streams into the specified Topic.
You can instantiate an instance of OutputTopic
with a string containing your Topic Id. You can find your Topic Id on Topics option of the platform or just using the automated generated Samples in the platform for your specific Workspace and Topic.
output_topic = client.create_output_topic(TOPIC_ID)
Writing Streams
Once you have the OutputTopic instance you can create as many streams as you want using the method create_stream
. The Stream Id is autogenerated but you can also pass a StreamId
to the method. If you do that, you can append existing stream that is already closed.
stream = output_topic.create_stream()
Stream Properties
As an option, you can add context to your streams by adding metadata to the stream.
You can add this metadata to a stream by using the Properties
options of the generated stream
instance.
stream.properties.name = "Hello World python stream"
stream.properties.location = "/test/location"
stream.properties.metadata["meta"] = "is"
stream.properties.metadata["working"] = "well"
Stream Location
The stream location property is particularly important as it defines the hierarchy of your data in the data catalouge.
For example, the following location:
stream.Properties.Location = $"/Game/Codemasters/F1-2019/{track}"
Would result in this hierarchy in the catalogue:
Any streams sent without a location property will be located under Root by default.
Writing Parameters
You can now start writing parameter data to your stream. We reccommend that you do this using the built-in buffer feature.
Timestamps
Our SDK gives you several helper functions to add new timestamps to Buffer
, ParamaterData
and EventData
instances with a several types of date time formats.
Some of these functions use the default epoch
defined at stream
level. This epoch
is very useful to avoid to specify the date part of each timestamp we add with the SDK.
These are all the common helper functions:
add_timestamp(datetime: datetime)
: Add a new timestamp indatetime
format. Defaultepoch
will never be added to this.add_timestamp(time: timedelta)
: Add a new timestamp intimedelta
format since the defaultepoch
determined in the stream.add_timestamp_milliseconds(milliseconds: int)
: Add a new timestamp in milliseconds since the defaultepoch
determined in the stream.add_timestamp_nanoseconds(nanoseconds: int)
: Add a new timestamp in nanoseconds since the defaultepoch
determined in the stream.
Adding data without epoch:
stream.parameters.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", 10) \
.add_value("ParameterB", "hello") \
.write()
or we can add timestamp 1000ms from epoch:
stream.epoch = datetime.datetime.utcnow()
stream.parameters.buffer \
.add_timestamp_milliseconds(1000) \
.add_value("ParameterA", 10) \
.add_value("ParameterB", "hello") \
.write()
Buffer
Our SDK provides a built in Buffer
to help you achieve high performance data streaming without the complexity of managing underlying streaming technologies. Instead, you just have to configure the buffer with your requirements. For example the following configuration means that the SDK will send a packet when the size of the buffer reaches 100 timestamps:
stream.parameters.buffer.packet_size = 100
Writing a parameter to that buffer is as simple as using the add_timestamp
method and add_value
for each Parameter value we want to write in. At the end we use Write
method to write the timestamp to the buffer.
stream.parameters.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", 10) \
.add_value("ParameterB", "hello") \
.add_value("ParameterC", bytearray("hello, Quix!", 'utf-8')) # use bytearray to write binary data to a stream.
.write()
You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:
buffer.buffer_timeout
: The maximum duration in milliseconds for which the buffer will be held before release the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.buffer.packet_size
: The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.buffer.time_span_in_nanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.buffer.time_span_in_milliseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top oftime_span_in_nanoseconds
. They both work with same underlying value.buffer.custom_trigger_before_enqueue
: Custom function which is invoked before adding a new timestamp to the buffer. If returns true, the packet of data is released before adding the timestamp to it.buffer.custom_trigger
: Custom function which is invoked after adding a new timestamp to the buffer. If returns true, the packet of data is released with the entire buffer content.buffer.filter
: Custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not.
Examples
This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.
stream.parameters.buffer.time_span_in_milliseconds = 100
stream.parameters.buffer.buffer_timeout = 1000
This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.
stream.parameters.buffer.time_span_in_milliseconds = 100
stream.parameters.buffer.custom_trigger = lambda data: data.timestamps[0].tags["is_critical"] == 'True'
Parameter Definitions
Quix SDK allows you to define some visualization configuration and metadata for Parameters and Events. You can define things like human readable names, descriptions, ranges, etc. Quix uses some of these configuration in the Visualise in the platform but you can use them aswell in your own models, bridges or visualization implementations.
We call these configurations Definitions
and all you need to do is to use add_definition
helper function either for stream.parameters
or stream.events
:
parameters.add_definition(parameter_id: str, name: str = None, description: str = None)
events.add_definition(event_id: str, name: str = None, description: str = None)
Example:
stream.parameters \
.add_location("vehicle/ecu") \
.add_definition("vehicle-speed", "Vehicle speed", "Current vehicle speed measured using wheel sensor") \
.set_unit("kmh") \
.set_range(0, 400)
The Min and Max range definition sets the Y axis range in the waveform visualisation view. The following definition
.add_definition("Speed").set_range(0, 400)
will set this view in Visualise:
Adding additional Definitions
for each parameter allows you to see data with diffent ranges on the same waveform view:
You can also define a Location
before adding parameter and event definitions. Locations are used to organize the Parameters and Events in hierarchy groups in the data catalogue. To add a Location you should use add_location
function before adding the definitions you want to include in that group.
For example, setting the following parameter location
stream.parameters \
.add_location("/Player/Motion/Car") \
.add_definition("Pitch") \
.add_definition("Roll") \
.add_definition("Yaw")
will result in this parameter hierarchy in the parameter selection dialogue:
Once you have added a new definition you can also attach some additional configurations to it. This is the whole list of visualization and metadata options we can attach to a ParameterDefinition
:
set_range(minimum_value: float, maximum_value: float)
: Set the minimum and maximum range of the parameter.set_unit(unit: str)
: Set the unit of the parameter.set_format(format: str)
: Set the format of the parameter.set_custom_properties(custom_properties: str)
: Set the custom properties of the parameter.
Example:
stream.parameters \
.add_definition("ParameterA", "The Parameter A") \
.set_range(-1.2, 1.2)
Writing Events
You can also write events to Quix.
Writing events to a stream is identical to writing parameters, although you can start without the buffer feature because events don't need high performance throughput.
stream.events \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("EventA", "Nice!") \
.add_value("EventB", "High Five") \
.write()
Event Definitions
Like parameters, you can write Definitions
to each event.
This is the whole list of visualization and metadata options we can attach to a EventDefinition
:
set_level(level: EventLevel)
: Set severity level of the event.set_custom_properties(custom_properties: str)
: Set the custom properties of the event.
For example the following code is defining a human readable name and a Severity level for the EventA
.
stream.events \
.add_definition("EventA", "The Event A") \
.set_level(EventLevel.Critical)
Tags
Using tags alongside parameters and events practically index persisted data in the database. This means you will be able to filter and group data by those tags in fast queries. Tags have to be chosen carefully as excessive cardinality leads to performance degradation in the database.
Good example: This will allow you later to query maximum speed for driver "Peter" per car.
stream.parameters.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_tag("vehicle-plate", "SL96 XCX") \
.add_tag("driver-id", "Peter") \
.add_value("Speed", 53) \
.add_value("Gear", 4) \
.write()
Wrong example: This will lead to excessive cardinality as there will be a massive amount of different values for specified tag Speed.
stream.parameters.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_tag("Speed", 53) \
.add_value("Gear", 4) \
.write()
Minimal example
This is minimal code example needed to write data to a topic using Quix SDK.
import time
import datetime
import math
from quixstreaming import *
# Create a client. Client helps you to create input reader or output writer for specified topic.
security = SecurityOptions('../certificates/ca.cert', WORKSPACE_ID, PASSWORD)
client = StreamingClient(BROKER_CLUSTER, security)
output_topic = client.open_output_topic(TOPIC_ID)
stream = output_topic.create_stream()
stream.properties.name = "Hello World python stream"
for index in range(0, 3000):
stream.parameters \
.buffer \
.add_timestamp(datetime.datetime.utcnow()) \
.add_value("ParameterA", index) \
.write()
time.sleep(0.01)
print("Closing stream")
stream.close()
Reading from Quix
In order to read streams, you need an InputTopic
instance. This instance allow you to read all the incoming streams on the specified Topic.
You can instantiate an instance of InputTopic
with a string containing your Topic Id. You can find your Topic Id on Topics page of the platform or by using the automated generated Samples in the platform for your specific Workspace and Topic.
input_topic = client.create_input_topic(TOPIC_ID)
Reading Streams
Once you have the InputTopic
instance you can start reading streams. For each stream received to the specified topic, the InputTopic
will execute the event on_stream_received
. For example the following code is printing the StreamId for each new_stream
received on that Topic:
def read_stream(new_stream: StreamReader):
print("New stream read:" + new_stream.stream_id)
input_topic.on_stream_received += read_stream
input_topic.start_reading()
Reading Parameters
You can also use buffers to read data, this helps you to to develop Models with a high performance throughput.
buffer = new_stream.parameters.create_buffer()
or if you want to filter only rows where certain paramaters are presented:
buffer = new_stream.parameters.create_buffer('Speed', 'Gear')
You just have to configure these buffers with your input requirements using the available built in configuration. For example the following configuration means that the Buffer will release a packet when the Time Span between first and last timestamp reaches 100 milliseconds:
buffer.time_span_in_milliseconds = 100
Reading parameter data from that buffer is as simple as use the on_read
event. For each paramater data packet released from the buffer the SDK will execute the on_read
event with the parameter data as a given parameter. For example the following code is printing the ParameterA value of first timestamp of the packet received:
def on_parameter_data_handler(data: ParameterData):
hello_world_value = data.timestamps[0].parameters['ParameterA'].numeric_value
binary_value = data.timestamps[1].parameters['ParameterC'].binary_value # reading binary data as a byte array.
print("ParameterA - " + str(data.timestamps[0]) + ": " + str(hello_world_value))
print("ParameterC - " + str(data.timestamps[1]) + ": " + binary_value.decode('utf-8'))
buffer.on_read += on_parameter_data_handler
You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:
buffer.buffer_timeout
: The maximum duration in milliseconds for which the buffer will be held before release the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.buffer.packet_size
: The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.buffer.time_span_in_nanoseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.buffer.time_span_in_milliseconds
: The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top oftime_span_in_nanoseconds
. They both work with same underlying value.buffer.custom_trigger_before_enqueue
: Custom function which is invoked before adding a new timestamp to the buffer. If returns true, the packet of data is released before adding the timestamp to it.buffer.custom_trigger
: Custom function which is invoked after adding a new timestamp to the buffer. If returns true, the packet of data is released with the entire buffer content.buffer.filter
: Custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not.
Examples
This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.
buffer.time_span_in_milliseconds = 100
buffer.buffer_timeout = 1000
This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.
buffer.time_span_in_milliseconds = 100
buffer.custom_trigger = lambda data: data.timestamps[0].tags["is_critical"] == 'True'
Reading Events
Reading events from a stream is as easy as reading parameter data. In that case the SDK is not using a Buffer because we don't need high performance throughput, but the way we read Event Data from a new_stream
is identical.
def on_event_data_handler(data: EventData):
print("Event data read for stream: " + new_stream.stream_id)
print(" Time:", data.timestamp)
print(" Id:", data.id)
print(" Value: " + data.value)
new_stream.events.on_read += on_event_data_handler
Minimum example
This is minimal code example needed to read data from a topic using Quix SDK.
from quixstreaming import *
from quixstreaming.models.parametersbufferconfiguration import ParametersBufferConfiguration
import sys
import signal
import threading
# Create a client. Client helps you to create input reader or output writer for specified topic.
security = SecurityOptions('../certificates/ca.cert', WORKSPACE_ID, PASSWORD)
client = StreamingClient(BROKER_CLUSTER, security)
input_topic = client.open_input_topic(TOPIC_ID)
# read streams
def read_stream(new_stream: StreamReader):
buffer = new_stream.parameters.create_buffer()
def on_parameter_data_handler(data: ParameterData):
df = data.to_panda_frame()
print(df.to_string())
buffer.on_read += on_parameter_data_handler
# Hook up events before initiating read to avoid losing out on any data
input_topic.on_stream_received += read_stream
input_topic.start_reading() # initiate read
# Hook up to termination signal (for docker image) and CTRL-C
print("Listening to streams. Press CTRL-C to exit.")
event = threading.Event()
def signal_handler(sig, frame):
print('Exiting...')
event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
event.wait()