001 
002 /**
003  * Title:        Advanced Network Client Sample<p>
004  * Description:  <p>
005  * Copyright:    Copyright (C) 2009 Alexey Veremenko<p>
006  * Company:      <p>
007  @author Alexey Veremenko
008  @version 1.0
009  */
010 package networking.base;
011 
012 import networking.protocol.*;
013 
014 import java.net.*;
015 import java.io.*;
016 import java.util.*;
017 
018 /**
019  * SocketThread - this class represents a thread which owns a socket and
020  * maintains an additional message queue
021  */
022 public abstract class SocketThread extends ForkedThread
023 {
024     /**
025      * Sleep value used in wait message loop (milliseconds)
026      */
027     public static final long s_timeSleep = 10;
028 
029     /**
030      * Socket
031      */
032     private Socket m_socket = null;
033 
034     /**
035      * Output socket stream
036      */
037     private OutputStream m_out = null;
038 
039     /**
040      * Input socket stream
041      */
042     private InputStream m_in = null;
043 
044     /**
045      * Output stream for internal queue processing
046      */
047     private PipedOutputStream m_innerOut = null;
048 
049     /**
050      * Input stream for internal queue processing
051      */
052     private PipedInputStream m_innerIn = null;
053 
054     /**
055      * Exit flag
056      */
057     private boolean m_exit = false;
058 
059     /**
060      * Maximum idle time (milliseconds) after which the thread exits
061      */
062     private long m_timeOut = 1000L 60 10;  // 10 min
063 
064     /**
065      * Construct new SocketThread object
066      */
067     public SocketThread()
068     {
069     }
070 
071     /**
072      * Construct new SocketThread object
073      @param socket socket
074      */
075     public SocketThread(Socket socket)
076     {
077         m_socket = socket;
078     }
079 
080     /**
081      * Copy constructor
082      @param st SocketThread object
083      */
084     public SocketThread(SocketThread t)
085     {
086         m_fork = CHILD;
087         m_socket = t.m_socket;
088         m_innerOut = t.m_innerOut;
089         m_innerIn = t.m_innerIn;
090         m_out = t.m_out;
091         m_in = t.m_in;
092     }
093 
094     /**
095      * Attach socket after construction
096      @param socket socket
097      */
098     protected void setSocket(Socket socket)
099     {
100         if (m_socket != null)
101             throw new UnsupportedOperationException(
102                   "The socket was set by constructor");
103 
104         m_socket = socket;
105     }
106 
107     /**
108      * Process a message
109      @param msg message
110      */
111     protected void processMessage(Message msgthrows Exception
112     {
113         switch (msg.getType())
114         {
115             /* Exit messages */
116             case Message.BYE:
117             {
118                 exit();
119                 break;
120             }
121             case Message.DISCONNECT:
122             {
123                 // Inform remote end
124                 send(new Message(Message.BYE));
125                 exit();
126                 break;
127             }
128             case Message.TIMEOUT:
129             {
130                 // Ignore by default
131                 break;
132             }
133 
134             /* Unexpected message */
135             default:
136             {
137                 throw new IllegalArgumentException("Unexpected message has arrived");
138                 //break;
139             }
140         }
141     }
142 
143     /**
144      * Exit message loop
145      */
146     protected final void exit()
147     {
148         m_exit = true;
149     }
150 
151     /**
152      * Set time-out value
153      @param millis time-out value (milliseconds)
154      */
155     public final void setTimeOut(long millis)
156     {
157         if (millis != && millis <= s_timeSleep)
158            throw new IllegalArgumentException("Time-out value is too small");
159 
160         m_timeOut = millis;
161     }
162 
163     /**
164      * Wait for InputStream become ready for reading,
165      * or exit was called or time-out has expired
166      @param in InputStream
167      * @throw ServerException if time-out has expired
168      */
169     protected final void wait(InputStream inthrows Exception
170     {
171         long idle = 0;
172 
173         while (!m_exit)
174         {
175             if (in.available() 0)
176                return;
177 
178             // Sleep
179             sleep(s_timeSleep);
180             idle += s_timeSleep;
181 
182             // Check time-out
183             if (idle > m_timeOut)
184                //!!!
185                throw new Exception("Time-out expired");
186         }
187     }
188 
189     /**
190      * Wait for one of the InputStreams become ready for reading,
191      * or exit was called or time-out has expired
192      * @throw ServerException if time-out has expired
193      */
194     private InputStream waitOneOf() throws Exception
195     {
196         long idle = 0;
197 
198         while (!m_exit)
199         {
200             if (m_in.available() 0)
201                return m_in;
202 
203             if (m_innerIn.available() 0)
204                return m_innerIn;
205 
206             // Sleep
207             sleep(s_timeSleep);
208             idle += s_timeSleep;
209 
210             // Check time-out
211             if (idle > m_timeOut)
212             {
213                 send(new Message(Message.TIMEOUT));
214                 send(new Message(Message.BYE));
215                 // Exit
216                 //!!!
217                 throw new Exception("Time-out expired");
218             }
219         }
220 
221         // Exit was called
222         return null;
223     }
224 
225     /**
226      * Convenience routine
227      * @throw Exception
228      */
229     protected void run1() throws Exception
230     {
231         // Listen to the socket and inner queue in a loop
232         while (!m_exit)
233         {
234             // Wait while any stream become alvailable
235             InputStream in = waitOneOf();
236             // Check exit
237             if (in == null)
238                break;
239             // Process message
240             processMessage(Message.recv(in));
241         }
242     }
243 
244     /**
245      * Init inner queue
246      */
247     private void init1() throws Exception
248     {
249         if (m_innerOut == null)
250         {
251             // Connect inner queue
252             m_innerOut = new PipedOutputStream();
253             m_innerIn = new PipedInputStream(m_innerOut);
254         }
255     }
256 
257     /*
258      * Override ForkedThread
259      */
260     protected void init() throws Exception
261     {
262         // Init inner queue
263         init1();
264 
265         // Init socket streams
266         m_out = m_socket.getOutputStream();
267         m_in = m_socket.getInputStream();
268     }
269 
270     /*
271      * Override ForkedThread
272      */
273     protected void deinit() throws Exception
274     {
275         // Release socket
276         m_out.close();
277         m_in.close();
278         m_socket.close();
279         // Release inner queue
280         m_innerIn.close();
281         m_innerOut.close();
282         // Null it all :)
283         m_out = null;
284         m_in = null;
285         m_socket = null;
286         m_innerIn = null;
287         m_innerOut = null;
288     }
289 
290     /*
291      * Enqueue message for internal processing
292      * @param msg message
293      */
294     synchronized public final void enque(Message msgthrows Exception
295     {
296         // Inner queue must be ready immediately after construction, i.e.
297         // even before thread starts
298         init1();
299         // Send to queue
300         msg.send(m_innerOut);
301     }
302 
303     /*
304      * Send message to the remote end
305      * @param msg message
306      */
307     synchronized protected final void send(Message msgthrows Exception
308     {
309         msg.send(m_out);
310     }
311 
312     /*
313      * Receive message from the remote end
314      * @return message
315      */
316     synchronized protected final Message recv() throws Exception
317     {
318         return Message.recv(m_in);
319     }
320 
321     /**
322      * Get socket output stream
323      @return socket output stream
324      */
325     synchronized protected final OutputStream getOut()
326     {
327         return m_out;
328     }
329 
330     /**
331      * Get socket input stream
332      @return socket input stream
333      */
334     synchronized protected final InputStream getIn()
335     {
336         return m_in;
337     }
338 
339     /**
340      * Report error
341      @param e exception
342      */
343     abstract protected void error(Exception e);
344 }
Java2html