1 module hunt.net.TcpConnection; 2 3 import hunt.net.AbstractConnection; 4 import hunt.net.Connection; 5 import hunt.net.codec; 6 import hunt.net.TcpSslOptions; 7 8 import hunt.collection; 9 import hunt.io.TcpStream; 10 import hunt.io.channel; 11 import hunt.Exceptions; 12 import hunt.Functions; 13 import hunt.logging; 14 import hunt.util.Common; 15 import hunt.util.DateTime; 16 17 import core.atomic; 18 import core.time; 19 import std.socket; 20 21 22 /** 23 * Represents a socket-like interface to a TCP connection on either the 24 * client or the server side. 25 */ 26 class TcpConnection : AbstractConnection { 27 28 version(HUNT_METRIC) { 29 private long openTime; 30 private long closeTime; 31 private long lastReadTime; 32 private long lastWrittenTime; 33 private size_t readBytes = 0; 34 private size_t writtenBytes = 0; 35 } 36 37 protected shared bool _isShutdownOutput = false; 38 protected shared bool _isShutdownInput = false; 39 protected shared bool _isWaitingForClose = false; 40 41 this(int connectionId, TcpSslOptions options, NetConnectionHandler handler, Codec codec, TcpStream tcp) { 42 super(connectionId, options, tcp, codec, handler); 43 version(HUNT_METRIC) this.openTime = DateTime.currentTimeMillis(); 44 version(HUNT_DEBUG) { 45 tracef("Initializing TCP connection %d...", connectionId); 46 } 47 } 48 49 version(HUNT_METRIC) { 50 51 override DataHandleStatus onDataReceived(ByteBuffer buffer) { 52 readBytes += buffer.limit(); 53 return super.onDataReceived(buffer); 54 } 55 56 override void write(const ubyte[] data) { 57 writtenBytes += data.length; 58 super.write(data); 59 } 60 alias write = AbstractConnection.write; 61 62 long getOpenTime() { 63 return openTime; 64 } 65 66 long getCloseTime() { 67 return closeTime; 68 } 69 70 long getDuration() { 71 if (closeTime > 0) { 72 return closeTime - openTime; 73 } else { 74 return DateTime.currentTimeMillis - openTime; 75 } 76 } 77 78 long getLastReadTime() { 79 return lastReadTime; 80 } 81 82 long getLastWrittenTime() { 83 return lastWrittenTime; 84 } 85 86 long getLastActiveTime() { 87 import std.algorithm; 88 return max(max(lastReadTime, lastWrittenTime), openTime); 89 } 90 91 size_t getReadBytes() { 92 return readBytes; 93 } 94 95 size_t getWrittenBytes() { 96 return writtenBytes; 97 } 98 99 long getIdleTimeout() { 100 return DateTime.currentTimeMillis - getLastActiveTime(); 101 } 102 103 void reset() { 104 readBytes = 0; 105 writtenBytes = 0; 106 } 107 108 override string toString() { 109 import std.conv; 110 return "[connectionId=" ~ _connectionId.to!string() ~ ", openTime=" 111 ~ openTime.to!string() ~ ", closeTime=" 112 ~ closeTime.to!string() ~ ", duration=" ~ getDuration().to!string() 113 ~ ", readBytes=" ~ readBytes.to!string() ~ ", writtenBytes=" ~ writtenBytes.to!string() ~ "]"; 114 } 115 } 116 117 // override void close() { 118 // // if(cas(&_isClosed, false, true)) { 119 // try { 120 // super.close(); 121 // } catch (AsynchronousCloseException e) { 122 // warningf("The connection %d asynchronously close exception", _connectionId); 123 // } catch (IOException e) { 124 // errorf("The connection %d close exception: %s", _connectionId, e.msg); 125 // } 126 // // finally { 127 // // _eventHandler.notifyConnectionClosed(this); 128 // // } 129 // // } else { 130 // // infof("The connection %d already closed", _connectionId); 131 // // } 132 // } 133 134 // override void closeNow() { 135 // this.close(); 136 // } 137 138 override protected void notifyClose() { 139 version(HUNT_METRIC) { 140 closeTime = DateTime.currentTimeMillis(); 141 // version(HUNT_DEBUG) 142 // tracef("The connection %d closed.", _connectionId); 143 } else { 144 version(HUNT_DEBUG) tracef("The connection %d closed.", _connectionId); 145 } 146 super.notifyClose(); 147 } 148 149 private void shutdownSocketChannel() { 150 shutdownOutput(); 151 shutdownInput(); 152 } 153 154 void shutdownOutput() { 155 if (_isShutdownOutput) { 156 tracef("The connection %d is already shutdown output", _connectionId); 157 } else { 158 _isShutdownOutput = true; 159 try { 160 _tcp.shutdownOutput(); 161 tracef("The connection %d is shutdown output", _connectionId); 162 } catch (ClosedChannelException e) { 163 warningf("Shutdown output exception. The connection %d is closed", _connectionId); 164 } catch (IOException e) { 165 errorf("The connection %d shutdown output I/O exception. %s", _connectionId, e.message); 166 } 167 } 168 } 169 170 void shutdownInput() { 171 if (_isShutdownInput) { 172 tracef("The connection %d is already shutdown input", _connectionId); 173 } else { 174 _isShutdownInput = true; 175 try { 176 _tcp.shutdownInput(); 177 tracef("The connection %d is shutdown input", _connectionId); 178 } catch (ClosedChannelException e) { 179 warningf("Shutdown input exception. The connection %d is closed", _connectionId); 180 } catch (IOException e) { 181 errorf("The connection %d shutdown input I/O exception. %s", _connectionId, e.message); 182 } 183 } 184 } 185 186 bool isShutdownOutput() { 187 return _isShutdownOutput; 188 } 189 190 bool isShutdownInput() { 191 return _isShutdownInput; 192 } 193 194 bool isWaitingForClose() { 195 return _isWaitingForClose; 196 } 197 198 Address getLocalAddress() { 199 return localAddress(); 200 } 201 202 Address getRemoteAddress() { 203 return remoteAddress(); 204 } 205 206 Duration getMaxIdleTimeout() { 207 return _options.getIdleTimeout(); 208 } 209 210 }