使用 Apache Cassandra 和 Mule 构建 NoSQL 服务器(NoSQL with Apache Cassandra and Mule)

数据库 William 321浏览 0评论

Apache Cassandra is a column-based, distributed NoSQL database.  Until recently the only way to interact with Cassandra databases from Mule was to reuse one of the existing Java clients, like Hector or Astyanax, in a component.  Mule’s Cassandra DB Module now provides message processors to insert, update, query and delete data in Cassandra.

To show off some of the features of the Cassandra module I’ll show how to implement a simple account management API.  This API will allow clients to perform CRUD operations on accounts, behaving similarly to something like an LDAP directory.

插入列

Cassandra Module 使用 Java 的 map机制来定义如何在Cassandra 的键空间中插入和获取数据。本例中,我们将使用Mule的JSON转换器来处理通过HTTP获取发送的数据。让我们看一下账户数据是什么样:

{
    "Accounts":{
        "engineering":{
            "joe@acmesoft.com":{
                "Name":"Joe Developer",
                "Password":"286755fad04869ca523320acce0dc6a4",
                "passwordAge": 731400
            },
            "jane@acmesoft.com":{
                "Name":"Jane Developer",
                "Password":"10b222970537b97919db36ec757370d2",
                "passwordAge": 10082400

            },
            "john@acmesoft.com":{
                "Name":"Jane Developer",
                "Password":"10b222970537b97919db36ec757370d2",
                "passwordAge": 1080000
            }
        },
        "operations":{
            "bill@acmesoft.com":{
                "Name":"Bill SysAdmin",
                "Password":"f1f16683f3e0208131b46d37a79c8921",
                "passwordAge": 4343100
            },
            "jill@acmesoft.com":{
                "Name":"Jill NetworkAdmin",
                "Password":"32a3571fa12b39266a58d42234836839",
                "passwordAge": 41923143
            }
        }
    }
}

当我们把这段JSON格式的数据持久化到Cassandra时,列名会叫 “Accounts”,每一个编制单元将是一个行的键 (即, “Engineering” 和“Operations”) ,而账户信息如用户名、密码以及最后一次密码修改距今的时间等将被容纳在一个超级列中。

让我们配置Mule流程来通过HTTP持久化这些数据。

<flow name="AccountCreate" doc:name="AccountsCreate">
        <http:inbound-endpoint 
            exchange-pattern="request-response" 
            host="localhost" 
            port="8081"
            path="account/create" 
            mimeType="application/json" />
        <json:json-to-object-transformer 
               returnClass="java.util.Map"/>
        <cassandradb:insert config-ref="CassandraDB" />
        <json:object-to-json-transformer />        
</flow>

这个流程将把我们刚才看到的通过HTTP获取到的JSON格式的账户信息转换成Map,使用Cassandra connector’s 的“insert” 消息处理器来持久化数据,然后把有效载荷转换回JSON格式并返回给客户端。

Column Serialization

One of the benefits, as well as challenges, with Cassandra is that all data is stored as byte arrays.  This makes it extremely flexible in terms of data storage but also means that type information is lost.  The Cassandra module makes use of Hector’s serializers to let you specify how data is transformed when pulling data out of a column.

Let’s take a look at how this works by specifying two query operations for the API.  The first will allow us to query for a user based on email address – which you’ll recall maps to the row key.

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

This flow will accept the JSON account data we just saw over HTTP, transform it to a Map, use the Cassandra connector’s “insert” message processor to persist the data and then return the payload back to JSON to return to the client.

列的序列化

Cassandra的一个优势,同时也是挑战,是所有的数据都保存为byte数组。这使得数据的存储非常灵活,不过数据的类型信息也因此丢失了。Cassandra通过使用 Hector 的序列化让你在从列拉取数据的时候指定如何对数据进行转换。

我们通过两个API的查询来看一下这是如何工作的。第一个查询让我们可以通过email地址查询用户,返回的结果是row key的map。

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

上面的流程接受我们从HTTP获得的JSON数据,转换成Map,然后使用Cassandra的 connector 的“insert” 消息处理器来持久化数据,最后将有效数据转回JSON格式并返回给客户端。

Column Serialization

One of the benefits, as well as challenges, with Cassandra is that all data is stored as byte arrays.  This makes it extremely flexible in terms of data storage but also means that type information is lost.  The Cassandra module makes use of Hector’s serializers to let you specify how data is transformed when pulling data out of a column.

Let’s take a look at how this works by specifying two query operations for the API.  The first will allow us to query for a user based on email address – which you’ll recall maps to the row key.

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

We’re using the Mule Expression Language  to parse the URI.  This is how we infer  the columnPath and rowKey.  In this case the columnPath will be “operators” and the rowKey will be “bill@acmesoft.com”.  We can query for Bill’s account now as follows:

http://localhost:8081/account/get/operations/bill@acmesoft.com

列的序列化

Cassandra的一个优势,同时也是挑战,是所有的数据都保存为byte数组。这使得数据的存储非常灵活,不过数据的类型信息也因此丢失了。Cassandra通过使用 Hector 的序列化让你在从列拉取数据的时候指定如何对数据进行转换。

我们通过两个API的查询来看一下这是如何工作的。第一个查询让我们可以通过email地址查询用户,返回的结果是row key的map。

 <flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response"
              host="localhost"
              port="8081" 
              path="account/get"   
              doc:name="HTTP"/>            
        <cassandradb:get config-ref="CassandraDB" 
               columnPath=
"Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
                rowKey=
"#[message.inboundProperties['http.relative.path'].split('/')[0]]"/>         
        <json:object-to-json-transformer doc:name="Object to JSON"/>
</flow>

【注:这一段与上一段有很多重复】

我们用 Mule的表达式语言 来解析URI。这样我们可以得到columnPath和rowKey。在这个例子,columnPath为“operators”,rowKey为“bill@acmesoft.com”。我们可以通过下面的URL查询Bill的account:

http://localhost:8081/account/get/operations/bill@acmesoft.com

There’s one problem though. When the response comes back it looks like this:

{
   "bill@acmesoft.com":{
      "passwordAge":"\u0000BE<",
      "Name":"Bill SysAdmin",
      "Password":"f1f16683f3e0208131b46d37a79c8921"
   }
}

The password age is a string instead of an integer.  This is because the Cassandra Module defaults to string serialization unless an explicit column-serializer is defined.  Let’s add one to  fix the flow.

<flow name="AccountGet" doc:name="AccountGet">
        <http:inbound-endpoint 
           exchange-pattern="request-response"
           host="localhost" 
           port="8081" 
           path="account/get" />            
        <cassandradb:get config-ref="CassandraDB" 
          columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
         rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb">
           <cassandradb:column-serializers>
                <cassandradb:column-serializer 
                     key="passwordAge" 
                     type="java.lang.Integer"/>
            </cassandradb:column-serializers>
         </cassandradb:get>       
        <json:object-to-json-transformer/>
    </flow>

Now when we refresh the URL something like this should appear:

{
   "bill@acmesoft.com":{
      "passwordAge":4343100,
      "Name":"Bill SysAdmin",
      "Password":"f1f16683f3e0208131b46d37a79c8921"
   }
}

Column serialization is available for all data types supported by Hector.

列的分片(Slices)

Cassandra模块还支持通过列分片来查询。下面的flow返回给定的 organizational unit 的所有account (row key):

 <flow name="AccountList" doc:name="AccountsList">
        <http:inbound-endpoint 
             exchange-pattern="request-response" 
             host="localhost" port="8081"
             path="account/list"/>          
         <cassandradb:get-slice 
              config-ref="CassandraDB" 
              rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" 
columnParent="Accounts" count="100">
         	<cassandradb:column-serializer 
               key="passwordAge" type="java.lang.Integer"/>        
         </cassandradb:get-slice>                  
        <json:object-to-json-transformer/>
</flow>

这将最多返回100个结果。例如,http://localhost:8081/account/list/operations 的返回是下面这样的:

[
   {
      "bill@acmesoft.com":{
         "passwordAge":4343100,
         "Name":"Bill SysAdmin",
         "Password":"f1f16683f3e0208131b46d37a79c8921"
      }
   },
   {
      "jill@acmesoft.com":{
         "passwordAge":41923143,
         "Name":"Jill NetworkAdmin",
         "Password":"32a3571fa12b39266a58d42234836839"
      }
   }
]

Column Deletion

Deleting columns is just as easy.  The following flow demonstrates how to remove a column from a row:

<flow name="AccountDelete" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response" 
              host="localhost" 
              port="8081" 
              path="account/delete"/>            
        <cassandradb:remove config-ref="CassandraDB" 
              columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
             rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb"/>         
        <json:object-to-json-transformer />
</flow>

So to delete Bill’s account we’d use a URL as follows:

http://localhost:8081/account/delete/operations/bill@acmesoft.com

Summary and What’s Next

Cassandra is a powerful contender in the NoSQL landscape.  It’s particularly suited for large data sets that need to span  multiple datacenters.  Some features we’re hoping to add to the module, and cover here, are support for a Cassandra backed Mule object store as well as support for CQL as an alternative query mechanism.

列的删除

删除列同样很简单。下面的flow演示了如何从一个row中删除一列:

<flow name="AccountDelete" doc:name="AccountGet">
        <http:inbound-endpoint 
              exchange-pattern="request-response" 
              host="localhost" 
              port="8081" 
              path="account/delete"/>            
        <cassandradb:remove config-ref="CassandraDB" 
              columnPath="Accounts:#[message.inboundProperties['http.relative.path'].split('/')[1]]"
             rowKey="#[message.inboundProperties['http.relative.path'].split('/')[0]]" doc:name="Cassandradb"/>         
        <json:object-to-json-transformer />
</flow>

下面的URL可以删除Bill的account:

http://localhost:8081/account/delete/operations/bill@acmesoft.com

总结和展望

Cassandra在NoSQL领域是一个强大的竞争者。它尤其适合于需要跨数据中心的大数据集。我们这里提到的一些希望能加入到Cassandra模块的特性,在Cassandra backed Mule的 object store 中可以找到支持,同样也支持 CQL 作为可选的查询机制。


via:oschina

转载请注明:AspxHtml学习分享网 » 使用 Apache Cassandra 和 Mule 构建 NoSQL 服务器(NoSQL with Apache Cassandra and Mule)

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址