hadoop 输出中文乱码问题

时间:2023-03-10 03:03:19
hadoop 输出中文乱码问题

本文转载至:

  http://www.aboutyun.com/thread-7358-1-1.html

hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。 
      默认的情况下MR主程序中,设定输出编码的设置语句为:

  1. job.setOutputFormatClass(TextOutputFormat.class);

复制代码

  1. TextOutputFormat.class

复制代码

的代码如下:

  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements.  See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership.  The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License.  You may obtain a copy of the License at
  9. *
  10. *     http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapreduce.lib.output;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.io.UnsupportedEncodingException;
  22. import org.apache.hadoop.classification.InterfaceAudience;
  23. import org.apache.hadoop.classification.InterfaceStability;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.io.NullWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.io.compress.CompressionCodec;
  31. import org.apache.hadoop.io.compress.GzipCodec;
  32. import org.apache.hadoop.mapreduce.OutputFormat;
  33. import org.apache.hadoop.mapreduce.RecordWriter;
  34. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  35. import org.apache.hadoop.util.*;
  36. /** An {@link OutputFormat} that writes plain text files. */
  37. @InterfaceAudience.Public
  38. @InterfaceStability.Stable
  39. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  40. public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  41. protected static class LineRecordWriter<K, V>
  42. extends RecordWriter<K, V> {
  43. private static final String utf8 = "UTF-8";  // 将UTF-8转换成GBK
  44. private static final byte[] newline;
  45. static {
  46. try {
  47. newline = "\n".getBytes(utf8);
  48. } catch (UnsupportedEncodingException uee) {
  49. throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  50. }
  51. }
  52. protected DataOutputStream out;
  53. private final byte[] keyValueSeparator;
  54. public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  55. this.out = out;
  56. try {
  57. this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  58. } catch (UnsupportedEncodingException uee) {
  59. throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  60. }
  61. }
  62. public LineRecordWriter(DataOutputStream out) {
  63. this(out, "\t");
  64. }
  65. /**
  66. * Write the object to the byte stream, handling Text as a special
  67. * case.
  68. * @param o the object to print
  69. * @throws IOException if the write throws, we pass it on
  70. */
  71. private void writeObject(Object o) throws IOException {
  72. if (o instanceof Text) {
  73. Text to = (Text) o;   // 将此行代码注释掉
  74. out.write(to.getBytes(), 0, to.getLength());  // 将此行代码注释掉
  75. } else { // 将此行代码注释掉
  76. out.write(o.toString().getBytes(utf8));
  77. }
  78. }
  79. public synchronized void write(K key, V value)
  80. throws IOException {
  81. boolean nullKey = key == null || key instanceof NullWritable;
  82. boolean nullValue = value == null || value instanceof NullWritable;
  83. if (nullKey && nullValue) {
  84. return;
  85. }
  86. if (!nullKey) {
  87. writeObject(key);
  88. }
  89. if (!(nullKey || nullValue)) {
  90. out.write(keyValueSeparator);
  91. }
  92. if (!nullValue) {
  93. writeObject(value);
  94. }
  95. out.write(newline);
  96. }
  97. public synchronized
  98. void close(TaskAttemptContext context) throws IOException {
  99. out.close();
  100. }
  101. }
  102. public RecordWriter<K, V>
  103. getRecordWriter(TaskAttemptContext job
  104. ) throws IOException, InterruptedException {
  105. Configuration conf = job.getConfiguration();
  106. boolean isCompressed = getCompressOutput(job);
  107. String keyValueSeparator= conf.get(SEPERATOR, "\t");
  108. CompressionCodec codec = null;
  109. String extension = "";
  110. if (isCompressed) {
  111. Class<? extends CompressionCodec> codecClass =
  112. getOutputCompressorClass(job, GzipCodec.class);
  113. codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  114. extension = codec.getDefaultExtension();
  115. }
  116. Path file = getDefaultWorkFile(job, extension);
  117. FileSystem fs = file.getFileSystem(conf);
  118. if (!isCompressed) {
  119. FSDataOutputStream fileOut = fs.create(file, false);
  120. return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  121. } else {
  122. FSDataOutputStream fileOut = fs.create(file, false);
  123. return new LineRecordWriter<K, V>(new DataOutputStream
  124. (codec.createOutputStream(fileOut)),
  125. keyValueSeparator);
  126. }
  127. }
  128. }

复制代码

从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:

  1. import java.io.DataOutputStream;
  2. import java.io.IOException;
  3. import java.io.UnsupportedEncodingException;
  4. import org.apache.hadoop.classification.InterfaceAudience;
  5. import org.apache.hadoop.classification.InterfaceStability;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.fs.FSDataOutputStream;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.compress.CompressionCodec;
  13. import org.apache.hadoop.io.compress.GzipCodec;
  14. import org.apache.hadoop.mapreduce.OutputFormat;
  15. import org.apache.hadoop.mapreduce.RecordWriter;
  16. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.*;
  19. @InterfaceAudience.Public
  20. @InterfaceStability.Stable
  21. public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
  22. public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  23. protected static class LineRecordWriter<K, V>
  24. extends RecordWriter<K, V> {
  25. private static final String utf8 = "GBK";
  26. private static final byte[] newline;
  27. static {
  28. try {
  29. newline = "\n".getBytes(utf8);
  30. } catch (UnsupportedEncodingException uee) {
  31. throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  32. }
  33. }
  34. protected DataOutputStream out;
  35. private final byte[] keyValueSeparator;
  36. public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  37. this.out = out;
  38. try {
  39. this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  40. } catch (UnsupportedEncodingException uee) {
  41. throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  42. }
  43. }
  44. public LineRecordWriter(DataOutputStream out) {
  45. this(out, "\t");
  46. }
  47. /**
  48. * Write the object to the byte stream, handling Text as a special
  49. * case.
  50. * @param o the object to print
  51. * @throws IOException if the write throws, we pass it on
  52. */
  53. private void writeObject(Object o) throws IOException {
  54. if (o instanceof Text) {
  55. //        Text to = (Text) o;
  56. //        out.write(to.getBytes(), 0, to.getLength());
  57. //      } else {
  58. out.write(o.toString().getBytes(utf8));
  59. }
  60. }
  61. public synchronized void write(K key, V value)
  62. throws IOException {
  63. boolean nullKey = key == null || key instanceof NullWritable;
  64. boolean nullValue = value == null || value instanceof NullWritable;
  65. if (nullKey && nullValue) {
  66. return;
  67. }
  68. if (!nullKey) {
  69. writeObject(key);
  70. }
  71. if (!(nullKey || nullValue)) {
  72. out.write(keyValueSeparator);
  73. }
  74. if (!nullValue) {
  75. writeObject(value);
  76. }
  77. out.write(newline);
  78. }
  79. public synchronized
  80. void close(TaskAttemptContext context) throws IOException {
  81. out.close();
  82. }
  83. }
  84. public RecordWriter<K, V>
  85. getRecordWriter(TaskAttemptContext job
  86. ) throws IOException, InterruptedException {
  87. Configuration conf = job.getConfiguration();
  88. boolean isCompressed = getCompressOutput(job);
  89. String keyValueSeparator= conf.get(SEPERATOR, "\t");
  90. CompressionCodec codec = null;
  91. String extension = "";
  92. if (isCompressed) {
  93. Class<? extends CompressionCodec> codecClass =
  94. getOutputCompressorClass(job, GzipCodec.class);
  95. codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  96. extension = codec.getDefaultExtension();
  97. }
  98. Path file = getDefaultWorkFile(job, extension);
  99. FileSystem fs = file.getFileSystem(conf);
  100. if (!isCompressed) {
  101. FSDataOutputStream fileOut = fs.create(file, false);
  102. return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  103. } else {
  104. FSDataOutputStream fileOut = fs.create(file, false);
  105. return new LineRecordWriter<K, V>(new DataOutputStream
  106. (codec.createOutputStream(fileOut)),
  107. keyValueSeparator);
  108. }
  109. }
  110. }

复制代码

最后将输出编码类型设置成GbkOutputFormat.class,如:

  1. job.setOutputFormatClass(GbkOutputFormat.class);

复制代码

参考:

  1. http://semantic.iteye.com/blog/1846238

复制代码