1 module hunt.net.TcpConnection;
2 
3 import hunt.net.AbstractConnection;
4 import hunt.net.Connection;
5 import hunt.net.codec;
6 import hunt.net.TcpSslOptions;
7 
8 import hunt.collection;
9 import hunt.io.TcpStream;
10 import hunt.io.channel;
11 import hunt.Exceptions;
12 import hunt.Functions;
13 import hunt.logging;
14 import hunt.util.Common;
15 import hunt.util.DateTime;
16 
17 import core.atomic;
18 import core.time;
19 import std.socket;
20 
21 
22 /**
23  * Represents a socket-like interface to a TCP connection on either the
24  * client or the server side.
25  */
26 class TcpConnection : AbstractConnection {
27 
28 version(HUNT_METRIC) {
29     private long openTime;
30     private long closeTime;
31     private long lastReadTime;
32     private long lastWrittenTime;
33     private size_t readBytes = 0;
34     private size_t writtenBytes = 0;
35 } 
36 
37     protected shared bool _isShutdownOutput = false;
38     protected shared bool _isShutdownInput = false;
39     protected shared bool _isWaitingForClose = false;
40 
41     this(int connectionId, TcpSslOptions options, NetConnectionHandler handler, Codec codec, TcpStream tcp) {
42         super(connectionId, options, tcp, codec, handler);
43         version(HUNT_METRIC) this.openTime = DateTime.currentTimeMillis();
44         version(HUNT_DEBUG) {
45             tracef("Initializing TCP connection %d...", connectionId);
46         }
47     }  
48 
49 version(HUNT_METRIC) {
50 
51     override DataHandleStatus onDataReceived(ByteBuffer buffer) {
52         readBytes += buffer.limit();
53         return super.onDataReceived(buffer);
54     }
55 
56     override void write(const ubyte[] data) {
57         writtenBytes += data.length;
58         super.write(data);
59     }
60     alias write = AbstractConnection.write; 
61 
62     long getOpenTime() {
63         return openTime;
64     }
65 
66     long getCloseTime() {
67         return closeTime;
68     }
69 
70     long getDuration() {
71         if (closeTime > 0) {
72             return closeTime - openTime;
73         } else {
74             return DateTime.currentTimeMillis - openTime;
75         }
76     }
77 
78     long getLastReadTime() {
79         return lastReadTime;
80     }
81 
82     long getLastWrittenTime() {
83         return lastWrittenTime;
84     }
85 
86     long getLastActiveTime() {
87         import std.algorithm;
88         return max(max(lastReadTime, lastWrittenTime), openTime);
89     }
90 
91     size_t getReadBytes() {
92         return readBytes;
93     }
94 
95     size_t getWrittenBytes() {
96         return writtenBytes;
97     }
98 
99     long getIdleTimeout() {
100         return DateTime.currentTimeMillis - getLastActiveTime();
101     }
102 
103     void reset() {
104         readBytes = 0;
105         writtenBytes = 0;
106     }
107 
108     override string toString() {
109         import std.conv;
110         return "[connectionId=" ~ _connectionId.to!string() ~ ", openTime="
111                 ~ openTime.to!string() ~ ", closeTime="
112                 ~ closeTime.to!string() ~ ", duration=" ~ getDuration().to!string()
113                 ~ ", readBytes=" ~ readBytes.to!string() ~ ", writtenBytes=" ~ writtenBytes.to!string() ~ "]";
114     }
115 }
116 
117     // override void close() {
118     //     // if(cas(&_isClosed, false, true)) {
119     //         try {
120     //             super.close();                
121     //         } catch (AsynchronousCloseException e) {
122     //             warningf("The connection %d asynchronously close exception", _connectionId);
123     //         } catch (IOException e) {
124     //             errorf("The connection %d close exception: %s", _connectionId, e.msg);
125     //         } 
126     //         // finally {
127     //         //     _eventHandler.notifyConnectionClosed(this);
128     //         // }
129     //     // } else {
130     //     //     infof("The connection %d already closed", _connectionId);
131     //     // }
132     // }
133 
134     // override void closeNow() {
135     //     this.close();
136     // }
137 
138     override protected void notifyClose() {
139         version(HUNT_METRIC) {
140             closeTime = DateTime.currentTimeMillis();
141             // version(HUNT_DEBUG) 
142             // tracef("The connection %d closed.", _connectionId);
143         } else {
144             version(HUNT_DEBUG) tracef("The connection %d closed.", _connectionId);
145         }
146         super.notifyClose();
147     }
148 
149     private void shutdownSocketChannel() {
150         shutdownOutput();
151         shutdownInput();
152     }
153 
154     void shutdownOutput() {
155         if (_isShutdownOutput) {
156             tracef("The connection %d is already shutdown output", _connectionId);
157         } else {
158             _isShutdownOutput = true;
159             try {
160                 _tcp.shutdownOutput();
161                 tracef("The connection %d is shutdown output", _connectionId);
162             } catch (ClosedChannelException e) {
163                 warningf("Shutdown output exception. The connection %d is closed", _connectionId);
164             } catch (IOException e) {
165                 errorf("The connection %d shutdown output I/O exception. %s", _connectionId, e.message);
166             }
167         }
168     }
169 
170     void shutdownInput() {
171         if (_isShutdownInput) {
172             tracef("The connection %d is already shutdown input", _connectionId);
173         } else {
174             _isShutdownInput = true;
175             try {
176                 _tcp.shutdownInput();
177                 tracef("The connection %d is shutdown input", _connectionId);
178             } catch (ClosedChannelException e) {
179                 warningf("Shutdown input exception. The connection %d is closed", _connectionId);
180             } catch (IOException e) {
181                 errorf("The connection %d shutdown input I/O exception. %s", _connectionId, e.message);
182             }
183         }
184     }    
185 
186     bool isShutdownOutput() {
187         return _isShutdownOutput;
188     }
189 
190     bool isShutdownInput() {
191         return _isShutdownInput;
192     }
193 
194     bool isWaitingForClose() {
195         return _isWaitingForClose;
196     }
197 
198     Address getLocalAddress() {
199         return localAddress();
200     }
201 
202     Address getRemoteAddress() {
203         return remoteAddress();
204     }
205 
206     Duration getMaxIdleTimeout() {
207         return _options.getIdleTimeout();
208     }
209 
210 }