1 module hunt.net.NetClientImpl;
2 
3 import hunt.net.TcpConnection;
4 import hunt.net.Connection;
5 import hunt.net.codec.Codec;
6 import hunt.net.EventLoopPool;
7 import hunt.net.NetClient;
8 import hunt.net.NetClientOptions;
9 
10 import hunt.event.EventLoop;
11 import hunt.Exceptions;
12 import hunt.Functions;
13 import hunt.io.TcpStream;
14 import hunt.io.TcpStreamOptions;
15 import hunt.io.IoError;
16 import hunt.logging;
17 import hunt.util.ByteOrder;
18 import hunt.util.AbstractLifecycle;
19 import hunt.util.Lifecycle;
20 import hunt.util.pool;
21 
22 import core.atomic;
23 import core.thread;
24 import std.format;
25 import std.parallelism;
26 
27 
28 /**
29  *
30  */
31 class NetClientImpl : AbstractLifecycle, NetClient {
32 
33     enum string DefaultLocalHost = "127.0.0.1";
34     enum int DefaultLocalPort = 8080;
35 
36     private string _host = DefaultLocalHost;
37     private int _port = DefaultLocalPort;
38     private string _serverName;
39     private static shared int _connectionId;
40     private int _currentId;
41     private NetClientOptions _options;
42     private Codec _codec;
43     private NetConnectionHandler _eventHandler;
44     private TcpConnection _tcpConnection;
45     private EventLoopPool _pool;
46     private EventLoop _loop;
47     private int _loopIdleTime = -1;
48     private Action _onClosed = null;
49     private shared bool _isConnected = false;
50 
51     this(EventLoop loop, NetClientOptions options) {
52         _loop = loop;
53         
54         this._options = options;
55 
56         _currentId = atomicOp!("+=")(_connectionId, 1);
57         version (HUNT_NET_DEBUG)
58             tracef("Client ID: %d", _currentId);
59     }
60 
61     this(EventLoopPool pool) {
62         this(pool, new NetClientOptions());
63     }
64 
65     this(EventLoopPool pool, NetClientOptions options) {
66         _pool = pool;
67         Duration timeout = options.getConnectTimeout;
68         version(HUNT_NET_DEBUG) tracef("Try to get a eventloop in %s", timeout);
69         _loop = pool.borrow(timeout, false);
70 
71         this._options = options;
72 
73         _currentId = atomicOp!("+=")(_connectionId, 1);
74         version (HUNT_NET_DEBUG)
75             tracef("Client ID: %d", _currentId);
76     }
77 
78     ~this() @nogc {
79         // this.stop();
80     }
81 
82     int getId() {
83         return _currentId;
84     }
85 
86     string getHost() {
87         return _host;
88     }
89 
90     int getPort() {
91         return _port;
92     }
93 
94     NetClientOptions getOptions() {
95         return _options;
96     }
97 
98     NetClient setOptions(NetClientOptions options) {
99         if(isConnected()) {
100             throw new IOException("The options can't be set after the connection created.");
101         }
102         this._options = options;
103         return this;
104     }
105 
106     NetClientImpl setCodec(Codec codec) {
107         this._codec = codec;
108         return this;
109     }
110 
111     Codec getCodec() {
112         return this._codec;
113     }
114 
115     void setOnClosed(Action callback)
116     {
117         if (_onClosed is null)
118         {
119             _onClosed = callback;
120         }
121     }
122 
123     NetConnectionHandler getHandler() {
124         return this._eventHandler;
125     }
126 
127     NetClientImpl setHandler(NetConnectionHandler handler) {
128         this._eventHandler = handler;
129         return this;
130     }
131 
132     void connect() {
133         connect(DefaultLocalHost, DefaultLocalPort, "");
134     }
135 
136     void connect(string host, int port) {
137         connect(host, port, "");
138     }
139 
140     void connect(string host, int port, string serverName) {
141         
142         if(isConnected()) {
143             throw new IOException("The connection has been created.");
144         }
145 
146         if(isRunning()) {
147             warning("Busy with connecting...");
148             return;
149         }
150 
151         this._host = host;
152         this._port = port;
153         this._serverName = serverName;
154 
155         super.start();
156     }
157 
158     override protected void initialize() { // doConnect
159         if(_loop.isReady()) {
160             initializeClient();
161         } else {
162             _loop.runAsync(_loopIdleTime, &initializeClient);
163         }
164         // initializeClient();
165     }
166 
167     private void initializeClient() {
168         TcpStreamOptions options = _options.toStreamOptions();
169         TcpStream _tcpStream = new TcpStream(_loop, options);
170         _tcpConnection = new TcpConnection(_currentId, _options,
171                 _eventHandler, _codec, _tcpStream);
172 
173         _tcpStream.closed(() {
174             TcpConnection conn = _tcpConnection;
175             if(!_isConnected || conn is null) {
176                 version(HUNT_NET_DEBUG) trace("The connection has already been closed.");
177             } else {
178                 version(HUNT_NET_DEBUG) {
179                     infof("Connection %d closed", conn.getId());
180                 }
181                 conn.setState(ConnectionState.Closed);
182             }
183 
184             this.close();
185             if (_onClosed !is null) {
186                 _onClosed();
187             }
188 
189             // _isConnected = false;
190 
191             //auto runTask = task((){
192             //    Thread.sleep(options.retryInterval);
193             //    _tcpStream.reconnect();
194             //});
195             //taskPool.put(runTask);
196 
197         });
198 
199         _tcpStream.error((IoError error) {
200             if(_tcpConnection !is null)
201                 _tcpConnection.setState(ConnectionState.Error);
202                 
203             if (_eventHandler !is null)
204                 _eventHandler.exceptionCaught(_tcpConnection, new Exception(error.errorMsg()));
205         });
206 
207         _tcpStream.connected((bool suc) {
208             if (suc) {
209 			    version (HUNT_DEBUG) trace("Connected with ", _tcpStream.remoteAddress.toString());
210                 // _tcpConnection.setState(ConnectionState.Opened);
211                 _isConnected = true;
212                 if (_eventHandler !is null) {
213                     _eventHandler.connectionOpened(_tcpConnection);
214                 }
215             }
216             else {
217                 string msg = format("Failed to connect to %s:%d", _host, _port);
218                 version(HUNT_DEBUG) warning(msg);
219                 _isConnected = false;
220 
221                 if(_tcpConnection !is null) {
222                     _tcpConnection.setState(ConnectionState.Error);
223                 }
224 
225                 if(_eventHandler !is null)
226                     _eventHandler.failedOpeningConnection(_currentId, new IOException(msg));
227             }
228 
229         });
230 
231         // _tcpConnection.setState(ConnectionState.Opening);
232         _tcpStream.connect(_host, cast(ushort)_port);
233     }
234 
235     void close() {
236         // if(cas(&_isConnected, true, false) ) {
237         //     this.stop();
238         // } else {
239         //     version(HUNT_NET_DEBUG) trace("Closed already.");
240         // }
241         version(HUNT_NET_DEBUG) tracef("isRunning: %s", this.isRunning());
242         if(this.isRunning()) {
243             this.stop();
244         }
245     }
246 
247     override protected void destroy() {
248         TcpConnection conn = _tcpConnection;
249         
250         if (conn !is null) {
251             version(HUNT_DEBUG) {
252                 tracef("connection state: %s, isConnected: %s, isClosing: %s",
253                     conn.getState(),
254                     conn.isConnected(), conn.isClosing());
255             }
256 
257             _tcpConnection = null;
258             _isConnected = false;
259 
260             if(!conn.isClosing()) {
261                 conn.close();
262             }
263 
264             if (_eventHandler !is null) {
265                 version(HUNT_NET_DEBUG) {
266                     infof("Notifying connection %d with %s closed.",
267                     conn.getId(), conn.remoteAddress());
268                 }
269                 _eventHandler.connectionClosed(conn);
270             }
271         }
272         
273         assert(_loop !is null);
274 
275         if(_pool is null) {
276             version(HUNT_NET_DEBUG) {
277                 tracef("Stopping the event loop %d", _loop.getId());
278             }
279             _loop.stop();
280         } else {
281             _pool.returnObject(_loop);
282         }
283         
284         // _isConnected = false;
285     }
286 
287     bool isConnected() {
288         return _isConnected;
289         //if(_tcpConnection is null)
290         //    return false;
291         //else
292         //    return _tcpConnection.isConnected();
293     }
294 }