Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -492,20 +491,6 @@ protected void handleCoprocessorThrowable(final E env, final Throwable e) throws
}
}

/**
* Used to limit legacy handling to once per Coprocessor class per classloader.
*/
private static final Set<Class<? extends Coprocessor>> legacyWarning =
new ConcurrentSkipListSet<>(new Comparator<Class<? extends Coprocessor>>() {
@Override
public int compare(Class<? extends Coprocessor> c1, Class<? extends Coprocessor> c2) {
if (c1.equals(c2)) {
return 0;
}
return c1.getName().compareTo(c2.getName());
}
});

/**
* Implementations defined function to get an observer of type {@code O} from a coprocessor of
* type {@code C}. Concrete implementations of CoprocessorHost define one getter for each observer
Expand All @@ -521,22 +506,45 @@ private abstract class ObserverOperation<O> extends ObserverContextImpl<E> {
ObserverGetter<C, O> observerGetter;

ObserverOperation(ObserverGetter<C, O> observerGetter) {
this(observerGetter, null);
this(observerGetter, (ObserverRpcCallContext) null);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, User user) {
this(observerGetter, user, false);
this(observerGetter, createRpcCallContext(user), false);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext) {
this(observerGetter, rpcCallContext, false);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, boolean bypassable) {
this(observerGetter, null, bypassable);
this(observerGetter, (ObserverRpcCallContext) null, bypassable);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, User user, boolean bypassable) {
super(user != null ? user : RpcServer.getRequestUser().orElse(null), bypassable);
this(observerGetter, createRpcCallContext(user), bypassable);
}

ObserverOperation(ObserverGetter<C, O> observerGetter, ObserverRpcCallContext rpcCallContext,
boolean bypassable) {
super(rpcCallContext != null ? rpcCallContext : createRpcCallContext(), bypassable);
this.observerGetter = observerGetter;
}

private static ObserverRpcCallContext createRpcCallContext() {
return RpcServer.getRequestUser()
.map(user -> new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes()))
.orElse(null);
}

private static ObserverRpcCallContext createRpcCallContext(User user) {
if (user == null) {
return null;
} else {
return new ObserverRpcCallContextImpl(user, RpcServer.getConnectionAttributes());
}
}

abstract void callObserver() throws IOException;

protected void postEnvCall() {
Expand All @@ -557,11 +565,21 @@ public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User
super(observerGetter, user);
}

public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
ObserverRpcCallContext rpcCallContext) {
super(observerGetter, rpcCallContext);
}

public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
}

public ObserverOperationWithoutResult(ObserverGetter<C, O> observerGetter,
ObserverRpcCallContext rpcCallContext, boolean bypassable) {
super(observerGetter, rpcCallContext, bypassable);
}

/**
* In case of coprocessors which have many kinds of observers (for eg, {@link RegionCoprocessor}
* has BulkLoadObserver, RegionObserver, etc), some implementations may not need all observers,
Expand All @@ -587,19 +605,30 @@ public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result

public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
boolean bypassable) {
this(observerGetter, result, null, bypassable);
this(observerGetter, result, (ObserverRpcCallContext) null, bypassable);
}

public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user) {
this(observerGetter, result, user, false);
}

private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
ObserverRpcCallContext rpcCallContext) {
this(observerGetter, result, rpcCallContext, false);
}

public ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result, User user,
boolean bypassable) {
super(observerGetter, user, bypassable);
this.result = result;
}

private ObserverOperationWithResult(ObserverGetter<C, O> observerGetter, R result,
ObserverRpcCallContext rpcCallContext, boolean bypassable) {
super(observerGetter, rpcCallContext, bypassable);
this.result = result;
}

protected R getResult() {
return this.result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,20 @@ public interface ObserverContext<E extends CoprocessorEnvironment> {
void bypass();

/**
* Returns the active user for the coprocessor call. If an explicit {@code User} instance was
* provided to the constructor, that will be returned, otherwise if we are in the context of an
* RPC call, the remote user is used. May not be present if the execution is outside of an RPC
* context.
* Returns the {@link ObserverRpcCallContext} of an RPC call. May not be present if the execution
* is outside an RPC context.
* @return the context.
*/
Optional<User> getCaller();
Optional<ObserverRpcCallContext> getRpcCallContext();

/**
* Returns the active user for the coprocessor call. May not be present if the execution is
* outside an RPC context.
* @return the {@link User}.
* @deprecated will be removed in 4.0.0. Use {@link #getRpcCallContext()} instead.
*/
@Deprecated
default Optional<User> getCaller() {
return getRpcCallContext().map(ObserverRpcCallContext::getUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcServer;
Expand All @@ -35,14 +36,14 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
* Is this operation bypassable?
*/
private final boolean bypassable;
private final User caller;
private final ObserverRpcCallContext rpcCallContext;

public ObserverContextImpl(User caller) {
this(caller, false);
public ObserverContextImpl(ObserverRpcCallContext rpcCallContext) {
this(rpcCallContext, false);
}

public ObserverContextImpl(User caller, boolean bypassable) {
this.caller = caller;
public ObserverContextImpl(ObserverRpcCallContext rpcCallContext, boolean bypassable) {
this.rpcCallContext = rpcCallContext;
this.bypassable = bypassable;
}

Expand Down Expand Up @@ -83,8 +84,8 @@ public boolean shouldBypass() {
}

@Override
public Optional<User> getCaller() {
return Optional.ofNullable(caller);
public Optional<ObserverRpcCallContext> getRpcCallContext() {
return Optional.ofNullable(rpcCallContext);
}

/**
Expand All @@ -98,8 +99,13 @@ public Optional<User> getCaller() {
@Deprecated
// TODO: Remove this method, ObserverContext should not depend on RpcServer
public static <E extends CoprocessorEnvironment> ObserverContext<E> createAndPrepare(E env) {
ObserverContextImpl<E> ctx = new ObserverContextImpl<>(RpcServer.getRequestUser().orElse(null));
Optional<User> user = RpcServer.getRequestUser();
ObserverRpcCallContext rpcCallContext =
user.map(value -> new ObserverRpcCallContextImpl(value, Map.of())).orElse(null);

ObserverContextImpl<E> ctx = new ObserverContextImpl<>(rpcCallContext);
ctx.prepare(env);

return ctx;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* RPC Call parameters for coprocessor context.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface ObserverRpcCallContext {
/**
* Returns the active user for the coprocessor call.
* @return the {@link User}, it must not be {@code null}.
*/
User getUser();

/**
* Returns the connection attributes for the coprocessor call. These parameters are passed by the
* client through {@code ConnectionHeader} protobuf.
* @return the attributes, it must not be {@code null}.
*/
Map<String, byte[]> getAttributes();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;

import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ObserverRpcCallContextImpl implements ObserverRpcCallContext {
private final User user;
private final Map<String, byte[]> attributes;

public ObserverRpcCallContextImpl(User user, Map<String, byte[]> attributes) {
this.user = Objects.requireNonNull(user, "user must not be null.");
this.attributes = Objects.requireNonNull(attributes, "attributes must not be null.");
}

@Override
public User getUser() {
return user;
}

@Override
public Map<String, byte[]> getAttributes() {
return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,16 @@ public static Optional<User> getRequestUser() {
return ctx.isPresent() ? ctx.get().getRequestUser() : Optional.empty();
}

/**
* Returns the RPC connection attributes for the current RPC request. These attributes are sent by
* the client when initiating a new connection to the HBase server. The attributes are sent in
* {@code ConnectionHeader.attribute} protobuf message.
* @return the attribute map. It will be empty if the method is called outside of an RPC context.
*/
public static Map<String, byte[]> getConnectionAttributes() {
return getCurrentCall().map(RpcCall::getConnectionAttributes).orElse(Map.of());
}

/**
* The number of open RPC conections
* @return the number of open rpc connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
Expand Down Expand Up @@ -202,8 +203,12 @@ public MasterObserverOperation(User user) {
super(masterObserverGetter, user);
}

public MasterObserverOperation(User user, boolean bypassable) {
super(masterObserverGetter, user, bypassable);
public MasterObserverOperation(ObserverRpcCallContext rpcCallContext) {
super(masterObserverGetter, rpcCallContext);
}

public MasterObserverOperation(ObserverRpcCallContext rpcCallContext, boolean bypassable) {
super(masterObserverGetter, rpcCallContext, bypassable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.MetricsCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
Expand Down Expand Up @@ -500,13 +501,22 @@ public RegionObserverOperationWithoutResult(User user) {
super(regionObserverGetter, user);
}

public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext) {
super(regionObserverGetter, rpcCallContext);
}

public RegionObserverOperationWithoutResult(boolean bypassable) {
super(regionObserverGetter, null, bypassable);
super(regionObserverGetter, (ObserverRpcCallContext) null, bypassable);
}

public RegionObserverOperationWithoutResult(User user, boolean bypassable) {
super(regionObserverGetter, user, bypassable);
}

public RegionObserverOperationWithoutResult(ObserverRpcCallContext rpcCallContext,
boolean bypassable) {
super(regionObserverGetter, rpcCallContext, bypassable);
}
}

abstract class BulkLoadObserverOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.ObserverRpcCallContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
Expand Down Expand Up @@ -2342,7 +2343,7 @@ public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnviro
*/
private User getActiveUser(ObserverContext<?> ctx) throws IOException {
// for non-rpc handling, fallback to system user
Optional<User> optionalUser = ctx.getCaller();
Optional<User> optionalUser = ctx.getRpcCallContext().map(ObserverRpcCallContext::getUser);
if (optionalUser.isPresent()) {
return optionalUser.get();
}
Expand Down
Loading