Implementing Comprehensive E2E Testing for Kafka Applications
Written on
Introduction to E2E Testing
In this guide, we will detail the process of executing End-to-End (E2E) testing for two Spring Boot applications: the News Producer and News Consumer. You can access the full code and detailed implementation through the link provided below. We encourage you to follow the steps outlined here to get started.
The purpose of E2E testing for both the News Producer and News Consumer is to ensure that the complete workflow—from news creation to its eventual consumption—operates seamlessly. This testing validates that news is accurately transmitted from the producer to Kafka and subsequently received and processed by the consumer, thus verifying the system’s functionality in real-world situations.
We will also delve into using Testcontainers to initialize Zookeeper, Kafka, and Docker containers for the News Producer and Consumer during the E2E testing phase.
Let's dive in!
Building Docker Images
Creating the News Producer Docker Image
Open your terminal and navigate to the directory housing the News Producer application. Execute the following command:
./mvnw clean spring-boot:build-image -DskipTests
This command will create the Docker image labeled news-producer:0.0.1-SNAPSHOT.
Creating the News Consumer Docker Image
Similarly, navigate to the News Consumer application directory in your terminal and run:
./mvnw clean spring-boot:build-image -DskipTests
This will build the Docker image named news-consumer:0.0.1-SNAPSHOT.
Creating the E2E Testing Spring Boot Application
Next, let’s set up a Spring Boot application through Spring Initializr. The application will be named news-e2e-testing, and we will include the necessary dependencies: Spring Web and Testcontainers. We will utilize Spring Boot version 3.1.5 and Java 17. You can find the setup details in the link provided earlier.
Once you click the GENERATE button, a zip file will be downloaded. Extract this file into the news-app folder and then open the news-e2e-testing project using your IDE.
Updating the pom.xml
We will now modify the pom.xml by incorporating the Kafka Testcontainers dependency (the relevant lines are highlighted in bold):
<project xmlns="http://maven.apache.org/POM/4.0.0"
...
<dependencies>
...
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>test</version>
</dependency>
...
</dependencies>
</project>
Creating the AbstractTestcontainers Class
In the src/test/java directory, let’s create the AbstractTestcontainers class within the com.example.newse2etesting package. This class should include the following content:
package com.example.newse2etesting;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
public abstract class AbstractTestcontainers {
private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1"));
protected static final GenericContainer newsProducerContainer = new GenericContainer<>("news-producer:0.0.1-SNAPSHOT");
protected static final GenericContainer newsConsumerContainer = new GenericContainer<>("news-consumer:0.0.1-SNAPSHOT");
protected static final int NEWS_PRODUCER_EXPOSED_PORT = 8080;
@DynamicPropertySource
private static void dynamicProperties(DynamicPropertyRegistry registry) {
Network network = Network.SHARED;
kafkaContainer.withNetwork(network)
.withNetworkAliases("kafka")
.start();
newsProducerContainer.withNetwork(network)
.withNetworkAliases("news-producer")
.withEnv("KAFKA_HOST", "kafka")
.withEnv("KAFKA_PORT", "9092")
.withExposedPorts(NEWS_PRODUCER_EXPOSED_PORT)
.waitingFor(Wait.forLogMessage(".*Tomcat started on port.*", 1))
.start();
newsConsumerContainer.withNetwork(network)
.withNetworkAliases("news-consumer")
.withEnv("KAFKA_HOST", "kafka")
.withEnv("KAFKA_PORT", "9092")
.waitingFor(Wait.forLogMessage(".*Tomcat started on port.*", 1))
.start();
}
}
The AbstractTestcontainers class serves as a foundation for integration testing in this application. The @Testcontainers annotation indicates that this class will interface with Testcontainers. It defines three containers: Kafka, News Producer, and News Consumer, which are configured and initialized within the dynamicProperties() method.
Updating the NewsE2eTestingApplicationTests Class
In the src/test/java directory, let’s completely revise the NewsE2eTestingApplicationTests class found in the com.example.newse2etesting package to include the following content:
package com.example.newse2etesting;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.testcontainers.containers.output.OutputFrame.OutputType;
import org.testcontainers.containers.output.WaitingConsumer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
class NewsE2eTestingApplicationTests extends AbstractTestcontainers {
@Autowired
private TestRestTemplate testRestTemplate;
@Test
void testPublishNews() {
WaitingConsumer waitingConsumer = new WaitingConsumer();
newsConsumerContainer.followOutput(waitingConsumer, OutputType.STDOUT);
String newsProducerApiUrl = "http://localhost:%s/api/news".formatted(newsProducerContainer.getMappedPort(NEWS_PRODUCER_EXPOSED_PORT));
NewsDto newsDto = new NewsDto("News Test 123456 ABCDEF");
ResponseEntity responseEntity = testRestTemplate.postForEntity(newsProducerApiUrl, newsDto, Void.class);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.CREATED);
try {
waitingConsumer.waitUntil(frame ->
frame.getUtf8String().contains("Received News! "News Test 123456 ABCDEF""), 5, TimeUnit.SECONDS);} catch (TimeoutException e) {
fail("The expected message was not received");}
}
private record NewsDto(String title) {
}
}
Let’s briefly clarify the key components:
- @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT): This annotation indicates that this is a Spring Boot test with a randomly assigned port for the web environment. The application and its dependencies are prepared for testing.
- extends AbstractTestcontainers: This extension allows the class to inherit the shared testing infrastructure defined in AbstractTestcontainers.
- @Autowired TestRestTemplate testRestTemplate: This autowires an instance of TestRestTemplate for making HTTP requests to the application.
- testPublishNews(): This method tests the end-to-end flow of publishing a news item from the producer to the consumer. A WaitingConsumer is used to capture the output (stdout) of the news consumer container for later verification.
Running the E2E Test
To initiate the tests, open a terminal and navigate to the news-e2e-testing root folder. Execute the following command:
./mvnw clean test
Testcontainers will launch Zookeeper, Kafka, and the News Producer and Consumer Docker containers prior to executing the E2E tests. Ultimately, all tests should pass.
Conclusion
In this guide, we successfully implemented End-to-End (E2E) testing for the two Spring Boot Kafka applications: News Producer and News Consumer. Utilizing Testcontainers allowed us to set up Zookeeper, Kafka, and Docker containers for both the News Producer and Consumer during the testing phase. We concluded with a straightforward test case where we published news through the News Producer API and confirmed its receipt by the News Consumer application.
Additional Resources
If you found this article helpful and would like to support it, consider taking the following actions:
- Engage by clapping, highlighting, and replying to my story. I'm eager to answer any questions you may have.
- Share this story on your social media platforms.
- Follow me on Medium, LinkedIn, Twitter, and GitHub.
- Subscribe to my newsletter to stay updated with my latest posts.
This video explores techniques for testing Kafka Streams applications using TestContainers, providing insights into best practices for E2E testing.
In this video, discover how to efficiently stream data with Spring Cloud Stream and Apache Kafka, enhancing your understanding of real-time data processing.