1 module hunt.net.NetClientImpl; 2 3 import hunt.net.TcpConnection; 4 import hunt.net.Connection; 5 import hunt.net.codec.Codec; 6 import hunt.net.EventLoopPool; 7 import hunt.net.NetClient; 8 import hunt.net.NetClientOptions; 9 10 import hunt.event.EventLoop; 11 import hunt.Exceptions; 12 import hunt.Functions; 13 import hunt.io.TcpStream; 14 import hunt.io.TcpStreamOptions; 15 import hunt.io.IoError; 16 import hunt.logging; 17 import hunt.util.ByteOrder; 18 import hunt.util.AbstractLifecycle; 19 import hunt.util.Lifecycle; 20 import hunt.util.pool; 21 22 import core.atomic; 23 import core.thread; 24 import std.format; 25 import std.parallelism; 26 27 28 /** 29 * 30 */ 31 class NetClientImpl : AbstractLifecycle, NetClient { 32 33 enum string DefaultLocalHost = "127.0.0.1"; 34 enum int DefaultLocalPort = 8080; 35 36 private string _host = DefaultLocalHost; 37 private int _port = DefaultLocalPort; 38 private string _serverName; 39 private static shared int _connectionId; 40 private int _currentId; 41 private NetClientOptions _options; 42 private Codec _codec; 43 private NetConnectionHandler _eventHandler; 44 private TcpConnection _tcpConnection; 45 private EventLoopPool _pool; 46 private EventLoop _loop; 47 private int _loopIdleTime = -1; 48 private Action _onClosed = null; 49 private shared bool _isConnected = false; 50 51 this(EventLoop loop, NetClientOptions options) { 52 _loop = loop; 53 54 this._options = options; 55 56 _currentId = atomicOp!("+=")(_connectionId, 1); 57 version (HUNT_NET_DEBUG) 58 tracef("Client ID: %d", _currentId); 59 } 60 61 this(EventLoopPool pool) { 62 this(pool, new NetClientOptions()); 63 } 64 65 this(EventLoopPool pool, NetClientOptions options) { 66 _pool = pool; 67 Duration timeout = options.getConnectTimeout; 68 version(HUNT_NET_DEBUG) tracef("Try to get a eventloop in %s", timeout); 69 _loop = pool.borrow(timeout, false); 70 71 this._options = options; 72 73 _currentId = atomicOp!("+=")(_connectionId, 1); 74 version (HUNT_NET_DEBUG) 75 tracef("Client ID: %d", _currentId); 76 } 77 78 ~this() @nogc { 79 // this.stop(); 80 } 81 82 int getId() { 83 return _currentId; 84 } 85 86 string getHost() { 87 return _host; 88 } 89 90 int getPort() { 91 return _port; 92 } 93 94 NetClientOptions getOptions() { 95 return _options; 96 } 97 98 NetClient setOptions(NetClientOptions options) { 99 if(isConnected()) { 100 throw new IOException("The options can't be set after the connection created."); 101 } 102 this._options = options; 103 return this; 104 } 105 106 NetClientImpl setCodec(Codec codec) { 107 this._codec = codec; 108 return this; 109 } 110 111 Codec getCodec() { 112 return this._codec; 113 } 114 115 void setOnClosed(Action callback) 116 { 117 if (_onClosed is null) 118 { 119 _onClosed = callback; 120 } 121 } 122 123 NetConnectionHandler getHandler() { 124 return this._eventHandler; 125 } 126 127 NetClientImpl setHandler(NetConnectionHandler handler) { 128 this._eventHandler = handler; 129 return this; 130 } 131 132 void connect() { 133 connect(DefaultLocalHost, DefaultLocalPort, ""); 134 } 135 136 void connect(string host, int port) { 137 connect(host, port, ""); 138 } 139 140 void connect(string host, int port, string serverName) { 141 142 if(isConnected()) { 143 throw new IOException("The connection has been created."); 144 } 145 146 if(isRunning()) { 147 warning("Busy with connecting..."); 148 return; 149 } 150 151 this._host = host; 152 this._port = port; 153 this._serverName = serverName; 154 155 super.start(); 156 } 157 158 override protected void initialize() { // doConnect 159 if(_loop.isReady()) { 160 initializeClient(); 161 } else { 162 _loop.runAsync(_loopIdleTime, &initializeClient); 163 } 164 // initializeClient(); 165 } 166 167 private void initializeClient() { 168 TcpStreamOptions options = _options.toStreamOptions(); 169 TcpStream _tcpStream = new TcpStream(_loop, options); 170 _tcpConnection = new TcpConnection(_currentId, _options, 171 _eventHandler, _codec, _tcpStream); 172 173 _tcpStream.closed(() { 174 TcpConnection conn = _tcpConnection; 175 if(!_isConnected || conn is null) { 176 version(HUNT_NET_DEBUG) trace("The connection has already been closed."); 177 } else { 178 version(HUNT_NET_DEBUG) { 179 infof("Connection %d closed", conn.getId()); 180 } 181 conn.setState(ConnectionState.Closed); 182 } 183 184 this.close(); 185 if (_onClosed !is null) { 186 _onClosed(); 187 } 188 189 // _isConnected = false; 190 191 //auto runTask = task((){ 192 // Thread.sleep(options.retryInterval); 193 // _tcpStream.reconnect(); 194 //}); 195 //taskPool.put(runTask); 196 197 }); 198 199 _tcpStream.error((IoError error) { 200 if(_tcpConnection !is null) 201 _tcpConnection.setState(ConnectionState.Error); 202 203 if (_eventHandler !is null) 204 _eventHandler.exceptionCaught(_tcpConnection, new Exception(error.errorMsg())); 205 }); 206 207 _tcpStream.connected((bool suc) { 208 if (suc) { 209 version (HUNT_DEBUG) trace("Connected with ", _tcpStream.remoteAddress.toString()); 210 // _tcpConnection.setState(ConnectionState.Opened); 211 _isConnected = true; 212 if (_eventHandler !is null) { 213 _eventHandler.connectionOpened(_tcpConnection); 214 } 215 } 216 else { 217 string msg = format("Failed to connect to %s:%d", _host, _port); 218 version(HUNT_DEBUG) warning(msg); 219 _isConnected = false; 220 221 if(_tcpConnection !is null) { 222 _tcpConnection.setState(ConnectionState.Error); 223 } 224 225 if(_eventHandler !is null) 226 _eventHandler.failedOpeningConnection(_currentId, new IOException(msg)); 227 } 228 229 }); 230 231 // _tcpConnection.setState(ConnectionState.Opening); 232 _tcpStream.connect(_host, cast(ushort)_port); 233 } 234 235 void close() { 236 // if(cas(&_isConnected, true, false) ) { 237 // this.stop(); 238 // } else { 239 // version(HUNT_NET_DEBUG) trace("Closed already."); 240 // } 241 version(HUNT_NET_DEBUG) tracef("isRunning: %s", this.isRunning()); 242 if(this.isRunning()) { 243 this.stop(); 244 } 245 } 246 247 override protected void destroy() { 248 TcpConnection conn = _tcpConnection; 249 250 if (conn !is null) { 251 version(HUNT_DEBUG) { 252 tracef("connection state: %s, isConnected: %s, isClosing: %s", 253 conn.getState(), 254 conn.isConnected(), conn.isClosing()); 255 } 256 257 _tcpConnection = null; 258 _isConnected = false; 259 260 if(!conn.isClosing()) { 261 conn.close(); 262 } 263 264 if (_eventHandler !is null) { 265 version(HUNT_NET_DEBUG) { 266 infof("Notifying connection %d with %s closed.", 267 conn.getId(), conn.remoteAddress()); 268 } 269 _eventHandler.connectionClosed(conn); 270 } 271 } 272 273 assert(_loop !is null); 274 275 if(_pool is null) { 276 version(HUNT_NET_DEBUG) { 277 tracef("Stopping the event loop %d", _loop.getId()); 278 } 279 _loop.stop(); 280 } else { 281 _pool.returnObject(_loop); 282 } 283 284 // _isConnected = false; 285 } 286 287 bool isConnected() { 288 return _isConnected; 289 //if(_tcpConnection is null) 290 // return false; 291 //else 292 // return _tcpConnection.isConnected(); 293 } 294 }