Kafka Consumer Connector
The RadiantOne Kafka capture connector leverages the Apache Kafka Consumer API to subscribe to topics and process streams of records produced by them.
The Kafka capture and apply connectors are independent, meaning that they do not need to be deployed together. The source(s) and target(s) could be any combination of Kafka topics, LDAP-accessible data stores, JDBC-accessible data stores, or web-services (SCIM or REST accessible).
This section assumes that you have access to an existing Kafka instance that you can connect to. Once you gain access, follow the instructions in this section.
To sync between RadiantOne and Kafka topics, you need to configure the following.
Configuring the Consumer Data Source
A data source is required for the Consumer connector.
To configure the Consumer data source:
-
In the Main Control Panel, navigate to Settings > Server Backend > Custom Data Sources.
-
From the Add Custom menu, select Kafka.
-
Name the data source. In this example, the data source is named kafkaconsumer.
-
Enter any value for each of the following properties; a value for these properties is required but ignored.
- Topic Name
- Message Format
- Producer Properties File
Figure 1: Kafka Consumer Data Source
NoteAdditional data source settings are configured in Configuring synchronization as a Kafka Consumer.
-
Click Save.
NoteIf a note displays stating that the connection to the data source failed, click Yes to save anyway.
Configuring the Consumer Schema
If you use the same schema for both your consumer and producer, no further schema configuration is required. If you plan to use a different message format for publishing to Kafka, that schema must also be created. See Configuring the Producer Schema for more information.
Mounting the Virtual View
In this section, a new naming context representing the incoming Kafka Consumer is added. If you plan to publish identity data to a Kafka topic, a separate view is required. If you use both a consumer and producer, and have already setup your producer schema, no further schema configuration is required.
-
In the Main Control Panel, navigate to the Context Builder tab.
-
On the View Designer tab, click .
-
Name the view. In this example, the schema is named kafkaExample.
-
Select the schema you created in Configuring the Schema. Click OK.
-
In the pane on the left, select the view’s folder icon and click New Content.
-
Select your table name and click OK.
Figure 2: New View Definition
-
Make any updates you need for the RDN and Attribute settings.
Figure 3: New View with Attributes
-
Click Save.
-
Navigate to the Directory Namespace tab.
-
Click .
-
Enter a context name, i.e. o=kafkaexample.
-
Choose Virtual Tree and click Next.
-
Select Use an existing view.
-
Click Browse. Navigate to the saved .dvx file for the view you created. Click OK.
-
Click OK.
-
Click OK to acknowledge the alert that your virtual tree has been created.
If you’re configuring synchronization to act as both a consumer and producer with Kafka, you need a separate mounted view with the appropriate schema.
Configuring Synchronization as a Kafka Consumer
-
In the Main Control Panel, navigate to the Synchronization tab.
-
Click .
-
Select your source and destination naming contexts and click OK.
-
Click Configure.
-
Click the Capture tile and fill in the following fields.
Field NameDescriptionTopic Name
The name of your topic (i.e. Workday).
Kafka Consumer Properties
Contains bootstrap.servers= followed by the name and port number for your Kafka server. See note below for more information.
Message Format
The name of your changeConvertor, such as GoldenGateJSONConvertor or KafkaExample (without .java on the end)
Table Name
Required only if you are using KafkaGoldenGate formatting
Figure 4: Capture connector properties using KafkaExample changeConvertor
The minimum requirement for the Kafka Consumer Properties field is the bootstrap.servers property specifying your Kafka broker and port number. The example shown above is for an unencrypted session without authentication. If your Kafka broker requires encryption and/or authentication, additional properties can be added to the field in a comma-separated list. For example, the following string adds in the sasl.mechanism, security.protocol, and ssl.truststore.location to enable SSL. It also sets a value for sasl.jaas.config to use username and password to login into the broker. This method is also applicable when using a Client Token and Secret to authenticate.
bootstrap.servers=kafka.mycompany.com:9094,sasl.mechanism=PLAIN,security.protocol=SASL_SSL,ssl.truststore.location=/radiantone/vds/vds_server/custom/truststore.jks,sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="accountname" password="password";
For encrypted Kafka connections, you must create a Java truststore containing the trusted CAs which issued the certificate used by the Kafka broker. The ssl.truststore.location value must be set to the full path to this file on your RadiantOne server. It is recommended that you store the truststore.jks file in your $RLI_HOME/vds_server/custom directory so that it replicates to follower nodes in your cluster.
-
Click Save.
-
Click the Transformation tile. Select a Transformation Type from the drop-down menu.
-
Expand the Mappings section and map attributes as required.
Figure 5: Sample Mappings
-
Click the Apply tile and start your pipeline.