Skip to content

Commit 60045e7

Browse files
committed
[2.x] Use predictable serialization logic for transport headers
This change will prevent new clusters from using the 'custom serialization' format that was causing performance impact with customers in OpenSearch 2.11. Background: the serialization changes from opensearch-project#2802 introduced issues where for certain serialization headers that were previously very small for over the wire transmission become much larger. The root cause of this was that the JDK serialization process was able to detect duplicate entries and then use an encoding format to make it compressible. Adding this logic into the serialization system from OpenSearch is non-trivial and is not being invested in. Signed-off-by: Peter Nied <petern@amazon.com>
1 parent badea0d commit 60045e7

File tree

7 files changed

+113
-28
lines changed

7 files changed

+113
-28
lines changed

src/main/java/org/opensearch/security/ssl/transport/SecuritySSLRequestHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.security.ssl.util.ExceptionUtils;
3535
import org.opensearch.security.ssl.util.SSLRequestHelper;
3636
import org.opensearch.security.support.ConfigConstants;
37+
import org.opensearch.security.support.SerializationFormat;
3738
import org.opensearch.tasks.Task;
3839
import org.opensearch.threadpool.ThreadPool;
3940
import org.opensearch.transport.TransportChannel;
@@ -92,7 +93,7 @@ public final void messageReceived(T request, TransportChannel channel, Task task
9293

9394
threadContext.putTransient(
9495
ConfigConstants.USE_JDK_SERIALIZATION,
95-
channel.getVersion().before(ConfigConstants.FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION)
96+
SerializationFormat.determineFormat(channel.getVersion()) == SerializationFormat.JDK
9697
);
9798

9899
if (SSLRequestHelper.containsBadHeader(threadContext, "_opendistro_security_ssl_")) {

src/main/java/org/opensearch/security/support/ConfigConstants.java

-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.google.common.collect.ImmutableList;
3636
import com.google.common.collect.ImmutableSet;
3737

38-
import org.opensearch.Version;
3938
import org.opensearch.common.settings.Settings;
4039
import org.opensearch.security.auditlog.impl.AuditCategory;
4140

@@ -332,7 +331,6 @@ public enum RolesMappingResolution {
332331
public static final String TENANCY_GLOBAL_TENANT_DEFAULT_NAME = "";
333332

334333
public static final String USE_JDK_SERIALIZATION = "plugins.security.use_jdk_serialization";
335-
public static final Version FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION = Version.V_2_11_0;
336334

337335
// On-behalf-of endpoints settings
338336
// CS-SUPPRESS-SINGLE: RegexpSingleline get Extensions Settings
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
package org.opensearch.security.support;
13+
14+
import org.opensearch.Version;
15+
16+
public enum SerializationFormat {
17+
/** Uses Java's native serialization system */
18+
JDK,
19+
/** Uses a custom serializer built ontop of OpenSearch 2.11 */
20+
CustomSerializer_2_11;
21+
22+
private static final Version FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION = Version.V_2_11_0;
23+
private static final Version CUSTOM_SERIALIZATION_NO_LONGER_SUPPORTED_OS_VERSION = Version.V_2_14_0;
24+
25+
/**
26+
* Determines the format of serialization that should be used from a version identifier
27+
*/
28+
public static SerializationFormat determineFormat(final Version version) {
29+
if (version.onOrAfter(FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION)
30+
&& version.before(CUSTOM_SERIALIZATION_NO_LONGER_SUPPORTED_OS_VERSION)) {
31+
return SerializationFormat.CustomSerializer_2_11;
32+
}
33+
return SerializationFormat.JDK;
34+
}
35+
}

src/main/java/org/opensearch/security/transport/SecurityInterceptor.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.opensearch.security.support.Base64Helper;
6363
import org.opensearch.security.support.ConfigConstants;
6464
import org.opensearch.security.support.HeaderHelper;
65+
import org.opensearch.security.support.SerializationFormat;
6566
import org.opensearch.security.user.User;
6667
import org.opensearch.threadpool.ThreadPool;
6768
import org.opensearch.transport.Transport.Connection;
@@ -150,7 +151,8 @@ public <T extends TransportResponse> void sendRequestDecorate(
150151
final String origCCSTransientMf = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_CCS);
151152

152153
final boolean isDebugEnabled = log.isDebugEnabled();
153-
final boolean useJDKSerialization = connection.getVersion().before(ConfigConstants.FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION);
154+
155+
final var serializationFormat = SerializationFormat.determineFormat(connection.getVersion());
154156
final boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode());
155157

156158
try (ThreadContext.StoredContext stashedContext = getThreadContext().stashContext()) {
@@ -228,25 +230,28 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROL
228230
);
229231
}
230232

231-
if (useJDKSerialization) {
232-
Map<String, String> jdkSerializedHeaders = new HashMap<>();
233-
HeaderHelper.getAllSerializedHeaderNames()
234-
.stream()
235-
.filter(k -> headerMap.get(k) != null)
236-
.forEach(k -> jdkSerializedHeaders.put(k, Base64Helper.ensureJDKSerialized(headerMap.get(k))));
237-
headerMap.putAll(jdkSerializedHeaders);
233+
try {
234+
if (serializationFormat == SerializationFormat.JDK) {
235+
Map<String, String> jdkSerializedHeaders = new HashMap<>();
236+
HeaderHelper.getAllSerializedHeaderNames()
237+
.stream()
238+
.filter(k -> headerMap.get(k) != null)
239+
.forEach(k -> jdkSerializedHeaders.put(k, Base64Helper.ensureJDKSerialized(headerMap.get(k))));
240+
headerMap.putAll(jdkSerializedHeaders);
241+
}
242+
getThreadContext().putHeader(headerMap);
243+
} catch (IllegalArgumentException iae) {
244+
log.debug("Failed to add headers information onto on thread context", iae);
238245
}
239246

240-
getThreadContext().putHeader(headerMap);
241-
242247
ensureCorrectHeaders(
243248
remoteAddress0,
244249
user0,
245250
origin0,
246251
injectedUserString,
247252
injectedRolesString,
248253
isSameNodeRequest,
249-
useJDKSerialization
254+
serializationFormat
250255
);
251256

252257
if (actionTraceEnabled.get()) {
@@ -275,7 +280,7 @@ private void ensureCorrectHeaders(
275280
final String injectedUserString,
276281
final String injectedRolesString,
277282
final boolean isSameNodeRequest,
278-
final boolean useJDKSerialization
283+
final SerializationFormat format
279284
) {
280285
// keep original address
281286

@@ -313,6 +318,7 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADE
313318
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserString);
314319
}
315320
} else {
321+
final var useJDKSerialization = format == SerializationFormat.JDK;
316322
if (transportAddress != null) {
317323
getThreadContext().putHeader(
318324
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER,

src/test/java/org/opensearch/security/support/Base64HelperTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@
1111
package org.opensearch.security.support;
1212

1313
import java.io.Serializable;
14+
import java.util.HashMap;
15+
import java.util.stream.IntStream;
1416

1517
import org.junit.Assert;
1618
import org.junit.Test;
1719

20+
import static org.hamcrest.Matchers.closeTo;
21+
import static org.hamcrest.Matchers.equalTo;
1822
import static org.opensearch.security.support.Base64Helper.deserializeObject;
1923
import static org.opensearch.security.support.Base64Helper.serializeObject;
24+
import static org.junit.Assert.assertThat;
2025

2126
public class Base64HelperTest {
2227

@@ -48,4 +53,22 @@ public void testEnsureJDKSerialized() {
4853
Assert.assertEquals(jdkSerialized, Base64Helper.ensureJDKSerialized(jdkSerialized));
4954
Assert.assertEquals(jdkSerialized, Base64Helper.ensureJDKSerialized(customSerialized));
5055
}
56+
57+
@Test
58+
public void testDuplicatedItemSizes() {
59+
var largeObject = new HashMap<String, Object>();
60+
var hm = new HashMap<>();
61+
IntStream.range(0, 100).forEach(i -> { hm.put("c" + i, "cvalue" + i); });
62+
IntStream.range(0, 100).forEach(i -> { largeObject.put("b" + i, hm); });
63+
64+
final var jdkSerialized = Base64Helper.serializeObject(largeObject, true);
65+
final var customSerialized = Base64Helper.serializeObject(largeObject, false);
66+
final var customSerializedOnlyHashMap = Base64Helper.serializeObject(hm, false);
67+
68+
assertThat(jdkSerialized.length(), equalTo(3832));
69+
// The custom serializer is ~50x larger than the jdk serialized version
70+
assertThat(customSerialized.length(), equalTo(184792));
71+
// Show that the majority of the size of the custom serialized large object is the map duplicated ~100 times
72+
assertThat((double) customSerializedOnlyHashMap.length(), closeTo(customSerialized.length() / 100, 70d));
73+
}
5174
}

src/test/java/org/opensearch/security/transport/SecurityInterceptorTests.java

+15-13
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
import java.net.InetAddress;
1313
import java.net.InetSocketAddress;
1414
import java.net.UnknownHostException;
15-
import java.util.concurrent.ExecutorService;
16-
import java.util.concurrent.Executors;
15+
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicReference;
1718

1819
import org.junit.Before;
1920
import org.junit.Test;
@@ -120,7 +121,7 @@ public class SecurityInterceptorTests {
120121

121122
private AsyncSender sender;
122123
private AsyncSender serializedSender;
123-
private AsyncSender nullSender;
124+
private AtomicReference<CountDownLatch> senderLatch = new AtomicReference<>(new CountDownLatch(1));
124125

125126
@Before
126127
public void setup() {
@@ -208,6 +209,7 @@ public <T extends TransportResponse> void sendRequest(
208209
) {
209210
String serializedUserHeader = threadPool.getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
210211
assertEquals(serializedUserHeader, Base64Helper.serializeObject(user, true));
212+
senderLatch.get().countDown();
211213
}
212214
};
213215

@@ -222,6 +224,7 @@ public <T extends TransportResponse> void sendRequest(
222224
) {
223225
User transientUser = threadPool.getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER);
224226
assertEquals(transientUser, user);
227+
senderLatch.get().countDown();
225228
}
226229
};
227230

@@ -249,17 +252,16 @@ final void completableRequestDecorate(
249252
TransportResponseHandler handler,
250253
DiscoveryNode localNode
251254
) {
255+
securityInterceptor.sendRequestDecorate(sender, connection, action, request, options, handler, localNode);
256+
verifyOriginalContext(user);
257+
try {
258+
senderLatch.get().await(1, TimeUnit.SECONDS);
259+
} catch (final InterruptedException e) {
260+
throw new RuntimeException(e);
261+
}
252262

253-
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
254-
255-
singleThreadExecutor.execute(() -> {
256-
try {
257-
securityInterceptor.sendRequestDecorate(sender, connection, action, request, options, handler, localNode);
258-
verifyOriginalContext(user);
259-
} finally {
260-
singleThreadExecutor.shutdown();
261-
}
262-
});
263+
// Reset the latch so another request can be processed
264+
senderLatch.set(new CountDownLatch(1));
263265
}
264266

265267
@Test

src/test/java/org/opensearch/security/transport/SecuritySSLRequestHandlerTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ public void testUseJDKSerializationHeaderIsSetOnMessageReceived() throws Excepti
9292
when(transportChannel.getVersion()).thenReturn(Version.V_2_11_0);
9393
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, transportChannel, task));
9494
Assert.assertFalse(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
95+
96+
threadPool.getThreadContext().stashContext();
97+
when(transportChannel.getVersion()).thenReturn(Version.V_2_13_0);
98+
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, transportChannel, task));
99+
Assert.assertFalse(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
100+
101+
threadPool.getThreadContext().stashContext();
102+
when(transportChannel.getVersion()).thenReturn(Version.V_2_14_0);
103+
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, transportChannel, task));
104+
Assert.assertTrue(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
95105
}
96106

97107
@Test
@@ -111,6 +121,16 @@ public void testUseJDKSerializationHeaderIsSetWithWrapperChannel() throws Except
111121
when(transportChannel.getVersion()).thenReturn(Version.V_2_11_0);
112122
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, wrappedChannel, task));
113123
Assert.assertFalse(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
124+
125+
threadPool.getThreadContext().stashContext();
126+
when(transportChannel.getVersion()).thenReturn(Version.V_2_13_0);
127+
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, wrappedChannel, task));
128+
Assert.assertFalse(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
129+
130+
threadPool.getThreadContext().stashContext();
131+
when(transportChannel.getVersion()).thenReturn(Version.V_2_14_0);
132+
Assert.assertThrows(Exception.class, () -> securitySSLRequestHandler.messageReceived(transportRequest, wrappedChannel, task));
133+
Assert.assertTrue(threadPool.getThreadContext().getTransient(ConfigConstants.USE_JDK_SERIALIZATION));
114134
}
115135

116136
@Test

0 commit comments

Comments
 (0)