シュワルツカウンタ

最近本を読んでいたらシュワルツカウンタを使って、Static メンバ変数の初期化順序を保証するという話が出てきました。シュワルツカウンタ自体は C++ で静的変数の利用前に初期化され、利用中に廃棄しないことを保証する仕組みらしいです。

この仕組みは More C++ Idioms で紹介されています。この記事によると、std::cout もシュワルツカウンタで初期化されているとのことです。

実際に記事のサンプルをたよりに作ってみると以下のようになりました。

#ifndef HOGE_INITALIZER_H
#define HOGE_INITALIZER_H

class HogeInitializer;

class Hoge {
friend class HogeInitializer;

public:
static int* global_count;
private:
Hoge () {}
};

class HogeInitializer {
public:
HogeInitializer ();
~HogeInitializer ();
};

static HogeInitializer initializer; // NOTE!

#endif //HOGE_INITALIZER_H

HogeInitializer のインスタンス initializer がヘッダファイル (hoge.hpp)で定義されている点がミソです。 HogeInitalizer は Hoge の static 変数 global_count を初期化します。具体的には以下のように定義されています。

#include "hoge.hpp"

static int nifty_counter = 0;
int* Hoge::global_count = 0;

HogeInitializer::HogeInitializer ()
{
if (0 == nifty_counter++)
{
// Initialize the statics in Hoge
Hoge::global_count = new int(1);
}
}

HogeInitializer::~HogeInitializer ()
{
if (0 == --nifty_counter)
{
delete Hoge::global_count;
// delete statics in Hoge if needed.
}
}

hoge.hpp を include した翻訳単位で HogeInitializer が生成されます。そのたびに nifty_counter がインクリメントされますが、はじめの呼び出しのときだけ、static 変数への代入がおこります。逆に Hoge が利用されなくなると、nifty_counter の値はデクリメントされ、最後の呼び出しのときだけ static 変数が削除されるので、static 変数を複数の場所で使用していても、変数の生存が保証されるそうです。ほへ〜〜。

#include
#include "hoge.hpp"

int main(int argc, char** argv) {
std::cout << "main: couter is " << *Hoge::global_count << std::endl;
}

Hoge を利用するフィアルはhoge.hpp をインクルードすることにより、Hogeの初期化クラス(HogeInitializer) のインスタンスを読み込むことになるため、必ず global_count の初期化は済んでいることになります。main 関数の外等で Hoge::global_count を利用する様に書き直しても、期待通り利用前に初期化されデストラクタが呼ばれた後に global_count がdelete されるのが確認できました。

さて自分の環境の(gcc 4.2.1)の iostream を読むと、ios_base::init_ioinit という static 変数が宣言があり、cout, cin 等の初期化に利用されています。

// For construction of filebuffers for cout, cin, cerr, clog et. al.
static ios_base::Init __ioinit;

Initのコンストラクタを見ると、確かにシュワルツカウンタが利用されていることが確認できます(間違っていたらすみませぬ)。

ios_base::Init::Init()
{
if (++_S_ios_base_init == 1)
{
// Standard streams default to synced with "C" operations.
ios_base::Init::_S_synced_with_stdio = true;
_S_ios_create(ios_base::Init::_S_synced_with_stdio);
}
}

Hadoop: Counters

Hadoopを利用してMap関数やReduce関数を利用する際、処理し終わったデータがどのようなものだったのか知りたいものです。調べてみると Hadoop の象さん本に書いてありますね。

MapReduce 処理が終わった後、Jobオブジェクトの getCounters メソッドを使うと Counters というクラスのオブジェクトがかえされます。このオブジェクトを調べると、Map関数に対する入力行数など処理したデータに関する情報が得られます。

以下のプログラムはワードカウントするだけですが、最後に入力ファイルの行数を出力します。

import java.io.IOException;
import java.lang.Iterable;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counters;

public class WordCountJob {

    public static class WordCountMapper
	extends Mapper<LongWritable, Text, Text, LongWritable> {

        @Override
        public void map(LongWritable key, Text value,
                            Context context) throws IOException, InterruptedException {
             StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");

            while (tokenizer.hasMoreTokens()) {
                String str = tokenizer.nextToken();
                context.write(new Text(str), new LongWritable(1));
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                                  Context context) throws IOException, InterruptedException  {
            int sum = 0;

            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception, InterruptedException {
        Job job = new Job();
        job.setJarByClass(WordCountJob.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.waitForCompletion(true);
        Counters counters = job.getCounters();
        long mapIn = counters.findCounter("org.apache.hadoop.mapred.Task$Counter",
                                         "MAP_INPUT_RECORDS").getValue();
        System.out.println("the number of lines in input is " + mapIn);
    }
}

たとえば、入力ファイルが以下のように三行からなるとき、

this is a pen
that is a hoge
those are hogehoge

先のプログラムは以下のように、入力ファイルの行数を出力します。

the number of lines in input is 3

デフォルトでも Map に対する入力だけでなく、Reduce を含め色々な入出力データに対するカウンタが定義してあります。

ちなみに、以下のページはデフォルトカウンタの種類や自分用にカスタマイズしたカウンタの作り方について親切に書いてます。
http://www.umiacs.umd.edu/~jimmylin/cloud9/docs/content/counters.html

Hadoop: 自作クラスのシリアライズ方法

以前 Hadoop におけるオブジェクトのシリアライズの仕方について述べました。その際、ArrayWritable オブジェクトをシリアライズして HBase に登録する簡単なサンプルプログラムを紹介しました。

Hadoop 本には, ArrayWritable, IntWritable や Text のように Hadoop ライブラリに存在する Writable クラスの使用方法だけでなく、Writable インターフェースを拡張して自作オブジェクトをシリアライズする例が載っています。このテクニックを使うことで HBase や HDFS に自作オブジェクトをシリアライズして追加することができ、必要になった際は、(テキストからオブジェクトを生成するよりは)高速に取り出すことができます。

勉強のために、前回作った '図書館' の例を下敷きにして、自作クラスの Writable クラス (UserWritable) の動作確認をしてみました。以下 UserWritable クラスです。

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ArrayWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public final class UserWritable implements Writable {

    public static class LongArrayWritable extends ArrayWritable {
        public LongArrayWritable() {
            super(LongWritable.class);
        }
    }

    private Text name;
    private Text job;
    private LongArrayWritable borrowedBooks;

    public UserWritable() {
        this.name = new Text();
        this.job = new Text();
        this.borrowedBooks = new LongArrayWritable();
    }

    public UserWritable(Text name, Text job,LongArrayWritable borrowedBooks) {
        this.name = name;
        this.job = job;
        this.borrowedBooks = borrowedBooks;
    }

    public Text getJob() {
        return this.job;
    }

    public Text getName() {
        return this.name;
    }

    public LongArrayWritable getBorrowedHistory() {
        return borrowedBooks;
    }

    @Override
        public void write(DataOutput out) throws IOException {
        name.write(out);
        job.write(out);
        borrowedBooks.write(out);
    }

    @Override
        public void readFields(DataInput in) throws IOException {
        name.readFields(in);
        job.readFields(in);
        borrowedBooks.readFields(in);
    }
}

UserWritable クラスはユーザの名前 (name), 職業(job), 借りた本の履歴 (borrowedBooks) フィールドを持ちます。さらに, UserWritable クラスは Writable インターフェースを実装するため、Writableインターフェースに存在する二つのメソッド (write, readFields) を実装する必要があります。

UserWritable オブジェクトが正常にシリアライズされるかを以下のサンプルプログラム (HBaseObjectSerialization) で試してみました。

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;

public class HBaseObjectSerialization {

    static final String TABLE_NAME  = "library2";
    static final String BOOK_FAMILY = "users";

    public static void main(String[] args) throws Exception {
        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin          = new HBaseAdmin(config);

        if (admin.tableExists(TABLE_NAME)) {
            admin.disableTable(TABLE_NAME);
            admin.deleteTable(TABLE_NAME);
        }

        // create table
        HTableDescriptor desc = new HTableDescriptor(TABLE_NAME.getBytes());
        desc.addFamily(new HColumnDescriptor(BOOK_FAMILY.getBytes()));
        admin.createTable(desc);

        HTable table   = new HTable(config, TABLE_NAME);

        // put ids of users and user profile to HBase
        Put userOnePut = new Put(Writables.getBytes(new LongWritable(1))); // user 1
        Put userTwoPut = new Put(Writables.getBytes(new LongWritable(2))); // user 2

        UserWritable.LongArrayWritable userOneHistory = new UserWritable.LongArrayWritable();
        UserWritable.LongArrayWritable userTwoHistory = new UserWritable.LongArrayWritable();

        userOneHistory.set(new LongWritable[] {new LongWritable(21),
                                               new LongWritable(121)});
        userTwoHistory.set(new LongWritable[] {new LongWritable(821),
                                               new LongWritable(26),
                                               new LongWritable(40)});

        UserWritable userOne = new UserWritable(new Text("John"),
                                                new Text("Professor"),
                                                userOneHistory);
        UserWritable userTwo = new UserWritable(new Text("Mike"),
                                                new Text("Engineer"),
                                                userTwoHistory);

        byte[] ueserOneHistoryByte = Writables.getBytes(userOne);
        byte[] ueserTwoHistoryByte = Writables.getBytes(userTwo);

        userOnePut.add((BOOK_FAMILY).getBytes(), null, ueserOneHistoryByte);
        userTwoPut.add((BOOK_FAMILY).getBytes(), null, ueserTwoHistoryByte);

        table.put(userOnePut);
        table.put(userTwoPut);

        // get the borrowing histories from HBase
        Scan scan = new Scan();
        scan.addFamily(BOOK_FAMILY.getBytes());
        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {
            LongWritable userId = (LongWritable) Writables.getWritable(result.getRow(),
                                                                       new LongWritable());
            UserWritable deserializedUser =
                (UserWritable) Writables.getWritable(result.getValue(Bytes.toBytes(BOOK_FAMILY)),
                                                     new UserWritable());
            System.out.println("The job of " + deserializedUser.getName()
                               + " is '"     + deserializedUser.getJob() + "'");
            Object[] userHistoryArray = deserializedUser.getBorrowedHistory().get();

            for (int j = 0; j < userHistoryArray.length; j++) {
                LongWritable bookId = (LongWritable) userHistoryArray[j];
                System.out.println("\t he/she borrwowed "
                                   + new String(Long.toString(bookId.get())));
            }
        }
    }
}

このプログラムでは二人の図書館ユーザ ("John" と "Mike") を表すオブジェクト (UserWritable) を作り、それをシリアライズして HBase に追加します。その後 HBase から Scan で追加済みのシリアライズされたオブジェクトを入手し、最後に getWritable メソッドをシリアライズされたオブジェクトに適用することで、オブジェクトを復元しています。

このプログラムを実行すると, 以下のような出力が得られ、UserWritable オブジェクトに保存されている各ユーザの情報が取り出せている (デシリアライズされている) ことが分かります。

The job of John is 'Professor'
he borrwowed 21
he borrwowed 121
The job of Mike is 'Engineer'
he borrwowed 821
he borrwowed 26
he borrwowed 40

ちなみに本ブログで載っているプログラムは全て私の環境で動作テストはしておりますが、あくまで自分が Hadoop を理解するために書いてあるものですので、例外への対処等は十分に行われていませんので悪しからず。

とはいえ、英語、日本語を含めてあまりにも参照できるドキュメントやソースコードが少ないので、おいておくとどなたかの役に立つことを期待しております。紹介させていただいているソースにあるやり方よりも良い、もしくは新しい方法があればご指摘いただけると幸いです。

Hadoop プログラムの単純なデバッグ方法について

最近 Hadoop ライブラリを用いて書かれたプログラムをデバッグする方法について調べてました。標準エラー出力を使用する方法と Context オブジェクトを利用する方法が簡単なようです。

標準エラー出力Hadoop プログラムから出力すると、プロンプトには出力されないのですが、とあるログファイルには出力されます。また、オブジェクト Context (昔は Reporter オブジェクトを使用していましたが 0.2 以降は Context を使用する方が良いらしいです) を利用して、別の場所に出力することもできます。

以下デバッグ用の文を含む単語カウントプログラムを書きました。このプログラムは通常の単語カウントプログラムなのですが、'invalid' という単語の場合だけはカウントを行わず、不正な入力として警告します (人工的なサンプルで申し訳ございません)。また入力ファイルに入っている各単語が標準エラー出力に出力されます。

import java.io.IOException;
import java.lang.Iterable;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {

    public static class WordCountMapper
        extends Mapper<LongWritable, Text, Text, LongWritable> {

        private final static String invalidWord = new String("invalid");

        @Override
            public void map(LongWritable key, Text value,
                            Context context) throws IOException, InterruptedException {

            System.err.println("Enter WordCountMapper");
            StringTokenizer tokenizer = new StringTokenizer(value.toString(), " ");

            while (tokenizer.hasMoreTokens()) {
                String str = tokenizer.nextToken();
                System.err.println("Input str: " + str);

                if (str.equals(invalidWord)) {
                    context.setStatus("Detected invalid word: " + str);
                    continue;
                } else {
                    context.write(new Text(str), new LongWritable(1));
                }
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        @Override
            protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedEx$
            int sum = 0;

            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception, InterruptedException {
        Job job = new Job();
        job.setJarByClass(WordCountJob.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

このプログラムを jar コマンドで WordCount.jar とまとめます。次に以下の内容を持つテキストファイルを作り、HDFS上に invalid.txt という名前で保存します。

this sentence contains word, invalid

最後に Hadoop からプログラム (WordCount.jar) を起動してください。

hadoop jar WordCount.jar WordCountJob invalid.txt output-wc

残念ながら、プロンプトには期待したデバッグ用の出力が現れませんが、その代わりデバッグ用の出力は Hadoop のログファイルの中に出力されています。標準エラー出力への出力は ${HADOOP_LOG_DIR}/userlogs/attempt___/stderr に出力されます。ここで attempt-id はタスクの ID です。たとえば、上記のプログラムの場合にはファイル stderr に以下の内容が記載されています。

Enter WordCountMapper
input str: this
input str: sentence
input str: contains
input str: word,
input str: invalid

ちなみに上記のプログラムでのもうひとつのデバッグ用のコードがあります。このコードは入力となる単語が invalid のときだけ "Detected possibly invalid word" と不平を出力します。この出力、

context.setStatus("Detected possibly invalid word: " + str);

はタスクトラッカー用のログファイル 
${HADOOP_LOG_DIR}/hadoop-username-tasktracker-nodename.log に出力されます。
たとえば上記の例だとファイルに以下のような行が出力が記載されます。

2009-10-23 14:41:08,378 INFO org.apache.hadoop.mapred.TaskTracker: attempt_200910231149_0013_m_000000_0 1.0% Detected possibly invalid record: invalid

このようなデバッグログを見るには、Hadoopのログディレクトリを探すのも良いですが、http://node-url:50030/jobtracker.jsp にアクセスすると、ブラウザ上からもログファイルを見ることができ便利です。

Hadoopには他にもいろいろなデバッグ方法がありますが、今回はこんな感じで。

Mockito と Junit を使用した Hadoop Unit Test

まだまだ、Hadoop の勉強中です。今は Hadoop プログラムのテストおよびデバッグ方法について調べてました。

Hadoop 本 (http://oreilly.com/catalog/9780596521981)には Junit と Mockito を利用したテストの書き方が紹介されています。勉強がてら簡単なプログラムを書いてみました。

まずは、今回テストされる単語をカウントするためのマッパークラスを以下のように定義します。このクラスの map メソッドは入力文をスペースで分割してキーが単語 (Text), バリューが 1 (LongWritable) の要素を Context.write によって追加します。

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper
    extends Mapper<LongWritable, Text, Text, LongWritable> {

  @Override
  protected void map(LongWritable key, Text value,
                     Context context) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
          context.write(new Text(itr.nextToken()), new LongWritable(1));
      }
  }
}

この map メソッドが期待通り動いているのかチェックするために、以下のテストプログラムを書きました。このプログラムでは、WordCountMapper内の map メソッドで期待した出力が Context オブジェクトの write メソッドを通じて期待された回数だけ出力されているのかを verify メソッドでチェックします。

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

import org.junit.*;
import org.junit.runner.JUnitCore;
import static org.mockito.Mockito.*;

import java.lang.InterruptedException;
import java.io.IOException;

public class WordCountMapperTest {

    public static void main(String[] args) {
        JUnitCore.main(WordCountMapperTest.class.getName());
    }

    @Test
        public void testValidInput() throws IOException, InterruptedException {
        WordCountMapper wordCountMapper = new WordCountMapper();
        Text    value   = new Text("This is a pen and that is a note.");
        Context mock_context = mock(Context.class);

        wordCountMapper.map(null, value, mock_context);

        verify(mock_context, times(1)).write(new Text("This"), new LongWritable(1));
        verify(mock_context, times(2)).write(new Text("is"), new LongWritable(1));
        verify(mock_context, times(2)).write(new Text("a"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("pen"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("that"), new LongWritable(1));
        verify(mock_context, times(1)).write(new Text("note."), new LongWritable(1));
    }
}

Hadoop 簡単いんすとーる

最近個人用の計算機をいただきました。このコンピュータにまた Hadoop をインストールするのは面倒だと思いましたが、 Hadoop 本に簡単インストールの方法が書かれているのを思い出しました。このやり方だと、ユーザを新しく作ったり、XML の設定ファイルを編集したりなどの手続きが必要ないため簡単にインストールができます。ただし、RedHat 系もしくは Debian 系 の Linux のみ対応です。

詳しくは Cloudera のページ http://archive.cloudera.com/docs/_installing_hadoop_pseudo_distributed_mode.html に書かれていますが、以下 Debian系の場合についての流れを日本語にしました。

  • Apt のレポジトリに Cloudera を追加
    • /etc/apt/sources.list.d/cloudera.list というファイルを作る
    • /etc/apt/sources.list.d/cloudera.list に以下の二行を追加

deb http://archive.cloudera.com/debian DISTRO-stable contrib
deb-src http://archive.cloudera.com/debian DISTRO-stable contrib

ここで DISTRO は使っているシステムにあわせて書き換える必要があります。 "lsb_release -c" コマンドで使っているディストリビューションの名前が分かります*1

  • レポジトリキーの追加 

以下のコマンドを実行

curl -s http://archive.cloudera.com/debian/archive.key | sudo apt-key add -

  • Apt のパッケージ索引を更新 

以下のコマンドを実行

sudo apt-get update

  • hadoop パッケージのインストール

apt-cache search hadoop
sudo apt-get install hadoop

  • hadoop-conf-pseudo パッケージのインストール

apt-get install hadoop-conf-pseude

  • サービスの立ち上げ

以下のコマンドを実行

for service in /etc/init.d/hadoop-*
do
sudo $service start
done

サービスを立ち上げた後は、ユーザを作ったり HDFS を初期化するなどの処理が必要なく、もうそのまま使えます。たとえば、

hadoop fs -mkdir hogehoge

とすると HDFS 上に hogehoge というディレクトリがくつられるはずです。

ここでは仮想クラスタの場合のインストールですが、このパッケージはマルチノードのインストールにも対応しています。

*1:ディストリビューション jaunty には stableのパッケージがないそうなので、その場合は "intrepid" と書き換えること。

HBase と Serialization

Hadoop で計算したデータは HDFS にファイルとして保存するのが手軽ですが、出力されたファイルに含まれるデータ片にアクセスするにはファイルを全ロードする必要があって面倒です (MapFile にはランダムアクセスできますが)。このような場合データベースにデータを格納すると格納された個々のデータ片アクセスできて便利です。

そこで HBase という データベースにデータを格納し、後でそのデータを取り出すという処理について調べてました。HBase は Hadoop のサブプロジェクトであり、キーバリューペアのデータを格納できます。HBase ではシリアライズされたオブジェクトを入れておいて、後でデシリアライズすることでオブジェクトを元通り復元することができます。

HBase のシリアライズの仕方については、HBase の Serialization テストを見ると書いてあります。ただ少々と分かりづらいので勉強がてら単純なサンプルを作ってみました。このサンプルプログラムで格納するデータは図書館の利用履歴です。ユーザ ID がキーで、借りられた本の ID からなる配列がバリューです。サンプルのシリアライズは Writable というインターフェースに基づいています。処理は単純で HBase のテーブル "library" を作って、そのテーブルに ユーザ IDと利用履歴を表す Array (LongArrayWritable) データを入れてからそれを取り出してデシリアライズする処理を行っています。このサンプルプログラムを実行すると、以下の用に本とそれを借りたユーザペアが表示されます。

book 21 is borrowed by 1-th user.
book 121 is borrowed by 1-th user.
book 821 is borrowed by 2-th user.
book 26 is borrowed by 2-th user.
book 40 is borrowed by 2-th user.

動作確認はできたので次はもうちっと本格的な自作のクラスのシリアライズをやってみようと思います。自作クラスをシリアライズするには Writable インタフェースを実装する必要があるそうです。ただシリアライズの方法も他にあるそうなのでそこいら辺をチェックしなくては。

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;

class LongArrayWritable extends ArrayWritable {
  public LongArrayWritable() {
    super(LongWritable.class);
  }
}

public class HBaseSampleAppendArray {

    static final String TABLE_NAME  = "library";
    static final String BOOK_FAMILY = "book_number";

    public static void main(String[] args) throws Exception {
        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin          = new HBaseAdmin(config);

        if (admin.tableExists(TABLE_NAME)) {
            admin.disableTable(TABLE_NAME);
            admin.deleteTable(TABLE_NAME);
        }

        // create table
        HTableDescriptor desc = new HTableDescriptor(TABLE_NAME.getBytes());
        desc.addFamily(new HColumnDescriptor(BOOK_FAMILY.getBytes()));
        admin.createTable(desc);

        HTable table   = new HTable(config, TABLE_NAME);

        // put ids of users and borrowed books to HBase
        Put userOnePut = new Put(Writables.getBytes(new LongWritable(1)));
        Put userTwoPut = new Put(Writables.getBytes(new LongWritable(2)));

        LongArrayWritable userOneHistory = new LongArrayWritable();
        LongArrayWritable userTwoHistory = new LongArrayWritable();

        userOneHistory.set(new LongWritable[] {new LongWritable(21),
                                               new LongWritable(121)});
        userTwoHistory.set(new LongWritable[] {new LongWritable(821),
                                               new LongWritable(26),
                                               new LongWritable(40)});

        byte[] ueserOneHistoryByte = Writables.getBytes(userOneHistory);
        byte[] ueserTwoHistoryByte = Writables.getBytes(userTwoHistory);

        userOnePut.add((BOOK_FAMILY).getBytes(), null, ueserOneHistoryByte);
        userTwoPut.add((BOOK_FAMILY).getBytes(), null, ueserTwoHistoryByte);

        table.put(userOnePut);
        table.put(userTwoPut);

        // get the borrowing histories from HBase
        Scan scan = new Scan();
        scan.addFamily(BOOK_FAMILY.getBytes());
        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {
            LongWritable userId = (LongWritable) Writables.getWritable(result.getRow(), new LongWritable());
            for(KeyValue key : result.sorted()) {
                LongArrayWritable deserializedUserHistory =
                    (LongArrayWritable) Writables.getWritable(result.getValue(Bytes.toBytes(BOOK_FAMILY)),
                                                              new LongArrayWritable());

                Object [] userHistoryArray = deserializedUserHistory.get();
                for (int j = 0; j < userHistoryArray.length; j++) {
                    LongWritable bookId = (LongWritable) userHistoryArray[j];
                    System.out.println("book "
                                       + new String(Long.toString(bookId.get()))
                                       + " is borrowed by " + userId + "-th user." );
                }
            }
        }
    }
}