Hadoop streaming broken pipe issue
I was working on a simple tool (a distributed grep
) using Hadoop streaming in Bash. Everything works fine when testing locally with the standard approach:
However, when I ran the program on a larger input in a single-node cluster, I kept getting the following error:
Streaming Job Failed!
The command used to run the Hadoop streaming job was:
The log shows:
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
The root cause is that mapper.sh was not fully consuming its input. See the related post on stackoverflow and a related issue.
This is the original mapper.sh that fails:
This is the mapper.sh that works:
The only difference to the original script is the added cat -
part, which makes sure that the input stream to the mapper script is fully consumed. It seems that grep does not always need to see its full input to determine its exit code (this is possible, e.g., if the remaining number of bytes in the input stream is smaller than the length of the shortest string that could match the pattern).
For completeness, the reducer.sh code:
The mapper outputs the input file name if its content matches the pattern. The reducer just removes potential duplicates.