lua-resty-cassandra, 使用CQL二进制协议的纯Lua客户端

分享于 

13分钟阅读

GitHub

  繁體 雙語
Cassandra client for Lua Nginx module using CQL3, binary protocol v2
  • 源代码名称:lua-resty-cassandra
  • 源代码网址:http://www.github.com/jbochi/lua-resty-cassandra
  • lua-resty-cassandra源代码文档
  • lua-resty-cassandra源代码下载
  • Git URL:
    git://www.github.com/jbochi/lua-resty-cassandra.git
    Git Clone代码到本地:
    git clone http://www.github.com/jbochi/lua-resty-cassandra
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/jbochi/lua-resty-cassandra
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    

    这里 repo 为积极维护。 我强烈建议你尝试

    lua-resty-cassandra

    Build StatusCoverage StatusModule VersionJoin the chat at https://gitter.im/jbochi/lua-resty-cassandra

    使用CQL二进制协议v2的纯Lua客户端。

    如果在 Nginx/openresty中使用,它是 100%个非阻塞,但也可以用于 luasocket。

    安装

    Luarocks

    建议通过 luarocks 安装:

    $ luarocks install cassandra
    手册

    复制 src/ 文件夹并要求 cassandra.lua

    用法

    概述:

    local cassandra =require"cassandra"local session = cassandra.new()
    session:set_timeout(1000) -- 1000ms timeoutlocal connected, err = session:connect("127.0.0.1", 9042)
    session:set_keyspace("lua_tests")-- simple querylocal table_created, err = session:execute[[ CREATE TABLE users( user_id uuid PRIMARY KEY, name varchar, age int )]]-- query with argumentslocal ok, err = session:execute([[ INSERT INTO users(name, age, user_id) VALUES(?,?,?)]], {"John O'Reilly", 42, cassandra.uuid("1144bada-852c-11e3-89fb-e0b9a54a6d11")})-- select statementlocal users, err = session:execute("SELECT name, age, user_id from users")assert(1==#users)local user = users[1]
    ngx.say(user.name) --"John O'Reilly"ngx.say(user.user_id) --"1144bada-852c-11e3-89fb-e0b9a54a6d11"ngx.say(user.age) -- 42

    你可以在这里的测试或者中查看更多示例。

    套接字方法

    会话,err = cassandra.new( )

    创建一个新会话。如果可用,创建一个带有 cosocket API的套接字,否则返回 luasocket。

    返回值:

    • session: lua-resty-cassandra会话。
    • err: 创建套接字过程中遇到的任何错误。

    会话:set_timeout ( 超时)

    设置超时( 微秒)。使用 Nginx tcpsock:settimeout

    参数:

    • timeout: 毫秒中的超时数

    ,err = 会话:连接( contact_points,端口)

    连接到给定端口上的单个或者多个主机。

    参数:

    • contact_points: 字符串或者字符串的array,用于连接到。
      • 注意:如果要给其中一个主机提供不同的端口,请将字符串设置为: "主机:特定接触点的端口。 指定的port 值将覆盖该联系人点的connectport 参数。
    • port: 端口号缺省值: 9042

    返回值:

    • ok: true 如果连接,则为 false。 无会话没有套接字。
    • err: 遇到任何遇到的错误。

    ,err = 会话:set_keepalive ( max_idle_timeout,pool_size ) -- Nginx

    将当前的Cassandra连接置于 ngx_lua cosocket连接池中。

    注意 : 只在你要调用close方法的地方调用这里方法。 调用这里方法将立即将当前的cassandra会话对象转换为closed状态。 除 connect() 以外的任何后续操作都将返回关闭的错误。

    参数:

    • max_idle_timeout: 当连接处于池中时,最大空闲超时( 在 ms )
    • pool_size: 每个 Nginx 工作进程池的最大大小。

    返回值:

    • ok: 如果成功,则为 1,否则为零。
    • err: 如果有任何错误,则遇到错误

    次,err = 会话:get_reused_times ( ) -- Nginx

    这里方法返回当前连接的( 成功) 重用时间。 如果出现错误,则返回 nil 和描述错误的字符串。

    注意:如果当前连接不是内置连接池,那么这个方法总是返回 0,即,连接从来没有被重用过,即( 但是)。 如果连接来自连接池,则返回值始终为非零。 所以这个方法也可以用来确定当前连接是否来自池。

    返回值:

    • times: 当前连接成功重用的次数,如果错误,则为零
    • err: 如果有任何错误,则遇到错误

    ,: session,err = session,close ( )

    关闭当前连接并返回状态。

    返回值:

    • ok: 如果成功,则为 1,否则为零。
    • err: 如果有任何错误,则遇到错误

    客户端方法

    本节中函数返回的所有错误都是具有以下属性的表:

    • code: cassandra.contants 中的一个 error_codes 中的字符串。
    • raw_message: 由Cassandra返回的错误消息。
    • message: 带有 code + raw_message的构造错误消息。

    错误表实现 __tostring 方法,因此可以打印。 字符串错误表将输出它的message 属性。

    ,err = 会话:set_keyspace ( keyspace_name )

    将会话keyspace设置为给定的keyspace_name

    参数:

    • keyspace_name: 要使用的keyspace的名称。

    返回值:

    查看 :execute()

    ,err = 会话:准备( 查询,选项)

    准备语句以便以后执行。

    参数:

    • query: 表示要准备的查询的字符串。
    • options: :execute() 上可用的选项。

    返回值:

    • stmt: :execute() 使用的prepareed语句,如果准备失败,则为 nil。
    • err: 如果有任何错误,则遇到错误。

    返回( 查询,参数,选项)的结果:

    执行查询或者以前准备好的语句。

    参数:

    • query: 表示查询或者先前准备好的语句的字符串。
    • args: 要绑定到查询的参数的array。 这些参数可以是注释类型( 例如: cassandra.bigint(4) 如果没有注释,驱动程序将尝试推断一个类型。 由于整数以 4字节的形式序列化为 int,如果我们尝试将它的插入到bigint列中,则Cassandra将返回错误。
    • options 是一个选项表:
      • consistency_level: 例如 cassandra.consistency.ONE
      • tracing: 如果设置为 true,则启用对此查询的跟踪。 此时,结果表将包含一个名为 tracing_id的键,其中包含跟踪会话的uuid。
      • page_size: 要获取的页面的最大大小( 默认: 5000 )。
      • auto_paging: 如果设置为 true,execute 将返回一个迭代器。 有关如何使用自动分页的信息,请参阅下面的示例。

    返回值:

    • result: 包含查询结果的表,如果成功,则为 ni。 表可以包含其他键:
      • type: 结果集的类型,可以是"空心","行","set_keyspace"或者"schema_change"。
      • meta: 如果结果类型为"行"且结果有更多尚未返回的页,则这里属性将包含 2个值: has_more_pagespaging_state。有关如何使用分页的信息,请参阅下面的示例。
    • err: 如果有任何错误,则遇到错误。

    批处理,err = cassandra.BatchStatement(type )

    Initialized Initialized statement有关如何使用批处理语句和的信息,请参阅下面的示例。有关要使用的批处理类型的信息,请参见下面的示例。

    参数:

    • type: 批处理语句的类型。 可以是其中之一:
      • cassandra.batch_types.LOGGED ( 默认值)
      • cassandra.batch_types.UNLOGGED
      • cassandra.batch_types.COUNTER

    返回值:

    • batch: 在它的上添加操作的空批处理语句。
    • err: 如果有任何错误,则遇到错误。

    批处理:添加( 查询,参数)

    向批处理语句中添加操作。 有关如何使用批处理语句的示例,请参阅下面的示例。

    参数:

    • query: 表示查询或者先前准备好的语句的字符串。
    • args: 要绑定到查询的参数的array,类似于 :execute()

    跟踪,err = 会话:get_trace ( 结果)

    如果可能,返回给定结果的跟踪。

    参数:

    • result: 以前的查询结果。

    返回值:

    trace: 是一个具有以下键的表( 来自 system_traces.sessionssystem_traces.events系统跟踪表:

    • 协调员
    • 工期
    • 参数
    • 请求
    • started_at
    • 事件:具有以下键的表的array:
      • event_id
      • 活动
      • source_elapsed
      • 线程

    err: 如果有任何错误,则遇到错误。

    示例

    批处理:

    -- Create a batch statementlocal batch = cassandra.BatchStatement()-- Add a querybatch:add("INSERT INTO users (name, age, user_id) VALUES (?,?,?)",
     {"James", 32, cassandra.uuid("2644bada-852c-11e3-89fb-e0b9a54a6d93")})-- Add a prepared statementlocal stmt, err = session:prepare("INSERT INTO users (name, age, user_id) VALUES (?,?,?)")
    batch:add(stmt, {"John", 45, cassandra.uuid("1144bada-852c-11e3-89fb-e0b9a54a6d11")})-- Execute the batchlocal result, err = session:execute(batch)

    分页可能对生成 Web服务 非常有用:

    -- Assuming our users table contains 1000 rowslocal query ="SELECT * FROM users"local rows, err = session:execute(query, nil, {page_size =500}) -- default page_size is 5000assert.same(500, #rows) -- rows contains the 500 first rowsif rows.meta.has_more_pagesthenlocal next_rows, err = session:execute(query, nil, {paging_state = rows.meta.paging_state})
     assert.same(500, #next_rows) -- next_rows contains the next (and last) 500 rowsend

    自动分页:

    -- Assuming our users table now contains 10.000 rowslocal query ="SELECT * FROM users"for _, rows, page, err in session:execute(query, nil, {auto_paging=true}) do assert.same(5000, #rows) -- rows contains 5000 rows on each iteration in this case-- page: will be 1 on the first iteration, 2 on the second-- err: in case any fetch returns an error-- _: (the first for argument) is the current paging_state used to fetch the rowsend

    运行单元测试

    我们使用 busted,需要 luasocket 来模拟 ngx.socket.tcp()。 要运行测试,启动一个本地的cassandra实例并运行:

    $ luarocks install busted
    $ make test

    运行覆盖

    $ luarocks install luacov
    $ make coverage

    报告将在 ./luacov.report.out 中。

    运行 linting

    $ luarocks install luacheck
    $ make lint

    贡献者

    Juarez Bochi ( @jbochi )

    ,Charbonnier ( @thibaultCha ) -> 几个贡献,包括分页支持,改进的批语句,更好的文档和代码样式。

    Leandro Moreira ( @leandromoreira ) -> 增加对double的支持

    Marco Palladino ( @thefosk )


    BIN  proto  protocol  protoc  CAS  LUA  
    相关文章