1 module hunt.net.AbstractConnection; 2 3 import hunt.net.Connection; 4 import hunt.net.TcpSslOptions; 5 import hunt.net.codec; 6 7 import hunt.Boolean; 8 import hunt.Exceptions; 9 import hunt.Functions; 10 import hunt.io.ByteBuffer; 11 import hunt.io.BufferUtils; 12 import hunt.io.channel; 13 import hunt.io.TcpStream; 14 import hunt.logging; 15 import hunt.util.Common; 16 17 import std.format; 18 import std.socket; 19 20 21 /** 22 * Abstract base class for TCP connections. 23 * 24 */ 25 abstract class AbstractConnection : Connection { 26 protected int _connectionId; 27 protected TcpStream _tcp; 28 protected TcpSslOptions _options; 29 protected DataReceivedHandler _dataReceivedHandler; 30 protected Object[string] attributes; 31 private Codec _codec; 32 protected Encoder _encoder; 33 protected Decoder _decoder; 34 protected NetConnectionHandler _netHandler; 35 protected shared ConnectionState _connectionState; 36 private bool _isSecured = false; 37 38 39 this(int connectionId, TcpSslOptions options, TcpStream tcp) { 40 assert(tcp !is null); 41 _tcp = tcp; 42 this._options = options; 43 this._connectionId = connectionId; 44 this._connectionState = ConnectionState.Ready; 45 46 _tcp.closed(¬ifyClose); 47 _tcp.received(&onDataReceived); 48 } 49 50 /// 51 this(int connectionId, TcpSslOptions options, TcpStream tcp, 52 Codec codec, NetConnectionHandler eventHandler) { 53 assert(eventHandler !is null); 54 55 this._netHandler = eventHandler; 56 this(connectionId, options, tcp); 57 if(codec !is null) { 58 this.setCodec(codec); 59 } 60 61 } 62 63 int getId() { 64 return _connectionId; 65 } 66 67 TcpSslOptions getOptions() { 68 return _options; 69 } 70 71 TcpStream getStream() { 72 return _tcp; 73 } 74 75 ConnectionState getState() { 76 return this._connectionState; 77 } 78 79 /** 80 * 81 */ 82 void setState(ConnectionState state) { 83 if(state == ConnectionState.Secured) { 84 _isSecured = true; 85 } 86 this._connectionState = state; 87 } 88 89 AbstractConnection setCodec(Codec codec) { 90 this._codec = codec; 91 if(codec !is null) { 92 this._encoder = codec.getEncoder(); 93 this._encoder.setBufferSize(_options.getEncoderBufferSize()); 94 this._decoder = codec.getDecoder; 95 } 96 return this; 97 } 98 99 Codec getCodec() { 100 return this._codec; 101 } 102 103 /// 104 AbstractConnection setHandler(NetConnectionHandler handler) { 105 this._netHandler = handler; 106 return this; 107 } 108 109 NetConnectionHandler getHandler() { 110 return _netHandler; 111 } 112 113 bool isConnected() { 114 return _tcp.isConnected(); 115 } 116 117 bool isActive() { 118 // FIXME: Needing refactor or cleanup -@zxp at 8/1/2019, 6:04:44 PM 119 // 120 return _tcp.isConnected(); 121 } 122 123 bool isClosing() { 124 return _tcp.isClosing(); 125 } 126 127 bool isSecured() { 128 return _isSecured; 129 } 130 131 protected DataHandleStatus onDataReceived(ByteBuffer buffer) { 132 version(HUNT_NET_DEBUG) { 133 auto data = cast(ubyte[]) buffer.peekRemaining(); 134 tracef("data received (%d bytes): ", data.length); 135 version(HUNT_NET_DEBUG_MORE) { 136 infof("%(%02X %)", data[0 .. $]); 137 } else { 138 if(data.length<=64) 139 infof("%(%02X %)", data[0 .. $]); 140 else 141 infof("%(%02X %) ...", data[0 .. 64]); 142 } 143 } else version(HUNT_DEBUG) { 144 // auto data = cast(string) buffer.peekRemaining(); 145 // tracef("data received (%d bytes): ", data.length); 146 // infof("%(%02X %)", data[0 .. $]); 147 } 148 149 // synchronized (this) { 150 // import hunt.io.BufferUtils; 151 // // Make usre data and thread safe 152 // handleReceivedData(BufferUtils.clone(buffer)); 153 // } 154 DataHandleStatus status = DataHandleStatus.Done; 155 156 try { 157 // Make usre data and thread safe 158 // status = handleReceivedData(BufferUtils.clone(buffer)); 159 status = handleReceivedData(buffer); 160 } catch(Throwable ex) { 161 warning(ex.msg); 162 warning(ex); 163 164 BufferUtils.clear(buffer); 165 } 166 167 return status; 168 } 169 170 private DataHandleStatus handleReceivedData(ByteBuffer buffer) { 171 DataHandleStatus result = DataHandleStatus.Done; 172 173 if(_decoder !is null) { 174 version(HUNT_NET_DEBUG) { 175 trace("Running decoder..."); 176 } 177 178 result = _decoder.decode(buffer, this); 179 version(HUNT_NET_DEBUG_) info("Decoding done."); 180 181 } else if(_netHandler !is null) { 182 result = _netHandler.messageReceived(this, cast(Object)buffer); 183 } else { 184 // do nothing 185 buffer.clear(); 186 buffer.flip(); 187 } 188 189 return result; 190 } 191 192 /// 193 void close() { 194 version(HUNT_NET_DEBUG) infof("Closing connection %d...The state: %s", this.getId(), _connectionState); 195 if(_connectionState == ConnectionState.Closing || _connectionState == ConnectionState.Closed) 196 return; 197 setState(ConnectionState.Closing); 198 _tcp.close(); 199 } 200 201 /// 202 @property Address localAddress() { 203 return _tcp.localAddress; 204 } 205 206 //// 207 @property Address remoteAddress() { 208 return _tcp.remoteAddress; 209 } 210 211 //// 212 void write(const(ubyte)[] data) { 213 version(HUNT_NET_DEBUG) { 214 tracef("writting data (%d bytes)...", data.length); 215 if(data.length<=64) 216 infof("%(%02X %)", data[0 .. $]); 217 else 218 infof("%(%02X %) ...", data[0 .. 64]); 219 } else version(HUNT_NET_DEBUG_MORE) { 220 tracef("writting data (%d bytes)...", data.length); 221 infof("%(%02X %)", data[0 .. $]); 222 } 223 //if (_tcp !is null && _tcp.isConnected) 224 _tcp.write(data); 225 } 226 227 //// 228 void write(string str) { 229 write(cast(ubyte[]) str); 230 } 231 232 void write(ByteBuffer buffer) { 233 234 version(HUNT_NET_DEBUG) { 235 tracef("writting buffer (%s bytes)...", buffer.toString()); 236 auto data = buffer.peekRemaining(); 237 if(data.length<=64) 238 infof("%(%02X %)", data[0 .. $]); 239 else 240 infof("%(%02X %) ...", data[0 .. 64]); 241 } else version(HUNT_NET_DEBUG_MORE) { 242 tracef("writting buffer (%s bytes)...", buffer.toString()); 243 auto data = buffer.peekRemaining(); 244 infof("%(%02X %)", data[0 .. $]); 245 } else version(HUNT_DEBUG) { 246 // tracef("writting buffer (%s bytes)...", buffer.toString()); 247 } 248 //if (_tcp !is null && _tcp.isConnected) 249 _tcp.write(buffer); 250 } 251 252 void write(ByteBuffer buffer, Callback callback) { 253 // byte[] data = buffer.array; 254 // int start = buffer.position(); 255 // int end = buffer.limit(); 256 257 // write(cast(ubyte[]) data[start .. end]); 258 259 write(cast(ubyte[])buffer.peekRemaining()); 260 callback.succeeded(); 261 } 262 263 void write(ByteBuffer[] buffers, Callback callback) { 264 foreach (ByteBuffer buffer; buffers) { 265 version (HUNT_DEBUG) 266 tracef("writting buffer: %s", buffer.toString()); 267 268 // byte[] data = buffer.array; 269 // int start = buffer.position(); 270 // int end = buffer.limit(); 271 272 // write(cast(ubyte[]) data[start .. end]); 273 write(cast(ubyte[])buffer.peekRemaining()); 274 } 275 callback.succeeded(); 276 } 277 278 /** 279 * {@inheritDoc} 280 */ 281 Object getAttribute(string key) { 282 return getAttribute(key, null); 283 } 284 285 /** 286 * {@inheritDoc} 287 */ 288 Object getAttribute(string key, Object defaultValue) { 289 return attributes.get(key, defaultValue); 290 } 291 292 /** 293 * {@inheritDoc} 294 */ 295 Object setAttribute(string key, Object value) { 296 auto itemPtr = key in attributes; 297 Object oldValue = null; 298 if(itemPtr !is null) { 299 oldValue = *itemPtr; 300 } 301 attributes[key] = value; 302 return oldValue; 303 } 304 305 /** 306 * {@inheritDoc} 307 */ 308 Object setAttribute(string key) { 309 return setAttribute(key, Boolean.TRUE); 310 } 311 312 /** 313 * {@inheritDoc} 314 */ 315 Object setAttributeIfAbsent(string key, Object value) { 316 auto itemPtr = key in attributes; 317 if(itemPtr is null) { 318 attributes[key] = value; 319 return null; 320 } else { 321 return *itemPtr; 322 } 323 } 324 325 /** 326 * {@inheritDoc} 327 */ 328 Object setAttributeIfAbsent(string key) { 329 return setAttributeIfAbsent(key, Boolean.TRUE); 330 } 331 332 /** 333 * {@inheritDoc} 334 */ 335 Object removeAttribute(string key) { 336 auto itemPtr = key in attributes; 337 if(itemPtr is null) { 338 return null; 339 } else { 340 Object oldValue = *itemPtr; 341 attributes.remove(key); 342 return oldValue; 343 } 344 } 345 346 /** 347 * {@inheritDoc} 348 */ 349 bool removeAttribute(string key, Object value) { 350 auto itemPtr = key in attributes; 351 if(itemPtr !is null && *itemPtr == value) { 352 attributes.remove(key); 353 return true; 354 } 355 return false; 356 } 357 358 /** 359 * {@inheritDoc} 360 */ 361 bool replaceAttribute(string key, Object oldValue, Object newValue) { 362 auto itemPtr = key in attributes; 363 if(itemPtr !is null && *itemPtr == oldValue) { 364 attributes[key] = newValue; 365 return true; 366 } 367 return false; 368 } 369 370 /** 371 * {@inheritDoc} 372 */ 373 bool containsAttribute(string key) { 374 auto itemPtr = key in attributes; 375 return itemPtr !is null; 376 } 377 378 /** 379 * {@inheritDoc} 380 */ 381 string[] getAttributeKeys() { 382 return attributes.keys(); 383 } 384 385 void write(Object message) { 386 encode(message); 387 } 388 389 void encode(Object message) { 390 try { 391 if(this._encoder is null) { 392 throw new IOException("No encoder set."); 393 } else { 394 this._encoder.encode(message, this); 395 } 396 } catch (Exception t) { 397 version(HUNT_DEBUG) { 398 string msg = format("Connection %d exception: %s", this.getId(), t.msg); 399 warning(msg); 400 } 401 version(HUNT_NET_DEBUG_MORE) warning(t); 402 notifyException(t); 403 } 404 } 405 406 // void encode(ByteBuffer message) { 407 // try { 408 // _config.getEncoder().encode(message, this); 409 // } catch (Exception t) { 410 // _netHandler.notifyExceptionCaught(this, t); 411 // } 412 // } 413 414 // void encode(ByteBuffer[] messages) { 415 // try { 416 // foreach (ByteBuffer message; messages) { 417 // this._encoder.encode(message, this); 418 // } 419 // } catch (Exception t) { 420 // _netHandler.exceptionCaught(this, t); 421 // } 422 // } 423 424 // void notifyMessageReceived(Object message) { 425 // implementationMissing(false); 426 // } 427 428 protected void notifyClose() { 429 this._connectionState = ConnectionState.Closed; 430 if(_netHandler !is null) 431 _netHandler.connectionClosed(this); 432 } 433 434 void notifyException(Exception t) { 435 if(_netHandler !is null) 436 _netHandler.exceptionCaught(this, t); 437 } 438 439 version(HUNT_METRIC) { 440 override string toString() { 441 return ""; 442 } 443 } 444 } 445 446