C#

Our C# SDK is designed to be used for high performance telemetry services where you need to process high volumes of data in a nanosecond response time.

We also provides HTTP and Websockets services to read and write realtime telemetry data for less performance oriented use cases like Mobile or Web applications.

Connecting to Quix

Streams are written and read using an instance of StreamingClient class.

You can instantiate an instance of StreamingClient with a string containing your Quix Kafka server list and some SecurityOptions to access it. You can find your Kafka configuration parameters using the automatically generated Samples in the platform for your specific workspace.

var security = new SecurityOptions(CERTIFICATES_FOLDER, QUIX_USER, QUIX_PASSWORD);
var client = new Quix.Sdk.Streaming.StreamingClient("kafka-k1.quix.ai:9093,kafka-k2.quix.ai:9093", security);

Writing Data to Quix

You need a Topic to write data to Quix. You can create one in platform. This instance allow you to write new Streams into the specified Topic.

You can instantiate an instance of OutputTopic with a string containing your Topic Id. You can find your Topic Id on Topics option of the platform or just using the automated generated Samples in the platform for your specific Workspace and Topic.

var outputTopic = client.CreateOutputTopic(TOPIC_ID);

Writing Streams

Once you have the OutputTopic instance you can create as many streams as you want using the method CreateStream. The Stream Id is autogenerated but you can also pass a StreamId to the method. If you do that, you can append existing stream that is already closed.

var stream = outputTopic.CreateStream();

Stream Properties

As an option, you can add context to your streams by adding metadata to the stream.

You can add this metadata to a stream by using the Properties options of the generated stream instance.

stream.Properties.Name = "Hello World C# stream";
stream.Properties.Location = "/test/location";
stream.Properties.Metadata["meta"] = "is";
stream.Properties.Metadata["working"] = "well";
Stream Location

The stream location property is particularly important as it defines the hierarchy of your data in the data catalouge.

For example, the following location:

stream.Properties.Location = $"/Game/Codemasters/F1-2019/{track}"

Would result in this hierarchy in the catalogue:

hierarchy

Any streams sent without a location property will be located under Root by default.

Writing Parameters

You can now start writing parameter data to your stream. We reccommend that you do this using the built-in buffer feature.

Timestamps

Our SDK provides several helper functions to add new timestamps to a Buffer, ParameterData and EventData instances with several types of date time formats.

Some of these functions use the default Epoch defined at Stream level. This Epoch is very useful to avoid specifying the date part of each timestamp you add with the SDK.

These are all the common helpers functions:

  • AddTimestamp(DateTime dateTime) : Add a new timestamp in DateTime format. Default Epoch will never be added to this.
  • AddTimestamp(TimeSpan timeSpan) : Add a new timestamp in TimeSpan format since the default Epoch determined in the stream.
  • AddTimestampMilliseconds(long timeMilliseconds) : Add a new timestamp in milliseconds since the default Epoch determined in the stream.
  • AddTimestampNanoseconds(long timeNanoseconds) : Add a new timestamp in nanoseconds since the default Epoch determined in the stream.

Adding data without epoch:

stream.Parameters.Buffer
    .AddTimestamp(DateTime.UtcNow)
    .AddValue("ParameterA", 10)
    .AddValue("ParameterB", "hello")
    .Write();

or we can add timestamp 1000ms from epoch:

stream.Epoch = DateTime.UtcNow;

stream.Parameters.Buffer
    .AddTimestampInMilliseconds(1000)
    .AddValue("ParameterA", 10)
    .AddValue("ParameterB", "hello")
    .Write();

Buffer

Our SDK provides a built in Buffer to help you achieve high performance data streaming without the complexity of managing underlying streaming technologies. Instead, you just have to configure the buffer with your requirements. For example the following configuration means that the SDK will send a packet when the size of the buffer reaches 100 timestamps:

stream.Parameters.Buffer.PacketSize = 100;

Writing a parameter to that buffer is as simple as using the AddTimestamp method and AddValue for each Parameter value we want to write in. At the end we use Write method to write the timestamp to the buffer.

stream.Parameters.Buffer
    .AddTimestamp(DateTime.UtcNow)
    .AddValue("ParameterA", 10)
    .AddValue("ParameterB", "hello")
    .AddValue("ParameterC", Encoding.ASCII.GetBytes("Hello Quix!")) // Write binary data as a byte array.
    .Write();

You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:

  • Buffer.BufferTimeout : The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.
  • Buffer.PacketSize : The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.
  • Buffer.TimeSpanInNanoseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.
  • Buffer.TimeSpanInMilliseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top of TimeSpanInNanoseconds. They both work with same underlying value.
  • Buffer.CustomTriggerBeforeEnqueue : Custom function which is invoked before adding a new timestamp to the buffer. If returned true, the packet of data is released before adding the timestamp to it.
  • Buffer.CustomTrigger : Custom function which is invoked after adding a new timestamp to the buffer. If returned true, the packet of data is released with the entire buffer content.
  • Buffer.Filter : Custom function to filter the incoming data before adding it to the buffer. If returned true, data is added otherwise not.
Examples

This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.

stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.BufferTimeout = 1000

This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.

stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.CustomTrigger = data => data.Timestamps[0].Tags["is_critical"] == "True"

Parameter Definitions

Quix SDK allows you to define some visualization configuration and metadata for Parameters and Events. You can define things like human readable names, descriptions, ranges, etc. Quix uses some of these configuration in the Visualise in the platform but you can use them aswell in your own models, bridges or visualization implementations.

We call these configurations Definitions and all you need to do is to use add_definition helper function either for stream.parameters or stream.events:

  • Parameters.AddDefinition(string parameterId, string name = null, string description = null)
  • events.AddDefinition(string eventId, string name = null, string description = null)
stream.Parameters
    .AddLocation("vehicle/ecu")
    .AddDefinition("vehicle-speed", "Vehicle speed", "Current vehicle speed measured using wheel sensor")
    .SetRange(0, 400)
    .SetUnit("kmh");

The Min and Max range definition sets the Y axis range in the waveform visualisation view. The following definition

.AddDefinition($"{playerPrefix}Speed").SetRange(0, 400)

will set this view in Visualise:

visualise

Adding additional Definitions for each parameter allows you to see data with diffent ranges on the same waveform view:

ranges

You can also define a Location before adding parameter and event definitions. Locations are used to organize the Parameters and Events in hierarchy groups in the data catalogue. To add a Location you should use AddLocation function before adding the definitions you want to include in that group.

For example, setting the following parameter location

stream.Parameters
    .AddLocation("/Player/Motion/Car") 
    .AddDefinition("Pitch")
    .AddDefinition("Roll")
    .AddDefinition("Yaw");

will result in this parameter hierarchy in the parameter selection dialogue:

parameterlocation

Once you have add a new definition you can also attach some additional configurations to it. This is the whole list of visualization and metadata options we can attach to a ParameterDefinition:

  • SetRange(double minimumValue, double maximumValue) : Set the minimum and maximum range of the parameter.
  • SetUnit(string unit) : Set the unit of the parameter.
  • SetFormat(string format) : Set the format of the parameter.
  • SetCustomProperties(string customProperties) : Set the custom properties of the parameter.

Writing Events

You can also write events to Quix.

Writing events to a stream is identical to writing parameters, although you can start without the buffer feature because events don't need high performance throughput.

stream.Events
    .AddTimestamp(DateTime.UtcNow)
    .AddValue("EventA", "Nice!")
    .AddValue("EventB", "High Five")
    .Write();

Event Definitions

Likewise, you can write Definitions to each event.

This is the whole list of visualization and metadata options we can attach to a EventDefinition:

  • SetLevel(EventLevel level) : Set severity level of the event.
  • SetCustomProperties(string customProperties) : Set the custom properties of the event.

For example the following code is defining a human readable name and a Severity level for the EventA.

stream.Events.AddDefinition("EventA", "The Event A").SetLevel(EventLevel.Critical);

Tags

Using tags alongside parameters and events practically index persisted data in the database. This means you will be able to filter and group data by those tags in fast queries. Tags have to be chosen carefully as excessive cardinality leads to performance degradation in the database.

Good example: This will allow you later to query maximum speed for driver "Peter" per car.

stream.Events
    .AddTimestamp(DateTime.UtcNow)
    .AddTag("vehicle-plate", "SL96 XCX") 
    .AddTag("driver-id", "Peter") 
    .AddValue("Speed", 53) 
    .AddValue("Gear", 4) 
    .Write();

Wrong example: This will lead to excessive cardinality as there will be a massive amount of different values for specified tag Speed.

stream.Events
    .AddTimestamp(DateTime.UtcNow)
    .AddTag("Speed", 53) 
    .AddValue("Gear", 4) 
    .Write();

Minimal example

This is minimal code example needed to write data to a topic using Quix SDK.

using System;
using System.Threading;
using Quix.Sdk.Streaming.Configuration;

namespace WriteHelloWorld
{
    class Program
    {
        /// <summary>
        /// Main will be invoked when you run the application
        /// </summary>
        static void Main()
        {
            // Create a client which holds generic details for creating input and output topics
            var client = new Quix.Sdk.Streaming.StreamingClient(
                BROKER_CLUSTER,
                new SecurityOptions(
                    "certificates/ca.cert",
                    WORKSPACE_ID,
                    PASSWORD));

            using var outputTopic = client.OpenOutputTopic(TOPIC_ID);

            var stream = outputTopic.CreateStream();

            stream.Properties.Name = "Hello World stream";

            Console.WriteLine("Sending values for 30 seconds");
            for (var index = 0; index < 3000; index++)
            {
                stream.Parameters.Buffer
                    .AddTimestamp(DateTime.UtcNow)
                    .AddValue("ParameterA", index)
                    .Write(); 

                Thread.Sleep(10);
            }

            Console.WriteLine("Closing stream");
            stream.Close();
            Console.WriteLine("Done!");
        }
    }
}

Reading from Quix

In order to read streams, you need an InputTopic instance. This instance allow you to read all the incoming streams on the specified Topic.

You can instantiate an instance of InputTopic with a string containing your Topic Id. You can find your Topic Id on Topics page of the platform or by using the automated generated Samples in the platform for your specific Workspace and Topic.

var inputTopic = client.CreateInputTopic(TOPIC_ID);

Reading Streams

Once you have the InputTopic instance you can start reading streams. For each stream received to the specified topic, InputTopic will execute the event OnStreamReceived. For example the following code is printing the StreamId for each newStream received on that Topic:

inputTopic.OnStreamReceived += (s, newStream) =>
{
    Console.WriteLine($"New stream read: {newStream.StreamId}");
};

inputTopic.StartReading();

Reading Parameters

You can also use buffers to read data, this helps you to to develop Models with a high performance throughput.

var buffer = newStream.Parameters.CreateBuffer();

You just have to configure these buffers with your input requirements using the available built in configuration. For example the following configuration means that the Buffer will release a packet when the Time Span between first and last timestamp reaches 100 milliseconds:

buffer.TimeSpanInMilliseconds = 100;

Reading parameter data from that buffer is as simple as use the OnRead event. For each paramater data packet released from the buffer the SDK will execute the OnRead event with the parameter data as a given parameter. For example the following code is printing the ParameterA value of first timestamp of the packet received:

buffer.OnRead += (data) =>
{
    var helloWorldValue = data.Timestamps[0].Parameters['ParameterA'].NumericValue;
    Console.WriteLine($"ParameterA - {data.Timestamps[0].Timestamp}: {helloWorldValue}");

    // Reading binary data into a byte array.
    var bData = data.Timestamps[0].Parameters["ParameterC"].BinaryValue;
    Console.WriteLine($"binary_param - {parameterData.Timestamps[0].Timestamp}: {Encoding.ASCII.GetString(bData)}");
};

You can configure multiple conditions to determine when the Buffer has to release data, if any of these conditions become true, the buffer will release a new packet of data and that data is cleared from the buffer:

  • Buffer.BufferTimeout : The maximum duration in milliseconds for which the buffer will be held before releasing the data. A packet of data is released when the configured timeout value has elapsed from the last data received in the buffer.
  • Buffer.PacketSize : The maximum packet size in terms of number of timestamps. Each time the buffer has this amount of timestamps the packet of data is released.
  • Buffer.TimeSpanInNanoseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released.
  • Buffer.TimeSpanInMilliseconds : The maximum time between timestamps in nanoseconds. When the difference between the earliest and latest buffered timestamp surpasses this number the packet of data is released. Note: This is a millisecond converter on top of TimeSpanInNanoseconds. They both work with same underlying value.
  • Buffer.CustomTriggerBeforeEnqueue : Custom function which is invoked before adding a new timestamp to the buffer. If returned true, the packet of data is released before adding the timestamp to it.
  • Buffer.CustomTrigger : Custom function which is invoked after adding a new timestamp to the buffer. If returned true, the packet of data is released with the entire buffer content.
  • Buffer.Filter : Custom function to filter the incoming data before adding it to the buffer. If returned true, data is added otherwise not.
Examples

This buffer configuration will send data every 100ms window or if no data is buffered in 1 second timout period, it will empty buffer anyway.

stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.BufferTimeout = 1000

This buffer configuration will send data every 100ms window or if critical data arrives, it will empty buffer anyway.

stream.Parameters.Buffer.PacketSize = 100;
stream.Parameters.Buffer.CustomTrigger = data => data.Timestamps[0].Tags["is_critical"] == "True"

Reading Events

Reading events from a stream is as easy as reading parameter data. In that case Quix SDK is not using a Buffer because we don't need high performance throughput, but the way we read Event Data from a newStream is identical.

newStream.Events.OnRead += (data) =>
{
    Console.WriteLine($"Event read for stream {newStream.StreamId}, Event Id: {data.Id}");
};

Minimal example

This is minimal code example needed to read data from a topic using Quix SDK.

using System;
using System.Linq;
using System.Threading;
using Quix.Sdk.Streaming.Configuration;
using Quix.Sdk.Streaming.Models;

namespace ReadHelloWorld
{
    class Program
    {
        /// <summary>
        /// Main will be invoked when you run the application
        /// </summary>
        static void Main()
        {
            // Create a client which holds generic details for creating input and output topics
            var client = new Quix.Sdk.Streaming.StreamingClient(
                BROKER_CLUSTER,
                new SecurityOptions(
                    "certificates/ca.cert",
                    WORKSPACE_ID,
                    PASSWORD));

            using var inputTopic = client.OpenInputTopic(TOPIC_ID);

            // Hook up events before initiating read to avoid losing out on any data
            inputTopic.OnStreamReceived += (s, streamReader) =>
            {
                Console.WriteLine($"New stream read: {streamReader.StreamId}");

                var buffer = streamReader.Parameters.CreateBuffer();

                buffer.OnRead += parameterData =>
                {
                    Console.WriteLine(
                        $"ParameterA - {parameterData.Timestamps[0].Timestamp}: {parameterData.Timestamps.Average(a => a.Parameters["ParameterA"].NumericValue)}");
                };
            };

            inputTopic.StartReading(); // initiate read
            Console.WriteLine("Listening for streams");

            // Hook up to termination signal (for docker image) and CTRL-C
            var exitEvent = new ManualResetEventSlim();
            Console.CancelKeyPress += (s, e) =>
            {
                e.Cancel = true; // In order to allow the application to cleanly exit instead of terminating it
                exitEvent.Set();
            }; 
            // Wait for CTRL-C
            exitEvent.Wait();
            Console.WriteLine("Exiting");
        }
    }
}

WebSockets and Streaming Data

Applications can react to changes occurring inside the Quix platform by subscribing to events.

Each workspce has it's own WebSockets service allowing data to be read from any topic on that workspace.

The URL to access the WebSockets interface is:

https://reader-{workspaceID}.platform.quix.ai/hub

This URL points to a SignalR hub dedicated to serving your workspace.

Note that authentication uses the same token as the rest of the Quix API's. This can be obtained in the portal under "Profile".

There are 2 steps to connecting to the WebSockets interface. - Connect - Subscribe

Connect

Using C# import or obtain the relevant SignalR implementation for your platform. E.g. for .net core use "Microsoft.AspNetCore.SignalR.Client"

Using the 'HubConnectionBuilder' from the SignalR client together with the URL and TOKEN specific to your workspace; build a connection object.

var connection = new HubConnectionBuilder()
.WithUrl(URL, options => { options.AccessTokenProvider = () => TOKEN; })
.Build();

Start the connection

connection.StartAsync()

Handle Message from the Hub

Listen for data being emitted from the interface. Once successfully started (and you should check by inspecting the connections 'State' property) you can subscribe to messages coming from the WebSockets interface.

There are 2 stages to this.

  • Tell Quix which messages you'd like to receive
  • Handle messages being recieved from Quix

We will tackle the event handler first.

Use one of the following lines of code as required:

connection.On<ParameterData>("ParameterDataReceived", (data) => { //handle the data here });
connection.On("ParameterDataReceived", (ParameterData data) => { //handle the data here });
//or use JsonConvert
connection.On<object>("ParameterDataReceived", (data) => { 
    var paramData = JsonConvert.DeserializeObject<ParameterData>(data.ToString());
});

On Data Received Subscriptions

The complete list of events emitted by the SignalR hub and that are subscribable in the .On method are:

ParameterDataReceived

Raised when parameter data is received. Type: ParameterData

ParameterDefinitionsUpdated

Raised when parameter definitions are updated Type: ParameterDefinitions

EventDataReceived

Raised when event data is received Type: EventData

EventDefinitionsUpdated

Raised when event definitions are updated Type: EventDefinitions

Subscribe

Now that your'e ready to receive the data, tell Quix what data you want.

You'll need to invoke the 'SubscribeToParameter' method on the WebSockets interface.

connection.invoke("SubscribeToParameter", TOPIC, STREAM, PARAMETER);

SubscribeToParameter takes 3 parameters. - Topic is the name of the topic you wish to listen to. - Stream is the ID of the stream to listen to. - Parameter is the ID of the parameter to listen to.

This combination uniquely identifies the Parameter to listen for.

When the specified parameter receives data, the 'On' event handler will fire, allowing your code to handle the fresh data.

WebSockets Subscriptions

The complete list of subscriptions allowed by the Quix WebSockets interface are as follows:

SubscribeToParameter

Subscribe to a specific parameter, stream topic combination Parameters: TopicName, StreamId, ParameterId

UnsubscribeFromParameter

Unsubscribe from a parameter Parameters: TopicName, StreamId, ParameterId

SubscribeToParameterDefinitions

Subscribe to updates to the parameters defined for a given topic and stream combination Parameters: TopicName, StreamId

SuscribeToEvent

Subscribe to a specific event, stream topic combination Parameters: TopicName, StreamId, EventId

UnsubscribeFromEvent

Unsubscribe from an event Parameters: TopicName, StreamId, EventId

SubscribeToEventDefinitions

Subscribe to updates to the events defined for a given topic and stream combination Parameters: TopicName, StreamId

UnsubscribeFromStream

Unsubscribe from all subscriptions to a topic, stream combination

Schemas

Schemas for the objects received from the WebSockets interface on a particular environment can be seen and obtained from the following swagger interface.

https://writer-quix-[YOUR-WORKSPACE-NAME].platform.quix.ai/index.html

This can link can also be found in Quix Platform's 'Write Topic Data' HTTP sample project.