1 module hunt.net.NetServerImpl;
2 
3 import hunt.net.Connection;
4 import hunt.net.codec;
5 import hunt.net.NetServer;
6 import hunt.net.NetServerOptions;
7 import hunt.net.TcpConnection;
8 
9 import hunt.event; 
10 import hunt.io;
11 import hunt.logging;
12 import hunt.util.AbstractLifecycle;
13 import hunt.util.Lifecycle;
14 
15 import core.atomic;
16 import std.conv;
17 import std.parallelism;
18 import std.socket;
19 
20 enum ThreadMode {
21     Single,
22     Multi
23 }
24 
25 import hunt.util.DateTime;
26 
27 shared static this() {
28     DateTime.startClock();
29 }
30 
31 shared static ~this() @nogc {
32     DateTime.stopClock();
33 }
34 
35 
36 /**
37  * 
38  */
39 class NetServerImpl(ThreadMode threadModel = ThreadMode.Single) : AbstractLifecycle, NetServer {
40     private string _host = NetServerOptions.DEFAULT_HOST;
41     private int _port = NetServerOptions.DEFAULT_PORT;
42     protected bool _isStarted;
43     private shared int _connectionId;
44     protected EventLoopGroup _group = null;
45     private NetServerOptions _options;
46     private Codec _codec;
47     private NetConnectionHandler _connectHandler;
48 
49 	protected Address _address;
50 
51     this() {
52         this(new NetServerOptions());
53     }
54 
55     this(NetServerOptions options) {
56         this(new EventLoopGroup(options.ioThreadSize(), options.workerThreadSize()), new NetServerOptions());
57     }
58 
59     this(EventLoopGroup loopGroup, NetServerOptions options) {
60         _group = loopGroup;
61         _options = options;
62 
63         version(Posix) {
64             // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe
65             // https://github.com/huntlabs/hunt-framework/issues/161
66             import core.sys.posix.signal;
67             sigset_t sigset;
68             sigemptyset(&sigset);
69             sigaction_t siginfo;
70             siginfo.sa_mask = sigset;
71             siginfo.sa_flags = SA_RESTART;
72             siginfo.sa_handler = SIG_IGN;
73             sigaction(SIGPIPE, &siginfo, null);
74         }        
75     }
76 
77     EventLoopGroup eventLoopGroup() {
78         return _group;
79     }
80 
81     NetServerOptions getOptions() {
82         return _options;
83     }
84     
85     // NetServer setOptions(NetServerOptions options) {
86     //     _options = options;
87     //     return this;
88     // }
89 
90     NetServer setCodec(Codec codec) {
91         this._codec = codec;
92         return this;
93     }
94 
95     Codec getCodec() {
96         return this._codec;
97     }
98 
99     NetConnectionHandler getHandler() {
100         return _connectHandler;
101     }
102 
103     NetServer setHandler(NetConnectionHandler handler) {
104         _connectHandler = handler;
105         return this;
106     }
107 
108     @property Address bindingAddress() {
109 		return _address;
110 	}
111 
112 
113     void listen() {
114         listen("0.0.0.0", 0);
115     }
116 
117     void listen(int port) {
118         listen("0.0.0.0", port);
119     }
120 
121     void listen(string host, int port) {
122         _host = host;
123         _port = port;
124 
125         if (_isStarted)
126 			return;
127         _address = new InternetAddress(host, cast(ushort)port);
128 
129 		version(HUNT_DEBUG) infof("Start to listen on %s:%d", host, port);
130         version(HUNT_THREAD_DEBUG) {
131             import core.thread;
132             warningf("Threads: %d", Thread.getAll().length);
133         }
134         _group.start();
135 
136         try {
137 
138             static if(threadModel == ThreadMode.Multi) {   
139                 listeners = new TcpListener[_group.size];         
140                 for (size_t i = 0; i < _group.size; ++i) {
141                     listeners[i] = createServer(_group[i]);
142                     version(HUNT_DEBUG) infof("lister[%d] created", i);
143                 }
144                 version(HUNT_DEBUG) infof("All the servers are listening on %s.", _address.toString());
145             } else {
146                 tcpListener = new TcpSocket();
147 
148                 version (Windows) {
149                     import core.sys.windows.winsock2;
150                     bool flag = this._options.isReuseAddress() || this._options.isReusePort();
151                     tcpListener.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag);
152                 } else {
153                     import core.sys.posix.sys.socket : SO_REUSEPORT;
154 
155                     tcpListener.setOption(SocketOptionLevel.SOCKET, 
156                         SocketOption.REUSEADDR, _options.isReuseAddress());
157 
158                     tcpListener.setOption(SocketOptionLevel.SOCKET, 
159                         cast(SocketOption) SO_REUSEPORT, _options.isReusePort());
160                 }
161 
162                 tcpListener.bind(_address);
163                 tcpListener.listen(1000);
164 
165                 version(HUNT_DEBUG) {
166                     infof("Server is listening on %s%s.", _address.toString(), 
167                         _options.isSsl ? " (with SSL)" : "");
168                 }
169             }     
170 
171 		    _isStarted = true;
172             
173         } catch (Exception e) {
174             warning(e.message);
175             if (_connectHandler !is null)
176                 _connectHandler.failedOpeningConnection(0, e);
177         }
178 
179         // if (handler !is null)
180         //     handler(result);
181 
182         static if(threadModel == ThreadMode.Single) {
183             auto theTask = task(&waitingForAccept);
184             // taskPool.put(theTask);
185             theTask.executeInNewThread();
186            // waitingForAccept();
187         }
188     }
189 
190     override protected void initialize() {
191         listen(_host, _port);
192     }
193 
194 static if(threadModel == ThreadMode.Multi){
195     private TcpListener[] listeners;
196 
197     protected TcpListener createServer(EventLoop loop) {
198 		TcpListener listener = new TcpListener(loop, _address.addressFamily);
199 
200 		listener.reusePort(true);
201 		listener.bind(_address).listen(1024);
202         listener.onConnectionAccepted((TcpListener sender, TcpStream stream) {
203                 auto currentId = atomicOp!("+=")(_connectionId, 1);
204                 version(HUNT_DEBUG) tracef("new tcp connection: id=%d", currentId);
205                 TcpConnection connection = new TcpConnection(currentId, _options, _connectHandler, stream);
206                 // connection.setState(ConnectionState.Opened);
207                 if (_connectHandler !is null)
208                     _connectHandler.notifyConnectionOpened(connection);
209             });
210 		listener.start();
211 
212         return listener;
213 	}
214 
215     override protected void destroy() {
216         if(_isStarted) {
217             foreach(TcpListener ls; listeners) {
218                 if (ls !is null)
219                     ls.close();
220             }
221         }
222         
223         version(HUNT_DEBUG) warning("stopping the EventLoopGroup...");
224         _group.stop();
225     }
226 
227 } else {
228     private Socket tcpListener;
229 
230     private void waitingForAccept() {
231         while (_isStarted) {
232 			try {
233                 version(HUNT_THREAD_DEBUG) {
234                     import core.thread;
235                     tracef("Waiting for accept on %s:%d...(Threads: %d)", _host, _port, Thread.getAll().length);
236                 } else version (HUNT_DEBUG) {
237                     tracef("Waiting for accept on %s:%d...", _host, _port);
238                 }
239 				Socket client = tcpListener.accept();
240                 processClient(client);
241                 
242                 // auto processTask = task(&processClient, client);
243                 // taskPool.put(processTask);
244 			} catch (SocketAcceptException e) {
245 				warningf("Failure on accept %s", e.msg);
246 				version(HUNT_DEBUG) warning(e);
247 				_isStarted = false;
248 			}
249 		}
250     }
251     
252 	private void processClient(Socket socket) {
253         version(HUNT_METRIC) {
254             import core.time;
255             import hunt.util.DateTime;
256             debug trace("processing client...");
257             MonoTime startTime = MonoTime.currTime;
258         }
259         
260 		version (HUNT_DEBUG) {
261 			infof("new connection from %s, fd=%d", socket.remoteAddress.toString(), socket.handle());
262 		}
263 
264         TcpStreamOptions streamOptions = _options.toStreamOptions();
265 
266 		EventLoop loop = _group.nextLoop(cast(size_t)socket.handle());
267 		TcpStream stream = new TcpStream(loop, socket, streamOptions);
268 
269         auto currentId = atomicOp!("+=")(_connectionId, 1);
270         version(HUNT_DEBUG) tracef("New tcp connection: id=%d", currentId);
271         Connection connection = new TcpConnection(currentId, _options, _connectHandler, _codec, stream);
272         // connection.setState(ConnectionState.Opened);
273         if (_connectHandler !is null) {
274             _connectHandler.connectionOpened(connection);
275         }
276 		stream.start();
277 
278         version(HUNT_METRIC) { 
279             Duration timeElapsed = MonoTime.currTime - startTime;
280             warningf("peer connection processing done in: %d microseconds",
281                 timeElapsed.total!(TimeUnit.Microsecond)());
282         }
283 	}
284 
285     int actualPort() {
286         return _port;
287     }
288 
289     override void close() {
290         this.stop();
291     }
292 
293     override protected void destroy() {
294         if(_isStarted && tcpListener !is null) {
295             tcpListener.close();
296         }
297     }
298 
299     bool isOpen() {
300         return _isStarted;
301     }
302 }    
303 }