diff --git a/CHANGELOG.md b/CHANGELOG.md index a5511b69ca519..87be4b481e58e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) +- Add support for serializing `InetSocketAddress` in StreamOutput/StreamInput ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java index 1deeb4c60d469..650fc84a4ab40 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/StreamInput.java @@ -63,6 +63,8 @@ import java.io.FilterInputStream; import java.io.IOException; import java.math.BigInteger; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.file.AccessDeniedException; import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.DirectoryNotEmptyException; @@ -346,6 +348,14 @@ public BigInteger readBigInteger() throws IOException { return new BigInteger(readString()); } + public InetSocketAddress readInetSocketAddress() throws IOException { + String host = this.readString(); + byte[] addressBytes = this.readByteArray(); + InetAddress addr = InetAddress.getByAddress(host, addressBytes); + int port = this.readInt(); + return new InetSocketAddress(addr, port); + } + @Nullable public Text readOptionalText() throws IOException { int length = readInt(); @@ -742,6 +752,8 @@ public Object readGenericValue() throws IOException { return readCollection(StreamInput::readGenericValue, HashSet::new, Collections.emptySet()); case 26: return readBigInteger(); + case 27: + return readInetSocketAddress(); default: throw new IOException("Can't read unknown type [" + type + "]"); } diff --git a/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java index b0f4f6c8a6139..b0e44c099f1c4 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/StreamOutput.java @@ -63,6 +63,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; +import java.net.InetSocketAddress; import java.nio.file.AccessDeniedException; import java.nio.file.AtomicMoveNotSupportedException; import java.nio.file.DirectoryNotEmptyException; @@ -791,6 +792,13 @@ public final void writeOptionalInstant(@Nullable Instant instant) throws IOExcep o.writeByte((byte) 26); o.writeString(v.toString()); }); + writers.put(InetSocketAddress.class, (o, v) -> { + final InetSocketAddress inetSocketAddress = (InetSocketAddress) v; + o.writeByte((byte) 27); + o.writeString(inetSocketAddress.getHostString()); + o.writeByteArray(inetSocketAddress.getAddress().getAddress()); + o.writeInt(inetSocketAddress.getPort()); + }); WRITERS = Collections.unmodifiableMap(writers); } diff --git a/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java index bd970be5e977d..7b1f9c898c298 100644 --- a/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java +++ b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java @@ -46,6 +46,8 @@ import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.time.Instant; import java.time.ZoneOffset; import java.util.ArrayList; @@ -513,6 +515,14 @@ public void testMapIsWriteable() throws IOException { assertNotWriteable(Collections.singletonMap("a", new Unwriteable()), Unwriteable.class); } + public void testInetSocketAddress() throws IOException { + InetSocketAddress toWrite = new InetSocketAddress(InetAddress.getLoopbackAddress(), 80); + BytesStreamOutput out = new BytesStreamOutput(); + out.writeGenericValue(toWrite); + assertEquals(toWrite, getStreamInput(out.bytes()).readGenericValue()); + out.close(); + } + public void testObjectArrayIsWriteable() throws IOException { StreamOutput.checkWriteable(new Object[] { "a", "b" }); assertNotWriteable(new Object[] { new Unwriteable() }, Unwriteable.class); diff --git a/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java index 36b84560bce41..a89e77750bb5b 100644 --- a/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamsTests.java @@ -48,6 +48,8 @@ import java.io.EOFException; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; @@ -311,6 +313,7 @@ public void testSimpleStreams() throws Exception { float[] floatArray = { 1.1f, 2.2f, 3.3f }; out.writeGenericValue(floatArray); double[] doubleArray = { 1.1, 2.2, 3.3 }; + InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 80); out.writeGenericValue(doubleArray); out.writeString("hello"); out.writeString("goodbye"); @@ -324,6 +327,7 @@ public void testSimpleStreams() throws Exception { out.writeOptionalTimeZone(DateTimeZone.getDefault()); out.writeOptionalTimeZone(null); out.writeGenericValue(new DateTime(123456, DateTimeZone.forID("America/Los_Angeles"))); + out.writeGenericValue(inetSocketAddress); final byte[] bytes = BytesReference.toBytes(out.bytes()); StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); assertEquals(in.available(), bytes.length); @@ -361,6 +365,7 @@ public void testSimpleStreams() throws Exception { JodaCompatibleZonedDateTime jdt = (JodaCompatibleZonedDateTime) dt; assertThat(jdt.getZonedDateTime().toInstant().toEpochMilli(), equalTo(123456L)); assertThat(jdt.getZonedDateTime().getZone(), equalTo(ZoneId.of("America/Los_Angeles"))); + assertEquals(inetSocketAddress, in.readGenericValue()); assertEquals(0, in.available()); IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> out.writeGenericValue(new Object() { @Override