Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support pgsql binding #1200

Merged
merged 50 commits into from
Aug 23, 2024
Merged

Support pgsql binding #1200

merged 50 commits into from
Aug 23, 2024

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented Aug 14, 2024

Support psql binding with server and client kinds, so that pgsql client can be proxied to a pgsql server.

Fixes #1057


read 43 # type
17 # length
"CREATE_TABLE" # tag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the ready message after completed too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename the scenario to create.table.with.primary.key and avoid the nested directory structure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename the scenario to create.table.with.primary.key and avoid the nested directory structure.


write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.pgsqlQuery()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.pgsqlQuery()
.query()

The context of pgsql is already clear, no need to repeat.


read zilla:flush.ext ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.pgsqlCompleted()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.pgsqlCompleted()
.completed()

read zilla:flush.ext ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.pgsqlReady()
.status(73)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have an enum for status; IDLE, TRANSACTION, ERROR.

"CREATE TABLE IF NOT EXISTS balances (name VARCHAR, type VARCHAR, description VARCHAR, PRIMARY KEY (name));"


read 43 # type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
read 43 # type
read 43 # type C


connected

write 51 # type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
write 51 # type
write 51 # type Q


struct PgsqlReadyFlushEx
{
int32 status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum PgsqlStatus (uint32)
{
    IDLE(73),
    TRANSACTION(84),
    ERROR(69)
}
Suggested change
int32 status;
PgsqlStatus status;

int32 typeOid;
int16 length;
int16 modifier;
int16 text;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum PgsqlFormat (uint16)
{
    TEXT(0),
    BINARY(1)
}
Suggested change
int16 text;
PgsqlFormat format;

jfallows
jfallows previously approved these changes Aug 15, 2024
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Remember to update module-info.java as well.

}

@Test
public void shouldEncodeAmqpDataExtensionWithDeferred()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo 😄

Comment on lines 43 to 47
final byte[] array = dataEx()
.typeId(0)
.query()
.build()
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naturally we need to get the test coverage to have a successful build.


public class PgsqlConfiguration extends Configuration
{
private static final ConfigurationDef ECHO_CONFIG;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final ConfigurationDef ECHO_CONFIG;
private static final ConfigurationDef PGSQL_CONFIG;

Comment on lines 143 to 155

@FunctionalInterface
private interface PgsqlServerDecoder
{
int decode(
PgsqlServer server,
long traceId,
long authorization,
long budgetId,
DirectBuffer buffer,
int offset,
int limit);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move to the end of the class.

Comment on lines 573 to 589

private void doNetworkData(
long traceId,
long authorization,
int flags,
long budgetId,
DirectBuffer buffer,
int offset,
int length)
{
final int reserved = length + replyPad;

doData(network, originId, routedId, replyId, replySeq, replyAck, replyMax, traceId, authorization,
flags, budgetId, reserved, buffer, offset, length, EMPTY_EXTENSION);

replySeq += reserved;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this up with the other doNetworkXxx methods.


private final BufferPool bufferPool;
private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer frameBuffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PGSQL protocol specification, they use the terminology "message", not "frame".

Let's replace all references to "frame" with "message" here to convey the same concept precisely.

@akrambek akrambek marked this pull request as ready for review August 22, 2024 06:01
@akrambek akrambek changed the title Pgsql support Support pgsql binding Aug 22, 2024
@@ -0,0 +1,66 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to NetworkIT and move up one level.

@@ -24,7 +24,7 @@
</licenses>

<properties>
<jacoco.coverage.ratio>1.00</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.79</jacoco.coverage.ratio>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be above 80% please.

}
}

private void doNetworkData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void doNetworkData(
private void doNetworkData(


final PgsqlAuthenticationMessageFW pgsqlAuth = authMessageRO.tryWrap(buffer, offset, limit);

if (pgsqlAuth != null && pgsqlAuth.authenticationType() == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if a different authentication type is present?
Perhaps we need to reject.

Comment on lines +1391 to +1399
final PgsqlMessageFW pgsqlType = messageRO.tryWrap(buffer, offset, limit);

if (pgsqlType != null)
{
progressOffset = pgsqlType.limit();

short fieldCount = buffer.getShort(progressOffset, BIG_ENDIAN);
progressOffset += Short.BYTES;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this approach work if pgsql message wraps but the remaining fields following do not?

I think we need to make sure that the progress is only advanced in the returned value if we're moving to a different decoder state, otherwise we would attempt reentry here but not aligned with the decoding logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah so I thought about this since I don't know if it succeeds to fully decode the message I have below logic at the end of the if condition

if (columns.fieldCount() == fieldCount)
            {
                client.onDecodeMessageType(traceId, authorization, columns);

                client.decoder = decodePgsqlMessage;
            }

doNetworkData(traceId, authorization, FLAGS_COMP, 0L, messageBuffer, 0, backendKeyMessage.limit());

doEncodeParamStatus(traceId, "client_encoding", "UTF8");
doEncodeParamStatus(traceId, "standard_confirming_strings", "on");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
doEncodeParamStatus(traceId, "standard_confirming_strings", "on");
doEncodeParamStatus(traceId, "standard_conforming_strings", "on");

doEncodeParamStatus(traceId, "client_encoding", "UTF8");
doEncodeParamStatus(traceId, "standard_confirming_strings", "on");
doEncodeParamStatus(traceId, "server_version", "1.0.0");
doEncodeParamStatus(traceId, "application_name", "psql");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not zilla?

Comment on lines 1440 to 1443
if (buffer.getByte(progress.value) == (byte) 0x00)
{
progress.addAndGet(Byte.BYTES);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this part conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an indication of the end of a parameter list


return length;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines 1629 to 1646
private int getLengthOfString(
DirectBuffer buffer,
int startingOffset)
{
int length = -1;

length:
for (int i = startingOffset; i < buffer.capacity(); i++)
{
if (buffer.getByte(i) == 0x00)
{
length = i - startingOffset + 1;
break length;
}
}

return length;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private int getLengthOfString(
DirectBuffer buffer,
int startingOffset)
{
int length = -1;
length:
for (int i = startingOffset; i < buffer.capacity(); i++)
{
if (buffer.getByte(i) == 0x00)
{
length = i - startingOffset + 1;
break length;
}
}
return length;
}
private int getLengthOfString(
DirectBuffer buffer,
int offset)
{
int length = -1;
loop:
for (int progress = offset; i < buffer.capacity(); progress++)
{
if (buffer.getByte(progress) == 0x00)
{
length = progress - offset + 1;
break loop;
}
}
return length;
}

Comment on lines 30 to 39
write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.deferred(41)
.build()
.build()}
write "CREATE TABLE IF NOT EXISTS balances (name VARCHAR, type VARCHAR, description VARCHAR, timestamp VARCHAR, PRIMARY KEY (name));"
[0x00]

write flush
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this verify fragmented DATA frames?
Don't we need to control the flags explicitly to send 2 different DATA frames, one with INIT flag and deferred in extension, and one with FIN flag plus the remaining deferred payload bytes?

@jfallows jfallows merged commit 20230bf into aklivity:develop Aug 23, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support pgsql binding
2 participants