1 module hunt.net.NetClientImpl; 2 3 import hunt.net.TcpConnection; 4 import hunt.net.Connection; 5 import hunt.net.codec.Codec; 6 import hunt.net.EventLoopPool; 7 import hunt.net.NetClient; 8 import hunt.net.NetClientOptions; 9 10 import hunt.event.EventLoop; 11 import hunt.Exceptions; 12 import hunt.Functions; 13 import hunt.io.TcpStream; 14 import hunt.io.TcpStreamOptions; 15 import hunt.io.IoError; 16 import hunt.logging; 17 import hunt.util.ByteOrder; 18 import hunt.util.AbstractLifecycle; 19 import hunt.util.Lifecycle; 20 import hunt.util.pool; 21 22 import core.atomic; 23 import core.thread; 24 import std.format; 25 import std.parallelism; 26 27 28 /** 29 * 30 */ 31 class NetClientImpl : AbstractLifecycle, NetClient { 32 33 enum string DefaultLocalHost = "127.0.0.1"; 34 enum int DefaultLocalPort = 8080; 35 36 private string _host = DefaultLocalHost; 37 private int _port = DefaultLocalPort; 38 private string _serverName; 39 private static shared int _connectionId; 40 private int _currentId; 41 private NetClientOptions _options; 42 private Codec _codec; 43 private NetConnectionHandler _eventHandler; 44 private TcpConnection _tcpConnection; 45 // private TcpStream _tcpStream; 46 private EventLoopPool _pool; 47 private EventLoop _loop; 48 private int _loopIdleTime = -1; 49 private Action _onClosed = null; 50 private shared bool _isConnected = false; 51 52 this(EventLoop loop, NetClientOptions options) { 53 _loop = loop; 54 55 this._options = options; 56 57 _currentId = atomicOp!("+=")(_connectionId, 1); 58 version (HUNT_NET_DEBUG) 59 tracef("Client ID: %d", _currentId); 60 } 61 62 this(EventLoopPool pool) { 63 this(pool, new NetClientOptions()); 64 } 65 66 this(EventLoopPool pool, NetClientOptions options) { 67 _pool = pool; 68 _loop = pool.borrow(options.getConnectTimeout, false); 69 70 this._options = options; 71 72 _currentId = atomicOp!("+=")(_connectionId, 1); 73 version (HUNT_NET_DEBUG) 74 tracef("Client ID: %d", _currentId); 75 } 76 77 ~this() @nogc { 78 // this.stop(); 79 } 80 81 int getId() { 82 return _currentId; 83 } 84 85 string getHost() { 86 return _host; 87 } 88 89 int getPort() { 90 return _port; 91 } 92 93 NetClientOptions getOptions() { 94 return _options; 95 } 96 97 NetClient setOptions(NetClientOptions options) { 98 if(isConnected()) { 99 throw new IOException("The options can't be set after the connection created."); 100 } 101 this._options = options; 102 return this; 103 } 104 105 NetClientImpl setCodec(Codec codec) { 106 this._codec = codec; 107 return this; 108 } 109 110 Codec getCodec() { 111 return this._codec; 112 } 113 114 void setOnClosed(Action callback) 115 { 116 if (_onClosed is null) 117 { 118 _onClosed = callback; 119 } 120 } 121 122 NetConnectionHandler getHandler() { 123 return this._eventHandler; 124 } 125 126 NetClientImpl setHandler(NetConnectionHandler handler) { 127 this._eventHandler = handler; 128 return this; 129 } 130 131 void connect() { 132 connect(DefaultLocalHost, DefaultLocalPort, ""); 133 } 134 135 void connect(string host, int port) { 136 connect(host, port, ""); 137 } 138 139 void connect(string host, int port, string serverName) { 140 141 if(isConnected()) { 142 throw new IOException("The connection has been created."); 143 } 144 145 if(isRunning()) { 146 warning("Busy with connecting..."); 147 return; 148 } 149 150 this._host = host; 151 this._port = port; 152 this._serverName = serverName; 153 154 super.start(); 155 } 156 157 override protected void initialize() { // doConnect 158 if(_loop.isReady()) { 159 initializeClient(); 160 } else { 161 _loop.runAsync(_loopIdleTime, &initializeClient); 162 } 163 // initializeClient(); 164 } 165 166 private void initializeClient() { 167 TcpStreamOptions options = _options.toStreamOptions(); 168 TcpStream _tcpStream = new TcpStream(_loop, options); 169 _tcpConnection = new TcpConnection(_currentId, _options, 170 _eventHandler, _codec, _tcpStream); 171 172 _tcpStream.closed(() { 173 TcpConnection conn = _tcpConnection; 174 if(conn is null) { 175 version(HUNT_DEBUG) trace("The connection has already been closed."); 176 } else { 177 version(HUNT_NET_DEBUG) { 178 infof("Connection %d closed", _tcpConnection.getId()); 179 } 180 conn.setState(ConnectionState.Closed); 181 } 182 183 this.close(); 184 if (_onClosed !is null) { 185 _onClosed(); 186 } 187 188 // _isConnected = false; 189 190 //auto runTask = task((){ 191 // Thread.sleep(options.retryInterval); 192 // _tcpStream.reconnect(); 193 //}); 194 //taskPool.put(runTask); 195 196 }); 197 198 _tcpStream.error((IoError error) { 199 if(_tcpConnection !is null) 200 _tcpConnection.setState(ConnectionState.Error); 201 202 if (_eventHandler !is null) 203 _eventHandler.exceptionCaught(_tcpConnection, new Exception(error.errorMsg())); 204 }); 205 206 _tcpStream.connected((bool suc) { 207 if (suc) { 208 version (HUNT_DEBUG) trace("Connected with ", _tcpStream.remoteAddress.toString()); 209 // _tcpConnection.setState(ConnectionState.Opened); 210 _isConnected = true; 211 if (_eventHandler !is null) { 212 _eventHandler.connectionOpened(_tcpConnection); 213 } 214 } 215 else { 216 string msg = format("Failed to connect to %s:%d", _host, _port); 217 version(HUNT_DEBUG) warning(msg); 218 _isConnected = false; 219 220 if(_tcpConnection !is null) { 221 _tcpConnection.setState(ConnectionState.Error); 222 } 223 224 if(_eventHandler !is null) 225 _eventHandler.failedOpeningConnection(_currentId, new IOException(msg)); 226 } 227 228 }); 229 230 // _tcpConnection.setState(ConnectionState.Opening); 231 _tcpStream.connect(_host, cast(ushort)_port); 232 } 233 234 void close() { 235 // if(cas(&_isConnected, true, false) ) { 236 // this.stop(); 237 // } else { 238 // version(HUNT_NET_DEBUG) trace("Closed already."); 239 // } 240 version(HUNT_NET_DEBUG) tracef("isRunning: %s", this.isRunning()); 241 if(this.isRunning()) { 242 this.stop(); 243 } 244 } 245 246 override protected void destroy() { 247 TcpConnection conn = _tcpConnection; 248 _tcpConnection = null; 249 250 if (conn !is null) { 251 version(HUNT_NET_DEBUG) { 252 tracef("connection state: %s, isConnected: %s, isClosing: %s", 253 conn.getState(), 254 conn.isConnected(), conn.isClosing()); 255 } 256 257 if(!conn.isClosing()) { 258 conn.close(); 259 } 260 261 if (_eventHandler !is null) { 262 version(HUNT_NET_DEBUG) { 263 infof("Notifying connection %d with %s closed.", 264 conn.getId(), conn.remoteAddress()); 265 } 266 _eventHandler.connectionClosed(conn); 267 } 268 } 269 270 assert(_loop !is null); 271 272 if(_pool is null) { 273 _loop.stop(); 274 } else { 275 _pool.returnObject(_loop); 276 } 277 278 _isConnected = false; 279 } 280 281 bool isConnected() { 282 return _isConnected; 283 //if(_tcpConnection is null) 284 // return false; 285 //else 286 // return _tcpConnection.isConnected(); 287 } 288 }