Coverage Report - ca.uhn.hl7v2.hoh.raw.client.HohRawClientMultithreaded
 
Classes in this File Line Coverage Branch Coverage Complexity
HohRawClientMultithreaded
73%
52/71
70%
17/24
3.182
HohRawClientMultithreaded$1
N/A
N/A
3.182
HohRawClientMultithreaded$TimeoutTask
85%
24/28
80%
8/10
3.182
 
 1  
 package ca.uhn.hl7v2.hoh.raw.client;
 2  
 
 3  
 import java.io.IOException;
 4  
 import java.net.Socket;
 5  
 import java.net.URL;
 6  
 import java.text.SimpleDateFormat;
 7  
 import java.util.ArrayList;
 8  
 import java.util.Date;
 9  
 import java.util.IdentityHashMap;
 10  
 import java.util.Iterator;
 11  
 import java.util.List;
 12  
 import java.util.Map;
 13  
 import java.util.Map.Entry;
 14  
 import java.util.concurrent.Executors;
 15  
 import java.util.concurrent.ScheduledExecutorService;
 16  
 import java.util.concurrent.TimeUnit;
 17  
 
 18  
 import ca.uhn.hl7v2.hoh.api.IClientMultithreaded;
 19  
 import ca.uhn.hl7v2.hoh.util.Validate;
 20  
 
 21  
 /**
 22  
  * <p>
 23  
  * Raw message sender using the HL7 over HTTP specification which uses a
 24  
  * {@link ScheduledExecutorService} to provide advanced functionality such as
 25  
  * persistent connections which time out and close automatically.
 26  
  * </p>
 27  
  * <p>
 28  
  * This connector uses an executor service which can start worker threads, so
 29  
  * use caution if embedding within a J2EE container.
 30  
  * </p>
 31  
  */
 32  245
 public class HohRawClientMultithreaded extends AbstractRawClient implements IClientMultithreaded {
 33  
 
 34  
         /**
 35  
          * Default {@link #setSocketTimeout(long) Socket Timeout}, 10000ms
 36  
          */
 37  
         public static final long DEFAULT_SOCKET_TIMEOUT = 10000;
 38  
 
 39  5
         private static final org.slf4j.Logger ourLog = org.slf4j.LoggerFactory.getLogger(HohRawClientMultithreaded.class);
 40  
 
 41  
         private final ScheduledExecutorService myExecutorService;
 42  40
         private Map<Socket, Long> myIdleSocketsToTimeBecameIdle = new IdentityHashMap<Socket, Long>();
 43  40
         private final SimpleDateFormat myLogTimeFormat = new SimpleDateFormat("HH:mm:ss,SSS");
 44  
         private boolean myReapingScheduled;
 45  40
         private long mySocketTimeout = DEFAULT_SOCKET_TIMEOUT;
 46  
 
 47  
         /**
 48  
          * Constructor
 49  
          */
 50  40
         public HohRawClientMultithreaded() {
 51  40
                 myExecutorService = Executors.newScheduledThreadPool(1);
 52  40
         }
 53  
 
 54  
         /**
 55  
          * Constructor
 56  
          * 
 57  
          * @param theHost
 58  
          *            The HOST (name/address). E.g. "192.168.1.1"
 59  
          * @param thePort
 60  
          *            The PORT. E.g. "8080"
 61  
          * @param thePath
 62  
          *            The path being requested (must either be blank or start with
 63  
          *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
 64  
          */
 65  
         public HohRawClientMultithreaded(String theHost, int thePort, String thePath) {
 66  20
                 this();
 67  
 
 68  20
                 setHost(theHost);
 69  20
                 setPort(thePort);
 70  20
                 setUriPath(thePath);
 71  20
         }
 72  
 
 73  
         /**
 74  
          * Constructor
 75  
          * 
 76  
          * @param theHost
 77  
          *            The HOST (name/address). E.g. "192.168.1.1"
 78  
          * @param thePort
 79  
          *            The PORT. E.g. "8080"
 80  
          * @param theUriPath
 81  
          *            The URI path being requested (must either be blank or start with
 82  
          *            '/' and contain a path). E.g. "/Apps/Receiver.jsp"
 83  
          * @param theExecutorService
 84  
          *            The executor service to use for detecting stale sockets
 85  
          */
 86  
         public HohRawClientMultithreaded(String theHost, int thePort, String theUriPath, ScheduledExecutorService theExecutorService) {
 87  0
                 super(theHost, thePort, theUriPath);
 88  0
                 Validate.notNull(theExecutorService, "executorService");
 89  
 
 90  0
                 myExecutorService = theExecutorService;
 91  0
         }
 92  
 
 93  
         /**
 94  
          * Constructor
 95  
          * 
 96  
          * @param theUrl
 97  
          *            The URL to connect to
 98  
          * @param theExecutorService
 99  
          *            The executor service to use for detecting stale sockets
 100  
          */
 101  
         public HohRawClientMultithreaded(URL theUrl) {
 102  0
                 this();
 103  0
                 setUrl(theUrl);
 104  0
         }
 105  
 
 106  
         /**
 107  
          * Constructor
 108  
          * 
 109  
          * @param theUrl
 110  
          *            The URL to connect to
 111  
          * @param theExecutorService
 112  
          *            The executor service to use for detecting stale sockets
 113  
          */
 114  
         public HohRawClientMultithreaded(URL theUrl, ScheduledExecutorService theExecutorService) {
 115  0
                 super(theUrl);
 116  0
                 Validate.notNull(theExecutorService, "executorService");
 117  
 
 118  0
                 myExecutorService = theExecutorService;
 119  0
         }
 120  
 
 121  
         @Override
 122  
         protected synchronized Socket provideSocket() throws IOException {
 123  
                 Socket retVal;
 124  75
                 if (myIdleSocketsToTimeBecameIdle.size() == 0) {
 125  55
                         ourLog.info("Creating new remote connection to {}:{}", getHost(), getPort());
 126  55
                         retVal = connect();
 127  
                 } else {
 128  20
                         retVal = myIdleSocketsToTimeBecameIdle.keySet().iterator().next();
 129  20
                         myIdleSocketsToTimeBecameIdle.remove(retVal);
 130  20
                         if (retVal.isClosed()) {
 131  0
                                 ourLog.trace("Found existing remote connection to {}:{} but it was closed, to going to open a new one", getHost(), getPort());
 132  0
                                 retVal = connect();
 133  
                         } else {
 134  20
                                 ourLog.trace("Returning existing remote connection to {}:{}", getHost(), getPort());
 135  
                         }
 136  
                 }
 137  70
                 return retVal;
 138  
         }
 139  
 
 140  
         /**
 141  
          * Returns a socket to the pool. If the socket is closed, it will
 142  
          * not be returned.
 143  
          */
 144  
         @Override
 145  
         protected synchronized void returnSocket(Socket theSocket) {
 146  70
                 if (theSocket.isClosed()) {
 147  20
                         return;
 148  
                 }
 149  
                 
 150  50
                 long now = System.currentTimeMillis();
 151  
 
 152  
                 // TODO: reap immediately if timeout is 0
 153  
                 
 154  50
                 if (ourLog.isDebugEnabled()) {
 155  0
                         if (mySocketTimeout == -1) {
 156  0
                                 ourLog.debug("Returning socket, will not attempt to reap");
 157  
                         } else {
 158  0
                                 ourLog.debug("Returning socket, will be eligible for reaping at " + myLogTimeFormat.format(new Date(now + mySocketTimeout)));
 159  
                         }
 160  
                 }
 161  
 
 162  50
                 myIdleSocketsToTimeBecameIdle.put(theSocket, now);
 163  50
                 scheduleReaping();
 164  50
         }
 165  
 
 166  
         private void scheduleReaping() {
 167  105
                 long now = System.currentTimeMillis();
 168  105
                 if (myReapingScheduled) {
 169  15
                         ourLog.debug("Reaping already scheduled");
 170  15
                         return;
 171  
                 }
 172  
 
 173  90
                 if (myIdleSocketsToTimeBecameIdle.size() < 1) {
 174  40
                         return;
 175  
                 }
 176  
 
 177  50
                 if (mySocketTimeout == -1) {
 178  10
                         return;
 179  
                 }
 180  
                 
 181  40
                 long earliestReapingTime = Long.MAX_VALUE;
 182  40
                 for (Long next : myIdleSocketsToTimeBecameIdle.values()) {
 183  40
                         long nextReapingTime = next + mySocketTimeout;
 184  40
                         if (nextReapingTime < earliestReapingTime) {
 185  40
                                 earliestReapingTime = nextReapingTime;
 186  
                         }
 187  40
                 }
 188  
 
 189  40
                 long delay = earliestReapingTime - now;
 190  40
                 if (ourLog.isDebugEnabled()) {
 191  0
                         ourLog.debug("Scheduling socket reaping in {} ms at {}", delay, myLogTimeFormat.format(new Date(earliestReapingTime)));
 192  
                 }
 193  
 
 194  40
                 myExecutorService.schedule(new TimeoutTask(), delay, TimeUnit.MILLISECONDS);
 195  40
                 myReapingScheduled = true;
 196  40
         }
 197  
 
 198  
         /**
 199  
          * {@inheritDoc}
 200  
          */
 201  
         public long getSocketTimeout() {
 202  0
                 return mySocketTimeout;
 203  
         }
 204  
 
 205  
         /**
 206  
          * {@inheritDoc}
 207  
          */
 208  
         public synchronized void setSocketTimeout(long theSocketTimeout) {
 209  20
                 if (mySocketTimeout < -1) {
 210  0
                         throw new IllegalArgumentException("Socket timeout must be -1, 0, or a positive integer");
 211  
                 }
 212  20
                 mySocketTimeout = theSocketTimeout;
 213  20
                 myReapingScheduled = false;
 214  20
                 scheduleReaping();
 215  20
         }
 216  
 
 217  80
         private class TimeoutTask implements Runnable {
 218  
                 public void run() {
 219  
 
 220  35
                         if (mySocketTimeout == -1) {
 221  0
                                 return;
 222  
                         }
 223  
                         
 224  35
                         ourLog.debug("Beginning socket reaping pass");
 225  
                         try {
 226  
 
 227  35
                                 List<Socket> socketsToClose = new ArrayList<Socket>();
 228  35
                                 long closeIfActiveBefore = System.currentTimeMillis() - mySocketTimeout;
 229  35
                                 synchronized (HohRawClientMultithreaded.this) {
 230  
 
 231  35
                                         for (Iterator<Map.Entry<Socket, Long>> iter = myIdleSocketsToTimeBecameIdle.entrySet().iterator(); iter.hasNext();) {
 232  35
                                                 Entry<Socket, Long> nextEntry = iter.next();
 233  35
                                                 if (nextEntry.getValue() <= closeIfActiveBefore) {
 234  20
                                                         Socket key = nextEntry.getKey();
 235  20
                                                         socketsToClose.add(key);
 236  20
                                                         ourLog.info("Closing idle socket with local port {} because it has been idle since {}", key.getLocalPort(), new Date(nextEntry.getValue()));
 237  20
                                                         iter.remove();
 238  20
                                                 } else {
 239  15
                                                         if (ourLog.isDebugEnabled()) {
 240  0
                                                                 ourLog.debug("Next socket has " + (nextEntry.getValue() - closeIfActiveBefore) + "ms remaining");
 241  
                                                         }
 242  
                                                 }
 243  35
                                         }
 244  
 
 245  35
                                         myReapingScheduled = false;
 246  35
                                         scheduleReaping();
 247  35
                                 }
 248  
 
 249  35
                                 for (Socket next : socketsToClose) {
 250  20
                                         closeSocket(next);
 251  20
                                 }
 252  0
                         } catch (Throwable e) {
 253  0
                                 ourLog.error("Failure during reaper pass", e);
 254  35
                         }
 255  35
                 }
 256  
         }
 257  
 
 258  
 }