1 module hunt.net.NetClientImpl;
2 
3 import hunt.net.TcpConnection;
4 import hunt.net.Connection;
5 import hunt.net.codec.Codec;
6 import hunt.net.EventLoopPool;
7 import hunt.net.NetClient;
8 import hunt.net.NetClientOptions;
9 
10 import hunt.event.EventLoop;
11 import hunt.Exceptions;
12 import hunt.Functions;
13 import hunt.io.TcpStream;
14 import hunt.io.TcpStreamOptions;
15 import hunt.io.IoError;
16 import hunt.logging;
17 import hunt.util.ByteOrder;
18 import hunt.util.AbstractLifecycle;
19 import hunt.util.Lifecycle;
20 import hunt.util.pool;
21 
22 import core.atomic;
23 import core.thread;
24 import std.format;
25 import std.parallelism;
26 
27 
28 /**
29  *
30  */
31 class NetClientImpl : AbstractLifecycle, NetClient {
32 
33     enum string DefaultLocalHost = "127.0.0.1";
34     enum int DefaultLocalPort = 8080;
35 
36     private string _host = DefaultLocalHost;
37     private int _port = DefaultLocalPort;
38     private string _serverName;
39     private static shared int _connectionId;
40     private int _currentId;
41     private NetClientOptions _options;
42     private Codec _codec;
43     private NetConnectionHandler _eventHandler;
44     private TcpConnection _tcpConnection;
45     // private TcpStream _tcpStream;
46     private EventLoopPool _pool;
47     private EventLoop _loop;
48     private int _loopIdleTime = -1;
49     private Action _onClosed = null;
50     private shared bool _isConnected = false;
51 
52     this(EventLoop loop, NetClientOptions options) {
53         _loop = loop;
54         
55         this._options = options;
56 
57         _currentId = atomicOp!("+=")(_connectionId, 1);
58         version (HUNT_NET_DEBUG)
59             tracef("Client ID: %d", _currentId);
60     }
61 
62     this(EventLoopPool pool) {
63         this(pool, new NetClientOptions());
64     }
65 
66     this(EventLoopPool pool, NetClientOptions options) {
67         _pool = pool;
68         _loop = pool.borrow(options.getConnectTimeout, false);
69         
70         this._options = options;
71 
72         _currentId = atomicOp!("+=")(_connectionId, 1);
73         version (HUNT_NET_DEBUG)
74             tracef("Client ID: %d", _currentId);
75     }
76 
77     ~this() @nogc {
78         // this.stop();
79     }
80 
81     int getId() {
82         return _currentId;
83     }
84 
85     string getHost() {
86         return _host;
87     }
88 
89     int getPort() {
90         return _port;
91     }
92 
93     NetClientOptions getOptions() {
94         return _options;
95     }
96 
97     NetClient setOptions(NetClientOptions options) {
98         if(isConnected()) {
99             throw new IOException("The options can't be set after the connection created.");
100         }
101         this._options = options;
102         return this;
103     }
104 
105     NetClientImpl setCodec(Codec codec) {
106         this._codec = codec;
107         return this;
108     }
109 
110     Codec getCodec() {
111         return this._codec;
112     }
113 
114     void setOnClosed(Action callback)
115     {
116         if (_onClosed is null)
117         {
118             _onClosed = callback;
119         }
120     }
121 
122     NetConnectionHandler getHandler() {
123         return this._eventHandler;
124     }
125 
126     NetClientImpl setHandler(NetConnectionHandler handler) {
127         this._eventHandler = handler;
128         return this;
129     }
130 
131     void connect() {
132         connect(DefaultLocalHost, DefaultLocalPort, "");
133     }
134 
135     void connect(string host, int port) {
136         connect(host, port, "");
137     }
138 
139     void connect(string host, int port, string serverName) {
140         
141         if(isConnected()) {
142             throw new IOException("The connection has been created.");
143         }
144 
145         if(isRunning()) {
146             warning("Busy with connecting...");
147             return;
148         }
149 
150         this._host = host;
151         this._port = port;
152         this._serverName = serverName;
153 
154         super.start();
155     }
156 
157     override protected void initialize() { // doConnect
158         if(_loop.isReady()) {
159             initializeClient();
160         } else {
161             _loop.runAsync(_loopIdleTime, &initializeClient);
162         }
163         // initializeClient();
164     }
165 
166     private void initializeClient() {
167         TcpStreamOptions options = _options.toStreamOptions();
168         TcpStream _tcpStream = new TcpStream(_loop, options);
169         _tcpConnection = new TcpConnection(_currentId, _options,
170                 _eventHandler, _codec, _tcpStream);
171 
172         _tcpStream.closed(() {
173             TcpConnection conn = _tcpConnection;
174             if(conn is null) {
175                 version(HUNT_DEBUG) trace("The connection has already been closed.");
176             } else {
177                 version(HUNT_NET_DEBUG) {
178                     infof("Connection %d closed", _tcpConnection.getId());
179                 }
180                 conn.setState(ConnectionState.Closed);
181             }
182 
183             this.close();
184             if (_onClosed !is null) {
185                 _onClosed();
186             }
187 
188             // _isConnected = false;
189 
190             //auto runTask = task((){
191             //    Thread.sleep(options.retryInterval);
192             //    _tcpStream.reconnect();
193             //});
194             //taskPool.put(runTask);
195 
196         });
197 
198         _tcpStream.error((IoError error) {
199             if(_tcpConnection !is null)
200                 _tcpConnection.setState(ConnectionState.Error);
201                 
202             if (_eventHandler !is null)
203                 _eventHandler.exceptionCaught(_tcpConnection, new Exception(error.errorMsg()));
204         });
205 
206         _tcpStream.connected((bool suc) {
207             if (suc) {
208 			    version (HUNT_DEBUG) trace("Connected with ", _tcpStream.remoteAddress.toString());
209                 // _tcpConnection.setState(ConnectionState.Opened);
210                 _isConnected = true;
211                 if (_eventHandler !is null) {
212                     _eventHandler.connectionOpened(_tcpConnection);
213                 }
214             }
215             else {
216                 string msg = format("Failed to connect to %s:%d", _host, _port);
217                 version(HUNT_DEBUG) warning(msg);
218                 _isConnected = false;
219 
220                 if(_tcpConnection !is null) {
221                     _tcpConnection.setState(ConnectionState.Error);
222                 }
223 
224                 if(_eventHandler !is null)
225                     _eventHandler.failedOpeningConnection(_currentId, new IOException(msg));
226             }
227 
228         });
229 
230         // _tcpConnection.setState(ConnectionState.Opening);
231         _tcpStream.connect(_host, cast(ushort)_port);
232     }
233 
234     void close() {
235         // if(cas(&_isConnected, true, false) ) {
236         //     this.stop();
237         // } else {
238         //     version(HUNT_NET_DEBUG) trace("Closed already.");
239         // }
240         version(HUNT_NET_DEBUG) tracef("isRunning: %s", this.isRunning());
241         if(this.isRunning()) {
242             this.stop();
243         }
244     }
245 
246     override protected void destroy() {
247         TcpConnection conn = _tcpConnection;
248         _tcpConnection = null;
249 
250         if (conn !is null) {
251             version(HUNT_NET_DEBUG) {
252                 tracef("connection state: %s, isConnected: %s, isClosing: %s",
253                     conn.getState(),
254                     conn.isConnected(), conn.isClosing());
255             }
256 
257             if(!conn.isClosing()) {
258                 conn.close();
259             }
260 
261             if (_eventHandler !is null) {
262                 version(HUNT_NET_DEBUG) {
263                     infof("Notifying connection %d with %s closed.",
264                     conn.getId(), conn.remoteAddress());
265                 }
266                 _eventHandler.connectionClosed(conn);
267             }
268         }
269 
270         assert(_loop !is null);
271 
272         if(_pool is null) {
273             _loop.stop();
274         } else {
275             _pool.returnObject(_loop);
276         }
277         
278         _isConnected = false;
279     }
280 
281     bool isConnected() {
282         return _isConnected;
283         //if(_tcpConnection is null)
284         //    return false;
285         //else
286         //    return _tcpConnection.isConnected();
287     }
288 }