Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
Expand Down Expand Up @@ -498,7 +498,7 @@ class BrokerServer(
quotaManagers,
),
new ScramPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
credentialProvider),
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
Expand All @@ -38,9 +38,9 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
Expand Down Expand Up @@ -350,7 +350,7 @@ class ControllerServer(

// Set up the SCRAM publisher.
metadataPublishers.add(new ScramPublisher(
config,
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
credentialProvider
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.{BrokerState, MetadataCache}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
Expand Down
71 changes: 0 additions & 71 deletions core/src/main/scala/kafka/server/metadata/ScramPublisher.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/TestRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{Exit, Time, Utils}
import org.apache.kafka.common.{TopicPartition, Uuid, protocol}
import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.SimpleApiVersionManager
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.common.serialization.RecordSerde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.common.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
import org.apache.kafka.server.config.QuotaConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, ShareVersion}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.publisher;

import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.server.common.CredentialProvider;
import org.apache.kafka.server.fault.FaultHandler;

import java.util.Optional;

public class ScramPublisher implements MetadataPublisher {
private final int nodeId;
private final FaultHandler faultHandler;
private final String nodeType;
private final CredentialProvider credentialProvider;

public ScramPublisher(int nodeId, FaultHandler faultHandler, String nodeType, CredentialProvider credentialProvider) {
this.nodeId = nodeId;
this.faultHandler = faultHandler;
this.nodeType = nodeType;
this.credentialProvider = credentialProvider;
}

@Override
public final String name() {
return "ScramPublisher " + nodeType + " id=" + nodeId;
}

@Override
public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
onMetadataUpdate(delta, newImage);
}

public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need the onMetadataUpdate(MetadataDelta, MetadataImage) method. It is only used by BrokerMetadataPublisher#onMetadataUpdate, but we could just call onMetadataUpdate(MetadataDelta, MetadataImage, LoaderManifest) instead

@see-quick @mimaison WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree I'm not sure why we have onMetadataUpdate(MetadataDelta, MetadataImage). It seems all the callers have a LoaderManifest instance so they could, and probably should, use onMetadataUpdate(MetadataDelta, MetadataImage, LoaderManifest).

I prefer keeping the code "as is" for the Java to Scala conversion. We can tackle this type of refactorings in follow up PRs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the case across most (all?) the *Publisher classes. So maybe a PR (or set of PRs) addressing them all together would make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe a PR (or set of PRs) addressing them all together would make sense.

+1 to "a PR"

String deltaName = "MetadataDelta up to " + newImage.highestOffsetAndEpoch().offset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please create this variable in the catch block? It is only used by the error handler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the PR comparing line by line with the Scala code which had that as well. Not a big deal but we can indeed move that into the catch block.

Looking at the catch block, I wonder if we should have one in FeaturesPublisher. At a glance, it seems metadataVersionOrThrow() could throw, or it's terribly named!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the catch block, I wonder if we should have one in FeaturesPublisher. At a glance, it seems metadataVersionOrThrow() could throw, or it's terribly named!

I prefer to let the caller handle the exception if all FeaturesPublisher does is call faultHandler.handleFault

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was consistency. Maybe there's an explanation but looking at these classes they all catch Throwable and run faultHandler.handleFault() apart from FeaturesPublisher

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison you are right. +1 to add catch to FeaturesPublisher

try {
// Apply changes to SCRAM credentials.
Optional.ofNullable(delta.scramDelta()).ifPresent(scramDelta -> {
scramDelta.changes().forEach((mechanism, userChanges) -> {
userChanges.forEach((userName, change) -> {
Comment on lines +56 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In both cases, the body of the block is a single expression, so we can remove the brackets{ }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this could be added to checkstyle, I assume. Updated.

if (change.isPresent()) {
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential());
} else {
credentialProvider.removeCredentials(mechanism, userName);
}
});
});
});
} catch (Throwable t) {
faultHandler.handleFault("Uncaught exception while publishing SCRAM changes from " + deltaName, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.security;
package org.apache.kafka.server.common;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.security.authenticator.CredentialCache;
Expand Down