/* * Copyright (C) 2013 Miguel Angel Astor Romero * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package ve.ucv.ciens.ccg.nxtcam.network; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.Socket; import ve.ucv.ciens.ccg.networkdata.VideoFrameDataMessage; import ve.ucv.ciens.ccg.networkdata.VideoStreamingControlMessage; import ve.ucv.ciens.ccg.nxtcam.camera.CameraImageMonitor; import ve.ucv.ciens.ccg.nxtcam.network.protocols.VideoStreamingProtocol; import ve.ucv.ciens.ccg.nxtcam.utils.Logger; import ve.ucv.ciens.ccg.nxtcam.utils.ProjectConstants; import android.graphics.ImageFormat; import android.graphics.Rect; import android.graphics.YuvImage; public class VideoStreamingThread extends Thread{ private final String TAG = "IM_THREAD"; private final String CLASS_NAME = VideoStreamingThread.class.getSimpleName(); private enum ProtocolState_t {WAIT_FOR_ACK, WAIT_FOR_READY, CAN_SEND, END_STREAM}; private boolean pause, done; private Object threadPauseMonitor; private CameraImageMonitor camMonitor; private Socket socket; private ObjectOutputStream writer; private ObjectInputStream reader; private String serverIp; private ProtocolState_t protocolState; private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); public VideoStreamingThread(String serverIp){ this.serverIp = serverIp; pause = false; done = false; threadPauseMonitor = new Object(); socket = null; writer = null; reader = null; camMonitor = CameraImageMonitor.getInstance(); protocolState = ProtocolState_t.WAIT_FOR_READY; } /*public void run(){ byte[] image; Object tmpMessage; VideoStreamingControlMessage controlMessage; VideoFrameDataMessage dataMessage; final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); connectToServer(); if(!socket.isConnected()){ Logger.log_e(TAG, CLASS_NAME + ".run() :: Not connected to a server. Finishing thread."); return; }else{ while(!done){ // checkPause(); switch(protocolState){ case WAIT_FOR_READY: Logger.log_d(TAG, CLASS_NAME + ".run() :: Reading message from server. State is WAIT_FOR_READY."); tmpMessage = readMessage(); if(!validateImageTransferProtocolMessage(tmpMessage)){ // If the message received is not valid then send an UNRECOGNIZED message to the server. Logger.log_d(TAG, CLASS_NAME + ".run() :: Received an unrecognized protocol message. State WAIT_FOR_READY."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Sending UNRECOGNIZED message to server."); sendUnrecognizedMessage(); }else{ // Else if the message passed the validity check then proceed to the next protocol state. controlMessage = (VideoStreamingControlMessage)tmpMessage; if(controlMessage.message == VideoStreamingProtocol.FLOW_CONTROL_CONTINUE){ Logger.log_d(TAG, CLASS_NAME + ".run() :: Received FLOW_CONTROL_CONTINUE from the server."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Transitioning from WAIT_FOR_READY to CAN_SEND."); protocolState = ProtocolState_t.CAN_SEND; }else if(controlMessage.message == VideoStreamingProtocol.STREAM_CONTROL_END){ Logger.log_d(TAG, CLASS_NAME + ".run() :: Received STREAM_CONTROL_END from the server."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Transitioning from WAIT_FOR_READY to END_STREAM."); protocolState = ProtocolState_t.END_STREAM; } } break; case WAIT_FOR_ACK: Logger.log_d(TAG, CLASS_NAME + ".run() :: Reading message from server. State is WAIT_FOR_ACK."); tmpMessage = readMessage(); if(!validateImageTransferProtocolMessage(tmpMessage)){ // If the message received is not valid then send an UNRECOGNIZED message to the server. Logger.log_d(TAG, CLASS_NAME + ".run() :: Received an unrecognized protocol message. State WAIT_FOR_ACK."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Sending UNRECOGNIZED message to server."); sendUnrecognizedMessage(); }else{ // Else if the message passed the validity check then proceed to the next protocol state. controlMessage = (VideoStreamingControlMessage)tmpMessage; if(controlMessage.message == VideoStreamingProtocol.ACK_SEND_NEXT){ Logger.log_d(TAG, CLASS_NAME + ".run() :: Received ACK_SEND_NEXT from the server."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Transitioning from WAIT_FOR_ACK to CAN_SEND."); protocolState = ProtocolState_t.CAN_SEND; }else if(controlMessage.message == VideoStreamingProtocol.ACK_WAIT){ Logger.log_d(TAG, CLASS_NAME + ".run() :: Received ACK_WAIT from the server."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Transitioning from WAIT_FOR_ACK to WAIT_FOR_READY."); protocolState = ProtocolState_t.WAIT_FOR_READY; }else if(controlMessage.message == VideoStreamingProtocol.STREAM_CONTROL_END){ protocolState = ProtocolState_t.END_STREAM; } } break; case CAN_SEND: // Get the image and it's parameters from the monitor. Logger.log_d(TAG, CLASS_NAME + ".run() :: Getting image data."); Rect imageSize = camMonitor.getImageParameters(); image = camMonitor.getImageData(); // Compress the image as Jpeg. Logger.log_d(TAG, CLASS_NAME + ".run() :: Compressing image."); YuvImage yuvImage = new YuvImage(image, ImageFormat.NV21, imageSize.width(), imageSize.height(), null); yuvImage.compressToJpeg(imageSize, 90, outputStream); // Prepare the message for sending. Logger.log_d(TAG, CLASS_NAME + ".run() :: Building message."); dataMessage = new VideoFrameDataMessage(); dataMessage.imageWidth = imageSize.width(); dataMessage.imageHeight = imageSize.height(); dataMessage.data = outputStream.toByteArray(); // Send the message. try{ Logger.log_d(TAG, CLASS_NAME + ".run() :: Sending message."); writer.writeObject(dataMessage); }catch(IOException io){ Logger.log_e(TAG, CLASS_NAME + ".run() :: Error sending image to the server: " + io.getMessage()); } // Clean up stuff. Logger.log_d(TAG, CLASS_NAME + ".run() :: Cleaning."); yuvImage = null; image = null; outputStream.reset(); dataMessage = null; imageSize = null; Logger.log_d(TAG, CLASS_NAME + ".run() :: Image data successfuly sent."); Logger.log_d(TAG, CLASS_NAME + ".run() :: Transitioning from CAN_SEND to WAIT_FOR_ACK."); protocolState = ProtocolState_t.WAIT_FOR_ACK; break; case END_STREAM: // Simply disconnect from the server. Logger.log_d(TAG, CLASS_NAME + ".run() :: Ending video stream."); disconnect(); done = true; break; } } } Logger.log_d(TAG, CLASS_NAME + ".run() :: Thread finish reached."); }*/ public void run(){ connectToServer(); if(!socket.isConnected()){ Logger.log_e(TAG, CLASS_NAME + ".run() :: Not connected to a server. Finishing thread."); return; }else{ while(!done){ sendImage(); } } Logger.log_d(TAG, CLASS_NAME + ".run() :: Thread finish reached."); } private void sendImage(){ byte[] image; YuvImage yuvImage; VideoFrameDataMessage message; Rect imageSize; image = camMonitor.getImageData(); imageSize = camMonitor.getImageParameters(); // Compress the image as Jpeg. Logger.log_d(TAG, CLASS_NAME + ".sendImage() :: Compressing image."); yuvImage = new YuvImage(image, ImageFormat.NV21, imageSize.width(), imageSize.height(), null); yuvImage.compressToJpeg(imageSize, 90, outputStream); Logger.log_d(TAG, CLASS_NAME + ".sendImage() :: Building message."); message = new VideoFrameDataMessage(); message.data = outputStream.toByteArray(); message.imageWidth = imageSize.width(); message.imageHeight = imageSize.height(); try{ Logger.log_d(TAG, CLASS_NAME + ".sendImage() :: Sending message."); writer.writeObject(message); writer.flush(); Logger.log_e(TAG, CLASS_NAME + ".sendImage() :: Message sent successfully: "); }catch(IOException io){ Logger.log_e(TAG, CLASS_NAME + ".sendImage() :: Error sending image to the server: " + io.getMessage()); }finally{ Logger.log_d(TAG, CLASS_NAME + ".sendImage() :: Cleaning."); outputStream.reset(); } } private void connectToServer(){ try{ Logger.log_i(TAG, CLASS_NAME + ".connectToServer() :: Connecting to the server at " + serverIp); socket = new Socket(InetAddress.getByName(serverIp), ProjectConstants.SERVER_TCP_PORT_1); writer = new ObjectOutputStream(socket.getOutputStream()); reader = new ObjectInputStream(socket.getInputStream()); Logger.log_i(TAG, CLASS_NAME + ".connectToServer() :: Connection successful."); }catch(IOException io){ Logger.log_e(TAG, CLASS_NAME + ".connectToServer() :: Connection failed with message: " + io.getMessage()); } } public void disconnect(){ if(socket != null && socket.isConnected()){ try{ Logger.log_d(TAG, CLASS_NAME + ".disconnect() :: Closing socket."); socket.close(); }catch (IOException io) { Logger.log_e(TAG, CLASS_NAME + ".connectToServer() :: " + io.getMessage()); } } } public synchronized void finish(){ done = true; Logger.log_i(TAG, CLASS_NAME + ".finish() :: Finishing thread."); } private void checkPause(){ synchronized (threadPauseMonitor){ while(pause){ Logger.log_d(TAG, CLASS_NAME + ".checkPause() :: Pause requested."); try{ threadPauseMonitor.wait(); }catch(InterruptedException ie){} } } } private Object readMessage(){ Object tmpMessage; // Read a message from the server stream. try{ tmpMessage = reader.readObject(); }catch(IOException io){ Logger.log_e(TAG, CLASS_NAME + ".run() :: IOException when reading in WAIT_FOR_READY state."); tmpMessage = null; return null; }catch(ClassNotFoundException cn){ Logger.log_e(TAG, CLASS_NAME + ".run() :: ClassNotFoundException when reading in WAIT_FOR_READY state."); tmpMessage = null; return null; } return tmpMessage; } private boolean validateImageTransferProtocolMessage(Object message){ if(message != null && message instanceof VideoStreamingControlMessage) return true; else return false; } private void sendUnrecognizedMessage(){ VideoStreamingControlMessage message = new VideoStreamingControlMessage(); message.message = VideoStreamingProtocol.UNRECOGNIZED; try{ writer.writeObject(message); }catch(IOException io){ Logger.log_e(TAG, CLASS_NAME + ".run() :: IOException when writing UNRECOGNIZED in WAIT_FOR_READY state."); } Logger.log_d(TAG, CLASS_NAME + ".run() :: UNRECOGNIZED message sent."); } public synchronized void pauseThread(){ pause = true; Logger.log_d(TAG, CLASS_NAME + ".pauseThread() :: Pausing thread."); } public synchronized void resumeThread(){ Logger.log_d(TAG, CLASS_NAME + ".resumeThread() :: Resuming thread."); synchronized (threadPauseMonitor) { pause = false; threadPauseMonitor.notifyAll(); } } public boolean isConnected(){ if(socket != null && socket.isConnected()) return true; else return false; } }