hermann, 提供跨平台 Kafka 生产者和消费者支持的gem

分享于 

5分钟阅读

GitHub

  繁體 雙語
A Ruby gem wrapping the librdkafka library as a C extension.
  • 源代码名称:hermann
  • 源代码网址:http://www.github.com/reiseburo/hermann
  • hermann源代码文档
  • hermann源代码下载
  • Git URL:
    git://www.github.com/reiseburo/hermann.git
    Git Clone代码到本地:
    git clone http://www.github.com/reiseburo/hermann
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/reiseburo/hermann
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    Gitter chatBuild Status

    实现 Kafka 发布者和消费者的ruby gem

    MRI ( C 基于 ruby ),这个库包装了 librdkafka库,它是在C 中实现的。

    在JRuby上这个库声明了 jar 依赖项,在 .gemspec 中表示依赖于 Kafka 项目提供的java库库。 像 jbundler 这样的工具将正确处理这些声明。

    用法

    用法是关于 kafka gem的模型,非常简单。

    • 支持 Kafka 0.8.
    • 针对 ruby 1.9.3。2.1.1和JRuby进行了测试

    搜索引擎发现

    通过zookeeper发现 Kafka 代理。 查看Zookeeper中的/brokers/ids 以查找代理列表。

    require'hermann/producer'require'hermann/discovery/zookeeper'broker_ids_array =Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokers
    producer =Hermann::Producer.new('topic', broker_ids_array)
    promise = producer.push('hello world') # send message to kafkapromise.value # forces the Concurrent::Promise to finish excuting (#value!)promise.state # the state of the promise
    仅适用于 x 射线成像
    require'hermann/producer'broker_ids_array =Hermann::Discovery::Zookeeper.new('localhost:2181').get_brokersp=Hermann::Producer.new('topic', broker_ids_array) # arguments topic, list of brokersf =p.push('hello world from mri')
    f.statep.tick_reactor
    f.state

    客户

    通过调用方法并传递块来处理已经生成消息,可以使用消息。 consume方法块,因此要小心处理该功能( 例如。 使用并发:: promise,线程等。

    ( JRuby )
    require'hermann'require'hermann/consumer'require'hermann_jars'topic ='topic'new_topic ='other_topic'the_consumer =Hermann::Consumer.new(topic, zookeepers:"localhost:2181", group_id:"group1")
    the_consumer.consume(new_topic) do |msg| # can change topic with optional argument to. consumeputs"Recv: #{msg}"end
    ( MRI )

    MRI目前没有 zookeeper/客户群支持。

    require'hermann'require'hermann/consumer'topic ='topic'new_topic ='other_topic'the_consumer =Hermann::Consumer.new(topic, brokers:"localhost:9092", partition:1)
    the_consumer.consume(new_topic) do |msg, key, offset| # can change topic with optional argument to. consumeputs"Recv: #{msg}, key: #{key}, offset: #{offset}"end

    元数据请求( 仅适用于 mri )

    主题和群集元数据可以通过查询 Kafka 代理在MRI版本中检索。

    require'hermann'require'hermann/discovery/metadata'c =Hermann::Discovery::Metadata.new( "localhost:9092" )
    topic = c.topic("topic")puts topic.partitions.first
    consumers = topic.partitions.map do |partition|
     partition.consumerend
    生成 & 单元测试

    第一次( 从干净的仓库): bundle install && bundle exec rake

    之后: bundle exec rake spec

    测试

    要运行集成测试,请执行以下操作:

    • 启动你自己的zookeeper/Kafka 实例
    • rspec spec/integration/producer_spec.rb
    如何从使用 jruby-kafka转换
    • Gemfile
      • 删除 jruby
      • 添加 gem"hermann"
      • bundle install
    • Jarfile
      • 从Jarfile中删除不必要的jars ( 例如。 Kafka,Log4J )
      • jar 依赖项自动包含在Hermann中
      • jbundle install
    • 测试上面的生产者/消费者示例

    SUP  平台  CONS  跨平台  GEM  kafka  
    相关文章