1 module hunt.net.AbstractConnection;
2 
3 import hunt.net.Connection;
4 import hunt.net.TcpSslOptions;
5 import hunt.net.codec;
6 
7 import hunt.Boolean;
8 import hunt.io.ByteBuffer;
9 import hunt.Exceptions;
10 import hunt.Functions;
11 import hunt.io.channel;
12 import hunt.io.TcpStream;
13 import hunt.logging.ConsoleLogger;
14 import hunt.util.Common;
15 
16 import std.format;
17 import std.socket;
18 
19 
20 /**
21  * Abstract base class for TCP connections.
22  *
23  */
24 abstract class AbstractConnection : Connection {
25     protected int _connectionId;
26     protected TcpStream _tcp;
27     protected TcpSslOptions _options;
28     protected DataReceivedHandler _dataReceivedHandler;
29     protected Object[string] attributes;
30     private Codec _codec;
31     protected Encoder _encoder;
32     protected Decoder _decoder;
33     protected NetConnectionHandler _netHandler;
34     protected shared ConnectionState _connectionState;
35     private bool _isSecured = false;
36 
37 
38     this(int connectionId, TcpSslOptions options, TcpStream tcp) {
39         assert(tcp !is null);
40         _tcp = tcp;
41         this._options = options;
42         this._connectionId = connectionId;
43         this._connectionState = ConnectionState.Ready;
44 
45         _tcp.closed(&notifyClose);
46         _tcp.received(&onDataReceived);
47     }
48 
49     ///
50     this(int connectionId, TcpSslOptions options, TcpStream tcp,
51             Codec codec, NetConnectionHandler eventHandler) {
52         assert(eventHandler !is null);
53 
54         this._netHandler = eventHandler;
55         this(connectionId, options, tcp);
56         if(codec !is null) {
57             this.setCodec(codec);
58         }
59 
60     }
61 
62     int getId() {
63         return _connectionId;
64     }
65 
66     TcpSslOptions getOptions() {
67         return _options;
68     }
69 
70     TcpStream getStream() {
71         return _tcp;
72     }
73 
74     ConnectionState getState() {
75         return this._connectionState;
76     }
77 
78     /**
79      *
80      */
81     void setState(ConnectionState state) {
82         if(state == ConnectionState.Secured) {
83             _isSecured = true;
84         }
85         this._connectionState = state;
86     }
87 
88     AbstractConnection setCodec(Codec codec) {
89         this._codec = codec;
90         if(codec !is null) {
91             this._encoder = codec.getEncoder();
92             this._encoder.setBufferSize(_options.getEncoderBufferSize());
93             this._decoder = codec.getDecoder;
94         }
95         return this;
96     }
97 
98     Codec getCodec() {
99         return this._codec;
100     }
101 
102     ///
103     AbstractConnection setHandler(NetConnectionHandler handler) {
104         this._netHandler = handler;
105         return this;
106     }
107 
108     NetConnectionHandler getHandler() {
109         return _netHandler;
110     }
111 
112     bool isConnected() {
113         return _tcp.isConnected();
114     }
115 
116     bool isActive() {
117         // FIXME: Needing refactor or cleanup -@zxp at 8/1/2019, 6:04:44 PM
118         //
119         return _tcp.isConnected();
120     }
121 
122     bool isClosing() {
123         return _tcp.isClosing();
124     }
125 
126     bool isSecured() {
127         return _isSecured;
128     }
129 
130     protected void onDataReceived(ByteBuffer buffer) {
131         synchronized (this) {
132             import hunt.io.BufferUtils;
133             // Make usre data and thread safe
134             handleReceivedData(BufferUtils.clone(buffer));
135         }
136     }
137 
138     private void handleReceivedData(ByteBuffer buffer) {
139         version(HUNT_NET_DEBUG) {
140             auto data = cast(ubyte[]) buffer.getRemaining();
141             tracef("data received (%d bytes): ", data.length);
142             version(HUNT_NET_DEBUG_MORE) {
143                 infof("%(%02X %)", data[0 .. $]);
144             } else {
145                 if(data.length<=64)
146                     infof("%(%02X %)", data[0 .. $]);
147                 else
148                     infof("%(%02X %) ...", data[0 .. 64]);
149             }
150         } else version(HUNT_DEBUG) {
151             // auto data = cast(string) buffer.getRemaining();
152             // tracef("data received (%d bytes): ", data.length);
153             // infof("%(%02X %)", data[0 .. $]);
154         }
155 
156         if(_decoder !is null) {
157             version(HUNT_NET_DEBUG_MORE) {
158                 trace("Running decoder...");
159             }
160             
161             try {
162                 _decoder.decode(buffer, this);
163                 version(HUNT_NET_DEBUG_MORE) info("Decoding done.");
164             } catch(Throwable ex) {
165                 warning(ex.msg);
166                 warning(ex);
167             }
168         } else {
169             if(_netHandler !is null) {
170                 _netHandler.messageReceived(this, cast(Object)buffer);
171             }
172         }
173     }
174 
175     ///
176     void close() {
177         version(HUNT_NET_DEBUG) infof("Closing connection %d...The state: %s", this.getId(), _connectionState);
178         if(_connectionState == ConnectionState.Closing || _connectionState == ConnectionState.Closed)
179             return;
180         setState(ConnectionState.Closing);
181         _tcp.close();
182     }
183 
184     ///
185     @property Address localAddress() {
186         return _tcp.localAddress;
187     }
188 
189     ////
190     @property Address remoteAddress() {
191         return _tcp.remoteAddress;
192     }
193 
194     ////
195     void write(const(ubyte)[] data) {
196         version(HUNT_NET_DEBUG) {
197             tracef("writting data (%d bytes)...", data.length);
198             if(data.length<=64)
199                 infof("%(%02X %)", data[0 .. $]);
200             else
201                 infof("%(%02X %) ...", data[0 .. 64]);
202         } else version(HUNT_NET_DEBUG_MORE) {
203             tracef("writting data (%d bytes)...", data.length);
204             infof("%(%02X %)", data[0 .. $]);
205         }
206         //if (_tcp !is null && _tcp.isConnected)
207         _tcp.write(data);
208     }
209 
210     ////
211     void write(string str) {
212         write(cast(ubyte[]) str);
213     }
214 
215     void write(ByteBuffer buffer) {
216 
217         version(HUNT_NET_DEBUG) {
218             tracef("writting buffer (%s bytes)...", buffer.toString());
219             auto data = buffer.getRemaining();
220             if(data.length<=64)
221                 infof("%(%02X %)", data[0 .. $]);
222             else
223                 infof("%(%02X %) ...", data[0 .. 64]);
224         } else version(HUNT_NET_DEBUG_MORE) {
225             tracef("writting buffer (%s bytes)...", buffer.toString());
226             auto data = buffer.getRemaining();
227             infof("%(%02X %)", data[0 .. $]);
228         } else version(HUNT_DEBUG) {
229             // tracef("writting buffer (%s bytes)...", buffer.toString());
230         }
231         //if (_tcp !is null && _tcp.isConnected)
232         _tcp.write(buffer);
233     }
234 
235     void write(ByteBuffer buffer, Callback callback) {
236         // byte[] data = buffer.array;
237         // int start = buffer.position();
238         // int end = buffer.limit();
239 
240         // write(cast(ubyte[]) data[start .. end]);
241 
242         write(cast(ubyte[])buffer.getRemaining());
243         callback.succeeded();
244     }
245 
246     void write(ByteBuffer[] buffers, Callback callback) {
247         foreach (ByteBuffer buffer; buffers) {
248             version (HUNT_DEBUG)
249                 tracef("writting buffer: %s", buffer.toString());
250 
251             // byte[] data = buffer.array;
252             // int start = buffer.position();
253             // int end = buffer.limit();
254 
255             // write(cast(ubyte[]) data[start .. end]);
256             write(cast(ubyte[])buffer.getRemaining());
257         }
258         callback.succeeded();
259     }
260 
261     /**
262      * {@inheritDoc}
263      */
264     Object getAttribute(string key) {
265         return getAttribute(key, null);
266     }
267 
268     /**
269      * {@inheritDoc}
270      */
271     Object getAttribute(string key, Object defaultValue) {
272         return attributes.get(key, defaultValue);
273     }
274 
275     /**
276      * {@inheritDoc}
277      */
278     Object setAttribute(string key, Object value) {
279         auto itemPtr = key in attributes;
280 		Object oldValue = null;
281         if(itemPtr !is null) {
282             oldValue = *itemPtr;
283         }
284         attributes[key] = value;
285 		return oldValue;
286     }
287 
288     /**
289      * {@inheritDoc}
290      */
291     Object setAttribute(string key) {
292         return setAttribute(key, Boolean.TRUE);
293     }
294 
295     /**
296      * {@inheritDoc}
297      */
298     Object setAttributeIfAbsent(string key, Object value) {
299         auto itemPtr = key in attributes;
300         if(itemPtr is null) {
301             attributes[key] = value;
302             return null;
303         } else {
304             return *itemPtr;
305         }
306     }
307 
308     /**
309      * {@inheritDoc}
310      */
311     Object setAttributeIfAbsent(string key) {
312         return setAttributeIfAbsent(key, Boolean.TRUE);
313     }
314 
315     /**
316      * {@inheritDoc}
317      */
318     Object removeAttribute(string key) {
319         auto itemPtr = key in attributes;
320         if(itemPtr is null) {
321             return null;
322         } else {
323             Object oldValue = *itemPtr;
324             attributes.remove(key);
325             return oldValue;
326         }
327     }
328 
329     /**
330      * {@inheritDoc}
331      */
332     bool removeAttribute(string key, Object value) {
333         auto itemPtr = key in attributes;
334         if(itemPtr !is null && *itemPtr == value) {
335             attributes.remove(key);
336             return true;
337         }
338         return false;
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     bool replaceAttribute(string key, Object oldValue, Object newValue) {
345         auto itemPtr = key in attributes;
346         if(itemPtr !is null && *itemPtr == oldValue) {
347             attributes[key] = newValue;
348             return true;
349         }
350         return false;
351     }
352 
353     /**
354      * {@inheritDoc}
355      */
356     bool containsAttribute(string key) {
357         auto itemPtr = key in attributes;
358         return itemPtr !is null;
359     }
360 
361     /**
362      * {@inheritDoc}
363      */
364     string[] getAttributeKeys() {
365         return attributes.keys();
366     }
367 
368     void write(Object message) {
369         encode(message);
370     }
371 
372     void encode(Object message) {
373         try {
374             if(this._encoder is null) {
375                 throw new IOException("No encoder set.");
376             } else {
377                 this._encoder.encode(message, this);
378             }
379         } catch (Exception t) {
380             version(HUNT_DEBUG) {
381                 string msg = format("Connection %d exception: %s", this.getId(), t.msg);
382                 warning(msg);
383             }
384             version(HUNT_NET_DEBUG_MORE) warning(t);
385             notifyException(t);
386         }
387     }
388 
389     // void encode(ByteBuffer message) {
390     //     try {
391     //         _config.getEncoder().encode(message, this);
392     //     } catch (Exception t) {
393     //         _netHandler.notifyExceptionCaught(this, t);
394     //     }
395     // }
396 
397     // void encode(ByteBuffer[] messages) {
398     //     try {
399     //         foreach (ByteBuffer message; messages) {
400     //             this._encoder.encode(message, this);
401     //         }
402     //     } catch (Exception t) {
403     //         _netHandler.exceptionCaught(this, t);
404     //     }
405     // }
406 
407     // void notifyMessageReceived(Object message) {
408     //     implementationMissing(false);
409     // }
410 
411     protected void notifyClose() {
412         this._connectionState = ConnectionState.Closed;
413         if(_netHandler !is null)
414             _netHandler.connectionClosed(this);
415     }
416 
417     void notifyException(Exception t) {
418         if(_netHandler !is null)
419             _netHandler.exceptionCaught(this, t);
420     }
421 
422     version(HUNT_METRIC) {
423         override string toString() {
424             return "";
425         }
426     }
427 }
428 
429