1 | |
package ca.uhn.hl7v2.hoh.raw.client; |
2 | |
|
3 | |
import java.io.IOException; |
4 | |
import java.net.Socket; |
5 | |
import java.net.URL; |
6 | |
import java.text.SimpleDateFormat; |
7 | |
import java.util.ArrayList; |
8 | |
import java.util.Date; |
9 | |
import java.util.IdentityHashMap; |
10 | |
import java.util.Iterator; |
11 | |
import java.util.List; |
12 | |
import java.util.Map; |
13 | |
import java.util.Map.Entry; |
14 | |
import java.util.concurrent.Executors; |
15 | |
import java.util.concurrent.ScheduledExecutorService; |
16 | |
import java.util.concurrent.TimeUnit; |
17 | |
|
18 | |
import ca.uhn.hl7v2.hoh.api.IClientMultithreaded; |
19 | |
import ca.uhn.hl7v2.hoh.util.Validate; |
20 | |
|
21 | |
|
22 | |
|
23 | |
|
24 | |
|
25 | |
|
26 | |
|
27 | |
|
28 | |
|
29 | |
|
30 | |
|
31 | |
|
32 | 245 | public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded { |
33 | |
|
34 | |
|
35 | |
|
36 | |
|
37 | |
public static final long DEFAULT_SOCKET_TIMEOUT = 10000; |
38 | |
|
39 | 5 | private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class); |
40 | |
|
41 | |
private final ScheduledExecutorService myExecutorService; |
42 | 40 | private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>(); |
43 | 40 | private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS"); |
44 | |
private boolean myReapingScheduled; |
45 | 40 | private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT; |
46 | |
|
47 | |
|
48 | |
|
49 | |
|
50 | 40 | public HohRawClientMultithreaded() { |
51 | 40 | myExecutorService = Executors.newScheduledThreadPool(1); |
52 | 40 | } |
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
|
58 | |
|
59 | |
|
60 | |
|
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
public HohRawClientMultithreaded(String theHost, int thePort, String thePath) { |
66 | 20 | this(); |
67 | |
|
68 | 20 | setHost(theHost); |
69 | 20 | setPort(thePort); |
70 | 20 | setUriPath(thePath); |
71 | 20 | } |
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
|
84 | |
|
85 | |
|
86 | |
public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) { |
87 | 0 | super(theHost, thePort, theUriPath); |
88 | 0 | Validate.notNull(theExecutorService, "executorService"); |
89 | |
|
90 | 0 | myExecutorService = theExecutorService; |
91 | 0 | } |
92 | |
|
93 | |
|
94 | |
|
95 | |
|
96 | |
|
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
public HohRawClientMultithreaded(URL theUrl) { |
102 | 0 | this(); |
103 | 0 | setUrl(theUrl); |
104 | 0 | } |
105 | |
|
106 | |
|
107 | |
|
108 | |
|
109 | |
|
110 | |
|
111 | |
|
112 | |
|
113 | |
|
114 | |
public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) { |
115 | 0 | super(theUrl); |
116 | 0 | Validate.notNull(theExecutorService, "executorService"); |
117 | |
|
118 | 0 | myExecutorService = theExecutorService; |
119 | 0 | } |
120 | |
|
121 | |
@Override |
122 | |
protected synchronized Socket provideSocket() throws IOException { |
123 | |
Socket retVal; |
124 | 75 | if (myIdleSocketsToTimeBecameIdle.size() == 0) { |
125 | 55 | ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort()); |
126 | 55 | retVal = connect(); |
127 | |
} else { |
128 | 20 | retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next(); |
129 | 20 | myIdleSocketsToTimeBecameIdle.remove(retVal); |
130 | 20 | if (retVal.isClosed()) { |
131 | 0 | ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort()); |
132 | 0 | retVal = connect(); |
133 | |
} else { |
134 | 20 | ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort()); |
135 | |
} |
136 | |
} |
137 | 70 | return retVal; |
138 | |
} |
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | |
|
144 | |
@Override |
145 | |
protected synchronized void returnSocket(Socket theSocket) { |
146 | 70 | if (theSocket.isClosed()) { |
147 | 20 | return; |
148 | |
} |
149 | |
|
150 | 50 | long now = System.currentTimeMillis(); |
151 | |
|
152 | |
|
153 | |
|
154 | 50 | if (ourLog.isDebugEnabled()) { |
155 | 0 | if (mySocketTimeout == -1) { |
156 | 0 | ourLog.debug("Returning socket, will not attempt to reap"); |
157 | |
} else { |
158 | 0 | ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout))); |
159 | |
} |
160 | |
} |
161 | |
|
162 | 50 | myIdleSocketsToTimeBecameIdle.put(theSocket, now); |
163 | 50 | scheduleReaping(); |
164 | 50 | } |
165 | |
|
166 | |
private void scheduleReaping() { |
167 | 105 | long now = System.currentTimeMillis(); |
168 | 105 | if (myReapingScheduled) { |
169 | 15 | ourLog.debug("Reaping already scheduled"); |
170 | 15 | return; |
171 | |
} |
172 | |
|
173 | 90 | if (myIdleSocketsToTimeBecameIdle.size() < 1) { |
174 | 40 | return; |
175 | |
} |
176 | |
|
177 | 50 | if (mySocketTimeout == -1) { |
178 | 10 | return; |
179 | |
} |
180 | |
|
181 | 40 | long earliestReapingTime = Long.MAX_VALUE; |
182 | 40 | for (Long next : myIdleSocketsToTimeBecameIdle.values()) { |
183 | 40 | long nextReapingTime = next + mySocketTimeout; |
184 | 40 | if (nextReapingTime < earliestReapingTime) { |
185 | 40 | earliestReapingTime = nextReapingTime; |
186 | |
} |
187 | 40 | } |
188 | |
|
189 | 40 | long delay = earliestReapingTime - now; |
190 | 40 | if (ourLog.isDebugEnabled()) { |
191 | 0 | ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime))); |
192 | |
} |
193 | |
|
194 | 40 | myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS); |
195 | 40 | myReapingScheduled = true; |
196 | 40 | } |
197 | |
|
198 | |
|
199 | |
|
200 | |
|
201 | |
public long getSocketTimeout() { |
202 | 0 | return mySocketTimeout; |
203 | |
} |
204 | |
|
205 | |
|
206 | |
|
207 | |
|
208 | |
public synchronized void setSocketTimeout(long theSocketTimeout) { |
209 | 20 | if (mySocketTimeout < -1) { |
210 | 0 | throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer"); |
211 | |
} |
212 | 20 | mySocketTimeout = theSocketTimeout; |
213 | 20 | myReapingScheduled = false; |
214 | 20 | scheduleReaping(); |
215 | 20 | } |
216 | |
|
217 | 80 | private class TimeoutTask implements Runnable { |
218 | |
public void run() { |
219 | |
|
220 | 35 | if (mySocketTimeout == -1) { |
221 | 0 | return; |
222 | |
} |
223 | |
|
224 | 35 | ourLog.debug("Beginning socket reaping pass"); |
225 | |
try { |
226 | |
|
227 | 35 | List<Socket> socketsToClose = new ArrayList<Socket>(); |
228 | 35 | long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout; |
229 | 35 | synchronized (HohRawClientMultithreaded.this) { |
230 | |
|
231 | 35 | for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) { |
232 | 35 | Entry<Socket, Long> nextEntry = iter.next(); |
233 | 35 | if (nextEntry.getValue() <= closeIfActiveBefore) { |
234 | 20 | Socket key = nextEntry.getKey(); |
235 | 20 | socketsToClose.add(key); |
236 | 20 | ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue())); |
237 | 20 | iter.remove(); |
238 | 20 | } else { |
239 | 15 | if (ourLog.isDebugEnabled()) { |
240 | 0 | ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining"); |
241 | |
} |
242 | |
} |
243 | 35 | } |
244 | |
|
245 | 35 | myReapingScheduled = false; |
246 | 35 | scheduleReaping(); |
247 | 35 | } |
248 | |
|
249 | 35 | for (Socket next : socketsToClose) { |
250 | 20 | closeSocket(next); |
251 | 20 | } |
252 | 0 | } catch (Throwable e) { |
253 | 0 | ourLog.error("Failure during reaper pass", e); |
254 | 35 | } |
255 | 35 | } |
256 | |
} |
257 | |
|
258 | |
} |