1 module hunt.net.NetServerImpl; 2 3 import hunt.net.Connection; 4 import hunt.net.codec; 5 import hunt.net.NetServer; 6 import hunt.net.NetServerOptions; 7 import hunt.net.TcpConnection; 8 9 import hunt.event; 10 import hunt.io; 11 import hunt.logging; 12 import hunt.util.AbstractLifecycle; 13 import hunt.util.Lifecycle; 14 15 import core.atomic; 16 import std.conv; 17 import std.parallelism; 18 import std.socket; 19 20 enum ThreadMode { 21 Single, 22 Multi 23 } 24 25 import hunt.util.DateTime; 26 27 shared static this() { 28 DateTime.startClock(); 29 } 30 31 shared static ~this() @nogc { 32 DateTime.stopClock(); 33 } 34 35 36 /** 37 * 38 */ 39 class NetServerImpl(ThreadMode threadModel = ThreadMode.Single) : AbstractLifecycle, NetServer { 40 private string _host = NetServerOptions.DEFAULT_HOST; 41 private int _port = NetServerOptions.DEFAULT_PORT; 42 protected bool _isStarted; 43 private shared int _connectionId; 44 protected EventLoopGroup _group = null; 45 private NetServerOptions _options; 46 private Codec _codec; 47 private NetConnectionHandler _connectHandler; 48 49 protected Address _address; 50 51 this() { 52 this(new NetServerOptions()); 53 } 54 55 this(NetServerOptions options) { 56 this(new EventLoopGroup(options.ioThreadSize(), options.workerThreadSize()), new NetServerOptions()); 57 } 58 59 this(EventLoopGroup loopGroup, NetServerOptions options) { 60 _group = loopGroup; 61 _options = options; 62 63 version(Posix) { 64 // https://stackoverflow.com/questions/6824265/sigpipe-broken-pipe 65 // https://github.com/huntlabs/hunt-framework/issues/161 66 import core.sys.posix.signal; 67 sigset_t sigset; 68 sigemptyset(&sigset); 69 sigaction_t siginfo; 70 siginfo.sa_mask = sigset; 71 siginfo.sa_flags = SA_RESTART; 72 siginfo.sa_handler = SIG_IGN; 73 sigaction(SIGPIPE, &siginfo, null); 74 } 75 } 76 77 EventLoopGroup eventLoopGroup() { 78 return _group; 79 } 80 81 NetServerOptions getOptions() { 82 return _options; 83 } 84 85 // NetServer setOptions(NetServerOptions options) { 86 // _options = options; 87 // return this; 88 // } 89 90 NetServer setCodec(Codec codec) { 91 this._codec = codec; 92 return this; 93 } 94 95 Codec getCodec() { 96 return this._codec; 97 } 98 99 NetConnectionHandler getHandler() { 100 return _connectHandler; 101 } 102 103 NetServer setHandler(NetConnectionHandler handler) { 104 _connectHandler = handler; 105 return this; 106 } 107 108 @property Address bindingAddress() { 109 return _address; 110 } 111 112 113 void listen() { 114 listen("0.0.0.0", 0); 115 } 116 117 void listen(int port) { 118 listen("0.0.0.0", port); 119 } 120 121 void listen(string host, int port) { 122 _host = host; 123 _port = port; 124 125 if (_isStarted) 126 return; 127 _address = new InternetAddress(host, cast(ushort)port); 128 129 version(HUNT_DEBUG) infof("Start to listen on %s:%d", host, port); 130 version(HUNT_THREAD_DEBUG) { 131 import core.thread; 132 warningf("Threads: %d", Thread.getAll().length); 133 } 134 _group.start(); 135 136 try { 137 138 static if(threadModel == ThreadMode.Multi) { 139 listeners = new TcpListener[_group.size]; 140 for (size_t i = 0; i < _group.size; ++i) { 141 listeners[i] = createServer(_group[i]); 142 version(HUNT_DEBUG) infof("lister[%d] created", i); 143 } 144 version(HUNT_DEBUG) infof("All the servers are listening on %s.", _address.toString()); 145 } else { 146 tcpListener = new TcpSocket(); 147 148 version (Windows) { 149 import core.sys.windows.winsock2; 150 bool flag = this._options.isReuseAddress() || this._options.isReusePort(); 151 tcpListener.setOption(SocketOptionLevel.SOCKET, cast(SocketOption) SO_EXCLUSIVEADDRUSE, !flag); 152 } else { 153 import core.sys.posix.sys.socket : SO_REUSEPORT; 154 155 tcpListener.setOption(SocketOptionLevel.SOCKET, 156 SocketOption.REUSEADDR, _options.isReuseAddress()); 157 158 tcpListener.setOption(SocketOptionLevel.SOCKET, 159 cast(SocketOption) SO_REUSEPORT, _options.isReusePort()); 160 } 161 162 tcpListener.bind(_address); 163 tcpListener.listen(1000); 164 165 version(HUNT_DEBUG) { 166 infof("Server is listening on %s%s.", _address.toString(), 167 _options.isSsl ? " (with SSL)" : ""); 168 } 169 } 170 171 _isStarted = true; 172 173 } catch (Exception e) { 174 warning(e.message); 175 if (_connectHandler !is null) 176 _connectHandler.failedOpeningConnection(0, e); 177 } 178 179 // if (handler !is null) 180 // handler(result); 181 182 static if(threadModel == ThreadMode.Single) { 183 auto theTask = task(&waitingForAccept); 184 // taskPool.put(theTask); 185 theTask.executeInNewThread(); 186 // waitingForAccept(); 187 } 188 } 189 190 override protected void initialize() { 191 listen(_host, _port); 192 } 193 194 static if(threadModel == ThreadMode.Multi){ 195 private TcpListener[] listeners; 196 197 protected TcpListener createServer(EventLoop loop) { 198 TcpListener listener = new TcpListener(loop, _address.addressFamily); 199 200 listener.reusePort(true); 201 listener.bind(_address).listen(1024); 202 listener.onConnectionAccepted((TcpListener sender, TcpStream stream) { 203 auto currentId = atomicOp!("+=")(_connectionId, 1); 204 version(HUNT_DEBUG) tracef("new tcp connection: id=%d", currentId); 205 TcpConnection connection = new TcpConnection(currentId, _options, _connectHandler, stream); 206 // connection.setState(ConnectionState.Opened); 207 if (_connectHandler !is null) 208 _connectHandler.notifyConnectionOpened(connection); 209 }); 210 listener.start(); 211 212 return listener; 213 } 214 215 override protected void destroy() { 216 if(_isStarted) { 217 foreach(TcpListener ls; listeners) { 218 if (ls !is null) 219 ls.close(); 220 } 221 } 222 223 version(HUNT_DEBUG) warning("stopping the EventLoopGroup..."); 224 _group.stop(); 225 } 226 227 } else { 228 private Socket tcpListener; 229 230 private void waitingForAccept() { 231 while (_isStarted) { 232 try { 233 version(HUNT_THREAD_DEBUG) { 234 import core.thread; 235 tracef("Waiting for accept on %s:%d...(Threads: %d)", _host, _port, Thread.getAll().length); 236 } else version (HUNT_DEBUG) { 237 tracef("Waiting for accept on %s:%d...", _host, _port); 238 } 239 Socket client = tcpListener.accept(); 240 processClient(client); 241 242 // auto processTask = task(&processClient, client); 243 // taskPool.put(processTask); 244 } catch (SocketAcceptException e) { 245 warningf("Failure on accept %s", e.msg); 246 version(HUNT_DEBUG) warning(e); 247 _isStarted = false; 248 } 249 } 250 } 251 252 private void processClient(Socket socket) { 253 version(HUNT_METRIC) { 254 import core.time; 255 import hunt.util.DateTime; 256 debug trace("processing client..."); 257 MonoTime startTime = MonoTime.currTime; 258 } 259 260 version (HUNT_DEBUG) { 261 infof("new connection from %s, fd=%d", socket.remoteAddress.toString(), socket.handle()); 262 } 263 264 TcpStreamOptions streamOptions = _options.toStreamOptions(); 265 266 EventLoop loop = _group.nextLoop(cast(size_t)socket.handle()); 267 TcpStream stream = new TcpStream(loop, socket, streamOptions); 268 269 auto currentId = atomicOp!("+=")(_connectionId, 1); 270 version(HUNT_DEBUG) tracef("New tcp connection: id=%d", currentId); 271 Connection connection = new TcpConnection(currentId, _options, _connectHandler, _codec, stream); 272 // connection.setState(ConnectionState.Opened); 273 if (_connectHandler !is null) { 274 _connectHandler.connectionOpened(connection); 275 } 276 stream.start(); 277 278 version(HUNT_METRIC) { 279 Duration timeElapsed = MonoTime.currTime - startTime; 280 warningf("peer connection processing done in: %d microseconds", 281 timeElapsed.total!(TimeUnit.Microsecond)()); 282 } 283 } 284 285 int actualPort() { 286 return _port; 287 } 288 289 override void close() { 290 this.stop(); 291 } 292 293 override protected void destroy() { 294 if(_isStarted && tcpListener !is null) { 295 tcpListener.close(); 296 } 297 } 298 299 bool isOpen() { 300 return _isStarted; 301 } 302 } 303 }