― Web Technology and Life ―

Perlのメモリ保存型Job Queueフレームワーク入門覚書 ~ClutchとGearmanとWorkerのFork~

2012-03-26
お仕事をメモリに保存するタイプのジョブキューフレームワークのGearmanClutchをそれぞれ使ってみて、実用的なワーカーのforkのコードを書いた、そのあたりの備忘録を、Hachojio.pmでLTした内容です。

やりたいこと ~なんでJob Queueとか使おうとしたのか~

なんで入門するかって話ですが、こんな経緯があったのです。「いくつかのAPIをたたいてその結果を集約して返したい」、あるいは、「たくさんAPIをたたきたいんだけど短時間で処理して、結果を返したい」と思ったんです。つまり、「並列処理をしつつ、結果をまとめて返したい」ということ実現する必要が出たんですね。それで、Perlでじっそうするとなると、以下の4つから選ぶんだと思います。

  • Coro
  • AnyEvent
  • Fork
  • Job Queue

AnyEventとかCoroとかは並列処理をさせる中身によって気にしなきゃいけないことが増えるし、そのままForkすると自分が親プロセスになって子プロセスを管理しなきゃいけなくなってまたまた拡張性が微妙になる気がするしAPIって利用制限とかあったりするから無尽蔵にオラオラでリクエスト送信しすぎても困るわけで、そこんとこ早めに切りだしておいた方が吉ということで、Job Queueに決めたわけです。一方で、日曜大工としては別のプロセス書くっていうのは大掛かりだし、結局ワーカーをForkするからメモリを食うので日曜大工用の512MメモリのVPSには不向きかなーというところもあります。しかし、日曜大工からブレイクするサービスもあるみたいだから、拡張性重視で、やっぱりJob Queue!

What is Job Queue? ~なぜGearmanやClutchにしようと思ったか~

それで、Job Queueにするにしても、

  • メモリに仕事を貯めるタイプ(Gearmanなど)
  • DBに仕事を貯めるタイプ(Q4M、TheSchwartz、Qudoなど)

とタイプがあるわけですね。詳しくは「Job Queueってそもそも何?どんなときに使うものなの?」って話は『ジョブキューで後回し大作戦』をご参考に頂くとして話を進めると、今回は「ワーカーがした仕事の結果をそのままクライアントで受け取りたい」ということをしたいんで、そいういうのって前者の「メモリに仕事を貯めるタイプ」がサポートしていていいんですね。それで、「Gearmanをつかってみよー。あと、そういえば、nekokakさんがClutchって作っていたなー」となったわけです。

それで具体的な仕様を整理すると、こんなかんじの実装になりますね。

  • クライアント(ウェブアプリ的には、コントローラーから呼び出されて、ワーカーに仕事を依頼して結果を受け取るロジックのクラスにします)
  • ワーカー(並列処理したい部分。1ワーカーだけだと並列じゃないので、「複数ワーカーを作る=forkする」。あと、子プロセス開始時と終了時にフックをかけたい)

ではではこんな文脈で「Gearman」と「Clutch」に入門していきたいと思います。

Clutchに入門

Clutchの概要は、Clutch - distributed job systemとか、Clutchを参考になります。特徴としては、

  • ジョブキュー
  • メモリに貯めるタイプ
  • クライアントがワーカーの処理結果受け取れる
  • 基本的に中間デーモンいらない
  • ワーカーが最初からforkしてくれる
  • 2011-03-05現在、まだ「一気に仕事を依頼して、一気に仕事の結果を受け取る」ということはきない

ということで、今回の要件では使えず。。。残念。

【追記:2012-03-26 13:20】『Clutch-0.04 released』ということで、最新版の0.04のClutchでは、「一気に仕事を依頼して、一気に仕事の結果を受け取る」機能が追加されていたそうです。。。気づかずにすみません。。。試してみたら、ココに追記しますー。

Gearmanに入門

簡単に使う方法は 『今さらGearman入門』がとても参考になりました。また、トリッキーな使い方は、『4Gbpsを超えるWebサービス構築術』の「すぱむちゃんぷるー」の実装のコードが参考になりますね。『Gearmanのはまりどころ』がかなり要チェックですね。

一気に仕事を依頼して、一気に仕事の結果を受け取るコードの書き方

clientのサンプルコード

use strict;
use warnings;
use utf8;
use Gearman::Client;
use Data::MessagePack;
use Data::Dumper;

my $client = Gearman::Client->new(job_servers => ['127.0.0.1']);

my $ts = $client->new_task_set;

my $returns = +{};#workerの返り値をクロージャーで入れる用のリファレンス
my $arg = Data::MessagePack->pack({arg_1 => 'hoge', arg_2 => 100});#workerに渡す引数を作成、いまどきData::MessagePack

$ts->add_task(#work_aという仕事をworkerに依頼
    "work_a",
    \$arg,
    +{
        on_complete => sub { $returns->{work_a}->{result}  = ${$_[0]} },
        on_fail     => sub { $returns->{work_a}->{is_fail} = 1        },
        retry_count => 5,
    }
);
$ts->add_task(#work_bという仕事をworkerに依頼
    "work_b",
    \$arg,
    +{
        on_complete => sub { $returns->{work_b}->{result}  = ${$_[0]} },
        on_fail     => sub { $returns->{work_b}->{is_fail} = 1        },
        retry_count => 5,
    }
);

$ts->wait;#結果が返ってくるまで待機

warn Dumper $returns;

workerの返り値がundefの場合、on_failが呼ばれ、workerの返り値がある場合、on_cmpleteが呼ばれます。

Workerのサンプルコード

use strict;
use warnings;
use utf8;
use Gearman::Worker;
use Data::MessagePack;
use Data::Dumper;

my $worker = Gearman::Worker->new(job_servers => ['127.0.0.1']);

$worker->register_function( 'work_a' => sub {
    sleep(1);

    my $arg = Data::MessagePack->unpack($_[0]->arg);

    warn 'work_a';
    print Dumper $arg;

    return Data::MessagePack->pack({'work_a' => 'return'});

} );

$worker->register_function( 'work_b' => sub {
    sleep(1);

    my $arg = Data::MessagePack->unpack($_[0]->arg);

    warn 'work_b';
    print Dumper $arg;

    return Data::MessagePack->pack({'work_b' => 'return'});

} );

$worker->work(stop_if => sub { 0 } );

こんな感じに書くみたいですね。

個人的なGearmanの微妙なところ

で、実際にちゃんとしたコードをGearmanで書こうと思うと、以下が微妙なのです。

  • gearmandという中間daemonをテストのときにどうするのか考えるのがめんどい
  • Gearman::Worker::workが巨大でよくわからん
  • register_functionにコールバック渡すのしっくりこない

gearmandという中間daemonをテストのときにどうするのか考えるのがめんどい

Proc::Guardでもしちゃう?とか思う時点でめんどくさいですねー(苦笑)まぁ、それでもいいんですけどね。テスト書くには、tokuhiromさんのProc::GuardとTest::TCP使ったテスト案の記事がとても参考になります。

Gearman::Worker::workが巨大でよくわからん

Gearman::Workerのworkメソッドのコールバック群をいろいろ利用してLivedoor本に載っていたような頑張ったコードを書けないなーと思いましたし、あと、「ドキュメントと実装が全然違うよー!」ということで、覚えているうちはいいんですが、メンテナンスしようとしたときに忘れて、いちからGearman::Workerのコード読みだすのだるいっすね。。。

register_functionにコールバック渡すのしっくりこない

QudoとかTheSchwartzはお仕事を登録するようのクラスがあって、それを継承するカタチでオレオレクラスをかけるのですが、なんかコールバックで渡すのは、コードリファレンスが入ってそれって、ちょっとレベル高い感の漂うコードなので、基本的に敬遠気味wまぁ、結局、ゴニョゴニョ書いて専用のクラスは作るのですが・・・、まぁそういう意味だとフレームワークと題しましたがちょっとフレームワークっぽくないかもなー。

まぁ、そうはいっても今回はGearmanでやるしかなさそうなので、とりあえず、Gearman::Workerでforkのコード書きます。

ちょっと休憩 ~その他の戯言~

Gearmanでみつけた見慣れぬコード

 my Gearman::Taskset $ts = shift;

「Gearman::Tasksetって部分何なの?」「なんかいっぱい出て来てなんか疲れるんですが・・・。」って思いました。ただ、これ(インスタンス変数の前にそのインスタンスのクラス名書くこと/書けること)のメリットとして、

  • ショートカット的な変数名つけたときにわかりやすい
  • ちょっとしたバリデーターとしてもつかえる

ということになりますので、そう考えるとなかなかモジュールも使わずこんなことできるのって結構Perlいけているかもみたいな。

ディストリビューションがフェイク

Perlで使うときに、cpanディストリビューション的には、Gearmanじゃなくて、Gearman::Serverを入れるのがフェイクだと思うんですね。あと、Gearman::XSとかあるんですねー。Gearman::XSのほうが高速なんでしょうねー。でも、こんなツイートがあるんで微妙なのかも。以上、戯言でした。。。では実用的なforkのコードに入っていきましょう。

Gearman::Workerでforkのコード

forkのコードのポイント

nekokakさんのジョブキューを使う際の注意点 - ジョブキューで後回し大作戦によれば、ジョブキューのワーカーを書くには以下に気をつける必要があるということです。

  • 途中でkillしたときに、データの不整合が起きないように、今やっている処理が終わってからkillされるようにする
  • メモリリーク対策として一定回数処理したら自動的に再起動するようにする

これに今回の要件を加えつつ、今回書くforkのコードのポイントは以下になります。

  • TERMシグナル(kill -TERM $pid)おくって処理中のジョブを終わらせてから終了
  • HUPシグナル(kill -HUP $pid)おくって処理中のジョブ終わってから、再起動
  • メモリリークしていても定期的に子プロセスを殺すことによっていい感じの按配のメモリ使用率加減を維持
  • 子プロセスの開始時と終了時にいい感じのHookをかけられる
  • 【追記:2012-03-27】Gearman::Workerはfork safeじゃないんでfork後生成

【追記:2012-03-27】@nihenさんに「はまりどころを参考にしていただいて恐縮ですが、fork後worker生成になってないようですよっと。 」という風にツッコミ頂きましたー。ありがとうございます。なんか、最後まで注意が及ばずでした、、、ホント助かりました。後述のコードも修正しました!

その他自分に向けた参考情報など

結果的に書いたWorkerコード

基本、ベースはQudoです。Workerのforkさせるモジュールはこんな感じで


package MyWorker;
use strict;
use warnings;
use utf8;
use UNIVERSAL::require;
use Smart::Args;
use Parallel::Prefork;
use Gearman::Worker;
use Data::MessagePack;
use Data::Recursive::Encode;

sub new {
    args my $class,
         my $servers                => 'ArrayRef',
         my $max_workers            => 'Int',
         my $max_requests_per_child => 'Int',
         my $manager_abilities      => 'ArrayRef',
         my $default_hooks          => 'ArrayRef';

    my $self = bless {
        servers                 => $servers,
        max_workers             => $max_workers,
        max_requests_per_child  => $max_requests_per_child,
        manager_abilities       => $manager_abilities,
        default_hooks           => $default_hooks,
        hooks                   => +{},
    },$class;

    $self->register_hooks(@{$self->{default_hooks}});

    return $self;
}

sub hooks  { $_[0]->{hooks} }

sub call_hook {
    my ($self, $hook_point) = @_;

    for my $module (keys %{$self->hooks->{$hook_point}}) {
        my $code = $self->hooks->{$hook_point}->{$module};
        $code->();
    }
}

sub register_hooks {
    my ($self, @hook_modules) = @_;

    for my $module (@hook_modules) {
        $module->use or Carp::croak $@;
        $module->load($self);
    }
}

sub worker {
    my ($self) = @_;

    my $worker = Gearman::Worker->new(
        job_servers => $self->{servers}
    );

    for my $pkg (@{$self->{manager_abilities}}) {
        $pkg = $pkg =~ s/^\+// ? $pkg : __PACKAGE__."Work::$pkg";

        $pkg->use or Carp::croak $@;

        $worker->register_function( $pkg => sub {
            my $arg = Data::Recursive::Encode->decode_utf8(Data::MessagePack->unpack($_[0]->arg));

            my $res = $pkg->work($arg);

            return Data::MessagePack->pack($res);
        });
    }

    $worker;
}


sub run {
    my ($self) = @_;

    my $pm = Parallel::Prefork->new(
        +{
            max_workers => $self->{max_workers},
            trap_signals => +{
                TERM => 'TERM',
                HUP  => 'TERM',
                USR1 => undef,
            },
        }
    );

    while ( $pm->signal_received ne 'TERM' ) {
        $pm->start and next;
        $self->call_hook('on_start_child');

        my $worker = $self->worker;

        my $counter = 0;
        my $max_requests_per_child = $self->{max_requests_per_child};
        $SIG{TERM} = sub { $counter = $max_requests_per_child };

        $worker->work(
            on_start => sub { $counter++ },
            stop_if  => sub { $counter >= $max_requests_per_child },
        );

        $self->call_hook('on_end_child');
        $pm->finish;
    }

    $pm->wait_all_children;
}

1;

ワーカークラスはこんな感じに用意して。


package MyWork::ApiRequest;
use Furl;

sub work {
.
.
.
    my $f = Furl->new;

    my $res = $f->get($url);
.
.
.

}

実際に使うにはこんな感じ。(HookはQudoのHookと同じ感じに用意)


MyWorker->new(
    servers                 => +['127.0.0.1'],
    max_workers             => 19,
    max_requests_per_child  => 20,
    default_hooks           => [qw/
        MyWorker::Hook::YourHook    
    /],
    manager_abilities       => +[qw/
        +MyWork::ApiRequest#or ApiRequest
    /],
)->run;

クライアントのコードは上述のをホネホネとカスタマイズすればいいと思います。あと、上述のコードは、実際に書いたコードよりも一般的に今書きなおしたので、ちょっと動かないかもw

感想

Hachioji.pmで発表したら、「Qudoでいんじゃない?」とか「jsでキックバックしたら?」とFBもらい、「確かに!」と思いつつ、「js書けないんだよなー」と思って悶々としました。そろそろその入門する機会かーと思い、サンプルスクリプト探してきて、コピペしつつカスタマイズしながら、Jqueryに入門していこうかなーと思いました。

Gearmanはよくできるているなーと思うんだけど、ドキュメントがこれほどひどい有名モジュールははじめて見ました(苦笑)「ここが変だよhirobanex!」とか「メモリに仕事を貯めるタイプのジョブキューでこんなもいいよ!」とか、いろいろあったお気軽にtweetとかはてぶとかコメント下さい!
いろんな意味で、そろそろネタ切れ感のただよる年度末ですが、Clutchを応援したいけど、Socket通信とかよくわからんからパッチも送れず・・・もっとさくっと書けるようになりたいと思う今日この頃。
最後に、このエントリーはhachioji.pm#15の発表をもとにしています。thanks for Hachioji.pm!!

Perl hachioji.pm update_at : 2012-03-27T02:02:49
hirobanex.netの更新情報の取得
 RSSリーダーで購読する   
blog comments powered by Disqus