Skip to content

Commit d2ffdd7

Browse files
committed
2 parents a1ed1ea + d856feb commit d2ffdd7

8 files changed

+234
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.handler.codec.MessageToByteEncoder;
6+
7+
/**
8+
* @author wangjun
9+
* @date 2020-08-08
10+
*/
11+
public class MessageToByteTimeEncoder extends MessageToByteEncoder<UnixTime> {
12+
13+
@Override
14+
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
15+
out.writeInt((int)msg.value());
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.EventLoopGroup;
8+
import io.netty.channel.nio.NioEventLoopGroup;
9+
import io.netty.channel.socket.SocketChannel;
10+
import io.netty.channel.socket.nio.NioSocketChannel;
11+
12+
public class TimeClient {
13+
14+
public static void main(String[] args) throws Exception {
15+
16+
String host = "127.0.0.1";// args[0];
17+
int port = 8080;//Integer.parseInt(args[1]);
18+
EventLoopGroup workerGroup = new NioEventLoopGroup();
19+
20+
try {
21+
Bootstrap b = new Bootstrap(); // (1)
22+
b.group(workerGroup); // (2)
23+
b.channel(NioSocketChannel.class); // (3)
24+
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
25+
b.handler(new ChannelInitializer<SocketChannel>() {
26+
@Override
27+
public void initChannel(SocketChannel ch) throws Exception {
28+
ch.pipeline().addLast(new TimeDecoder());
29+
ch.pipeline().addLast(new TimeClientHandler());
30+
}
31+
});
32+
33+
// 启动客户端
34+
ChannelFuture f = b.connect(host, port).sync(); // (5)
35+
36+
// 等待连接关闭
37+
f.channel().closeFuture().sync();
38+
} finally {
39+
workerGroup.shutdownGracefully();
40+
}
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.ChannelInboundHandlerAdapter;
5+
6+
/**
7+
* @author wj89757
8+
* @date 2020-08-08
9+
*/
10+
public class TimeClientHandler extends ChannelInboundHandlerAdapter{
11+
12+
@Override
13+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
14+
UnixTime m = (UnixTime) msg;
15+
System.out.println(m);
16+
ctx.close();
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.handler.codec.ByteToMessageDecoder;
6+
7+
import java.util.List;
8+
9+
/**
10+
* TimeDecoder 处理数据拆分的问题
11+
* @author wj89757
12+
* @date 2020-08-08
13+
*/
14+
public class TimeDecoder extends ByteToMessageDecoder {
15+
16+
@Override
17+
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
18+
if (byteBuf.readableBytes() < 4) {
19+
return;
20+
}
21+
list.add(new UnixTime(byteBuf.readUnsignedInt()));
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.channel.ChannelOutboundHandlerAdapter;
6+
import io.netty.channel.ChannelPromise;
7+
8+
/**
9+
* @author wj89757
10+
* @date 2020-08-08
11+
*/
12+
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
13+
/**
14+
* 第一,通过 ChannelPromise,当编码后的数据被写到了通道上 Netty 可以通过这个对象标记是成功还是失败。
15+
* 第二, 我们不需要调用 cxt.flush()。因为处理器已经单独分离出了一个方法 void flush(ChannelHandlerContext cxt),
16+
* 如果像自己实现 flush() 方法内容可以自行覆盖这个方法。
17+
*/
18+
@Override
19+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
20+
UnixTime m = (UnixTime) msg;
21+
ByteBuf encoded = ctx.alloc().buffer(4);
22+
encoded.writeInt((int)m.value());
23+
ctx.write(encoded, promise); // (1)
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.bootstrap.ServerBootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.EventLoopGroup;
8+
import io.netty.channel.nio.NioEventLoopGroup;
9+
import io.netty.channel.socket.SocketChannel;
10+
import io.netty.channel.socket.nio.NioServerSocketChannel;
11+
12+
/**
13+
* 时间服务器
14+
*/
15+
public class TimeServer {
16+
17+
private int port;
18+
19+
public TimeServer(int port) {
20+
this.port = port;
21+
}
22+
23+
public void run() throws Exception {
24+
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
25+
EventLoopGroup workerGroup = new NioEventLoopGroup();
26+
try {
27+
ServerBootstrap b = new ServerBootstrap(); // (2)
28+
b.group(bossGroup, workerGroup)
29+
.channel(NioServerSocketChannel.class) // (3)
30+
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
31+
@Override
32+
public void initChannel(SocketChannel ch) throws Exception {
33+
ch.pipeline().addLast(new TimeEncoder());
34+
ch.pipeline().addLast(new TimeServerHandler());
35+
}
36+
})
37+
.option(ChannelOption.SO_BACKLOG, 128) // (5)
38+
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
39+
40+
// 绑定端口,开始接收进来的连接
41+
ChannelFuture f = b.bind(port).sync(); // (7)
42+
43+
// 等待服务器 socket 关闭 。
44+
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
45+
f.channel().closeFuture().sync();
46+
} finally {
47+
workerGroup.shutdownGracefully();
48+
bossGroup.shutdownGracefully();
49+
}
50+
}
51+
52+
public static void main(String[] args) throws Exception {
53+
int port;
54+
if (args.length > 0) {
55+
port = Integer.parseInt(args[0]);
56+
} else {
57+
port = 8080;
58+
}
59+
new TimeServer(port).run();
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
9+
/**
10+
* @author wj89757
11+
* @date 2020-08-08
12+
*/
13+
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
14+
15+
@Override
16+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
17+
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
18+
f.addListener(ChannelFutureListener.CLOSE);
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.waylau.netty.demo.pojo;
2+
3+
import java.util.Date;
4+
5+
/**
6+
* @author wj89757
7+
* @date 2020-08-08
8+
*/
9+
public class UnixTime {
10+
private final long value;
11+
12+
public UnixTime() {
13+
this(System.currentTimeMillis() / 1000L + 2208988800L);
14+
}
15+
16+
public UnixTime(long value) {
17+
this.value = value;
18+
}
19+
20+
public long value() {
21+
return value;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return new Date((value() - 2208988800L) * 1000L).toString();
27+
}
28+
}

0 commit comments

Comments
 (0)