@@ -792,17 +792,36 @@ type StreamingConn struct {
792
792
suite network.Suite
793
793
}
794
794
795
+ // StreamingReadOpts contains options for the ReadMessageWithOpts. It allows us
796
+ // to add new options in the future without making breaking changes.
797
+ type StreamingReadOpts struct {
798
+ Deadline time.Time
799
+ }
800
+
795
801
// ReadMessage read more data from the connection, it will block if there are
796
802
// no messages.
797
803
func (c * StreamingConn ) ReadMessage (ret interface {}) error {
798
- if err := c .conn .SetReadDeadline (time .Now ().Add (5 * time .Minute )); err != nil {
804
+ opts := StreamingReadOpts {
805
+ Deadline : time .Now ().Add (5 * time .Minute ),
806
+ }
807
+
808
+ return c .readMsg (ret , opts )
809
+ }
810
+
811
+ // ReadMessageWithOpts does the same as ReadMessage and allows to pass options.
812
+ func (c * StreamingConn ) ReadMessageWithOpts (ret interface {}, opts StreamingReadOpts ) error {
813
+ return c .readMsg (ret , opts )
814
+ }
815
+
816
+ func (c * StreamingConn ) readMsg (ret interface {}, opts StreamingReadOpts ) error {
817
+ if err := c .conn .SetReadDeadline (opts .Deadline ); err != nil {
799
818
return xerrors .Errorf ("read deadline: %v" , err )
800
819
}
801
820
// No need to add bytes to counter here because this function is only
802
821
// called by the client.
803
822
_ , buf , err := c .conn .ReadMessage ()
804
823
if err != nil {
805
- return xerrors .Errorf ("connection read: %v " , err )
824
+ return xerrors .Errorf ("connection read: %w " , err )
806
825
}
807
826
err = protobuf .DecodeWithConstructors (buf , ret , network .DefaultConstructors (c .suite ))
808
827
if err != nil {
@@ -811,6 +830,11 @@ func (c *StreamingConn) ReadMessage(ret interface{}) error {
811
830
return nil
812
831
}
813
832
833
+ // Ping sends a ping message. Data can be nil.
834
+ func (c * StreamingConn ) Ping (data []byte , deadline time.Time ) error {
835
+ return c .conn .WriteControl (websocket .PingMessage , data , deadline )
836
+ }
837
+
814
838
// Stream will send a request to start streaming, it returns a connection where
815
839
// the client can continue to read values from it.
816
840
func (c * Client ) Stream (dst * network.ServerIdentity , msg interface {}) (StreamingConn , error ) {
0 commit comments