ITPub博客

首页 > 大数据 > Hadoop > 将数据按指定格式存入zookeeper

将数据按指定格式存入zookeeper

原创 Hadoop 作者:hz_ganwei 时间:2018-08-22 18:10:31 0 删除 编辑

环境:

  scala版本:2.11.8

  zookeeper版本:3.4.5-cdh5.7.0

package com.ruozedata.zk
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Created by ganwei on 2018/08/21
  * 要求:
  * 1 通过storeOffsets方法把数据存入zookeeper中。
  *  存储格式:
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/0
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/1
  *          /consumers/G322/offsets/ruoze_offset_topic/partition/2
  * 2 通过obtainOffsets方法把存入的数据读取出来
  * 输出格式:
  *           topic:ruoze_offset_topic	partition:0	offset:7
  *           topic:ruoze_offset_topic	partition:1	offset:3
  *           topic:ruoze_offset_topic	partition:2	offset:5
  */
object ZkConnectApp{
  val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("172.16.100.31:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("consumers")
      .build()
    client.start()
    client
  }
  def lock(path: String)(body: => Unit) {
    val lock = new InterProcessMutex(client, path)
    lock.acquire()
    try {
      body
    } finally {
      lock.release()
    }
  }
  def tryDo(path: String)(body: => Unit): Boolean = {
    val lock = new InterProcessMutex(client, path)
    if (!lock.acquire(10, TimeUnit.SECONDS)) {
      LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出")
      return false
    }
    try {
      LOG.info("获准运行")
      body
      true
    } finally {
      lock.release()
      LOG.info(s"释放锁 {$path}")
    }
  }
  //zookeeper创建路径
  def ensurePathExists(path: String): Unit = {
    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }
  }
  /**
    * OffsetRange类定义(偏移量对象)
    * 用于存储偏移量
    */
  case class OffsetRange(
                          val topic:String,     // 主题
                          val partition:Int,    // 分区
                          val fromOffset:Long,  // 起始偏移量
                          val utilOffset:Long   // 终止偏移量
                        )
  /**
    * zookeeper存储offset的方法
    * 写入格式:
    * /consumers/G322/offsets/ruoze_offset_topic/partition/0
    * /consumers/G322/offsets/ruoze_offset_topic/partition/1
    * /consumers/G322/offsets/ruoze_offset_topic/partition/2
    * @param OffsetsRanges
    * @param groupName
    */
  def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={
    val offsetRootPath = s"/"+groupName
    if (client.checkExists().forPath(offsetRootPath) == null) {
      client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
    }
    for(els <- OffsetsRanges ){
      val data = String.valueOf(els.utilOffset).getBytes
      val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"
      // 创建路径
      ensurePathExists(path)
      // 写入数据
      client.setData().forPath(path, data)
    }
  }
  /**
    * TopicAndPartition类定义(偏移量key对象)
    *  用于提取偏移量
    */
  case class TopicAndPartition(
                                topic:String,  // 主题
                                partition:Int  // 分区
                              )
  /**
    * zookeeper提取offset的方法
    * @param topic
    * @param groupName
    * @return
    */
  def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={
    // 定义一个空的HashMap
    val maps = mutable.HashMap[TopicAndPartition,Long]()
    // offset的路径
    val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"
    // 判断路径是否存在
    val stat = client.checkExists().forPath(s"$offsetRootPath")
    if (stat == null ){
      println(stat)  // 路径不存在 就将路径打印在控制台,检查路径
    }else{
      // 获取 offsetRootPath路径下一级的所有子目录
      // 我们这里是获取的所有分区
      val children = client.getChildren.forPath(s"$offsetRootPath")
     // 遍历所有的分区
      for ( lines <- children ){
        // 获取分区的数据
        val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong
        // 将 topic  partition  和数据赋值给 maps
        maps(TopicAndPartition(topic,lines.toInt)) = data
      }
    }
    // 按partition排序后 返回map对象
    maps.toList.sortBy(_._1.partition).toMap
  }
  def main(args: Array[String]) {
      //定义初始化数据
      val off1 = OffsetRange("ruoze_offset_topic",0,0,7)
      val off2 = OffsetRange("ruoze_offset_topic",1,0,3)
      val off3 = OffsetRange("ruoze_offset_topic",2,0,5)
      val arr = Array(off1,off2,off3)
      //获取到namespace
//      println(client.getNamespace)
      // 创建路径
//      val offsetRootPath = "/G322"
//      if (client.checkExists().forPath(offsetRootPath) == null) {
//        client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
//      }
      //存储值
      storeOffsets(arr,"G322")
      //获取值
      /**
        * 输出格式:
        * topic:ruoze_offset_topic	partition:0	offset:7
        * topic:ruoze_offset_topic	partition:1	offset:3
        * topic:ruoze_offset_topic	partition:2	offset:5
        */
      val result = obtainOffsets("ruoze_offset_topic","G322")
      for (map <- result){
        println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)
      }
  }
}

  


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/29609890/viewspace-2212606/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2014-04-15

  • 博文量
    44
  • 访问量
    38897