Quick Start
CONTENT

This guide demonstrates how to create Publisher and Subscriber using rtpstalk and interact with other RTPS participants. The entire demonstration can be taken from GitHub.

Defining message type

In RTPS each topic supports only messages of certain type. Those types are defined using IDL and they part of application logic.

In our examples we are going to use HelloWorld message taken from Fast-DDS examples:

struct HelloWorld
{
    unsigned long index;
    string message;
};

rtpstalk does not know anything about application data which is transferred through RTPS. It represents all data as byte[] and does all interactions through RtpsTalkDataMessage. It is up to the users to serialize the application data properly with respect to application data format they use. Since we are going to use HelloWorld data format we need to map its definition to Java class and implement deserialization for it:

record HelloWorld(int index, String message) {
    /**
     * Deserialize HelloWorld object from byte[]
     * which would be received from the remote Publisher
     */
    static HelloWorld read(byte[] data) {
        var buf = ByteBuffer.wrap(data);
        // To make sure that both Publisher and Subscriber use same ordering
        // when processing the data, we set it explicitly to little-endian
        buf.order(ByteOrder.LITTLE_ENDIAN);
        var id = buf.getInt();
        // each String is prefixed with its length so we read it first
        var len = buf.getInt();
        // and now we read the String
        var message = "";
        while (len-- > 0) message += (char)buf.get();
        return new HelloWorld(id, message);
    }

    /**
     * Converts message to byte[] which can be then sent to remote RTPS Subscribers
     */
    public byte[] toByteArray() {
        var buf = ByteBuffer.allocate(Integer.BYTES * 2 + message.length() + 1);
        // To make sure that both Publisher and Subscriber use same ordering
        // when processing the data, we set it explicitly to little-endian
        buf.order(ByteOrder.LITTLE_ENDIAN);
        buf.putInt(index);
        buf.putInt(message.length() + 1);
        buf.put(message.getBytes());
        return buf.array();
    }
}

See complete code

Writing Subscriber

If you are familiar with Flow.Subscriber in Java then subscribing to the RTPS topic will look like subscribing to anything else in Java:

try (var client = new RtpsTalkClient(new RtpsTalkConfiguration.Builder()
        .networkInterface("lo") // name of network interface where to run participant discovery
        .build())) {
    var topicName = "HelloWorldTopic";
    var topicType = "HelloWorld";
    var future = new CompletableFuture<Void>();
    // register a new subscriber
    client.subscribe(topicName, topicType, new Subscriber<RtpsTalkDataMessage>() {
        private Subscription subscription;
        private int counter;
    
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            // once we subscribed we request for messages
            subscription.request(1);
        }
    
        @Override
        public void onNext(RtpsTalkDataMessage message) {
            message.data().ifPresent(
                    data -> System.out.println("Received " + HelloWorld.read(data)));
            if (++counter < 10)
                subscription.request(1);
            else {
                subscription.cancel();
                future.complete(null);
                System.out.println("Closing...");
            }
        }
    
        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    future.get();
}

See complete code

Writing Publisher

Good news, Java provides a Publisher which can be used to publish messages to RTPS topics so we just use it:

try (var client = new RtpsTalkClient(new RtpsTalkConfiguration.Builder()
        .networkInterface("lo") // name of network interface where to run participant discovery
        .build())) {
    var topicName = "HelloWorldTopic";
    var topicType = "HelloWorld";
    var publisher = new SubmissionPublisher<RtpsTalkDataMessage>();
    client.publish(topicName, topicType, publisher);
    int c = 1;
    // keep publishing messages every 100ms
    while (true) {
        var m = new RtpsTalkDataMessage(new HelloWorld(c++, "Hello from Java").toByteArray());
        publisher.submit(m);
        Thread.sleep(100);
    }
}  

See complete code

Building

To build the examples we need to add dependency on rtpstalk.

rtpstalk supports Java modularity and it is preferable to use it as a module. But for examples we will use it as a library and put it to classpath.

Download the entire demonstration project with Maven and Gradle setup and build it:

git clone https://github.com/lambdaprime/bootstrap cd rtpstalk

Maven:

mvn clean install

Gradle:

gradle clean build

Testing

In the same folder where rtpstalk examples were built, open two terminals: one for Publisher and another for Subscriber.

Both Participants should be able to discover each other and start interacting.

Subscriber terminal:

Received HelloWorld[index=1, message=Hello from Java]
Received HelloWorld[index=2, message=Hello from Java]
Received HelloWorld[index=3, message=Hello from Java]
Received HelloWorld[index=4, message=Hello from Java]
Received HelloWorld[index=5, message=Hello from Java]
Received HelloWorld[index=6, message=Hello from Java]
Received HelloWorld[index=7, message=Hello from Java]
Received HelloWorld[index=8, message=Hello from Java]
Received HelloWorld[index=9, message=Hello from Java]
Received HelloWorld[index=10, message=Hello from Java]

Testing with Fast-DDS

To test interaction with other RTPS providers we will use HelloWorldExample from Fast-DDS.

First we will need to compile it:

git clone https://github.com/eProsima/Fast-DDS.git cd Fast-DDS/ git checkout v2.6.0 mkdir build cd build/ cmake -DTHIRDPARTY=ON -DCOMPILE_EXAMPLES=ON .. make DESTDIR=$(pwd)/install make install

This will produce HelloWorldExample executable which we are going to use.

Testing subscriber:

This should produce following output in Subscriber terminal:

Received HelloWorld[index=1, message=HelloWorld]
Received HelloWorld[index=2, message=HelloWorld]
Received HelloWorld[index=3, message=HelloWorld]
Received HelloWorld[index=4, message=HelloWorld]
Received HelloWorld[index=5, message=HelloWorld]
Received HelloWorld[index=6, message=HelloWorld]
Received HelloWorld[index=7, message=HelloWorld]
Received HelloWorld[index=8, message=HelloWorld]
Received HelloWorld[index=9, message=HelloWorld]
Received HelloWorld[index=10, message=HelloWorld]
Closing...

Logging

To keep 3rd party dependencies of rtpstalk to a minimum it uses java.util.logging (JUL) for logging. Many logging libraries provide JUL adapters so you can still use them to manage its logging.

rtpstalk comes with debug logging configurations. You can enable it by specifying "java.util.logging.config.file" property:

java -Djava.util.logging.config.file=rtpstalk-debug.properties -cp rtpstalkexample.jar id.bootstrap.rtpstalk.SubscriberApp

All the logging will be available in your system temporary folder inside rtpstalk-debug.log.

Debugging

First make sure to enable logging as described in this document.

Reader flow

Following flow helps to establish why local Reader may not be receving messages from remote Writer:

  1. Remote participant is detected by the rtpstalk and registered in ParticipantsRegistry:
    Adding new participant { "guidPrefix": { "value": "*" }, "entityId": "ENTITYID_PARTICIPANT" } to the registry
  2. Configuration of builtin endpoints between rtpstalk and remote Participant with SedpBuiltinEndpointsConfigurator:
    Configuring builtin endpoints for Participant { "value": "*" }
  3. Discovery of topics which are published by remote Participant with TopicSubscriptionsManager:
    Discovered Publisher for topic HelloWorldTopic0 type HelloWorld with following details
    {
      "endpointGuid": {
        "guidPrefix": {
          "value": "*"
        },
        "entityId": "00000103"
      },
      "writerUnicastLocator": [
        {
          "transportType": "LOCATOR_KIND_UDPv4",
          "port": "*",
          "address": "/*"
        }
      ],
      "reliabilityKind": "RELIABLE"
    }
    
  4. Subscribing Reader to remote Writer. This happens when both Reader and Writer are compatible (based on topic name, type, QOS policies etc) and is done by TopicSubscriptionsManager:

After all steps are done rtpstalk Reader will be waiting data from the remote Writer. If data is not received it is mostly because:

QOS policies matching rules

RELIABILITY:

Metrics

rtpstalk integrates with OpenTelemetry for emitting metrics.

OpenTelemetry is similar to SLF4J in that it serves as a simple facade or abstraction for various observability backends (Prometheus, Elasticsearch etc).

On top of that it is available for many programming languages.

If you plan to export metrics to Elasticsearch or CSV files you can try opentelemetry-exporters-pack. For complete list of available exporters for OpenTelemetry see Registry for OpenTelemetry ecosystem.

Free Web Hosting