ITPub博客

首页 > 大数据 > Hadoop > 通过CompressionCodec对数据流进行压缩和解压缩

通过CompressionCodec对数据流进行压缩和解压缩

Hadoop 作者:karlesar 时间:2014-03-15 10:01:29 0 删除 编辑
CompressionCodec包含两个函数,可以轻松用于压缩和解压缩数据。如果要对写入数据输出数据流的数据进行压缩,可用 createOutputStream(OutputStream out)方法在底层的数据流中对需要以压缩格式写入在此之前尚未压缩的数据新建一个CompressionOutputStream对象。相反,对输入数据流中读取的数据进行解压缩的时候,则调用createInputStream(InputStream in)获取CompressionInputStream,可通过该方法从底层数据流读取解压缩后的数据。
CompressionOutputStream 对象和CompressionInputStream对象,类似于java.util.zip.DeflaterOutputStream和 java.util.zip.DeflaterInputStream,只不过前两者能够重置其底层的压缩和解压缩的方法,对于某些将部分数据流 (section of data stream)压缩为单独数据块(block)的应用--例如SequenceFile(详见SequenceFile日志),这个能力是非常重要的。

例4-1显示了如何利用API来压缩从本地输入到HDFS压缩  几种压缩方式比较

package hadoop.compression;
import java.io.BufferedInputStream; 
import java.io.FileInputStream; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.URI; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.compress.CompressionCodec; 
import org.apache.hadoop.io.compress.CompressionCodecFactory; 
import org.apache.hadoop.io.compress.GzipCodec; 
import org.apache.hadoop.util.ReflectionUtils; 
public class HadoopCodec { 
     
    public static void main(String[] args) throws Exception { 
        // TODO Auto-generated method stub 
        String inputFile = "/home/hduser/11"; 
        String outputFolder = "hdfs://192.168.13.26:54310/"; 
        // String outputFile="bigfile.gz"; 
        // 读取Hadoop文件系统的配置 
        Configuration conf = new Configuration(); 
        conf.set("mapred.job.tracker", "192.168.13.27:54311");
        conf.set("Hadoop.job.ugi", "hadoop-user,hadoop-user"); 
        //测试各种压缩格式的效率 
       // gzip 
        long gzipTime = copyAndZipFile(conf, inputFile, outputFolder, "org.apache.hadoop.io.compress.GzipCodec", "gz"); 
        //bzip2 
        long bzip2Time = copyAndZipFile(conf, inputFile, outputFolder, "org.apache.hadoop.io.compress.BZip2Codec", "bz2"); 
        //deflate 
        long deflateTime = copyAndZipFile(conf, inputFile, outputFolder, "org.apache.hadoop.io.compress.DefaultCodec", "deflate"); 
       
     
        System.out.println("被压缩的文件名为: "+inputFile); 
        System.out.println("使用gzip压缩,时间为: "+gzipTime+"毫秒!"); 
        System.out.println("使用bzip2压缩,时间为: "+bzip2Time+"毫秒!"); 
        System.out.println("使用deflate压缩,时间为: "+deflateTime+"毫秒!");
   
    public static long copyAndZipFile(Configuration conf, String inputFile, String outputFolder, String codecClassName, 
            String suffixName) throws Exception { 
        long startTime = System.currentTimeMillis(); 
        // 因为本地文件系统是基于java.io包的,所以我们创建一个本地文件输入流 
        InputStream in = new BufferedInputStream(new FileInputStream(inputFile)); 
       
        //去掉扩展名提取basename 
        String baseName = inputFile.substring(0); 
        //构造输出文件名,它是路径名+基本名+扩展名 
        String outputFile = outputFolder + baseName + "."+suffixName; 
      
        FileSystem fs = FileSystem.get(URI.create(outputFile), conf); 
        // 创建一个编码解码器,通过反射机制根据传入的类名来动态生成其实例 
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(codecClassName), conf); 
        // 创建一个指向HDFS目标文件的压缩文件输出流 
        OutputStream out = codec.createOutputStream(fs.create(new Path(outputFile))); 
        // 用IOUtils工具将文件从本地文件系统复制到HDFS目标文件中 
        try { 
            IOUtils.copyBytes(in, out, conf); 
        } finally { 
            IOUtils.closeStream(in); 
            IOUtils.closeStream(out); 
       
        long endTime = System.currentTimeMillis(); 
        return endTime - startTime; 
   

 
<!-- 正文结束 -->

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/25579401/viewspace-1120580/,如需转载,请注明出处,否则将追究法律责任。

下一篇: 没有了~
请登录后发表评论 登录
全部评论

注册时间:2011-03-24

最新文章