-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ZVIEW command #1329
Support ZVIEW command #1329
Conversation
| ZSTREAM | ||
| ZVIEW | ||
| ZTABLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ZSTREAM | |
| ZVIEW | |
| ZTABLE | |
| ZSTREAM | |
| ZTABLE | |
| ZVIEW |
(alphabetical)
|
||
write "CREATE ZSTREAM weather (\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this commands
instead so it fits better to have reply-to replies
, and structure the columns accordingly.
" reply_to = 'replies',\n" | ||
" function = compute_status_and_correlation_id,\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create these too, so that the example is self-documenting and self-consistent, agree?
Also, after this first pass, we wanted the function
to become less aware of correlation_id, and handle that implicitly, agreed?
How do we route specific commands to specific functions?
This seems to currently imply every function has its own ZSTREAM and therefore it's own pair of command and reply topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many root
pgsql
connections are created to risingwave
per regular user pgsql
connection to zilla
?
@@ -207,17 +207,17 @@ public void shouldHandleEmptyDropStatement() | |||
@Test | |||
public void shouldParseDropView() | |||
{ | |||
String sql = "DROP VIEW test_view;"; | |||
String sql = "DROP ZVIEW test_view;"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this change be reverted since this is shouldParseDropView()
and we also have a separate shouldParseDropZView()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
@@ -19,6 +19,18 @@ accept "zilla://streams/app1" | |||
|
|||
accepted | |||
|
|||
read zilla:begin.ext ${pgsql:beginEx() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read zilla:begin.ext ${pgsql:beginEx() | |
read zilla:begin.ext ${pgsql:matchBeginEx() |
Same feedback for other scripts, where we are reading the pgsql begin extension.
binding.routes.forEach(r -> | ||
{ | ||
final long clientId = r.id + DEFAULT_USER_HASH; | ||
streamsByRouteIds.put(clientId, | ||
new PgsqlClient(this, routedId, r.id, clientId, DEFAULT_USER)); | ||
}); | ||
|
||
final long clientId = risingwaveRouteId + ZILLABASE_USER_HASH; | ||
streamsByRouteIds.put(clientId, | ||
new PgsqlClient(this, routedId, risingwaveRouteId, clientId, ZILLABASE_USER)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should discuss the design problem you are trying to solve with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we discussed changing this, where zillabase
is the system user that could potentially be done just once (per core). Let me know if there are any changes you'd like to make to prepare for that and we can file an enhancement to track the optimization for after the feature work is completed.
One |
@@ -29,6 +29,7 @@ public class SqlAlterStreamTopicListener extends PostgreSqlParserBaseListener | |||
{ | |||
private final TokenStream tokens; | |||
|
|||
private String schema; | |||
private String name; | |||
private final List<AlterExpression> alterExpressions = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move final
field up with other final
fields and initialize in constructor as it is not a (pseudo-) constant.
@@ -29,6 +29,7 @@ public class SqlAlterTableTopicListener extends PostgreSqlParserBaseListener | |||
{ | |||
private final TokenStream tokens; | |||
|
|||
private String schema; | |||
private String name; | |||
private final List<AlterExpression> alterExpressions = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same feedback.
name = ctx.getText(); | ||
String text = ctx.getText(); | ||
String[] split = text.split("\\."); | ||
schema = split.length > 1 ? split[0] : "public"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this default "public"
to a constant.
name = ctx.getText(); | ||
String text = ctx.getText(); | ||
String[] split = text.split("\\."); | ||
schema = split.length > 1 ? split[0] : "public"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same feedback.
name = ctx.create_mv_target().qualified_name().getText(); | ||
String text = ctx.create_mv_target().qualified_name().getText(); | ||
String[] split = text.split("\\."); | ||
schema = split.length > 1 ? split[0] : "public"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same feedback.
{ | ||
String text = n.getText(); | ||
String[] split = text.split("\\."); | ||
String schema = split.length > 1 ? split[0] : "public"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same feedback.
@@ -52,7 +53,7 @@ public void shouldParseWithPrimaryKeySql() | |||
} | |||
|
|||
@Test | |||
public void shouldCreateParseWithPrimaryKeysSql() | |||
public void shouldCreateTableParseWithPrimaryKeysSql() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void shouldCreateTableParseWithPrimaryKeysSql() | |
public void shouldCreateTableWithPrimaryKey() |
works?
Note: we are in the parser test class, so no need to use parse in the name of any of the test methods, please review all of the test method names.
assertEquals("public", drops.get(0).schema()); | ||
assertEquals("table1", drops.get(0).name()); | ||
assertEquals("public", drops.get(1).schema()); | ||
assertEquals("table2", drops.get(1).name()); | ||
} | ||
|
||
@Test(expected = ParseCancellationException.class) | ||
public void shouldHandleEmptyDropStatement() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void shouldHandleEmptyDropStatement() | |
public void shouldDropTable() |
binding.routes.forEach(r -> | ||
{ | ||
final long clientId = r.id + DEFAULT_USER_HASH; | ||
streamsByRouteIds.put(clientId, | ||
new PgsqlClient(this, routedId, r.id, clientId, DEFAULT_USER)); | ||
}); | ||
|
||
final long clientId = risingwaveRouteId + ZILLABASE_USER_HASH; | ||
streamsByRouteIds.put(clientId, | ||
new PgsqlClient(this, routedId, risingwaveRouteId, clientId, ZILLABASE_USER)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we discussed changing this, where zillabase
is the system user that could potentially be done just once (per core). Let me know if there are any changes you'd like to make to prepare for that and we can file an enhancement to track the optimization for after the feature work is completed.
@@ -31,15 +31,15 @@ public PgsqlKafkaKeyAvroSchemaTemplate( | |||
|
|||
public String generate( | |||
String database, | |||
Table table) | |||
CreateTable createTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateTable createTable) | |
CreateTable command) |
Seems more readable, agreed?
Let's update the other generate
methods too, so it's all consistent.
@@ -17,6 +17,7 @@ | |||
import java.util.List; | |||
|
|||
public record Alter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AlterZView
AlterZTable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't support alter zview and ztable is addressed in Ztable PR
@@ -16,7 +16,8 @@ | |||
|
|||
import java.util.Map; | |||
|
|||
public record Stream( | |||
public record CreateStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateZStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be in another PR
@@ -17,7 +17,8 @@ | |||
import java.util.List; | |||
import java.util.Set; | |||
|
|||
public record Table( | |||
public record CreateTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CreateZTable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is covered in ZTABLE PR
Description
Support Zview statement