1 module hunt.net.AbstractConnection; 2 3 import hunt.net.Connection; 4 import hunt.net.TcpSslOptions; 5 import hunt.net.codec; 6 7 import hunt.Boolean; 8 import hunt.io.ByteBuffer; 9 import hunt.Exceptions; 10 import hunt.Functions; 11 import hunt.io.channel; 12 import hunt.io.TcpStream; 13 import hunt.logging.ConsoleLogger; 14 import hunt.util.Common; 15 16 import std.format; 17 import std.socket; 18 19 20 /** 21 * Abstract base class for TCP connections. 22 * 23 */ 24 abstract class AbstractConnection : Connection { 25 protected int _connectionId; 26 protected TcpStream _tcp; 27 protected TcpSslOptions _options; 28 protected DataReceivedHandler _dataReceivedHandler; 29 protected Object[string] attributes; 30 private Codec _codec; 31 protected Encoder _encoder; 32 protected Decoder _decoder; 33 protected NetConnectionHandler _netHandler; 34 protected shared ConnectionState _connectionState; 35 private bool _isSecured = false; 36 37 38 this(int connectionId, TcpSslOptions options, TcpStream tcp) { 39 assert(tcp !is null); 40 _tcp = tcp; 41 this._options = options; 42 this._connectionId = connectionId; 43 this._connectionState = ConnectionState.Ready; 44 45 _tcp.closed(¬ifyClose); 46 _tcp.received(&onDataReceived); 47 } 48 49 /// 50 this(int connectionId, TcpSslOptions options, TcpStream tcp, 51 Codec codec, NetConnectionHandler eventHandler) { 52 assert(eventHandler !is null); 53 54 this._netHandler = eventHandler; 55 this(connectionId, options, tcp); 56 if(codec !is null) { 57 this.setCodec(codec); 58 } 59 60 } 61 62 int getId() { 63 return _connectionId; 64 } 65 66 TcpSslOptions getOptions() { 67 return _options; 68 } 69 70 TcpStream getStream() { 71 return _tcp; 72 } 73 74 ConnectionState getState() { 75 return this._connectionState; 76 } 77 78 /** 79 * 80 */ 81 void setState(ConnectionState state) { 82 if(state == ConnectionState.Secured) { 83 _isSecured = true; 84 } 85 this._connectionState = state; 86 } 87 88 AbstractConnection setCodec(Codec codec) { 89 this._codec = codec; 90 if(codec !is null) { 91 this._encoder = codec.getEncoder(); 92 this._encoder.setBufferSize(_options.getEncoderBufferSize()); 93 this._decoder = codec.getDecoder; 94 } 95 return this; 96 } 97 98 Codec getCodec() { 99 return this._codec; 100 } 101 102 /// 103 AbstractConnection setHandler(NetConnectionHandler handler) { 104 this._netHandler = handler; 105 return this; 106 } 107 108 NetConnectionHandler getHandler() { 109 return _netHandler; 110 } 111 112 bool isConnected() { 113 return _tcp.isConnected(); 114 } 115 116 bool isActive() { 117 // FIXME: Needing refactor or cleanup -@zxp at 8/1/2019, 6:04:44 PM 118 // 119 return _tcp.isConnected(); 120 } 121 122 bool isClosing() { 123 return _tcp.isClosing(); 124 } 125 126 bool isSecured() { 127 return _isSecured; 128 } 129 130 protected void onDataReceived(ByteBuffer buffer) { 131 synchronized (this) { 132 import hunt.io.BufferUtils; 133 // Make usre data and thread safe 134 handleReceivedData(BufferUtils.clone(buffer)); 135 } 136 } 137 138 private void handleReceivedData(ByteBuffer buffer) { 139 version(HUNT_NET_DEBUG) { 140 auto data = cast(ubyte[]) buffer.getRemaining(); 141 tracef("data received (%d bytes): ", data.length); 142 version(HUNT_NET_DEBUG_MORE) { 143 infof("%(%02X %)", data[0 .. $]); 144 } else { 145 if(data.length<=64) 146 infof("%(%02X %)", data[0 .. $]); 147 else 148 infof("%(%02X %) ...", data[0 .. 64]); 149 } 150 } else version(HUNT_DEBUG) { 151 // auto data = cast(string) buffer.getRemaining(); 152 // tracef("data received (%d bytes): ", data.length); 153 // infof("%(%02X %)", data[0 .. $]); 154 } 155 156 if(_decoder !is null) { 157 version(HUNT_NET_DEBUG_MORE) { 158 trace("Running decoder..."); 159 } 160 161 try { 162 _decoder.decode(buffer, this); 163 version(HUNT_NET_DEBUG_MORE) info("Decoding done."); 164 } catch(Throwable ex) { 165 warning(ex.msg); 166 warning(ex); 167 } 168 } else { 169 if(_netHandler !is null) { 170 _netHandler.messageReceived(this, cast(Object)buffer); 171 } 172 } 173 } 174 175 /// 176 void close() { 177 version(HUNT_NET_DEBUG) infof("Closing connection %d...The state: %s", this.getId(), _connectionState); 178 if(_connectionState == ConnectionState.Closing || _connectionState == ConnectionState.Closed) 179 return; 180 setState(ConnectionState.Closing); 181 _tcp.close(); 182 } 183 184 /// 185 @property Address localAddress() { 186 return _tcp.localAddress; 187 } 188 189 //// 190 @property Address remoteAddress() { 191 return _tcp.remoteAddress; 192 } 193 194 //// 195 void write(const(ubyte)[] data) { 196 version(HUNT_NET_DEBUG) { 197 tracef("writting data (%d bytes)...", data.length); 198 if(data.length<=64) 199 infof("%(%02X %)", data[0 .. $]); 200 else 201 infof("%(%02X %) ...", data[0 .. 64]); 202 } else version(HUNT_NET_DEBUG_MORE) { 203 tracef("writting data (%d bytes)...", data.length); 204 infof("%(%02X %)", data[0 .. $]); 205 } 206 //if (_tcp !is null && _tcp.isConnected) 207 _tcp.write(data); 208 } 209 210 //// 211 void write(string str) { 212 write(cast(ubyte[]) str); 213 } 214 215 void write(ByteBuffer buffer) { 216 217 version(HUNT_NET_DEBUG) { 218 tracef("writting buffer (%s bytes)...", buffer.toString()); 219 auto data = buffer.getRemaining(); 220 if(data.length<=64) 221 infof("%(%02X %)", data[0 .. $]); 222 else 223 infof("%(%02X %) ...", data[0 .. 64]); 224 } else version(HUNT_NET_DEBUG_MORE) { 225 tracef("writting buffer (%s bytes)...", buffer.toString()); 226 auto data = buffer.getRemaining(); 227 infof("%(%02X %)", data[0 .. $]); 228 } else version(HUNT_DEBUG) { 229 // tracef("writting buffer (%s bytes)...", buffer.toString()); 230 } 231 //if (_tcp !is null && _tcp.isConnected) 232 _tcp.write(buffer); 233 } 234 235 void write(ByteBuffer buffer, Callback callback) { 236 // byte[] data = buffer.array; 237 // int start = buffer.position(); 238 // int end = buffer.limit(); 239 240 // write(cast(ubyte[]) data[start .. end]); 241 242 write(cast(ubyte[])buffer.getRemaining()); 243 callback.succeeded(); 244 } 245 246 void write(ByteBuffer[] buffers, Callback callback) { 247 foreach (ByteBuffer buffer; buffers) { 248 version (HUNT_DEBUG) 249 tracef("writting buffer: %s", buffer.toString()); 250 251 // byte[] data = buffer.array; 252 // int start = buffer.position(); 253 // int end = buffer.limit(); 254 255 // write(cast(ubyte[]) data[start .. end]); 256 write(cast(ubyte[])buffer.getRemaining()); 257 } 258 callback.succeeded(); 259 } 260 261 /** 262 * {@inheritDoc} 263 */ 264 Object getAttribute(string key) { 265 return getAttribute(key, null); 266 } 267 268 /** 269 * {@inheritDoc} 270 */ 271 Object getAttribute(string key, Object defaultValue) { 272 return attributes.get(key, defaultValue); 273 } 274 275 /** 276 * {@inheritDoc} 277 */ 278 Object setAttribute(string key, Object value) { 279 auto itemPtr = key in attributes; 280 Object oldValue = null; 281 if(itemPtr !is null) { 282 oldValue = *itemPtr; 283 } 284 attributes[key] = value; 285 return oldValue; 286 } 287 288 /** 289 * {@inheritDoc} 290 */ 291 Object setAttribute(string key) { 292 return setAttribute(key, Boolean.TRUE); 293 } 294 295 /** 296 * {@inheritDoc} 297 */ 298 Object setAttributeIfAbsent(string key, Object value) { 299 auto itemPtr = key in attributes; 300 if(itemPtr is null) { 301 attributes[key] = value; 302 return null; 303 } else { 304 return *itemPtr; 305 } 306 } 307 308 /** 309 * {@inheritDoc} 310 */ 311 Object setAttributeIfAbsent(string key) { 312 return setAttributeIfAbsent(key, Boolean.TRUE); 313 } 314 315 /** 316 * {@inheritDoc} 317 */ 318 Object removeAttribute(string key) { 319 auto itemPtr = key in attributes; 320 if(itemPtr is null) { 321 return null; 322 } else { 323 Object oldValue = *itemPtr; 324 attributes.remove(key); 325 return oldValue; 326 } 327 } 328 329 /** 330 * {@inheritDoc} 331 */ 332 bool removeAttribute(string key, Object value) { 333 auto itemPtr = key in attributes; 334 if(itemPtr !is null && *itemPtr == value) { 335 attributes.remove(key); 336 return true; 337 } 338 return false; 339 } 340 341 /** 342 * {@inheritDoc} 343 */ 344 bool replaceAttribute(string key, Object oldValue, Object newValue) { 345 auto itemPtr = key in attributes; 346 if(itemPtr !is null && *itemPtr == oldValue) { 347 attributes[key] = newValue; 348 return true; 349 } 350 return false; 351 } 352 353 /** 354 * {@inheritDoc} 355 */ 356 bool containsAttribute(string key) { 357 auto itemPtr = key in attributes; 358 return itemPtr !is null; 359 } 360 361 /** 362 * {@inheritDoc} 363 */ 364 string[] getAttributeKeys() { 365 return attributes.keys(); 366 } 367 368 void write(Object message) { 369 encode(message); 370 } 371 372 void encode(Object message) { 373 try { 374 if(this._encoder is null) { 375 throw new IOException("No encoder set."); 376 } else { 377 this._encoder.encode(message, this); 378 } 379 } catch (Exception t) { 380 version(HUNT_DEBUG) { 381 string msg = format("Connection %d exception: %s", this.getId(), t.msg); 382 warning(msg); 383 } 384 version(HUNT_NET_DEBUG_MORE) warning(t); 385 notifyException(t); 386 } 387 } 388 389 // void encode(ByteBuffer message) { 390 // try { 391 // _config.getEncoder().encode(message, this); 392 // } catch (Exception t) { 393 // _netHandler.notifyExceptionCaught(this, t); 394 // } 395 // } 396 397 // void encode(ByteBuffer[] messages) { 398 // try { 399 // foreach (ByteBuffer message; messages) { 400 // this._encoder.encode(message, this); 401 // } 402 // } catch (Exception t) { 403 // _netHandler.exceptionCaught(this, t); 404 // } 405 // } 406 407 // void notifyMessageReceived(Object message) { 408 // implementationMissing(false); 409 // } 410 411 protected void notifyClose() { 412 this._connectionState = ConnectionState.Closed; 413 if(_netHandler !is null) 414 _netHandler.connectionClosed(this); 415 } 416 417 void notifyException(Exception t) { 418 if(_netHandler !is null) 419 _netHandler.exceptionCaught(this, t); 420 } 421 422 version(HUNT_METRIC) { 423 override string toString() { 424 return ""; 425 } 426 } 427 } 428 429