Skip to content

Commit

Permalink
KAFKA-18314: Fix to Kraft or remove tests associate with Zk Broker co…
Browse files Browse the repository at this point in the history
…nfig in KafkaApisTest
  • Loading branch information
frankvicky committed Dec 24, 2024
1 parent 5a4590c commit 5699f01
Showing 1 changed file with 4 additions and 52 deletions.
56 changes: 4 additions & 52 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,14 @@ class KafkaApisTest extends Logging {
overrideProperties: Map[String, String] = Map.empty,
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = {
val properties = if (raftSupport) {
val properties = TestUtils.createBrokerConfig(brokerId, "")
val properties = TestUtils.createBrokerConfig(brokerId, null)
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
val voterId = brokerId + 1
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
// properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// properties.put("listeners", "PLAINTEXT://localhost:9092")
// properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
properties
} else {
TestUtils.createBrokerConfig(brokerId, "zk")
Expand Down Expand Up @@ -739,22 +741,6 @@ class KafkaApisTest extends Logging {
assertEquals(cmConfigs.size, configs.size)
}

@Test
def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
val request = buildRequest(requestBuilder.build(DescribeQuorumRequestData.HIGHEST_SUPPORTED_VERSION))

when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(enableForwarding = true)
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)

val response = verifyNoThrottling[DescribeQuorumResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
assertEquals(Errors.UNKNOWN_SERVER_ERROR.message(), response.data.errorMessage)
}

@Test
def testDescribeQuorumForwardedForKRaftClusters(): Unit = {
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
Expand Down Expand Up @@ -11426,18 +11412,6 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}

@Test
def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = {
val data = new GetTelemetrySubscriptionsRequestData()

val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build())
kafkaApis = createKafkaApis(enableForwarding = true)
kafkaApis.handle(request, RequestLocal.noCaching)

val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}

@Test
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
Expand Down Expand Up @@ -11477,18 +11451,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResponse, response.data)
}

@Test
def testPushTelemetryNotAllowedForZkClusters(): Unit = {
val data = new PushTelemetryRequestData()

val request = buildRequest(new PushTelemetryRequest.Builder(data, true).build())
kafkaApis = createKafkaApis(enableForwarding = true)
kafkaApis.handle(request, RequestLocal.noCaching)

val response = verifyNoThrottling[PushTelemetryResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}

@Test
def testPushTelemetry(): Unit = {
val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build())
Expand Down Expand Up @@ -11523,16 +11485,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResponse, response.data)
}

@Test
def testListClientMetricsResourcesNotAllowedForZkClusters(): Unit = {
val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build())
kafkaApis = createKafkaApis(enableForwarding = true)
kafkaApis.handle(request, RequestLocal.noCaching)

val response = verifyNoThrottling[ListClientMetricsResourcesResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode))
}

@Test
def testListClientMetricsResources(): Unit = {
val request = buildRequest(new ListClientMetricsResourcesRequest.Builder(new ListClientMetricsResourcesRequestData()).build())
Expand Down

0 comments on commit 5699f01

Please sign in to comment.