maestro, 用于构建健壮ETL作业的类型化数据管道框架

分享于 

15分钟阅读

GitHub

  繁體 雙語
Data Pipelines that are well typed and enforce good convention
  • 源代码名称:maestro
  • 源代码网址:http://www.github.com/CommBank/maestro
  • maestro源代码文档
  • maestro源代码下载
  • Git URL:
    git://www.github.com/CommBank/maestro.git
    Git Clone代码到本地:
    git clone http://www.github.com/CommBank/maestro
    Subversion代码到本地:
    $ svn co --depth empty http://www.github.com/CommBank/maestro
    Checked out revision 1.
    $ cd repo
    $ svn up trunk
    
    大师

    Stories in ReadyBuild StatusGitter chat

    
    maestro: a distinguished conductor
    
    
    
    

    maestro 库为ETL类型的数据编组和编制提供了一种通用的框架,用于组合各种数据 api。

    maestro的主要目标是使它的易于管理,以牺牲安全性或者健壮性来管理数据集合。 这是通过坚固地粘贴强类型模式来描述数据的固定结构,并提供适合于 100s 列数据集的api。

    Scaladoc

    为相关 Scaladoc 提供方便的链接

    起始点

    maestro 被设计用来处理高度结构化的数据。 在某种程度上,由 maestro 操作的所有数据集都有良好定义的行模式和固定的列集。

    此时,maestro 支持模式定义的thrift

    maestro 使用 Thrift 模式定义来派生尽可能多的元数据和自定义处理( 例如打印和解析)的实现,因为它可以。 然后它提供使用这些"数据类型"特定工具的api来提供通用的"任务",比如生成分析视图。

    5分钟快速启动

    定义 Thrift 架构。

    如果你想要更完整的文档,那么这里不是完整的Thrift 教程,那么 http://diwakergupta.github.io/thrift-missing-guide/插件是一个非常好的参考。

    因这里,如果数据集将在系统上进行覆盖,我们将定义一个准确定义列和类型的模式:

    
    
    
    
    #@namespace scala au.com.cba.omnia.etl.customer.thrift
    
    
    
    struct Customer {
    
    
     1 : string CUSTOMER_ID
    
    
     2 : string CUSTOMER_NAME
    
    
     3 : string CUSTOMER_ACCT
    
    
     4 : string CUSTOMER_CAT
    
    
     5 : string CUSTOMER_SUB_CAT
    
    
     6 : i32 CUSTOMER_BALANCE
    
    
     7 : string EFFECTIVE_DATE
    
    
     }
    
    
    
    
    

    这是一个简化的例子,实际数据集可以能有 100s 个列,但是应该足以演示。 重要的一点是顺序重要,结构应该定义为与输入数据相同的字段,并且类型,它们应该准确地描述数据类型( 将用于推断数据的解析和验证方式)。

    构建 maestro 作业

    可以使用 maestro的特性快速。安全地实现大多数ETL任务,但是它也设计为易于适应定制代码,如烫伤。hive。hdfs和 sqoop。

    通过 Execution ( 请参见 scalding 和以下概念部分中的Execution monad ) 来定义 maestro 作业,通常涉及多个步骤,每个步骤都是 Execution 本身,并且可能取决于前面步骤的结果。 这通常被用 Scala for - yield 理解,如下面的例子所示。

    在配置过程中,maestro 可以方便地从hive查询。导入/导出。hdfs操作。烫伤管道以及scalding管道等各种方便的操作和组合步骤的方式。

    示例 maestroCustomerJob.scala 中的示例工作将客户数据文件加载到配置单元表中。 提取后,会给出执行的味道和它们的配置。

    caseclassCustomerAutomapConfig(config: Config) {
     valmaestro=MaestroConfig(
     conf = config,
     source ="customer",
     domain ="customer",
     tablename ="customer" )
     valupload= maestro.upload()
     valload= maestro.load[Customer](none ="null")
     valacctTable= maestro.partitionedHiveTable[Account, (String, String, String)](
     partition =Partition.byDate(Fields[Account].EffectiveDate),
     tablename ="account" )
    }objectCustomerAutomapJobextendsMaestroJob {
     defjob:Execution[JobStatus] = {
     @automap defcustomerToAccount (x: Customer):Account= {
     id := x.acct
     customer := x.id
     balance := x.balance /100 balanceCents := x.balance %100 }
     for {
     conf <-Execution.getConfig.map(CustomerAutomapConfig(_))
     uploadInfo <- upload(conf.upload)
     sources <- uploadInfo.withSources
     (pipe, loadInfo) <- load[Customer](conf.load, uploadInfo.files)
     acctPipe = pipe.map(customerToAccount)
     loadSuccess <- loadInfo.withSuccess
     count <- viewHive(conf.acctTable, acctPipe)
     if count == loadSuccess.actual
     } yieldJobFinished }
    . . .
    }

    概念

    为字段和记录生成支持

    maestro 将使用 Thrift 定义中可用的元数据来生成自定义的基础结构,以便为你的特定的记录类型。 通过这种方式,我们可以在一种很容易被检查和验证的方法中引用代码中的字段。验证。行筛选和转换。 这种字段引用可以有有趣的元数据,它可能让我们自动解析,打印,验证,过滤,以一种我们在运行代码( 对于有效架构) 之前就知道的方式对数据进行分割。

    用户定义的类型映射

    Thrift 类型配置单元类型 Scala 类型
    布尔值:一个布尔值( true 或者 false ),一个字节布尔型bool
    字节:有符号字节TINYINT ( 1-byte有符号整数,从 -128到 127 )字节
    i16: 一个 16位 有符号整数int ( 2-byte有符号整数,从 -32,768到 32,767 )
    i32: 一个 32位 有符号整数INT ( 从 -2,147,483,648,到 2,147,483,647 )int
    i64: 一个 64位 有符号整数BIGINT ( 从 -9到 223,372,036,854,775,808到 9,223,,,0 )BigInteger
    double: 64位 浮点数双精度浮点浮点数( 8-byte双精度浮点数)
    字符串:编码不可识别的文本或者二进制字符串字符串( 仅 非二进制 )字符串

    类似于嵌套结构,列表,集和映射的复杂 Thrift 类型不是直接支持的,至少不是当前。 尽管,许多底层库确实支持复杂 Thrift 类型的某些形式。

    执行清单

    在 scalding scalding中,execution是一个关键概念,见 com.twitter.scalding.Execution特性对象,以及RichExecutionRichExecutionObject maestro。

    执行是一个具有 Execution[T] 类型的对象,表示一些可以提供 T 类型的工作,如果成功,则失败。 工作可以依赖配置信息,可以涉及各种较小的步骤,包括通过,。 许多小型 Execution 可以被链接在一个较大的Execution 中,使用前面例子中的for - yield -comprehension。

    for - yield -comprehensions为 Execution[T]的结构与它的他类型构造函数( 如 List[T]Iterable[T] ) 具有相同的结构。 这是因为这些类型构造函数都是 monads,这大致意味着它们允许通过调用 flatMap 来形成链。 对于此类链,Scala for - yield 理解实际上只是语法糖,有关更多细节,请参见这个常见问题解答。

    失败

    当发现意外情况时,作业通常会失败,以便采取适当的措施来修复这种情况。 如果条件是 false ( 通过 filter 方法,请参见常见问题解答。),则在理解中使用 if( 条件 ) 使整个 Execution 失败,并出现错误。 自定义执行( 请参见下面) 中的异常也导致失败。

    有关故障的一些有用方法包括 recoverWithbracketensuringonException

    自定义执行

    尽管 Execution.from 允许任何 Scala 代码包含在 Execution 中,但是应该小心地进行处理,包括考虑处理错误和其他异常情况。 如果代码抛出异常,则捕获并将它的转换为失败的Execution

    只返回值,从不抛出异常,也不执行效果的纯表达式应该包含在使用 x =.. . 而不是 x <- Execution.from(...)的语法。

    Hive和Hdfs操作应该包含在 Execution的通过他们自己的清单中,如下所示。

    配置单元

    大师允许你直接写入配置文件表,并作为一个大师作业的一部分运行查询。

    viewHive 允许大师作业将 TypedPipe ( 比如从一个负荷) 中的数据写入到地图中的分区配置单元表中。 但是,如果它还不存在,还会创建配置单元表,或者验证该模式是否存在。

    或者 HiveTable 实例允许你引用特定的配置单元表。 HiveTable 上的sourcesink 方法为类型化管道提供烫磨源和接收器以读取或者写入表。 name 提供了一个完全 qualifed NAME,可以在hql内部使用。

    Execution.fromHive 通过 Hive[T] monad作为 Execution的一部分执行hive操作,提供了来自 Execution 配置的适当配置。 Hive[T] monad类似于 Execution[T] monad,但具体用于构建配置单元操作,包括创建数据库和表,以及执行查询和查询的附加支持,以及对检查条件和生成复合操作的一些额外支持。

    配置单元限制和问题

    • 目前,我们的实现不可能从分区列中读取数据。 相反,预期所有数据只包含在表本身的核心列中。 因此,不能在与 Thrift 结构( 需要一个具有不同 NAME的重复列)的字段相同的列上进行分区。 分区列只能用于配置单元性能,不能承载信息。

    • 为了使作业工作,当作业启动和每个 node 时,hive-site.xml 需要在类路径上。

    • 仅在metastore被指定为 Thrift 端点而不是数据库时,写入配置单元文件才有效。

      
       <property>
      
      
       <name>hive.metastore.uris</name>
      
      
       <value>thrift://metastore:9083</value>
      
      
       </property>
      
      
      
      
    • 为了运行查询,hive-site.xml 需要包括 yarn.resourcemanager.address 属性,即使值是假的。

      
       <property>
      
      
       <name>yarn.resourcemanager.address</name>
      
      
       <value>bogus</value>
      
      
       </property>
      
      
      
      
    • 为了运行分区查询,需要将分区模式设置为 nonstrict。

      
       <property>
      
      
       <name>hive.exec.dynamic.partition.mode</name>
      
      
       <value>nonstrict</value>
      
      
       </property>
      
      
      
      

    你可以从示例 hive-site.xml 开始。 使用它可以在集群中安装它,或者将它添加到项目目录的资源中,以便它包含在你的jar。

    Hdfs

    composer为Hdfs操作提供支持,类似于上面描述的Hive[T] monad的支持。

    Partitioners

    Partitioners非常简单。Partitioners只是一个字段列表,用于分区由。

    主api是要进行分区的字段列表:

    Partition.byFields(Fields.CUSTOMER_CAT, Fields.CUSTOMER_SUB_CAT)

    api还支持日期格式,例如:

    Partition.byDate(Fields.EFFECTIVE_DATE, "yyyy-MM-dd")

    使用这个字段,但将分区分割为 yyyy。MM和dd的3部分。

    验证器

    验证器可以认为是从记录类型到错误消息或者"行"批准的函数。 在很多情况下,这种理解可以简化为说明它是一个 Field的组合,并且应用了 Check。 提供了一些内置检查,如果你想要进行自定义检查,可以退回定定自定义函数。

    Validator.all(
     Validator.of(fields.EFFECTIVE_DATE, Check.isDate),
     Validator.of(fields.CUSTOMER_CAT, Check.oneOf("BANK", "INSURE")),
     Validator.of(fields.CUSTOMER_NAME, Check.nonempty),
     Validator.of(fields.CUSTOMER_ID, Check.matches("d+")),
     Validator.by[Customer](_.customerAcct.length ==4, "Customer accounts should always be a length of 4")
    )

    数据  构建  type  job  PIP  管道  
    相关文章