riak mysql_[Translate] 从SQL数据库迁移到Riak
例子
關(guān)系數(shù)據(jù)庫: 創(chuàng)建表
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
author VARCHAR(30) NOT NULL,
title VARCHAR(50) NOT NULL,
body TEXT NOT NULL,
created DATE NOT NULL
);
關(guān)系數(shù)據(jù)庫: 查詢
SELECT * FROM posts WHERE id = 99;
關(guān)系數(shù)據(jù)庫: 查詢結(jié)果
id | author | title | body | created
----+------------+--------------------------------+----------------------------+------------
99 | John Daily | Riak Development Anti-Patterns | Writing an application ... | 2014-01-07
基本轉(zhuǎn)換和存儲(chǔ)過程如下
每一個(gè)表行被轉(zhuǎn)換為一個(gè)JSON對(duì)象,包含除id字段外的所有其他字段.
id字段不會(huì)存儲(chǔ)在Riak中的JSON對(duì)象中. 它不會(huì)作為對(duì)象的鍵. 而是把標(biāo)題作為鍵, 30字符長(zhǎng)度限制, 小寫, 并且用-作為單詞之間的連字符. 比如上面的例子把riak-development-anti-patterns作為鍵.
所有產(chǎn)生自posts表的JSON對(duì)象存儲(chǔ)在名為posts的單個(gè)Riak桶當(dāng)中.
鍵存在在Riak set中, 需要的時(shí)候, 使所有對(duì)象可以被一次查詢出來.
把Table轉(zhuǎn)換為L(zhǎng)ist
下面, 我們使用psycopg2庫來把表轉(zhuǎn)換為列表
import psycopg2
connection = psycopg2.connection('dbname=blog_db')
cursor = connection.cursor()
用該游標(biāo)(cursor)執(zhí)行SELECT * FROM posts查詢,然后使用fetchall函數(shù)從游標(biāo)當(dāng)中獲取信息:
cursor.execute('SELECT * FROM posts')
table = cursor.fetchall()
table對(duì)象由一個(gè)Python元組列表構(gòu)成, 如下:
[(1, 'John Doe', 'Post 1 title', 'Post body ...', datetime.date(2014, 1, 1)),
(2, 'Jane Doe', 'Post 2 title', 'Post body ...', datetime.date(2014, 1, 2)),
# more posts in the list
]
轉(zhuǎn)換行為JSON對(duì)象
上面的代碼把posts表中的每一行數(shù)據(jù)轉(zhuǎn)換為包含5個(gè)元素的元組, 為了更好的進(jìn)行轉(zhuǎn)換, 我們把元組轉(zhuǎn)換為能夠在Riak中更好的處理的Python字典數(shù)據(jù)類型, 官方的Riak Python客戶端能夠自動(dòng)地把Python字段轉(zhuǎn)換為能夠在Riak中存儲(chǔ)的JSON對(duì)象. 一旦我們有了一個(gè)字典列表, 就可以直接在Riak中存儲(chǔ)這些字典類型的數(shù)據(jù).
下面通過Python代碼把數(shù)據(jù)庫中的數(shù)據(jù)轉(zhuǎn)換為JSON格式:
import datetime
def convert_row_to_dict(row):
return {
'author': row[1],
'title': row[2],
'body': row[3],
'created': row[4].strftime('%m-%d-%Y')
}
產(chǎn)生如下的JSON數(shù)據(jù):
{
'author': 'John Daily',
'title': 'Riak Development Anti-Patterns',
'body': 'Writing an application ...',
'created': '01-07-2014'
}
存儲(chǔ)行對(duì)象
現(xiàn)在使用store_row_in_riak把posts表中的數(shù)據(jù)存儲(chǔ)到Riak中:
用博客的標(biāo)題來構(gòu)造鍵, 獲取簽名30字符, 字符全部轉(zhuǎn)換為小寫, 并用-替換空格.
每行將被轉(zhuǎn)換為一個(gè)正確的Riak對(duì)象,并存儲(chǔ)
下面是這個(gè)函數(shù)的實(shí)現(xiàn):
bucket = client.bucket('posts')
def store_row_in_riak(row):
key = row[2][0:29].lower().replace(' ', '-')
obj = RiakObject(client, bucket, key)
obj.content_type = 'application/json'
obj.data = convert_row_to_dict(row)
obj.store()
上面我們?cè)谝粋€(gè)Riak set存儲(chǔ)所有對(duì)象的鍵, 以幫助我們?cè)趯砟軌虿樵冞@些對(duì)象. 我們修改上的store_row_in_riak函數(shù)添加每一個(gè)鍵到一個(gè)set(集合):
from riak.datatypes import Set
objects_bucket = client.bucket('posts')
key_set = Set(client.bucket_type('sets').bucket('key_sets'), 'posts')
def store_row_in_riak(row):
key = row[0]
obj = RiakObject(client, bucket, key)
obj.content_type = 'application/json'
obj.data = convert_row_to_dict(row)
obj.store()
現(xiàn)在,我們編寫一個(gè)迭代器用于存儲(chǔ)所有的行:
# Using our "table" object from above:
for row in table:
store_row_in_riak(row)
一旦所有對(duì)象存儲(chǔ)到Riak, 我們可以執(zhí)行正常的鍵/值操作來一個(gè)一個(gè)地獲取. 下面是一個(gè)使用Riak HTTP API的例子:
curl http://localhost:8098/buckets/posts/keys/99
這將返回一個(gè)博客(Blog)對(duì)象:
{
"author": "John Daily",
"title": "Riak Development Anti-Patterns",
"body": "Writing an application ...",
"created": "01-07-2014"
}
需要的時(shí)候, 我們也可以一次獲取所有這些對(duì)象. 前面我們?cè)赗aik set中存儲(chǔ)了所有對(duì)象的鍵. 我們可以編寫一個(gè)函數(shù)從set中獲取所有對(duì)象的key, 以及和這些key相關(guān)的所有對(duì)象:
from riak.datatypes import Set
set_bucket = client.bucket_type('sets').bucket('key_sets')
posts_bucket = client.bucket('posts')
def fetch_all_objects(table_name):
keys = Set(client, bucket, table_name)
for key in keys:
return posts_bucket.get(key)
fetch_all_objects('posts')
這回返回之前存儲(chǔ)的Python字典的完整列表.
使用二級(jí)索引
添加一個(gè)關(guān)鍵字字段(keywords), 來描述一篇博客的關(guān)鍵字列表:
CREATE TABLE posts (
id SERIAL PRIMARY KEY,
author VARCHAR(30) NOT NULL,
title VARCHAR(50) NOT NULL,
body TEXT NOT NULL,
created DATE NOT NULL,
keywords TEXT[] NOT NULL
);
插入數(shù)據(jù):
INSERT INTO posts (author, title, body, created, keywords) VALUES
('Basho', 'Moving from MySQL to Riak', 'Traditional database architectures...',
current_date, '{"mysql","riak","migration","rdbms"}');
為沒一個(gè)博客的關(guān)鍵字列表添加一個(gè)二級(jí)索引. 下面我們編寫一個(gè)函數(shù)對(duì)每一個(gè)關(guān)鍵字附加一個(gè)二級(jí)索引.
def add_keyword_2i_to_object(obj, keywords):
for keywork in keywords:
obj.add_index('keywords_bin', keyword)
然后我們可以插入該函數(shù)到store_row_in_riak函數(shù)中:
bucket = client.bucket('posts')
def store_row_in_riak(row):
obj = RiakObject(client, bucket, row[0])
obj.content_type = 'application/json'
obj.data = convert_row_to_dict(row)
add_keyword_2i_to_object(obj, row[5])
obj.store()
現(xiàn)在我們可以基于一篇博客的關(guān)鍵字來查詢:
bucket = client.bucket('posts')
def fetch_posts_by_keyword(keyword):
for key in bucket.get_index('keywords_bin', keyword):
return bucket.get(key)
該函數(shù)會(huì)返回所有包含特定關(guān)鍵字的博客列表.
總結(jié)
以上是生活随笔為你收集整理的riak mysql_[Translate] 从SQL数据库迁移到Riak的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: STM32寄存器操作端口模式CRL/CR
- 下一篇: Redis缓存对象相关