Python

Our Python SDK is designed to be used for high performance telemetry services where we need to process high volumes of data in a nanosecond response time.

We also provide HTTP and Websockets services to read and write realtime telemetry data for less performance oriented use cases like Mobile or Web applications.

Connecting to Quix

Streams are written and read using an instance of StreamingClient class.

You can instantiate an instance of StreamingClient with a string containing your Quix Kafka server list and some SecurityOptions to access it. You can find your Kafka configuration parameters using the automatically generated Samples in the platform for your specific workspace.

security = SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD)
client = StreamingClient('kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093', security)

Writing to Quix

You need a Topic to write data to Quix. You can create one in platform. This instance allow you to write new Streams into the specified Topic.

You can instantiate an instance of OutputTopic with a string containing your Topic Id. You can find your Topic Id on Topics option of the platform or just using the automated generated Samples in the platform for your specific Workspace and Topic.

output_topic = client.create_output_topic(TOPIC_ID)

Writing Streams

Once you have the OutputTopic instance you can create as many streams as you want using the method create_stream. The Stream Id is autogenerated but you can also pass a StreamId to the method. If you do that, you can append existing stream that is already closed.

stream = output_topic.create_stream()

Stream Properties

As an option, you can add context to your streams by adding metadata to the stream.

You can add this metadata to a stream by using the Properties options of the generated stream instance.

stream.properties.name = "Hello World python stream"
stream.properties.location = "/test/location"
stream.properties.metadata["meta"] = "is"
stream.properties.metadata["working"] = "well"
Stream Location

The stream location property is particularly important as it defines the hierarchy of your data in the data catalouge.

For example, the following location:

stream.Properties.Location = $"/Game/Codemasters/F1-2019/{track}"

Would result in this hierarchy in the catalogue:

hierarchy

Any streams sent without a location property will be located under Root by default.

Writing Parameters

You can now start writing parameter data to your stream. We reccommend that you do this using the built-in buffer feature.

Timestamps

Our SDK gives you several helper functions to add new timestamps to Buffer, ParamaterData and EventData instances with a several types of date time formats.

Some of these functions use the default epoch defined at stream level. This epoch is very useful to avoid to specify the date part of each timestamp we add with the SDK.

These are all the common helper functions:

  • add_timestamp(datetime: datetime) : Add a new timestamp in datetime format. Default epoch will never be added to this.
  • add_timestamp(time: timedelta) : Add a new timestamp in timedelta format since the default epoch determined in the stream.
  • add_timestamp_milliseconds(milliseconds: int) : Add a new timestamp in milliseconds since the default epoch determined in the stream.
  • add_timestamp_nanoseconds(nanoseconds: int) : Add a new timestamp in nanoseconds since the default epoch determined in the stream.

Adding data without epoch:

stream.parameters.buffer \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_value("ParameterA", 10) \
    .add_value("ParameterB", "hello") \
    .write()

or we can add timestamp 1000ms from epoch:

stream.epoch = datetime.datetime.utcnow()

stream.parameters.buffer \
    .add_timestamp_milliseconds(1000) \
    .add_value("ParameterA", 10) \
    .add_value("ParameterB", "hello") \
    .write()

Buffer

Our SDK provides a built in Buffer to help you achieve high performance data streaming without the complexity of managing underlying streaming technologies. Instead, you just have to configure the buffer with your requirements. For example the following configuration means that the SDK will send a packet when the size of the buffer reaches 100 timestamps:

stream.parameters.buffer.packet_size = 100

Writing a parameter to that buffer is as simple as using the add_timestamp method and add_value for each Parameter value we want to write in. At the end we use Write method to write the timestamp to the buffer.

stream.parameters.buffer \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_value("ParameterA", 10) \
    .add_value("ParameterB", "hello") \
    .add_value("ParameterC", bytearray("hello, Quix!", 'utf-8')) # use bytearray to write binary data to a stream.
    .write()

You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:

  • buffer.buffer_timeout : The maximum duration in milliseconds for which the buffer will be held before release the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.
  • buffer.packet_size : The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.
  • buffer.time_span_in_nanoseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.
  • buffer.time_span_in_milliseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top of time_span_in_nanoseconds. They both work with same underlying value.
  • buffer.custom_trigger_before_enqueue : Custom function which is invoked before adding a new timestamp to the buffer. If returns true, the packet of data is released before adding the timestamp to it.
  • buffer.custom_trigger : Custom function which is invoked after adding a new timestamp to the buffer. If returns true, the packet of data is released with the entire buffer content.
  • buffer.filter : Custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not.
Examples

This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.

stream.parameters.buffer.time_span_in_milliseconds = 100
stream.parameters.buffer.buffer_timeout = 1000

This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.

stream.parameters.buffer.time_span_in_milliseconds = 100
stream.parameters.buffer.custom_trigger = lambda data: data.timestamps[0].tags["is_critical"] == 'True'

Parameter Definitions

Quix SDK allows you to define some visualization configuration and metadata for Parameters and Events. You can define things like human readable names, descriptions, ranges, etc. Quix uses some of these configuration in the Visualise in the platform but you can use them aswell in your own models, bridges or visualization implementations.

We call these configurations Definitions and all you need to do is to use add_definition helper function either for stream.parameters or stream.events:

  • parameters.add_definition(parameter_id: str, name: str = None, description: str = None)
  • events.add_definition(event_id: str, name: str = None, description: str = None)

Example:

stream.parameters \
    .add_location("vehicle/ecu") \
    .add_definition("vehicle-speed", "Vehicle speed", "Current vehicle speed measured using wheel sensor") \
    .set_unit("kmh") \
    .set_range(0, 400)

The Min and Max range definition sets the Y axis range in the waveform visualisation view. The following definition

.add_definition("Speed").set_range(0, 400)

will set this view in Visualise:

visualise

Adding additional Definitions for each parameter allows you to see data with diffent ranges on the same waveform view:

ranges

You can also define a Location before adding parameter and event definitions. Locations are used to organize the Parameters and Events in hierarchy groups in the data catalogue. To add a Location you should use add_location function before adding the definitions you want to include in that group.

For example, setting the following parameter location

stream.parameters \
    .add_location("/Player/Motion/Car") \
    .add_definition("Pitch") \
    .add_definition("Roll") \
    .add_definition("Yaw")

will result in this parameter hierarchy in the parameter selection dialogue:

parameterlocation

Once you have added a new definition you can also attach some additional configurations to it. This is the whole list of visualization and metadata options we can attach to a ParameterDefinition:

  • set_range(minimum_value: float, maximum_value: float) : Set the minimum and maximum range of the parameter.
  • set_unit(unit: str) : Set the unit of the parameter.
  • set_format(format: str) : Set the format of the parameter.
  • set_custom_properties(custom_properties: str) : Set the custom properties of the parameter.

Example:

stream.parameters \
    .add_definition("ParameterA", "The Parameter A") \
    .set_range(-1.2, 1.2)

Writing Events

You can also write events to Quix.

Writing events to a stream is identical to writing parameters, although you can start without the buffer feature because events don't need high performance throughput.

stream.events \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_value("EventA", "Nice!") \
    .add_value("EventB", "High Five") \
    .write()

Event Definitions

Like parameters, you can write Definitions to each event.

This is the whole list of visualization and metadata options we can attach to a EventDefinition:

  • set_level(level: EventLevel) : Set severity level of the event.
  • set_custom_properties(custom_properties: str) : Set the custom properties of the event.

For example the following code is defining a human readable name and a Severity level for the EventA.

stream.events \
    .add_definition("EventA", "The Event A") \
    .set_level(EventLevel.Critical)

Tags

Using tags alongside parameters and events practically index persisted data in the database. This means you will be able to filter and group data by those tags in fast queries. Tags have to be chosen carefully as excessive cardinality leads to performance degradation in the database.

Good example: This will allow you later to query maximum speed for driver "Peter" per car.

stream.parameters.buffer \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_tag("vehicle-plate", "SL96 XCX") \
    .add_tag("driver-id", "Peter") \
    .add_value("Speed", 53) \
    .add_value("Gear", 4) \
    .write()

Wrong example: This will lead to excessive cardinality as there will be a massive amount of different values for specified tag Speed.

stream.parameters.buffer \
    .add_timestamp(datetime.datetime.utcnow()) \
    .add_tag("Speed", 53) \
    .add_value("Gear", 4) \
    .write()

Minimal example

This is minimal code example needed to write data to a topic using Quix SDK.

import time
import datetime
import math

from quixstreaming import *
# Create a client. Client helps you to create input reader or output writer for specified topic.
security = SecurityOptions('../certificates/ca.cert', WORKSPACE_ID, PASSWORD)
client = StreamingClient(BROKER_CLUSTER, security)

output_topic = client.open_output_topic(TOPIC_ID)

stream = output_topic.create_stream()

stream.properties.name = "Hello World python stream"


for index in range(0, 3000):
    stream.parameters \
        .buffer \
        .add_timestamp(datetime.datetime.utcnow()) \
        .add_value("ParameterA", index) \
        .write()
    time.sleep(0.01)
print("Closing stream")
stream.close()

Reading from Quix

In order to read streams, you need an InputTopic instance. This instance allow you to read all the incoming streams on the specified Topic.

You can instantiate an instance of InputTopic with a string containing your Topic Id. You can find your Topic Id on Topics page of the platform or by using the automated generated Samples in the platform for your specific Workspace and Topic.

input_topic = client.create_input_topic(TOPIC_ID)

Reading Streams

Once you have the InputTopic instance you can start reading streams. For each stream received to the specified topic, the InputTopic will execute the event on_stream_received. For example the following code is printing the StreamId for each new_stream received on that Topic:

def read_stream(new_stream: StreamReader):
    print("New stream read:" + new_stream.stream_id)

input_topic.on_stream_received += read_stream
input_topic.start_reading()

Reading Parameters

You can also use buffers to read data, this helps you to to develop Models with a high performance throughput.

buffer = new_stream.parameters.create_buffer()

or if you want to filter only rows where certain paramaters are presented:

buffer = new_stream.parameters.create_buffer('Speed', 'Gear')

You just have to configure these buffers with your input requirements using the available built in configuration. For example the following configuration means that the Buffer will release a packet when the Time Span between first and last timestamp reaches 100 milliseconds:

buffer.time_span_in_milliseconds = 100

Reading parameter data from that buffer is as simple as use the on_read event. For each paramater data packet released from the buffer the SDK will execute the on_read event with the parameter data as a given parameter. For example the following code is printing the ParameterA value of first timestamp of the packet received:

def on_parameter_data_handler(data: ParameterData):
    hello_world_value = data.timestamps[0].parameters['ParameterA'].numeric_value
    binary_value = data.timestamps[1].parameters['ParameterC'].binary_value # reading binary data as a byte array.
    print("ParameterA - " + str(data.timestamps[0]) + ": " + str(hello_world_value))
    print("ParameterC - " + str(data.timestamps[1]) + ": " + binary_value.decode('utf-8'))

buffer.on_read += on_parameter_data_handler

You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:

  • buffer.buffer_timeout : The maximum duration in milliseconds for which the buffer will be held before release the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.
  • buffer.packet_size : The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.
  • buffer.time_span_in_nanoseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.
  • buffer.time_span_in_milliseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top of time_span_in_nanoseconds. They both work with same underlying value.
  • buffer.custom_trigger_before_enqueue : Custom function which is invoked before adding a new timestamp to the buffer. If returns true, the packet of data is released before adding the timestamp to it.
  • buffer.custom_trigger : Custom function which is invoked after adding a new timestamp to the buffer. If returns true, the packet of data is released with the entire buffer content.
  • buffer.filter : Custom function to filter the incoming data before adding it to the buffer. If returns true, data is added otherwise not.
Examples

This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.

buffer.time_span_in_milliseconds = 100
buffer.buffer_timeout = 1000

This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.

buffer.time_span_in_milliseconds = 100
buffer.custom_trigger = lambda data: data.timestamps[0].tags["is_critical"] == 'True'

Reading Events

Reading events from a stream is as easy as reading parameter data. In that case the SDK is not using a Buffer because we don't need high performance throughput, but the way we read Event Data from a new_stream is identical.

def on_event_data_handler(data: EventData):
    print("Event data read for stream: " + new_stream.stream_id)
    print("  Time:", data.timestamp)
    print("  Id:", data.id)
    print("  Value: " + data.value)

new_stream.events.on_read += on_event_data_handler

Minimum example

This is minimal code example needed to read data from a topic using Quix SDK.

from quixstreaming import *
from quixstreaming.models.parametersbufferconfiguration import ParametersBufferConfiguration
import sys
import signal
import threading

# Create a client. Client helps you to create input reader or output writer for specified topic.
security = SecurityOptions('../certificates/ca.cert', WORKSPACE_ID, PASSWORD)
client = StreamingClient(BROKER_CLUSTER, security)

input_topic = client.open_input_topic(TOPIC_ID)

# read streams
def read_stream(new_stream: StreamReader):

    buffer = new_stream.parameters.create_buffer()

    def on_parameter_data_handler(data: ParameterData):

        df = data.to_panda_frame()
        print(df.to_string())

    buffer.on_read += on_parameter_data_handler

# Hook up events before initiating read to avoid losing out on any data
input_topic.on_stream_received += read_stream
input_topic.start_reading()  # initiate read

# Hook up to termination signal (for docker image) and CTRL-C
print("Listening to streams. Press CTRL-C to exit.")

event = threading.Event() 
def signal_handler(sig, frame):
    print('Exiting...')
    event.set()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
event.wait()