Distributed Computing in Java 9
上QQ阅读APP看书,第一时间看更新

Streams

As we have seen, once a connection is established with the remote machine, data gets exchanged in the form of binary streams. To access binary data, the java.io package provides two basic types of classes: one for incoming and other for outgoing data, namely java.io.InputStream and java.io.OutputStream, respectively. Both InputStream and OutputStream support communication through bytes and expose the methods to read or write the information in the form of bytes or byte array. They have many subclasses meant for writing to different destinations and reading from different sources (for example, string buffers and files). But, if we want to send character data (16-bit), then one has to go for the java.io.Reader and java.io.Writer classes. They also have multiple subclasses specialized for different conditions. The class hierarchy of Java streams is shown in the following screenshot:

Once a socket is created, we can get the input or output streams from it using the getInputStream() or getOutputStream() methods, respectively; they return an instance of OutputStream and InputStream, respectively. If we choose to interact with the applications based on the character stream, then InputStream will need to be wrapped in InputStreamReader (the reader's subclass) and OutputStream will need to be wrapped in OutputStreamWriter (the writer's subclass).

Interprocess communication can also be established through the java.lang.Runtime interface. It provides the Process object that can be used to get both the input and output streams. This can be used to execute a local native subtask. Here's sample code of this:

Runtime rt1 = Runtime.getRuntime();
Process proc = rt1.exec("/usr/local/bin/performTask.sh");
InputStream in = proc.getInputStream();
OutputStream out = proc.getOutputStream();

If you want to transform data or provide additional functionality, then you can use FilterInputStream and FilterOuputStream. Using BufferedInputStream or BufferedReader, we can reduce the overhead associated with data read requests and also reduce latency as it performs large data reads into a buffer. Alternatively, we can use the ProcessBuilder class API to obtain InputStream and OutputStream.

Using PushbackInputStream or PushbackReader, we can push the data back to the stage where it will receive the information stream. This type of read-and-parse mechanism really helps in scenarios where the data of one level of the tree is parsed and reviewed to figure out the next level of the tree structure of the data reading process.

If you want to read and write information in the portal binary format, use DataInputStream and DataOutputStream, which are subclasses of the FilterInputStream and FilterOutputStream classes, respectively.

FilterInputStream and FilterOutputStream are the DataInputStream and DataOutputStream classes, respectively. Use them to transform portable binary data that can be used by any other system.

Different classes in the java.io package support different sources/destinations of data; they are given in the following table:

PipedInputStream is capable of reading data from PipedOutputStream, and PipedOutputStream is capable of writing data to PipedInputStream. Generally, a piped input stream and piped output stream are connected. PipedInputStream writes data bytes to PipedOutputStream. Typically, one thread reads data from PipedInputStream, and another thread writes data to PipedOutputStream. If both are required, having activities done from a single thread is not recommended, as it may lead to performance bottlenecks. PipedInputStream has a buffer that decouples the read operation from the write operation to a certain extent. If a thread that provides data bytes is no longer alive, then the pipe is called broken:

//Piped Server

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class SimplePipedServer extends Thread
{
PipedInputStream pipeIn;
PipedOutputStream pipeOut;
public SimplePipedServer(PipedInputStream pipeIn,
PipedOutputStream pipeOut)
{
this.pipeIn = pipeIn;
this.pipeOut = pipeOut;
}
@SuppressWarnings("deprecation")
public void run()
{
// Wrap piped input and output streams with
// data input and output streams
DataInputStream dataIn = new DataInputStream(pipeIn);
DataOutputStream dataOut = new DataOutputStream(pipeOut);
// Accept the client communication
try
{
System.out.println("SimplePipedServer:
Reading message from client : ");

String clientMessage = dataIn.readUTF();
System.out.println("SimplePipedServer:
Client message: " + clientMessage);
}
catch (IOException ex)
{
System.out.println("SimplePipedServer: IO Exception :
Couldn't read the
message from the client.");
stop();
}
try
{
System.out.println("SimplePipedServer:
Writing response to the client : ");
dataOut.writeChars("Message from the server.n");
}
catch (IOException ex)
{
System.out.println("SimplePipedServer: IO Exception :
Failed to connect to client.");
}
stop();
}
}

The following piped client program corresponds to the preceding server program:

//Piped Client

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class SimplePipedClient extends Thread {
PipedInputStream pipeIn;
PipedOutputStream pipeOut;
public SimplePipedClient(PipedInputStream pipeIn,
PipedOutputStream pipeOut) {
this.pipeIn = pipeIn;
this.pipeOut = pipeOut;
}
@SuppressWarnings("deprecation")
public void run() {
DataInputStream dataIn = new DataInputStream(pipeIn);
DataOutputStream dataOut = new DataOutputStream(pipeOut);
try {
System.out.println("SimplePipedClient:
Writing message to the server : ");
dataOut.writeChars("Message from the
SimplePipedClient to the Servern");
} catch (IOException ex) {
System.out.println("SimplePipedClient: IOException :
Couldn't get the response from the server.");
System.exit(1);
}
// Server responds
try {
System.out.println("SimplePipedClient:
Reading response from the server : ");
String response = dataIn.readUTF();
System.out.println("SimplePipedClient: Server response : "
+ response);
} catch (IOException e) {
System.out.println("SimplePipedClient: IO Exception :
Failed to connect to peer.");
}

stop();
}
}

A similar program with PipeStream is given as follows:

//PipeStream Sample

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class SimplePipeStream {
public static void main(String[] args) throws IOException {
final PipedOutputStream pipedOut = new PipedOutputStream();
final PipedInputStream pipedIn = new PipedInputStream(pipedOut);
Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
try {
pipedOut.write("Hello world, pipe!".getBytes());
} catch (IOException e) {
}
}
});
Thread threadTwo = new Thread(new Runnable() {
@Override
public void run() {
try {
int pipedData = pipedIn.read();
while(pipedData != -1){
System.out.print((char) pipedData);
pipedData = pipedIn.read();
}
} catch (IOException e) {
}
}
});
threadOne.start();
threadTwo.start();
}
}