001
002 /**
003 * Title: Advanced Network Client Sample<p>
004 * Description: <p>
005 * Copyright: Copyright (C) 2009 Alexey Veremenko<p>
006 * Company: <p>
007 * @author Alexey Veremenko
008 * @version 1.0
009 */
010 package networking.base;
011
012 import networking.protocol.*;
013
014 import java.net.*;
015 import java.io.*;
016 import java.util.*;
017
018 /**
019 * SocketThread - this class represents a thread which owns a socket and
020 * maintains an additional message queue
021 */
022 public abstract class SocketThread extends ForkedThread
023 {
024 /**
025 * Sleep value used in wait message loop (milliseconds)
026 */
027 public static final long s_timeSleep = 10;
028
029 /**
030 * Socket
031 */
032 private Socket m_socket = null;
033
034 /**
035 * Output socket stream
036 */
037 private OutputStream m_out = null;
038
039 /**
040 * Input socket stream
041 */
042 private InputStream m_in = null;
043
044 /**
045 * Output stream for internal queue processing
046 */
047 private PipedOutputStream m_innerOut = null;
048
049 /**
050 * Input stream for internal queue processing
051 */
052 private PipedInputStream m_innerIn = null;
053
054 /**
055 * Exit flag
056 */
057 private boolean m_exit = false;
058
059 /**
060 * Maximum idle time (milliseconds) after which the thread exits
061 */
062 private long m_timeOut = 1000L * 60 * 10; // 10 min
063
064 /**
065 * Construct new SocketThread object
066 */
067 public SocketThread()
068 {
069 }
070
071 /**
072 * Construct new SocketThread object
073 * @param socket socket
074 */
075 public SocketThread(Socket socket)
076 {
077 m_socket = socket;
078 }
079
080 /**
081 * Copy constructor
082 * @param st SocketThread object
083 */
084 public SocketThread(SocketThread t)
085 {
086 m_fork = CHILD;
087 m_socket = t.m_socket;
088 m_innerOut = t.m_innerOut;
089 m_innerIn = t.m_innerIn;
090 m_out = t.m_out;
091 m_in = t.m_in;
092 }
093
094 /**
095 * Attach socket after construction
096 * @param socket socket
097 */
098 protected void setSocket(Socket socket)
099 {
100 if (m_socket != null)
101 throw new UnsupportedOperationException(
102 "The socket was set by constructor");
103
104 m_socket = socket;
105 }
106
107 /**
108 * Process a message
109 * @param msg message
110 */
111 protected void processMessage(Message msg) throws Exception
112 {
113 switch (msg.getType())
114 {
115 /* Exit messages */
116 case Message.BYE:
117 {
118 exit();
119 break;
120 }
121 case Message.DISCONNECT:
122 {
123 // Inform remote end
124 send(new Message(Message.BYE));
125 exit();
126 break;
127 }
128 case Message.TIMEOUT:
129 {
130 // Ignore by default
131 break;
132 }
133
134 /* Unexpected message */
135 default:
136 {
137 throw new IllegalArgumentException("Unexpected message has arrived");
138 //break;
139 }
140 }
141 }
142
143 /**
144 * Exit message loop
145 */
146 protected final void exit()
147 {
148 m_exit = true;
149 }
150
151 /**
152 * Set time-out value
153 * @param millis time-out value (milliseconds)
154 */
155 public final void setTimeOut(long millis)
156 {
157 if (millis != 0 && millis <= s_timeSleep)
158 throw new IllegalArgumentException("Time-out value is too small");
159
160 m_timeOut = millis;
161 }
162
163 /**
164 * Wait for InputStream become ready for reading,
165 * or exit was called or time-out has expired
166 * @param in InputStream
167 * @throw ServerException if time-out has expired
168 */
169 protected final void wait(InputStream in) throws Exception
170 {
171 long idle = 0;
172
173 while (!m_exit)
174 {
175 if (in.available() > 0)
176 return;
177
178 // Sleep
179 sleep(s_timeSleep);
180 idle += s_timeSleep;
181
182 // Check time-out
183 if (idle > m_timeOut)
184 //!!!
185 throw new Exception("Time-out expired");
186 }
187 }
188
189 /**
190 * Wait for one of the InputStreams become ready for reading,
191 * or exit was called or time-out has expired
192 * @throw ServerException if time-out has expired
193 */
194 private InputStream waitOneOf() throws Exception
195 {
196 long idle = 0;
197
198 while (!m_exit)
199 {
200 if (m_in.available() > 0)
201 return m_in;
202
203 if (m_innerIn.available() > 0)
204 return m_innerIn;
205
206 // Sleep
207 sleep(s_timeSleep);
208 idle += s_timeSleep;
209
210 // Check time-out
211 if (idle > m_timeOut)
212 {
213 send(new Message(Message.TIMEOUT));
214 send(new Message(Message.BYE));
215 // Exit
216 //!!!
217 throw new Exception("Time-out expired");
218 }
219 }
220
221 // Exit was called
222 return null;
223 }
224
225 /**
226 * Convenience routine
227 * @throw Exception
228 */
229 protected void run1() throws Exception
230 {
231 // Listen to the socket and inner queue in a loop
232 while (!m_exit)
233 {
234 // Wait while any stream become alvailable
235 InputStream in = waitOneOf();
236 // Check exit
237 if (in == null)
238 break;
239 // Process message
240 processMessage(Message.recv(in));
241 }
242 }
243
244 /**
245 * Init inner queue
246 */
247 private void init1() throws Exception
248 {
249 if (m_innerOut == null)
250 {
251 // Connect inner queue
252 m_innerOut = new PipedOutputStream();
253 m_innerIn = new PipedInputStream(m_innerOut);
254 }
255 }
256
257 /*
258 * Override ForkedThread
259 */
260 protected void init() throws Exception
261 {
262 // Init inner queue
263 init1();
264
265 // Init socket streams
266 m_out = m_socket.getOutputStream();
267 m_in = m_socket.getInputStream();
268 }
269
270 /*
271 * Override ForkedThread
272 */
273 protected void deinit() throws Exception
274 {
275 // Release socket
276 m_out.close();
277 m_in.close();
278 m_socket.close();
279 // Release inner queue
280 m_innerIn.close();
281 m_innerOut.close();
282 // Null it all :)
283 m_out = null;
284 m_in = null;
285 m_socket = null;
286 m_innerIn = null;
287 m_innerOut = null;
288 }
289
290 /*
291 * Enqueue message for internal processing
292 * @param msg message
293 */
294 synchronized public final void enque(Message msg) throws Exception
295 {
296 // Inner queue must be ready immediately after construction, i.e.
297 // even before thread starts
298 init1();
299 // Send to queue
300 msg.send(m_innerOut);
301 }
302
303 /*
304 * Send message to the remote end
305 * @param msg message
306 */
307 synchronized protected final void send(Message msg) throws Exception
308 {
309 msg.send(m_out);
310 }
311
312 /*
313 * Receive message from the remote end
314 * @return message
315 */
316 synchronized protected final Message recv() throws Exception
317 {
318 return Message.recv(m_in);
319 }
320
321 /**
322 * Get socket output stream
323 * @return socket output stream
324 */
325 synchronized protected final OutputStream getOut()
326 {
327 return m_out;
328 }
329
330 /**
331 * Get socket input stream
332 * @return socket input stream
333 */
334 synchronized protected final InputStream getIn()
335 {
336 return m_in;
337 }
338
339 /**
340 * Report error
341 * @param e exception
342 */
343 abstract protected void error(Exception e);
344 }
|