1 module hunt.net.NetServerImpl; 2 3 import hunt.net.Connection; 4 import hunt.net.codec; 5 import hunt.net.NetServer; 6 import hunt.net.NetServerOptions; 7 import hunt.net.TcpConnection; 8 9 import hunt.event; 10 import hunt.io; 11 import hunt.logging; 12 import hunt.util.AbstractLifecycle; 13 import hunt.util.Lifecycle; 14 15 import core.atomic; 16 import std.conv; 17 import std.parallelism; 18 import std.socket; 19 20 enum ThreadMode { 21 Single, 22 Multi 23 } 24 25 import hunt.util.DateTime; 26 27 shared static this() { 28 DateTime.startClock(); 29 } 30 31 shared static ~this() @nogc { 32 DateTime.stopClock(); 33 } 34 35 36 /** 37 * 38 */ 39 class NetServerImpl(ThreadMode threadModel = ThreadMode.Single) : AbstractLifecycle, NetServer { 40 private string _host = NetServerOptions.DEFAULT_HOST; 41 private int _port = NetServerOptions.DEFAULT_PORT; 42 protected bool _isStarted; 43 private shared int _connectionId; 44 protected EventLoopGroup _group = null; 45 private NetServerOptions _options; 46 private Codec _codec; 47 private NetConnectionHandler _connectHandler; 48 49 protected Address _address; 50 51 this() { 52 this(new NetServerOptions()); 53 } 54 55 this(NetServerOptions options) { 56 this(new EventLoopGroup(options.ioThreadSize(), options.workerThreadSize()), new NetServerOptions()); 57 } 58 59 this(EventLoopGroup loopGroup, NetServerOptions options) { 60 _group = loopGroup; 61 _options = options; 62 63 version(Posix) { 64 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 65 // https://github.com/huntlabs/hunt-framework/issues/161 66 import core.sys.posix.signal; 67 sigset_t sigset; 68 sigemptyset(&sigset); 69 sigaction_t siginfo; 70 siginfo.sa_mask = sigset; 71 siginfo.sa_flags = SA_RESTART; 72 siginfo.sa_handler = SIG_IGN; 73 sigaction(SIGPIPE, &siginfo, null); 74 } 75 } 76 77 EventLoopGroup eventLoopGroup() { 78 return _group; 79 } 80 81 NetServerOptions getOptions() { 82 return _options; 83 } 84 85 // NetServer setOptions(NetServerOptions options) { 86 // _options = options; 87 // return this; 88 // } 89 90 NetServer setCodec(Codec codec) { 91 this._codec = codec; 92 return this; 93 } 94 95 Codec getCodec() { 96 return this._codec; 97 } 98 99 NetConnectionHandler getHandler() { 100 return _connectHandler; 101 } 102 103 NetServer setHandler(NetConnectionHandler handler) { 104 _connectHandler = handler; 105 return this; 106 } 107 108 @property Address bindingAddress() { 109 return _address; 110 } 111 112 void listen() { 113 listen("0.0.0.0", 0); 114 } 115 116 void listen(int port) { 117 listen("0.0.0.0", port); 118 } 119 120 void listen(string host, int port) { 121 _host = host; 122 _port = port; 123 124 if (_isStarted) 125 return; 126 _address = new InternetAddress(host, cast(ushort)port); 127 128 version(HUNT_DEBUG) infof("Start to listen on %s:%d", host, port); 129 version(HUNT_THREAD_DEBUG) { 130 import core.thread; 131 warningf("Threads: %d", Thread.getAll().length); 132 } 133 _group.start(); 134 135 try { 136 137 static if(threadModel == ThreadMode.Multi) { 138 listeners = new TcpListener[_group.size]; 139 for (size_t i = 0; i < _group.size; ++i) { 140 listeners[i] = createServer(_group[i]); 141 version(HUNT_DEBUG) infof("lister[%d] created", i); 142 } 143 version(HUNT_DEBUG) infof("All the servers are listening on %s.", _address.toString()); 144 } else { 145 tcpListener = new TcpSocket(); 146 147 version (Windows) { 148 import core.sys.windows.winsock2; 149 bool flag = this._options.isReuseAddress() || this._options.isReusePort(); 150 tcpListener.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag); 151 } else { 152 import core.sys.posix.sys.socket : SO_REUSEPORT; 153 154 tcpListener.setOption(SocketOptionLevel.SOCKET, 155 SocketOption.REUSEADDR, _options.isReuseAddress()); 156 157 tcpListener.setOption(SocketOptionLevel.SOCKET, 158 cast(SocketOption) SO_REUSEPORT, _options.isReusePort()); 159 } 160 161 tcpListener.bind(_address); 162 tcpListener.listen(1000); 163 164 version(HUNT_DEBUG) { 165 infof("Server is listening on %s%s.", _address.toString(), 166 _options.isSsl ? " (with SSL)" : ""); 167 } 168 } 169 170 _isStarted = true; 171 172 } catch (Exception e) { 173 warning(e.message); 174 if (_connectHandler !is null) 175 _connectHandler.failedOpeningConnection(0, e); 176 } 177 178 // if (handler !is null) 179 // handler(result); 180 181 static if(threadModel == ThreadMode.Single) { 182 auto theTask = task(&waitingForAccept); 183 // taskPool.put(theTask); 184 theTask.executeInNewThread(); 185 // waitingForAccept(); 186 } 187 } 188 189 override protected void initialize() { 190 listen(_host, _port); 191 } 192 193 static if(threadModel == ThreadMode.Multi){ 194 private TcpListener[] listeners; 195 196 protected TcpListener createServer(EventLoop loop) { 197 TcpListener listener = new TcpListener(loop, _address.addressFamily); 198 199 listener.reusePort(true); 200 listener.bind(_address).listen(1024); 201 listener.onConnectionAccepted((TcpListener sender, TcpStream stream) { 202 auto currentId = atomicOp!("+=")(_connectionId, 1); 203 version(HUNT_DEBUG) tracef("new tcp connection: id=%d", currentId); 204 TcpConnection connection = new TcpConnection(currentId, _options, _connectHandler, stream); 205 // connection.setState(ConnectionState.Opened); 206 if (_connectHandler !is null) 207 _connectHandler.notifyConnectionOpened(connection); 208 }); 209 listener.start(); 210 211 return listener; 212 } 213 214 override protected void destroy() { 215 if(_isStarted) { 216 foreach(TcpListener ls; listeners) { 217 if (ls !is null) 218 ls.close(); 219 } 220 } 221 222 version(HUNT_DEBUG) warning("stopping the EventLoopGroup..."); 223 _group.stop(); 224 } 225 226 } else { 227 private Socket tcpListener; 228 229 private void waitingForAccept() { 230 while (_isStarted) { 231 try { 232 version(HUNT_THREAD_DEBUG) { 233 import core.thread; 234 tracef("Waiting for accept on %s:%d...(Threads: %d)", _host, _port, Thread.getAll().length); 235 } else version (HUNT_DEBUG) { 236 tracef("Waiting for accept on %s:%d...", _host, _port); 237 } 238 Socket client = tcpListener.accept(); 239 processClient(client); 240 241 // auto processTask = task(&processClient, client); 242 // taskPool.put(processTask); 243 } catch (SocketAcceptException e) { 244 warningf("Failure on accept %s", e.msg); 245 version(HUNT_DEBUG) warning(e); 246 _isStarted = false; 247 } 248 } 249 } 250 251 private void processClient(Socket socket) { 252 version(HUNT_METRIC_DEBUG) { 253 import core.time; 254 import hunt.util.DateTime; 255 debug trace("processing client..."); 256 MonoTime startTime = MonoTime.currTime; 257 } 258 259 version (HUNT_DEBUG) { 260 infof("new connection from %s, fd=%d", socket.remoteAddress.toString(), socket.handle()); 261 } 262 263 TcpStreamOptions streamOptions = _options.toStreamOptions(); 264 265 EventLoop loop = _group.nextLoop(cast(size_t)socket.handle()); 266 TcpStream stream = new TcpStream(loop, socket, streamOptions); 267 268 auto currentId = atomicOp!("+=")(_connectionId, 1); 269 version(HUNT_DEBUG) tracef("New tcp connection: id=%d", currentId); 270 Connection connection = new TcpConnection(currentId, _options, _connectHandler, _codec, stream); 271 // connection.setState(ConnectionState.Opened); 272 if (_connectHandler !is null) { 273 _connectHandler.connectionOpened(connection); 274 } 275 stream.start(); 276 277 version(HUNT_METRIC_DEBUG) { 278 Duration timeElapsed = MonoTime.currTime - startTime; 279 warningf("peer connection processing done in: %d microseconds", 280 timeElapsed.total!(TimeUnit.Microsecond)()); 281 } 282 } 283 284 int actualPort() { 285 return _port; 286 } 287 288 override void close() { 289 this.stop(); 290 } 291 292 override protected void destroy() { 293 if(_isStarted && tcpListener !is null) { 294 tcpListener.close(); 295 } 296 } 297 298 bool isOpen() { 299 return _isStarted; 300 } 301 } 302 }