1 module hunt.net.NetServerImpl;
2
3 import hunt.net.Connection;
4 import hunt.net.codec;
5 import hunt.net.NetServer;
6 import hunt.net.NetServerOptions;
7 import hunt.net.TcpConnection;
8
9 import hunt.event;
10 import hunt.io;
11 import hunt.logging;
12 import hunt.util.AbstractLifecycle;
13 import hunt.util.Lifecycle;
14
15 import core.atomic;
16 import std.conv;
17 import std.parallelism;
18 import std.socket;
19
20 enum ThreadMode {
21 Single,
22 Multi
23 }
24
25 import hunt.util.DateTime;
26
27 shared static this() {
28 DateTime.startClock();
29 }
30
31 shared static ~this() @nogc {
32 DateTime.stopClock();
33 }
34
35
36 /**
37 *
38 */
39 class NetServerImpl(ThreadMode threadModel = ThreadMode.Single) : AbstractLifecycle, NetServer {
40 private string _host = NetServerOptions.DEFAULT_HOST;
41 private int _port = NetServerOptions.DEFAULT_PORT;
42 protected bool _isStarted;
43 private shared int _connectionId;
44 protected EventLoopGroup _group = null;
45 private NetServerOptions _options;
46 private Codec _codec;
47 private NetConnectionHandler _connectHandler;
48
49 protected Address _address;
50
51 this() {
52 this(new NetServerOptions());
53 }
54
55 this(NetServerOptions options) {
56 this(new EventLoopGroup(options.ioThreadSize(), options.workerThreadSize()), new NetServerOptions());
57 }
58
59 this(EventLoopGroup loopGroup, NetServerOptions options) {
60 _group = loopGroup;
61 _options = options;
62
63 version(Posix) {
64 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
65 // https://github.com/huntlabs/hunt-framework/issues/161
66 import core.sys.posix.signal;
67 sigset_t sigset;
68 sigemptyset(&sigset);
69 sigaction_t siginfo;
70 siginfo.sa_mask = sigset;
71 siginfo.sa_flags = SA_RESTART;
72 siginfo.sa_handler = SIG_IGN;
73 sigaction(SIGPIPE, &siginfo, null);
74 }
75 }
76
77 EventLoopGroup eventLoopGroup() {
78 return _group;
79 }
80
81 NetServerOptions getOptions() {
82 return _options;
83 }
84
85 // NetServer setOptions(NetServerOptions options) {
86 // _options = options;
87 // return this;
88 // }
89
90 NetServer setCodec(Codec codec) {
91 this._codec = codec;
92 return this;
93 }
94
95 Codec getCodec() {
96 return this._codec;
97 }
98
99 NetConnectionHandler getHandler() {
100 return _connectHandler;
101 }
102
103 NetServer setHandler(NetConnectionHandler handler) {
104 _connectHandler = handler;
105 return this;
106 }
107
108 @property Address bindingAddress() {
109 return _address;
110 }
111
112 void listen() {
113 listen("0.0.0.0", 0);
114 }
115
116 void listen(int port) {
117 listen("0.0.0.0", port);
118 }
119
120 void listen(string host, int port) {
121 _host = host;
122 _port = port;
123
124 if (_isStarted)
125 return;
126 _address = new InternetAddress(host, cast(ushort)port);
127
128 version(HUNT_DEBUG) infof("Start to listen on %s:%d", host, port);
129 version(HUNT_THREAD_DEBUG) {
130 import core.thread;
131 warningf("Threads: %d", Thread.getAll().length);
132 }
133 _group.start();
134
135 try {
136
137 static if(threadModel == ThreadMode.Multi) {
138 listeners = new TcpListener[_group.size];
139 for (size_t i = 0; i < _group.size; ++i) {
140 listeners[i] = createServer(_group[i]);
141 version(HUNT_DEBUG) infof("lister[%d] created", i);
142 }
143 version(HUNT_DEBUG) infof("All the servers are listening on %s.", _address.toString());
144 } else {
145 tcpListener = new TcpSocket();
146
147 version (Windows) {
148 import core.sys.windows.winsock2;
149 bool flag = this._options.isReuseAddress() || this._options.isReusePort();
150 tcpListener.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
151 } else {
152 import core.sys.posix.sys.socket : SO_REUSEPORT;
153
154 tcpListener.setOption(SocketOptionLevel.SOCKET,
155 SocketOption.REUSEADDR, _options.isReuseAddress());
156
157 tcpListener.setOption(SocketOptionLevel.SOCKET,
158 cast(SocketOption) SO_REUSEPORT, _options.isReusePort());
159 }
160
161 tcpListener.bind(_address);
162 tcpListener.listen(1000);
163
164 version(HUNT_DEBUG) {
165 infof("Server is listening on %s%s.", _address.toString(),
166 _options.isSsl ? " (with SSL)" : "");
167 }
168 }
169
170 _isStarted = true;
171
172 } catch (Exception e) {
173 warning(e.message);
174 if (_connectHandler !is null)
175 _connectHandler.failedOpeningConnection(0, e);
176 }
177
178 // if (handler !is null)
179 // handler(result);
180
181 static if(threadModel == ThreadMode.Single) {
182 auto theTask = task(&waitingForAccept);
183 // taskPool.put(theTask);
184 theTask.executeInNewThread();
185 // waitingForAccept();
186 }
187 }
188
189 override protected void initialize() {
190 listen(_host, _port);
191 }
192
193 static if(threadModel == ThreadMode.Multi){
194 private TcpListener[] listeners;
195
196 protected TcpListener createServer(EventLoop loop) {
197 TcpListener listener = new TcpListener(loop, _address.addressFamily);
198
199 listener.reusePort(true);
200 listener.bind(_address).listen(1024);
201 listener.onConnectionAccepted((TcpListener sender, TcpStream stream) {
202 auto currentId = atomicOp!("+=")(_connectionId, 1);
203 version(HUNT_DEBUG) tracef("new tcp connection: id=%d", currentId);
204 TcpConnection connection = new TcpConnection(currentId, _options, _connectHandler, stream);
205 // connection.setState(ConnectionState.Opened);
206 if (_connectHandler !is null)
207 _connectHandler.notifyConnectionOpened(connection);
208 });
209 listener.start();
210
211 return listener;
212 }
213
214 override protected void destroy() {
215 if(_isStarted) {
216 foreach(TcpListener ls; listeners) {
217 if (ls !is null)
218 ls.close();
219 }
220 }
221
222 version(HUNT_DEBUG) warning("stopping the EventLoopGroup...");
223 _group.stop();
224 }
225
226 } else {
227 private Socket tcpListener;
228
229 private void waitingForAccept() {
230 while (_isStarted) {
231 try {
232 version(HUNT_THREAD_DEBUG) {
233 import core.thread;
234 tracef("Waiting for accept on %s:%d...(Threads: %d)", _host, _port, Thread.getAll().length);
235 } else version (HUNT_DEBUG) {
236 tracef("Waiting for accept on %s:%d...", _host, _port);
237 }
238 Socket client = tcpListener.accept();
239 processClient(client);
240
241 // auto processTask = task(&processClient, client);
242 // taskPool.put(processTask);
243 } catch (SocketAcceptException e) {
244 warningf("Failure on accept %s", e.msg);
245 version(HUNT_DEBUG) warning(e);
246 _isStarted = false;
247 }
248 }
249 }
250
251 private void processClient(Socket socket) {
252 version(HUNT_METRIC_DEBUG) {
253 import core.time;
254 import hunt.util.DateTime;
255 debug trace("processing client...");
256 MonoTime startTime = MonoTime.currTime;
257 }
258
259 version (HUNT_DEBUG) {
260 infof("new connection from %s, fd=%d", socket.remoteAddress.toString(), socket.handle());
261 }
262
263 TcpStreamOptions streamOptions = _options.toStreamOptions();
264
265 EventLoop loop = _group.nextLoop(cast(size_t)socket.handle());
266 TcpStream stream = new TcpStream(loop, socket, streamOptions);
267
268 auto currentId = atomicOp!("+=")(_connectionId, 1);
269 version(HUNT_DEBUG) tracef("New tcp connection: id=%d", currentId);
270 Connection connection = new TcpConnection(currentId, _options, _connectHandler, _codec, stream);
271 // connection.setState(ConnectionState.Opened);
272 if (_connectHandler !is null) {
273 _connectHandler.connectionOpened(connection);
274 }
275 stream.start();
276
277 version(HUNT_METRIC_DEBUG) {
278 Duration timeElapsed = MonoTime.currTime - startTime;
279 warningf("peer connection processing done in: %d microseconds",
280 timeElapsed.total!(TimeUnit.Microsecond)());
281 }
282 }
283
284 int actualPort() {
285 return _port;
286 }
287
288 override void close() {
289 this.stop();
290 }
291
292 override protected void destroy() {
293 if(_isStarted && tcpListener !is null) {
294 tcpListener.close();
295 }
296 }
297
298 bool isOpen() {
299 return _isStarted;
300 }
301 }
302 }