documentation
Note: If you encounter a problem with openssl on Mac, follow this link
We are going to add the following properties to our pom.xml to perform code generation with the maven thrift plugin
<plugin> <groupId>org.apache.thrift.tools</groupId> <artifactId>maven-thrift-plugin</artifactId> <version>0.1.11</version> <configuration> <thriftExecutable>/usr/local/bin/thrift</thriftExecutable> </configuration> <executions> <execution> <id>thrift-sources</id> <phase>generate-sources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>thrift-test-sources</id> <phase>generate-test-sources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
Create a file named prices.thrift
namespace java prices
typedef i64 long // We can use typedef to get pretty names for the types we are using
service KakfaStateService { map<string,long> getAll(1:string store) }
We declare one method that is going to be exposed by our endpoint for anyone to query. This getAll method is going to return the total prices per item.
Run the thrift code generation using the plugin
mvn thrift:compile
A service class should have been generated under target/generated-sources/thrift/prices. We need to implement the actual code of the getAll method in a handler class using the generated interface.
Our implementation is using the Kafka Streams API.
Next, implement the state server class that will boot the thrift server and expose our endpoint i.e. our getAll method.
Add the property application.server to your Kafka Streams configuration and make it point to the same port you’ve configured for your Thrift server.
settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + String.valueOf(KafkaStateServer.APPLICATION_SERVER_PORT));
We also need to add the line that will start the server in our Kafka Streams application
KafkaStateServer.run(streams);
Next, let’s package our application
mvn package
Send the jar file to the server and run the application
java -jar yourjar.jar
You should see you Kafka Streams application start. Now we need a client to actually query our aggregates.
Note: If you were to publish an updated version of your Kafka Streams application and there are already some old instances running, Kafka Stream would reassign tasks from the existing instances to the updated one.
The same happens when shutting down an instance of the application, all the tasks running in this instance will be migrated to other available instances if any.
Note: The code for the client is available at https://github.com/ArthurBaudry/kafka-stream-basket-client
Let’s design our client. The whole point of using Thrift is to be able from the same thrift file to generate multiple clients or servers in a different language. We’ll stay in the same language and generate a Java client in the same exact way we’ve done for the server.
Create a new project and use the same thrift file called prices.thrift to generate your thrift service class.
This time, we’ll use the client part to query our remote state store. Make sure port 9090 is open on your instance.
Our getAll method returns a map of total and their total price so our code will simply display the content of the map. Let’s run our code from our IDE. The client should connect to the thrift server without any trouble.
Now let’s go back to our RDS instance and insert more values inside. Let’s make sure we insert the same item several times to witness the results.
psql -h <rds-end-point-url> -U <user> -d <database> INSERT INTO PRICES VALUES ('laptop', 2000); INSERT INTO PRICES VALUES ('laptop', 2000); INSERT INTO PRICES VALUES ('laptop', 2000); Key is laptop and value is 6000
We inserted three times the same item and my client display the total for this item.
Congratulations! You now have an end to end pipeline which carries data from a database to S3 for archiving or further processing while exposing real-time data to third parties application.
Sources: