1 module hunt.net.NetServerImpl;
2 
3 import hunt.net.Connection;
4 import hunt.net.codec;
5 import hunt.net.NetServer;
6 import hunt.net.NetServerOptions;
7 import hunt.net.TcpConnection;
8 
9 import hunt.event; 
10 import hunt.io;
11 import hunt.logging;
12 import hunt.util.AbstractLifecycle;
13 import hunt.util.Lifecycle;
14 
15 import core.atomic;
16 import std.conv;
17 import std.parallelism;
18 import std.socket;
19 
20 enum ThreadMode {
21     Single,
22     Multi
23 }
24 
25 import hunt.util.DateTime;
26 
27 shared static this() {
28     DateTime.startClock();
29 }
30 
31 shared static ~this() @nogc {
32     DateTime.stopClock();
33 }
34 
35 
36 /**
37  * 
38  */
39 class NetServerImpl(ThreadMode threadModel = ThreadMode.Single) : AbstractLifecycle, NetServer {
40     private string _host = NetServerOptions.DEFAULT_HOST;
41     private int _port = NetServerOptions.DEFAULT_PORT;
42     protected bool _isStarted;
43     private shared int _connectionId;
44     protected EventLoopGroup _group = null;
45     private NetServerOptions _options;
46     private Codec _codec;
47     private NetConnectionHandler _connectHandler;
48 
49 	protected Address _address;
50 
51     this() {
52         this(new NetServerOptions());
53     }
54 
55     this(NetServerOptions options) {
56         this(new EventLoopGroup(options.ioThreadSize(), options.workerThreadSize()), new NetServerOptions());
57     }
58 
59     this(EventLoopGroup loopGroup, NetServerOptions options) {
60         _group = loopGroup;
61         _options = options;
62 
63         version(Posix) {
64             // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
65             // https://github.com/huntlabs/hunt-framework/issues/161
66             import core.sys.posix.signal;
67             sigset_t sigset;
68             sigemptyset(&sigset);
69             sigaction_t siginfo;
70             siginfo.sa_mask = sigset;
71             siginfo.sa_flags = SA_RESTART;
72             siginfo.sa_handler = SIG_IGN;
73             sigaction(SIGPIPE, &siginfo, null);
74         }        
75     }
76 
77     EventLoopGroup eventLoopGroup() {
78         return _group;
79     }
80 
81     NetServerOptions getOptions() {
82         return _options;
83     }
84     
85     // NetServer setOptions(NetServerOptions options) {
86     //     _options = options;
87     //     return this;
88     // }
89 
90     NetServer setCodec(Codec codec) {
91         this._codec = codec;
92         return this;
93     }
94 
95     Codec getCodec() {
96         return this._codec;
97     }
98 
99     NetConnectionHandler getHandler() {
100         return _connectHandler;
101     }
102 
103     NetServer setHandler(NetConnectionHandler handler) {
104         _connectHandler = handler;
105         return this;
106     }
107 
108     @property Address bindingAddress() {
109 		return _address;
110 	}
111 
112     void listen() {
113         listen("0.0.0.0", 0);
114     }
115 
116     void listen(int port) {
117         listen("0.0.0.0", port);
118     }
119 
120     void listen(string host, int port) {
121         _host = host;
122         _port = port;
123 
124         if (_isStarted)
125 			return;
126         _address = new InternetAddress(host, cast(ushort)port);
127 
128 		version(HUNT_DEBUG) infof("Start to listen on %s:%d", host, port);
129         version(HUNT_THREAD_DEBUG) {
130             import core.thread;
131             warningf("Threads: %d", Thread.getAll().length);
132         }
133         _group.start();
134 
135         try {
136 
137             static if(threadModel == ThreadMode.Multi) {   
138                 listeners = new TcpListener[_group.size];         
139                 for (size_t i = 0; i < _group.size; ++i) {
140                     listeners[i] = createServer(_group[i]);
141                     version(HUNT_DEBUG) infof("lister[%d] created", i);
142                 }
143                 version(HUNT_DEBUG) infof("All the servers are listening on %s.", _address.toString());
144             } else {
145                 tcpListener = new TcpSocket();
146 
147                 version (Windows) {
148                     import core.sys.windows.winsock2;
149                     bool flag = this._options.isReuseAddress() || this._options.isReusePort();
150                     tcpListener.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
151                 } else {
152                     import core.sys.posix.sys.socket : SO_REUSEPORT;
153 
154                     tcpListener.setOption(SocketOptionLevel.SOCKET, 
155                         SocketOption.REUSEADDR, _options.isReuseAddress());
156 
157                     tcpListener.setOption(SocketOptionLevel.SOCKET, 
158                         cast(SocketOption) SO_REUSEPORT, _options.isReusePort());
159                 }
160 
161                 tcpListener.bind(_address);
162                 tcpListener.listen(1000);
163 
164                 version(HUNT_DEBUG) {
165                     infof("Server is listening on %s%s.", _address.toString(), 
166                         _options.isSsl ? " (with SSL)" : "");
167                 }
168             }     
169 
170 		    _isStarted = true;
171             
172         } catch (Exception e) {
173             warning(e.message);
174             if (_connectHandler !is null)
175                 _connectHandler.failedOpeningConnection(0, e);
176         }
177 
178         // if (handler !is null)
179         //     handler(result);
180 
181         static if(threadModel == ThreadMode.Single) {
182             auto theTask = task(&waitingForAccept);
183             // taskPool.put(theTask);
184             theTask.executeInNewThread();
185            // waitingForAccept();
186         }
187     }
188 
189     override protected void initialize() {
190         listen(_host, _port);
191     }
192 
193 static if(threadModel == ThreadMode.Multi){
194     private TcpListener[] listeners;
195 
196     protected TcpListener createServer(EventLoop loop) {
197 		TcpListener listener = new TcpListener(loop, _address.addressFamily);
198 
199 		listener.reusePort(true);
200 		listener.bind(_address).listen(1024);
201         listener.onConnectionAccepted((TcpListener sender, TcpStream stream) {
202                 auto currentId = atomicOp!("+=")(_connectionId, 1);
203                 version(HUNT_DEBUG) tracef("new tcp connection: id=%d", currentId);
204                 TcpConnection connection = new TcpConnection(currentId, _options, _connectHandler, stream);
205                 // connection.setState(ConnectionState.Opened);
206                 if (_connectHandler !is null)
207                     _connectHandler.notifyConnectionOpened(connection);
208             });
209 		listener.start();
210 
211         return listener;
212 	}
213 
214     override protected void destroy() {
215         if(_isStarted) {
216             foreach(TcpListener ls; listeners) {
217                 if (ls !is null)
218                     ls.close();
219             }
220         }
221         
222         version(HUNT_DEBUG) warning("stopping the EventLoopGroup...");
223         _group.stop();
224     }
225 
226 } else {
227     private Socket tcpListener;
228 
229     private void waitingForAccept() {
230         while (_isStarted) {
231 			try {
232                 version(HUNT_THREAD_DEBUG) {
233                     import core.thread;
234                     tracef("Waiting for accept on %s:%d...(Threads: %d)", _host, _port, Thread.getAll().length);
235                 } else version (HUNT_DEBUG) {
236                     tracef("Waiting for accept on %s:%d...", _host, _port);
237                 }
238 				Socket client = tcpListener.accept();
239                 processClient(client);
240                 
241                 // auto processTask = task(&processClient, client);
242                 // taskPool.put(processTask);
243 			} catch (SocketAcceptException e) {
244 				warningf("Failure on accept %s", e.msg);
245 				version(HUNT_DEBUG) warning(e);
246 				_isStarted = false;
247 			}
248 		}
249     }
250     
251 	private void processClient(Socket socket) {
252         version(HUNT_METRIC_DEBUG) {
253             import core.time;
254             import hunt.util.DateTime;
255             debug trace("processing client...");
256             MonoTime startTime = MonoTime.currTime;
257         }
258         
259 		version (HUNT_DEBUG) {
260 			infof("new connection from %s, fd=%d", socket.remoteAddress.toString(), socket.handle());
261 		}
262 
263         TcpStreamOptions streamOptions = _options.toStreamOptions();
264 
265 		EventLoop loop = _group.nextLoop(cast(size_t)socket.handle());
266 		TcpStream stream = new TcpStream(loop, socket, streamOptions);
267 
268         auto currentId = atomicOp!("+=")(_connectionId, 1);
269         version(HUNT_DEBUG) tracef("New tcp connection: id=%d", currentId);
270         Connection connection = new TcpConnection(currentId, _options, _connectHandler, _codec, stream);
271         // connection.setState(ConnectionState.Opened);
272         if (_connectHandler !is null) {
273             _connectHandler.connectionOpened(connection);
274         }
275 		stream.start();
276 
277         version(HUNT_METRIC_DEBUG) { 
278             Duration timeElapsed = MonoTime.currTime - startTime;
279             warningf("peer connection processing done in: %d microseconds",
280                 timeElapsed.total!(TimeUnit.Microsecond)());
281         }
282 	}
283 
284     int actualPort() {
285         return _port;
286     }
287 
288     override void close() {
289         this.stop();
290     }
291 
292     override protected void destroy() {
293         if(_isStarted && tcpListener !is null) {
294             tcpListener.close();
295         }
296     }
297 
298     bool isOpen() {
299         return _isStarted;
300     }
301 }    
302 }