1 module hunt.net.AbstractConnection;
2 
3 import hunt.net.Connection;
4 import hunt.net.TcpSslOptions;
5 import hunt.net.codec;
6 
7 import hunt.Boolean;
8 import hunt.Exceptions;
9 import hunt.Functions;
10 import hunt.io.ByteBuffer;
11 import hunt.io.BufferUtils;
12 import hunt.io.channel;
13 import hunt.io.TcpStream;
14 import hunt.logging;
15 import hunt.util.Common;
16 
17 import std.format;
18 import std.socket;
19 
20 
21 /**
22  * Abstract base class for TCP connections.
23  *
24  */
25 abstract class AbstractConnection : Connection {
26     protected int _connectionId;
27     protected TcpStream _tcp;
28     protected TcpSslOptions _options;
29     protected DataReceivedHandler _dataReceivedHandler;
30     protected Object[string] attributes;
31     private Codec _codec;
32     protected Encoder _encoder;
33     protected Decoder _decoder;
34     protected NetConnectionHandler _netHandler;
35     protected shared ConnectionState _connectionState;
36     private bool _isSecured = false;
37 
38 
39     this(int connectionId, TcpSslOptions options, TcpStream tcp) {
40         assert(tcp !is null);
41         _tcp = tcp;
42         this._options = options;
43         this._connectionId = connectionId;
44         this._connectionState = ConnectionState.Ready;
45 
46         _tcp.closed(&notifyClose);
47         _tcp.received(&onDataReceived);
48     }
49 
50     ///
51     this(int connectionId, TcpSslOptions options, TcpStream tcp,
52             Codec codec, NetConnectionHandler eventHandler) {
53         assert(eventHandler !is null);
54 
55         this._netHandler = eventHandler;
56         this(connectionId, options, tcp);
57         if(codec !is null) {
58             this.setCodec(codec);
59         }
60 
61     }
62 
63     int getId() {
64         return _connectionId;
65     }
66 
67     TcpSslOptions getOptions() {
68         return _options;
69     }
70 
71     TcpStream getStream() {
72         return _tcp;
73     }
74 
75     ConnectionState getState() {
76         return this._connectionState;
77     }
78 
79     /**
80      *
81      */
82     void setState(ConnectionState state) {
83         if(state == ConnectionState.Secured) {
84             _isSecured = true;
85         }
86         this._connectionState = state;
87     }
88 
89     AbstractConnection setCodec(Codec codec) {
90         this._codec = codec;
91         if(codec !is null) {
92             this._encoder = codec.getEncoder();
93             this._encoder.setBufferSize(_options.getEncoderBufferSize());
94             this._decoder = codec.getDecoder;
95         }
96         return this;
97     }
98 
99     Codec getCodec() {
100         return this._codec;
101     }
102 
103     ///
104     AbstractConnection setHandler(NetConnectionHandler handler) {
105         this._netHandler = handler;
106         return this;
107     }
108 
109     NetConnectionHandler getHandler() {
110         return _netHandler;
111     }
112 
113     bool isConnected() {
114         return _tcp.isConnected();
115     }
116 
117     bool isActive() {
118         // FIXME: Needing refactor or cleanup -@zxp at 8/1/2019, 6:04:44 PM
119         //
120         return _tcp.isConnected();
121     }
122 
123     bool isClosing() {
124         return _tcp.isClosing();
125     }
126 
127     bool isSecured() {
128         return _isSecured;
129     }
130 
131     protected DataHandleStatus onDataReceived(ByteBuffer buffer) {
132         version(HUNT_NET_DEBUG) {
133             auto data = cast(ubyte[]) buffer.peekRemaining();
134             tracef("data received (%d bytes): ", data.length);
135             version(HUNT_NET_DEBUG_MORE) {
136                 infof("%(%02X %)", data[0 .. $]);
137             } else {
138                 if(data.length<=64)
139                     infof("%(%02X %)", data[0 .. $]);
140                 else
141                     infof("%(%02X %) ...", data[0 .. 64]);
142             }
143         } else version(HUNT_DEBUG) {
144             // auto data = cast(string) buffer.peekRemaining();
145             // tracef("data received (%d bytes): ", data.length);
146             // infof("%(%02X %)", data[0 .. $]);
147         }
148 
149         // synchronized (this) {
150         //     import hunt.io.BufferUtils;
151         //     // Make usre data and thread safe
152         //     handleReceivedData(BufferUtils.clone(buffer));
153         // }
154         DataHandleStatus status = DataHandleStatus.Done;
155 
156         try {
157             // Make usre data and thread safe
158             // status = handleReceivedData(BufferUtils.clone(buffer));
159             status = handleReceivedData(buffer);
160         } catch(Throwable ex) {
161             warning(ex.msg);
162             warning(ex);
163             
164             BufferUtils.clear(buffer);
165         }
166 
167         return status;
168     }
169 
170     private DataHandleStatus handleReceivedData(ByteBuffer buffer) {
171         DataHandleStatus result = DataHandleStatus.Done;
172 
173         if(_decoder !is null) {
174             version(HUNT_NET_DEBUG) {
175                 trace("Running decoder...");
176             }
177             
178             result = _decoder.decode(buffer, this);
179             version(HUNT_NET_DEBUG_) info("Decoding done.");
180 
181         } else if(_netHandler !is null) {
182             result = _netHandler.messageReceived(this, cast(Object)buffer);
183         } else {
184             // do nothing
185             buffer.clear();
186             buffer.flip();            
187         }
188 
189         return result;
190     }
191 
192     ///
193     void close() {
194         version(HUNT_NET_DEBUG) infof("Closing connection %d...The state: %s", this.getId(), _connectionState);
195         if(_connectionState == ConnectionState.Closing || _connectionState == ConnectionState.Closed)
196             return;
197         setState(ConnectionState.Closing);
198         _tcp.close();
199     }
200 
201     ///
202     @property Address localAddress() {
203         return _tcp.localAddress;
204     }
205 
206     ////
207     @property Address remoteAddress() {
208         return _tcp.remoteAddress;
209     }
210 
211     ////
212     void write(const(ubyte)[] data) {
213         version(HUNT_NET_DEBUG) {
214             tracef("writting data (%d bytes)...", data.length);
215             if(data.length<=64)
216                 infof("%(%02X %)", data[0 .. $]);
217             else
218                 infof("%(%02X %) ...", data[0 .. 64]);
219         } else version(HUNT_NET_DEBUG_MORE) {
220             tracef("writting data (%d bytes)...", data.length);
221             infof("%(%02X %)", data[0 .. $]);
222         }
223         //if (_tcp !is null && _tcp.isConnected)
224         _tcp.write(data);
225     }
226 
227     ////
228     void write(string str) {
229         write(cast(ubyte[]) str);
230     }
231 
232     void write(ByteBuffer buffer) {
233 
234         version(HUNT_NET_DEBUG) {
235             tracef("writting buffer (%s bytes)...", buffer.toString());
236             auto data = buffer.peekRemaining();
237             if(data.length<=64)
238                 infof("%(%02X %)", data[0 .. $]);
239             else
240                 infof("%(%02X %) ...", data[0 .. 64]);
241         } else version(HUNT_NET_DEBUG_MORE) {
242             tracef("writting buffer (%s bytes)...", buffer.toString());
243             auto data = buffer.peekRemaining();
244             infof("%(%02X %)", data[0 .. $]);
245         } else version(HUNT_DEBUG) {
246             // tracef("writting buffer (%s bytes)...", buffer.toString());
247         }
248         //if (_tcp !is null && _tcp.isConnected)
249         _tcp.write(buffer);
250     }
251 
252     void write(ByteBuffer buffer, Callback callback) {
253         // byte[] data = buffer.array;
254         // int start = buffer.position();
255         // int end = buffer.limit();
256 
257         // write(cast(ubyte[]) data[start .. end]);
258 
259         write(cast(ubyte[])buffer.peekRemaining());
260         callback.succeeded();
261     }
262 
263     void write(ByteBuffer[] buffers, Callback callback) {
264         foreach (ByteBuffer buffer; buffers) {
265             version (HUNT_DEBUG)
266                 tracef("writting buffer: %s", buffer.toString());
267 
268             // byte[] data = buffer.array;
269             // int start = buffer.position();
270             // int end = buffer.limit();
271 
272             // write(cast(ubyte[]) data[start .. end]);
273             write(cast(ubyte[])buffer.peekRemaining());
274         }
275         callback.succeeded();
276     }
277 
278     /**
279      * {@inheritDoc}
280      */
281     Object getAttribute(string key) {
282         return getAttribute(key, null);
283     }
284 
285     /**
286      * {@inheritDoc}
287      */
288     Object getAttribute(string key, Object defaultValue) {
289         return attributes.get(key, defaultValue);
290     }
291 
292     /**
293      * {@inheritDoc}
294      */
295     Object setAttribute(string key, Object value) {
296         auto itemPtr = key in attributes;
297 		Object oldValue = null;
298         if(itemPtr !is null) {
299             oldValue = *itemPtr;
300         }
301         attributes[key] = value;
302 		return oldValue;
303     }
304 
305     /**
306      * {@inheritDoc}
307      */
308     Object setAttribute(string key) {
309         return setAttribute(key, Boolean.TRUE);
310     }
311 
312     /**
313      * {@inheritDoc}
314      */
315     Object setAttributeIfAbsent(string key, Object value) {
316         auto itemPtr = key in attributes;
317         if(itemPtr is null) {
318             attributes[key] = value;
319             return null;
320         } else {
321             return *itemPtr;
322         }
323     }
324 
325     /**
326      * {@inheritDoc}
327      */
328     Object setAttributeIfAbsent(string key) {
329         return setAttributeIfAbsent(key, Boolean.TRUE);
330     }
331 
332     /**
333      * {@inheritDoc}
334      */
335     Object removeAttribute(string key) {
336         auto itemPtr = key in attributes;
337         if(itemPtr is null) {
338             return null;
339         } else {
340             Object oldValue = *itemPtr;
341             attributes.remove(key);
342             return oldValue;
343         }
344     }
345 
346     /**
347      * {@inheritDoc}
348      */
349     bool removeAttribute(string key, Object value) {
350         auto itemPtr = key in attributes;
351         if(itemPtr !is null && *itemPtr == value) {
352             attributes.remove(key);
353             return true;
354         }
355         return false;
356     }
357 
358     /**
359      * {@inheritDoc}
360      */
361     bool replaceAttribute(string key, Object oldValue, Object newValue) {
362         auto itemPtr = key in attributes;
363         if(itemPtr !is null && *itemPtr == oldValue) {
364             attributes[key] = newValue;
365             return true;
366         }
367         return false;
368     }
369 
370     /**
371      * {@inheritDoc}
372      */
373     bool containsAttribute(string key) {
374         auto itemPtr = key in attributes;
375         return itemPtr !is null;
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     string[] getAttributeKeys() {
382         return attributes.keys();
383     }
384 
385     void write(Object message) {
386         encode(message);
387     }
388 
389     void encode(Object message) {
390         try {
391             if(this._encoder is null) {
392                 throw new IOException("No encoder set.");
393             } else {
394                 this._encoder.encode(message, this);
395             }
396         } catch (Exception t) {
397             version(HUNT_DEBUG) {
398                 string msg = format("Connection %d exception: %s", this.getId(), t.msg);
399                 warning(msg);
400             }
401             version(HUNT_NET_DEBUG_MORE) warning(t);
402             notifyException(t);
403         }
404     }
405 
406     // void encode(ByteBuffer message) {
407     //     try {
408     //         _config.getEncoder().encode(message, this);
409     //     } catch (Exception t) {
410     //         _netHandler.notifyExceptionCaught(this, t);
411     //     }
412     // }
413 
414     // void encode(ByteBuffer[] messages) {
415     //     try {
416     //         foreach (ByteBuffer message; messages) {
417     //             this._encoder.encode(message, this);
418     //         }
419     //     } catch (Exception t) {
420     //         _netHandler.exceptionCaught(this, t);
421     //     }
422     // }
423 
424     // void notifyMessageReceived(Object message) {
425     //     implementationMissing(false);
426     // }
427 
428     protected void notifyClose() {
429         this._connectionState = ConnectionState.Closed;
430         if(_netHandler !is null)
431             _netHandler.connectionClosed(this);
432     }
433 
434     void notifyException(Exception t) {
435         if(_netHandler !is null)
436             _netHandler.exceptionCaught(this, t);
437     }
438 
439     version(HUNT_METRIC) {
440         override string toString() {
441             return "";
442         }
443     }
444 }
445 
446