[ トップページ ]

« ストリーム・データ処理によるオンライン・オークションのシミュレーション (非構造化版) | メイン | Perl でフォームデータから UTF-8 日本語文字をとりだす方法 »

ストリームデータ処理

ストリーム・データ処理によるオンライン・オークションのシミュレーション (構造化版)

ストリーム・データ処理は通常はデータベースの処理と同様にインタプリタ的に実行される. しかし,ここではクエリの実行のしくみをみるために,あえてそれを Perl のプログラム (“コンパイル・コード”) でシミュレートしてみる. とりあげる例題は STREAM (Stanford Stream Data Manager) や ATLaS (Aggregate & Table Language and System) でも使用されているオンライン・オークションである. なお,このページでは構造的な Perl に翻訳するやりかたをしめす. 「ストリーム・データ処理によるオンライン・オークションのシミュレーション (非構造化版)」 においては goto 文をつかった非構造的な版のプログラムをしめす.

目次

はじめに

Stream Query Repository” というサイトには,CQL (Continuous Query Language) の文とその実行例がいろいろ書かれている. そこにあるをすこしかえて,Perl に翻訳してみることにする. このページのなかにもひととおりのプログラムを収容するが,全部をつないだ ソースコード もある.

使用するデータ

Stream Query Repository: Online Auctions” でつかわれているデータを一部だけ変更したものをしめす.

関係

ここであつかういくつかの関係 (relation) は配列によって表現する. オークションにおいてあつかわれる品目は Item という関係の要素 (record または tuple) とみなすことができる. 品目は識別子 (id),なまえ (name),種類の識別子 (categoryID),登録時刻 (registrationTime) などの属性をもつ. 以下のテスト・データは NEXMark ベンチマークのテスト・データのごく一部の部分を変更して使用している.

my @Item =
    ({id => 0, name => 'Item 1',
      description => '',
      categoryID => 1, registrationTime => 0},
     {id => 1, name => 'Item 2',
      description => '',
      categoryID => 2, registrationTime => 0},
     {id => 2, name => 'Item 3',
      description => '',
      categoryID => 2, registrationTime => 0},
     {id => 3, name => 'Item 4',
      description => '',
      categoryID => 1, registrationTime => 0});

もとのベンチマークであつかわれている多数の品目のうち,ここでは 4 品目だけをあつかっている.

また,売り手,買い手や入札者はひとであり,それは Person という関係の要素だとかんがえることができる. 各人はなまえ,E-メイル・アドレス,国,市などの属性をもつが,国,市などは不明なばあいもある.

my @Person = 
    ({id => 0,
      name => 'Luitpold Martucci', emailAddress => 'Martucci@toronto.edu',
      city => '', country => ''},
     {id => 1,
      name => 'Takakazu Schonegge', emailAddress => 'Schonegge@panasonic.com',
      city => 'Gothenburg', country => 'United States'},
     {id => 9,
      name => 'Leonid Nour', emailAddress => 'Nour@llnl.gov',
      city => '', country => ''},
     {id => 11,
      name => 'Ravindra Abdelmoty', emailAddress => 'Abdelmoty@whizbang.com',
      city => 'Manchester', country => 'United States'},
     {id => 13,
      name => 'Annette Klaiber', emailAddress => 'Klaiber@propel.com',
      city => '', country => ''},
     {id => 14,
      name => 'Jacinto Ceccarelli', emailAddress => 'Ceccarelli@unbc.ca',
      city => 'Corpus', country => 'United States'},
     {id => 15,
      name => 'Janick Blaauw', emailAddress => 'Blaauw@ogi.edu',
      city => '', country => ''},
     {id => 16,
      name => 'Warwich Marsiglia', emailAddress => 'Marsiglia@cmu.edu',
      city => 'South', country => 'United States'},
     {id => 18,
      name => 'Justus Binkley', emailAddress => 'Binkley@ogi.edu',
      city => '', country => ''},
     {id => 19,
      name => 'Kunsoo Raghavendran', emailAddress => 'Raghavendran@umass.edu',
      city => 'Paris', country => 'Tuvalu'},
     {id => 20,
      name => 'Takuji Liedekerke', emailAddress => 'Liedekerke@cnr.it',
      city => '', country => ''},
     {id => 22,
      name => 'Shiyi Polster', emailAddress => 'Polster@ucd.ie',
      city => '', country => ''},
     {id => 30,
      name => 'Keiko Nastansky', emailAddress => 'Nastansky@co.jp',
      city => '', country => ''},
     {id => 31,
      name => 'Keumog Vuskovic', emailAddress => 'Vuskovic@filemaker.com',
      city => 'Cancun', country => 'Tajikistan'},
     {id => 33,
      name => 'Armand Impagliazzo', emailAddress => 'Impagliazzo@cwi.nl',
      city => 'Oakland', country => 'Heard and Mcdonald Island'},
     {id => 36,
      name => 'Zoe Holmback', emailAddress => 'Holmback@ucsb.edu',
      city => '', country => ''},
     {id => 39,
      name => 'Rajamani Pinzani', emailAddress => 'Pinzani@mitre.org',
      city => 'Gothenburg', country => 'United States'},
     {id => 44,
      name => 'Jianying Parikh', emailAddress => 'Parikh@sleepycat.com',
      city => 'Torreon', country => 'United States'},
     {id => 45,
      name => 'Edmond Rajcani', emailAddress => 'Rajcani@okcu.edu',
      city => '', country => ''},
     {id => 53,
      name => 'Guijun Cosette', emailAddress => 'Cosette@unl.edu',
      city => '', country => ''},
     {id => 55,
      name => 'Fabio Denna', emailAddress => 'Denna@cmu.edu',
      city => 'Wilmington', country => 'Australia'});

関係の内容を印刷するためのサブルーティン print_result() を用意する. その定義はつぎのとおりである.

sub print_result(\@$) {
    my ($result, $keys) = @_;
    foreach my $key (@$keys) {
	print "\t$key";
    }
    print "\n\t----------------\n";

    foreach my $record (@$result) {
	foreach my $key (@$keys) {
	    print "\t$record->{$key}";
	}
	print "\n";
    }
}

ストリーム

ストリームの要素はこのシミュレーションにおいてはあらかじめ配列にいれ,必要なときにとりだすことにする. オークションの開始をあらわすストリーム OpenAuction の要素を配列 @OpenAuction にいれる.

my @OpenAuction =
    ({itemID => 3, sellerID => 12, start_price =>  80, timestamp => 3100},
     {itemID => 2, sellerID => 39, start_price => 110, timestamp => 3150},
     {itemID => 1, sellerID => 18, start_price => 150, timestamp => 3300},
     {itemID => 0, sellerID => 30, start_price =>  30, timestamp => 3600});

また,オークションの終了をあらわすストリーム ClosedAuction の要素を配列 @ClosedAuction にいれる.

my @ClosedAuction =
    ({itemID => 3, byerID => 31, timestamp => 5400},
     {itemID => 2, byerID => 16, timestamp => 6500},
     {itemID => 1, byerID => 55, timestamp => 7050},
     {itemID => 0, byerID => 30, timestamp => 7200});

競りにおけるビッドのストリーム Bid の要素を配列 @Bid にいれる.

my @Bid =
    ({itemID => 3, bidPrice =>  82, bidderID => 22, timestamp => 3105},
     {itemID => 2, bidPrice => 115, bidderID => 19, timestamp => 3175},
     {itemID => 2, bidPrice => 120, bidderID => 39, timestamp => 3196},
     {itemID => 1, bidPrice => 155, bidderID => 45, timestamp => 3306},
     {itemID => 3, bidPrice => 103, bidderID =>  0, timestamp => 3380},
     {itemID => 2, bidPrice => 122, bidderID => 38, timestamp => 3497},
     {itemID => 0, bidPrice =>  33, bidderID => 41, timestamp => 3662},
     {itemID => 3, bidPrice => 130, bidderID => 19, timestamp => 3663},
     {itemID => 1, bidPrice => 171, bidderID => 11, timestamp => 3852},
     {itemID => 1, bidPrice => 181, bidderID =>  9, timestamp => 4395},
     {itemID => 1, bidPrice => 201, bidderID => 20, timestamp => 4396},
     {itemID => 3, bidPrice => 148, bidderID => 33, timestamp => 4573},
     {itemID => 3, bidPrice => 152, bidderID => 15, timestamp => 4574},
     {itemID => 3, bidPrice => 166, bidderID => 44, timestamp => 4758},
     {itemID => 3, bidPrice => 169, bidderID =>  1, timestamp => 4972},
     {itemID => 3, bidPrice => 191, bidderID => 53, timestamp => 5227},
     {itemID => 3, bidPrice => 216, bidderID => 31, timestamp => 5360},
     {itemID => 1, bidPrice => 226, bidderID => 33, timestamp => 5506},
     {itemID => 1, bidPrice => 249, bidderID => 36, timestamp => 5536},
     {itemID => 0, bidPrice =>  43, bidderID => 18, timestamp => 6040},
     {itemID => 1, bidPrice => 260, bidderID => 33, timestamp => 6041},
     {itemID => 0, bidPrice =>  53, bidderID =>  9, timestamp => 6282},
     {itemID => 0, bidPrice =>  58, bidderID => 13, timestamp => 6391},
     {itemID => 2, bidPrice => 126, bidderID => 16, timestamp => 6462},
     {itemID => 1, bidPrice => 280, bidderID => 14, timestamp => 6570},
     {itemID => 1, bidPrice => 302, bidderID => 55, timestamp => 6975},
     {itemID => 0, bidPrice =>  80, bidderID => 30, timestamp => 7171});

もとのデータにおいては同一のタイム・スタンプをもつビッドがふくまれていたが,プログラムにおけるあつかいを単純化するため,タイム・スタンプをずらして,かさならないようにしている.

時間の経過は $clock という変数によってシミュレートする. tick() がよびだされるごとに時刻が 1 (秒) すすむものとする (実用的にはもっとこまかい単位でタイムスタンプをつける必要があるが,ここではかんたんのために 1 秒単位としている). また,ここではかんたんのため,1 秒に 2 個ストリーム要素が到着することはないものとする.

my $clock = 0;

sub tick () {
    return ++$clock;
}

本来は外部から到着するはずの,配列にいれたストリームの要素は, instream_get() という関数によって 1 個ずつとりだして使用する. 初期設定のためにあらかじめ instream_open() をよびだすことにする. これらの関数の定義はつぎのとおりである.

sub instream_open(\@) {
    my ($array) = @_;
    my $stream = {array => $array, index => 0};

    return $stream;
}

sub instream_get($) {
    my ($stream) = @_;
    my $array = $stream->{array};
    my $next = $array->[$stream->{index}];
    if ($stream->{index} >= @$array ||
	$next->{timestamp} > $clock) {
	return '';
    } else {
	$stream->{index}++;
	return $next;
    }
}

instream_open() はストリームのハンドルをかえすが,これを instream_get() の第 1 引数として使用する.

また,外部に出力する (このシミュレーションにおいては印刷する) ストリームは outstream_open() によってひらき,outstream_put() によって出力する. outstream_open() の第 1 引数は本来はストリームを指定するが,現在は出力先を標準出力にかぎっているため使用していない. 第 2 引数は出力するフィールド名のリストを指定する. outstream_open() はストリームのハンドルをかえすが,それを outstream_put() の第 1 引数にわたす.

sub outstream_open($$) {
    my ($stream, $keys) = @_;
    foreach my $key (@$keys) {
	print "\t$key";
    }
    print "\n";
    return $keys;
}

sub outstream_put($$) {
    my ($stream, $record) = @_;
    my $keys = $stream;
    foreach my $key (@$keys) {
	print "\t$record->{$key}";
    }
    print "\n";
}

両替計算

米ドルによるビッドにおける価格をユーロに変換する. ビッドがストリームとして入力されるので,結果もストリームとして出力する. CQL によるプログラムはつぎのとおりである.

stream EuroBid is
Select itemID, DolToEuro(bidPrice) euroBidPrice, bidderID
From Bid;

これを Perl に翻訳するとつぎのようになる.

sub DolToEuro($) {
    my ($dollar) = @_;
    return 0.71 * $dollar;
}

#--- euroBid object

#- create an euroBid object
sub new_euroBid($) {
    my ($output) = @_;
    return {output => $output};
}

#- euroBid entry (for new bidding)
sub euroBid_dolBid($$) {
    my ($self, $tuple) = @_;
    my $tuple2 = {itemID => $tuple->{itemID},
		  euroBidPrice => DolToEuro($tuple->{bidPrice}),
		  bidderID => $tuple->{bidderID}};

    outstream_put($self->{output}, $tuple2); # send a message to the parent
}

#--- scheduler
sub euroBid_scheduler() {
    my $dolBid = instream_open(@Bid);
    my $euroBid =
	new_euroBid(outstream_open('', ['itemID', 'euroBidPrice', 'bidderID']));
    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($dolBid)) { # From Bid
	    euroBid_dolBid($euroBid, instream_next($dolBid));
	    # send a message to the euroBid object (leaf node)
	}
    }	
}

サブルーティン euroBid_scheduler() は入力ストリームの要素を 1 個よむごとにその値を変換し,出力ストリームに出力する. すなわち,この処理は 1 本のパイプだけで実行することができる. ここではややオブジェクト指向風にプログラムを記述している. ただし,Perl のオブジェクト指向機能はつかっていない. euroBid() を実行すれば,つぎのような結果がえられる.

	itemID	euroBidPrice	bidderID
	3	58.22	22
	2	81.65	19
	2	85.2	39
	1	110.05	45
	3	73.13	0
	2	86.62	38
	0	23.43	41
	3	92.3	19
	1	121.41	11
	1	128.51	9
	1	142.71	20
	3	105.08	33
	3	107.92	15
	3	117.86	44
	3	119.99	1
	3	135.61	53
	3	153.36	31
	1	160.46	33
	1	176.79	36
	0	30.53	18
	1	184.6	33
	0	37.63	9
	0	41.18	13
	2	89.46	16
	1	198.8	14
	1	214.42	55
	0	56.8	30

特定のオークションを選択するクエリ

品目識別子が 1 または 2 のビッドだけを抽出するには,つぎのような CQL クエリを実行すればよい.

stream Bid_1_2 is
Select itemID, bidPrice
From Bid
where itemID = 1 or itemID = 2;

これを Perl に翻訳するとつぎのようになる.

#--- bid_1_2 object

#- create a bid_1_2 object
sub new_bid_1_2($) {
    my ($output) = @_;
    return {output => $output};
}

#- bid_1_2 entry (for new bidding)
sub bid_1_2_bid($$) {
    my ($self, $tuple) = @_;
    if ($tuple->{itemID} == 1 || $tuple->{itemID} == 2) {
	outstream_put($self->{output}, $tuple); # send a message to the parent
    }
}

#--- scheuler
sub bid_1_2_scheduler() {
    my $bid = instream_open(@Bid);
    my $bid_1_2 = new_bid_1_2(outstream_open('', ['itemID', 'bidPrice']));
    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($bid)) {
	    bid_1_2_bid($bid_1_2, instream_next($bid));
	    # send a message to the bid_1_2 object (leaf node)
	}
    }
}

この処理も 1 本のパイプだけで実行することができる. bid_1_2_scheduler() を実行すれば,つぎのような結果がえられる.

	itemID	bidPrice
	2	115
	2	120
	1	155
	2	122
	1	171
	1	181
	1	201
	1	226
	1	249
	1	260
	2	126
	1	280
	1	302

[TBD]

Local Item Suggestion Query

米国の売り手が出品した品種が 2 の品目をすべて報告するには,つぎのクエリを実行すればよい.

stream ItemsCat2US is
Select Istream(P.name, P.city, O.itemID)
From OpenAuction [Now] O, Person P, Item I
Where O.sellerID = P.id and P.country = 'United States' and O.itemID = I.id
      and I.categoryID = 2;

これを Perl に翻訳するとつぎのようになる.

#--- ItemsCat2US object

#- create an ItemsCat2US object
sub new_itemsCat2US($) {
    my ($output) = @_;

    # Generate index to relation: id -> person
    my %hashedPerson = ();
    foreach my $record (@Person) {
	$hashedPerson{$record->{id}} = $record; # id is assumed to be unique
    }
    # print %{$hashedPerson{39}}, "\n";

    # Generate index to relation: id -> item
    my %hashedItem = ();
    foreach my $record (@Item) {
	$hashedItem{$record->{id}} = $record; # id is assumed to be unique
    }
    # print %{$hashedItem{2}}, "\n";

    return {output => $output,
	    hashedPerson => \%hashedPerson, hashedItem => \%hashedItem};
}

#- itemsCat2US entry for open auction stream
sub itemsCat2US($$) {
    my ($self, $tuple) = @_;

    my $person = $self->{hashedPerson}->{$tuple->{sellerID}};
    my $item = $self->{hashedItem}->{$tuple->{itemID}};
    if ($person->{country} eq 'United States' && $item->{categoryID} == 2) {
	my $tuple2 = {name => $person->{name},
		      city => $person->{city},
		      itemID => $tuple->{itemID}}; # hashed join

	outstream_put($self->{output}, $tuple2); # send a message to the parent
    }
}

#--- Scheduler
sub itemsCat2US_scheduler() {
    my $is_auction = instream_open(@OpenAuction);
    my $select_obj =
	new_itemsCat2US(outstream_open('', ['name', 'city', 'itemID']));
    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($is_auction)) {
	    itemsCat2US($select_obj, instream_next($is_auction));
	    # send a message to the itemsCat2US object (leaf node)
	}
    }
}

関係 Person および Item のインデクス (primary index) を生成して使用している. いずれにおいても id が一意であることを仮定している. これらの関係の処理をのぞいたストリームの入力,処理,出力の処理に関しては,上記の 2 つの例題とおおきなちがいはない.

itemsCat2US_scheduler() をよびだすと,結果はつぎのように表示される.

	name	city	itemID
	Rajamani Pinzani	Gothenburg	2

開催中のオークションに関するクエリ

現在開催中のオークションからなるテーブル (関係) を維持するには,つぎのようなクエリを使用すればよい.

temporal relation OpenAuctionItemID is
Select itemID
From OpenAuction [range .. now];

temporal relation ClosedAuctionItemID is
Select itemID
From ClosedAuction [range .. now];

temporal relation CurrentAuctions is
OpenAuctionItemID Minus ClosedAuctionItemID;

これを Perl に翻訳するとつぎのようになる. このプログラムにおいては複数のパイプがくみあわされるが,このプログラムにおいてはサブルーティンよびだしによってパイプをつないでいる.

#--- Toplevel: printer
sub new_CAresult_put() {
}

sub CAresult_put($) {
    my ($tuple) = @_;
    my @result = ();
    foreach my $itemID (keys %$tuple) {
	push(@result, {itemID => $itemID});
    }
    print "$clock\n";
    print_result(@result, ['itemID']);
    print "\n";
}

#--- Minus
sub new_hashed_minus() {
    new_CAresult_put();
    return {hashed2ndArg => {}, result => {}};
}

#- Minus entry for the first argument
sub hashed_minus1($$) {
    my ($self, $tuple) = @_;
    my $itemID = $tuple->{itemID};
    if (!$self->{hashed2ndArg}->{$itemID}) {
	$self->{result}->{$itemID} = $tuple;
	CAresult_put($self->{result}); # send a message to the parent
    };
}

#- Minus entry for the second argument
sub hashed_minus2($$) {
    my ($self, $tuple) = @_;
    my $itemID = $tuple->{itemID};
    $self->{hashed2ndArg}->{$itemID} = $tuple;
    if (delete $self->{result}->{$itemID}) {
	CAresult_put($self->{result}); # send a message to the parent
    }
}

#--- currentAuctions object

#- create a currentAuctions object
sub new_currentAuctions() {
    return {minus_obj => new_hashed_minus()};
}

#- currentAuctions entry for open auction stream
sub currentAuctions_open($$) {
    my ($self, $tuple) = @_;
    my $tuple2 = {itemID => $tuple->{itemID}}; # select itemID
    hashed_minus1($self->{minus_obj}, $tuple2); # send a message to the parent (minus)
}

#- currentAuctions entry for closed auction stream
sub currentAuctions_closed($$) {
    my ($self, $tuple) = @_;
    my $tuple2 = {itemID => $tuple->{itemID}}; # select itemID
    hashed_minus2($self->{minus_obj}, $tuple2); # send a message to the parent (minus)
}

#--- scheduler
sub currentAuctions_scheduler() {
    my $CA_obj = new_currentAuctions();
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);

    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($openAuction)) {
	    currentAuctions_open($CA_obj, instream_next($openAuction));
	    # send a message to the currentAuctions object (leaf node)
	}
	if (instream_hasNext($closedAuction)) {
	    currentAuctions_closed($CA_obj, instream_next($closedAuction));
	    # send a message to the currentAuctions object (leaf node)
	}
    }
}

このプログラムにおいて,“# Dispatcher” というコメントをつけた部分が本来なら外部からとどくストリーム要素の内容によって 2 個のパイプ (“OpenAuctionItemID” と “ClosedAuctionItemID”) のうちのいずれかにその要素をふりわける.

currentAuctions_scheduler() をよびだすと,結果はつぎのように表示される.

3100
	itemID
	----------------
	3

3150
	itemID
	----------------
	3
	2

3300
	itemID
	----------------
	1
	3
	2

3600
	itemID
	----------------
	1
	0
	3
	2

5400
	itemID
	----------------
	1
	0
	2

6500
	itemID
	----------------
	1
	0

7050
	itemID
	----------------
	0

7200
	itemID
	----------------

おなじ意味をもつプログラムをつぎのように記述することもできる.

stream CurrentAuction2 is
Select *
From OpenAuction
Where itemID Not in (Select itemID From ClosedAuction);

Perl に翻訳してえられるプログラムは,まえのプログラムとほぼおなじなので省略する.

終了時価格に関するクエリ

各オークションの終了時の価格を報告するには,つぎのクエリを実行すればよい.

stream BidPrice is
Select itemID, bidPrice as price
From Bid;

stream OpenPrice is
Select itemID, start_price as price
From OpenAuction;

temporal relation P is
BidPrice Union All OpenPrice

temporal relation CurrentPrice is
Select P.itemID, Max(P.price) as price
From P [range .. now]
Group By P.itemID;

stream Result is
Select Rstream(C.itemID, P.price)
From ClosedAuction [Now] C, CurrentPrice P
Where C.itemID = P.itemID;

これを Perl に翻訳するとつぎのようになる.

#--- ClosingPrice object
sub new_ClosingPrice($$) {
    my ($CurrentPrice_object, $output) = @_;
    return {CurrentPrice => $CurrentPrice_object->{CurrentPrice},
	    output => $output};
}

#- ClosingPrice part
sub ClosingPrice_join($$) {
    my ($self, $itemID) = @_;
    my $currentPrice = $self->{CurrentPrice}->{$itemID};
    if ($currentPrice) {
	my $Rstream_record = {itemID => $itemID,
			      price => $currentPrice->{price},
			      timestamp => $clock};
	outstream_put($self->{output}, $Rstream_record); # send a message to the parent
    }
}

#- ClosingPrice entry for closed auction stream
sub ClosingPrice_closed($$) {
    my ($self, $tuple) = @_;
    my $now = $tuple->{itemID};
    ClosingPrice_join($self, $now);
}

#--- CurrentPrice
sub new_CurrentPrice() {
    return {CurrentPrice => {}};
}

sub CurrentPrice($$$) {
    my ($self, $itemID, $price) = @_;
    my $CurrentPrice = $self->{CurrentPrice};
    # Max(price) & Group By itemID:
    if (!$CurrentPrice->{$itemID}) {
	$CurrentPrice->{$itemID} = {itemID => $itemID, price => $price};
    } elsif ($price > $CurrentPrice->{$itemID}->{price}) {
	$CurrentPrice->{$itemID}->{price} = $price;
    }
}

#--- P (implicit union all)
#- P entry for both left and right arguments
sub P($$$) {
    my ($self, $itemID, $price) = @_;
    CurrentPrice($self, $itemID, $price);
}

#- OpenPrice entry for open auction stream
sub OpenPrice_open($$) {
    my ($self, $tuple) = @_;
    P($self, $tuple->{itemID}, $tuple->{start_price});
}

#- BidPrice entry for bidding stream
sub BidPrice_bid($$) {
    my ($self, $tuple) = @_;
    P($self, $tuple->{itemID}, $tuple->{bidPrice});
}

#--- Scheduler
sub closingPrice_scheduler() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my $bid = instream_open(@Bid);
    my $CurrentPrice_object = new_CurrentPrice();
    my $ClosingPrice_object =
	new_ClosingPrice($CurrentPrice_object,
			 outstream_open('', ['itemID', 'price', 'timestamp']));

    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($closedAuction)) {
	    ClosingPrice_closed($ClosingPrice_object,
				instream_next($closedAuction));
	}
	if (instream_hasNext($bid)) {
	    BidPrice_bid($CurrentPrice_object, instream_next($bid));
	}
	if (instream_hasNext($openAuction)) {
	    OpenPrice_open($CurrentPrice_object, instream_next($openAuction));
	}
    }
}

closingPrices_scheduler() をよびだすと,結果はつぎのように表示される.

	itemID	price	timestamp
	3	216	5400
	2	126	6500
	1	302	7050
	0	80	7200

終了時価格のカテゴリーごとの平均値に関するクエリ

Monitor the average closing price across items in each category over the last hour.

temporal relation CurrentPrice is
    Select    P.itemID, Max(P.price) as price
    From      ((Select itemID, bid_price as price 
                From   Bid) Union All
               (Select itemID, start_price as price 
                From OpenAuction)) [range .. now] P
    Group By  P.itemID

stream ClosingPriceStream is
    Select   Rstream(I.categoryID as categoryID, P.price as price)
    From     ClosedAuction [Now] C, CurrentPrice P, Item I
    Where    C.itemID = P.itemID and C.itemID = I.id

temporal relation AvgPrice is
    Select   catID, Avg(price)
    From     ClosingPriceStream [Range 1 Hour]
    Group By catID 

Perl ではつぎのように表現することができる. ただし,ウィンドウ・サイズは 1 時間 (3600 sec) でなく 1500 sec にしている.

sub new_averageClosingPrice() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my $bid = instream_open(@Bid);
    my %CurrentPrice = ();
    my @ClosingPriceWindow = ();
    my %PriceSum = ();

    # Generate Item primary index: relation (@Item) -> hashed relation (%hashedItem)
    my %hashedItem = ();
    foreach my $record (@Item) {
	$hashedItem{$record->{id}} = $record;
    }

    return {openAuction => $openAuction, closedAuction => $closedAuction,
	    bid => $bid, CurrentPrice => \%CurrentPrice,
	    ClosingPriceWindow => \@ClosingPriceWindow,
	    PriceSum => \%PriceSum, hashedItem => \%hashedItem};
}

sub averageClosingPrice_result_put($$) {
    my ($self, $change) = @_;
    my $PriceSum = $self->{PriceSum};
    print "$change $clock\n";
    my @result = ();
    foreach my $categoryID (keys %$PriceSum) {
	my $record = $PriceSum->{$categoryID};
	if ($record->{count} > 0) {
	    push(@result,
		 {categoryID => $categoryID,
		  Avg_price => $record->{priceSum} / $record->{count}});
	}
    }
    print_result(@result, ['categoryID', 'Avg_price']);
    print "\n";
}

sub averageClosingPrice_avgPrice_put($$$) {
    my ($self, $categoryID, $price) = @_;
    my $PriceSum = $self->{PriceSum};
    if (!$PriceSum->{$categoryID}) {
	$PriceSum->{$categoryID} =
	{categoryID => $categoryID, priceSum => $price, count => 1};
    } else {
	$PriceSum->{$categoryID}->{priceSum} += $price;
	$PriceSum->{$categoryID}->{count}++;
    }
    averageClosingPrice_result_put($self, 1);
}

sub averageClosingPrice_avgPrice_negate($$$) {
    my ($self, $categoryID, $price) = @_;
    my $PriceSum = $self->{PriceSum};
    $PriceSum->{$categoryID}->{priceSum} -= $price;
    $PriceSum->{$categoryID}->{count}--;
    averageClosingPrice_result_put($self, -1);
}

sub averageClosingPrice_closingPriceWindow_put($$$) {
    my ($self, $categoryID, $price) = @_;
    my $ClosingPriceWindow = $self->{ClosingPriceWindow};

    # check expiration
    if (@$ClosingPriceWindow > 0 &&
	$ClosingPriceWindow->[0]->{timestamp} <= $clock - 1500) {
	# $ClosingPriceWindow->[0]->{timestamp} <= $clock - 3600) {
	my $removed = shift @$ClosingPriceWindow;
	averageClosingPrice_avgPrice_negate($self, $removed->{categoryID},
					    $removed->{price});
    }

    my $ClosingPrice_record =
    {categoryID => $categoryID, price => $price, timestamp => $clock};

    # Get ClosingPriceStream [Range 1 Hour] as ClosingPrice
    push(@$ClosingPriceWindow, $ClosingPrice_record);
    averageClosingPrice_avgPrice_put($self, $categoryID, $price);
}

sub averageClosingPrice_closingPriceStream($$) {
    my ($self, $closedAuction_tuple) = @_;
    my $itemID = $closedAuction_tuple->{itemID};
    my $categoryID = $self->{hashedItem}->{$itemID}->{categoryID};
    my $currentPrice = $self->{CurrentPrice}->{$itemID}->{price};
    averageClosingPrice_closingPriceWindow_put($self, $categoryID, $currentPrice);
}

sub averageClosingPrice_currentPrice($$$) {
    my ($self, $itemID, $price) = @_;
    # Max(price) & Group By itemID:
    my $CurrentPrice = $self->{CurrentPrice};
    if (!$CurrentPrice->{$itemID}) {
	$CurrentPrice->{$itemID} = {itemID => $itemID, price => $price};
    } elsif ($price > $CurrentPrice->{$itemID}->{price}) {
	$CurrentPrice->{$itemID}->{price} = $price;
    }
}

sub averageClosingPrice_open_next($$) {
    my ($self, $openAuction_record) = @_;
    averageClosingPrice_currentPrice($self, $openAuction_record->{itemID},
				     $openAuction_record->{start_price});
}

sub averageClosingPrice_closed_next($$) {
    my ($self, $tuple) = @_;
    averageClosingPrice_closingPriceStream($self, $tuple);
}

sub averageClosingPrice_bid_next($$) {
    my ($self, $bid_record) = @_;
    averageClosingPrice_currentPrice($self, $bid_record->{itemID},
				     $bid_record->{bidPrice});
}

sub averageClosingPrice() {
    my $self = new_averageClosingPrice();

    for (my $time = 0; $time < $Max_timestamp; $time++) {
	tick();
	if (instream_hasNext($self->{closedAuction})) {
	    averageClosingPrice_closed_next($self,
					    instream_next($self->{closedAuction}));
	}
	if (instream_hasNext($self->{bid})) {
	    averageClosingPrice_bid_next($self, instream_next($self->{bid}));
	}
	if (instream_hasNext($self->{openAuction})) {
	    averageClosingPrice_open_next($self, instream_next($self->{openAuction}));
	}
    }
}

このプログラムがこれまでのプログラムともっともおおきくことなる点は,スライディング・ウィンドウ [Range 1 Hour] から,ふるい要素が脱落していくために,再計算が必要になることである. その再計算をおこなっているのが averageClosingPrice_closingPriceWindow_put の前半部分と,そこからよびだされる averageClosingPrice_avgPrice_negate である. すなわち,要素を追加する際にあわせて要素の削除もおこなっている.

averageClosingPrice() をよびだすと,結果はつぎのように表示される.

1 5400
	categoryID	Avg_price
	----------------
	1	216

1 6500
	categoryID	Avg_price
	----------------
	1	216
	2	126

-1 6900
	categoryID	Avg_price
	----------------
	2	126

1 7050
	categoryID	Avg_price
	----------------
	2	214

1 7200
	categoryID	Avg_price
	----------------
	1	80
	2	214

-1 8000
	categoryID	Avg_price
	----------------
	1	80
	2	302

おなじ機能を Java で記述したプログラムもある: スライディング・ウィンドウ うめこみ版スライディング・ウィンドウ 分離版

その他

オンライン・オークションの例題のなかには,さらに以下のような 5 題の問題がふくまれているが,これらは当面,省略する.

# 7. Short Auctions Query: Report all auctions that closed within five hours of their opening.
# 
# Select Rstream(OpenAuction.*)
# From   OpenAuction [Range 5 Hour] O, ClosedAuction [Now] C
# Where  O.itemID = C.itemID


# 8. Hot Item Query: Select the item(s) with the most bids in the past hour. Update the results every minute.
# 
# HotItemStream: 
#     Select Rstream(itemID)
#     From   (Select   B1.itemID as itemID, Count(*) as num
#             From     Bid [Range 60 Minute
#                           Slide 1 Minute] B1
#             Group By B1.itemID) 
#     Where   num >= All (Select   Count(*)
#                         From     Bid [Range 60 Minute
#                                       Slide 1 Minute] B2
#                         Group By B2.itemID)
# 
# Select *
# From   HotItemStream [Range 1 Minute]


# 9. Average Selling Price By Seller Query: For each seller, maintain the average selling price over the last 10 items sold.
# 
# CurrentPrice: 
#     Select    P.itemID, Max(P.price) as price
#     From      ((Select itemID, bid_price as price 
#                 From   Bid) Union All
#                (Select itemID, start_price as price 
#                 From OpenAuction)) P
#     Group By  P.itemID
# 
# ClosingPriceStream:
#     Select   Rstream(O.sellerID as sellerID, P.price as price)
#     From     ClosedAuction [Now] C, CurrentPrice P, 
#              OpenAuction O
#     Where    C.itemID = P.itemID and C.itemID = O.itemID
# 
# AvgSellingPrice:
#     Select   sellerID, Avg(price)
#     From     ClosingPriceStream [Partition By sellerID Rows 10] 
#     Group By sellerID 


# 10. Highest Bid Query: Every 10 minutes return the highest bid(s) in the recent 10 minutes.
# 
# Select  Rstream(itemID, bid_price)
# From    Bid [Range 10 Minute
#              Slide 10 Minute]
# Where   bid_price = (Select  Max(bid_price)
#                      From    Bid [Range 10 Minute]
#                                   Slide 10 Minute]


# 11. Monitor New Users Query: Find people who put up something for sale within 12 hours of registering to use the auction service.
# 
# NewPersonStream:
#   Select Istream(P.id, P.name)
#   From   Person P
# 
# Select Distinct(P.id, P.name)
# From   Select Rstream(P.id, P.name)
#        From   NewPersonStream [Range 12 Hour] P, OpenAuction A [Now]
#        Where  P.id = A.sellerID
Keywords:

トラックバック

このエントリーのトラックバックURL:
http://www.kanadas.com/mt/mt-tb.cgi/3577

コメントを投稿

このページについて

2009-01-25 00:18 に投稿されたエントリーのページです。

他にも多くのエントリーがあります。メインページアーカイブページも見てください。

Creative Commons License
このブログは、次のライセンスで保護されています。 クリエイティブ・コモンズ・ライセンス.
Powered by
Movable Type 3.36