ストリーム・データ処理は通常はデータベースの処理と同様にインタプリタ的に実行される. しかし,ここではクエリの実行のしくみをみるために,あえてそれを Perl のプログラム (“コンパイル・コード”) でシミュレートしてみる. とりあげる例題は STREAM (Stanford Stream Data Manager) や ATLaS (Aggregate & Table Language and System) でも使用されているオンライン・オークションである. なお,このページでは構造的な Perl に翻訳するやりかたをしめす. 「ストリーム・データ処理によるオンライン・オークションのシミュレーション (非構造化版)」 においては goto 文をつかった非構造的な版のプログラムをしめす.
目次
- はじめに
- 使用するデータ
- 両替計算
- 特定のオークションを選択するクエリ
- Local Item Suggestion Query
- 開催中のオークションに関するクエリ
- 終了時価格に関するクエリ
- 終了時価格のカテゴリーごとの平均値に関するクエリ
- その他
はじめに
“Stream Query Repository” というサイトには,CQL (Continuous Query Language) の文とその実行例がいろいろ書かれている. そこにあるをすこしかえて,Perl に翻訳してみることにする. このページのなかにもひととおりのプログラムを収容するが,全部をつないだ ソースコード もある.
使用するデータ
“Stream Query Repository: Online Auctions” でつかわれているデータを一部だけ変更したものをしめす.
関係
ここであつかういくつかの関係 (relation) は配列によって表現する. オークションにおいてあつかわれる品目は Item という関係の要素 (record または tuple) とみなすことができる. 品目は識別子 (id),なまえ (name),種類の識別子 (categoryID),登録時刻 (registrationTime) などの属性をもつ. 以下のテスト・データは NEXMark ベンチマークのテスト・データのごく一部の部分を変更して使用している.
my @Item =
({id => 0, name => 'Item 1',
description => '',
categoryID => 1, registrationTime => 0},
{id => 1, name => 'Item 2',
description => '',
categoryID => 2, registrationTime => 0},
{id => 2, name => 'Item 3',
description => '',
categoryID => 2, registrationTime => 0},
{id => 3, name => 'Item 4',
description => '',
categoryID => 1, registrationTime => 0});
もとのベンチマークであつかわれている多数の品目のうち,ここでは 4 品目だけをあつかっている.
また,売り手,買い手や入札者はひとであり,それは Person という関係の要素だとかんがえることができる. 各人はなまえ,E-メイル・アドレス,国,市などの属性をもつが,国,市などは不明なばあいもある.
my @Person =
({id => 0,
name => 'Luitpold Martucci', emailAddress => 'Martucci@toronto.edu',
city => '', country => ''},
{id => 1,
name => 'Takakazu Schonegge', emailAddress => 'Schonegge@panasonic.com',
city => 'Gothenburg', country => 'United States'},
{id => 9,
name => 'Leonid Nour', emailAddress => 'Nour@llnl.gov',
city => '', country => ''},
{id => 11,
name => 'Ravindra Abdelmoty', emailAddress => 'Abdelmoty@whizbang.com',
city => 'Manchester', country => 'United States'},
{id => 13,
name => 'Annette Klaiber', emailAddress => 'Klaiber@propel.com',
city => '', country => ''},
{id => 14,
name => 'Jacinto Ceccarelli', emailAddress => 'Ceccarelli@unbc.ca',
city => 'Corpus', country => 'United States'},
{id => 15,
name => 'Janick Blaauw', emailAddress => 'Blaauw@ogi.edu',
city => '', country => ''},
{id => 16,
name => 'Warwich Marsiglia', emailAddress => 'Marsiglia@cmu.edu',
city => 'South', country => 'United States'},
{id => 18,
name => 'Justus Binkley', emailAddress => 'Binkley@ogi.edu',
city => '', country => ''},
{id => 19,
name => 'Kunsoo Raghavendran', emailAddress => 'Raghavendran@umass.edu',
city => 'Paris', country => 'Tuvalu'},
{id => 20,
name => 'Takuji Liedekerke', emailAddress => 'Liedekerke@cnr.it',
city => '', country => ''},
{id => 22,
name => 'Shiyi Polster', emailAddress => 'Polster@ucd.ie',
city => '', country => ''},
{id => 30,
name => 'Keiko Nastansky', emailAddress => 'Nastansky@co.jp',
city => '', country => ''},
{id => 31,
name => 'Keumog Vuskovic', emailAddress => 'Vuskovic@filemaker.com',
city => 'Cancun', country => 'Tajikistan'},
{id => 33,
name => 'Armand Impagliazzo', emailAddress => 'Impagliazzo@cwi.nl',
city => 'Oakland', country => 'Heard and Mcdonald Island'},
{id => 36,
name => 'Zoe Holmback', emailAddress => 'Holmback@ucsb.edu',
city => '', country => ''},
{id => 39,
name => 'Rajamani Pinzani', emailAddress => 'Pinzani@mitre.org',
city => 'Gothenburg', country => 'United States'},
{id => 44,
name => 'Jianying Parikh', emailAddress => 'Parikh@sleepycat.com',
city => 'Torreon', country => 'United States'},
{id => 45,
name => 'Edmond Rajcani', emailAddress => 'Rajcani@okcu.edu',
city => '', country => ''},
{id => 53,
name => 'Guijun Cosette', emailAddress => 'Cosette@unl.edu',
city => '', country => ''},
{id => 55,
name => 'Fabio Denna', emailAddress => 'Denna@cmu.edu',
city => 'Wilmington', country => 'Australia'});
関係の内容を印刷するためのサブルーティン print_result() を用意する. その定義はつぎのとおりである.
sub print_result(\@$) {
my ($result, $keys) = @_;
foreach my $key (@$keys) {
print "\t$key";
}
print "\n\t----------------\n";
foreach my $record (@$result) {
foreach my $key (@$keys) {
print "\t$record->{$key}";
}
print "\n";
}
}
ストリーム
ストリームの要素はこのシミュレーションにおいてはあらかじめ配列にいれ,必要なときにとりだすことにする. オークションの開始をあらわすストリーム OpenAuction の要素を配列 @OpenAuction にいれる.
my @OpenAuction =
({itemID => 3, sellerID => 12, start_price => 80, timestamp => 3100},
{itemID => 2, sellerID => 39, start_price => 110, timestamp => 3150},
{itemID => 1, sellerID => 18, start_price => 150, timestamp => 3300},
{itemID => 0, sellerID => 30, start_price => 30, timestamp => 3600});
また,オークションの終了をあらわすストリーム ClosedAuction の要素を配列 @ClosedAuction にいれる.
my @ClosedAuction =
({itemID => 3, byerID => 31, timestamp => 5400},
{itemID => 2, byerID => 16, timestamp => 6500},
{itemID => 1, byerID => 55, timestamp => 7050},
{itemID => 0, byerID => 30, timestamp => 7200});
競りにおけるビッドのストリーム Bid の要素を配列 @Bid にいれる.
my @Bid =
({itemID => 3, bidPrice => 82, bidderID => 22, timestamp => 3105},
{itemID => 2, bidPrice => 115, bidderID => 19, timestamp => 3175},
{itemID => 2, bidPrice => 120, bidderID => 39, timestamp => 3196},
{itemID => 1, bidPrice => 155, bidderID => 45, timestamp => 3306},
{itemID => 3, bidPrice => 103, bidderID => 0, timestamp => 3380},
{itemID => 2, bidPrice => 122, bidderID => 38, timestamp => 3497},
{itemID => 0, bidPrice => 33, bidderID => 41, timestamp => 3662},
{itemID => 3, bidPrice => 130, bidderID => 19, timestamp => 3663},
{itemID => 1, bidPrice => 171, bidderID => 11, timestamp => 3852},
{itemID => 1, bidPrice => 181, bidderID => 9, timestamp => 4395},
{itemID => 1, bidPrice => 201, bidderID => 20, timestamp => 4396},
{itemID => 3, bidPrice => 148, bidderID => 33, timestamp => 4573},
{itemID => 3, bidPrice => 152, bidderID => 15, timestamp => 4574},
{itemID => 3, bidPrice => 166, bidderID => 44, timestamp => 4758},
{itemID => 3, bidPrice => 169, bidderID => 1, timestamp => 4972},
{itemID => 3, bidPrice => 191, bidderID => 53, timestamp => 5227},
{itemID => 3, bidPrice => 216, bidderID => 31, timestamp => 5360},
{itemID => 1, bidPrice => 226, bidderID => 33, timestamp => 5506},
{itemID => 1, bidPrice => 249, bidderID => 36, timestamp => 5536},
{itemID => 0, bidPrice => 43, bidderID => 18, timestamp => 6040},
{itemID => 1, bidPrice => 260, bidderID => 33, timestamp => 6041},
{itemID => 0, bidPrice => 53, bidderID => 9, timestamp => 6282},
{itemID => 0, bidPrice => 58, bidderID => 13, timestamp => 6391},
{itemID => 2, bidPrice => 126, bidderID => 16, timestamp => 6462},
{itemID => 1, bidPrice => 280, bidderID => 14, timestamp => 6570},
{itemID => 1, bidPrice => 302, bidderID => 55, timestamp => 6975},
{itemID => 0, bidPrice => 80, bidderID => 30, timestamp => 7171});
もとのデータにおいては同一のタイム・スタンプをもつビッドがふくまれていたが,プログラムにおけるあつかいを単純化するため,タイム・スタンプをずらして,かさならないようにしている.
時間の経過は $clock という変数によってシミュレートする. tick() がよびだされるごとに時刻が 1 (秒) すすむものとする (実用的にはもっとこまかい単位でタイムスタンプをつける必要があるが,ここではかんたんのために 1 秒単位としている). また,ここではかんたんのため,1 秒に 2 個ストリーム要素が到着することはないものとする.
my $clock = 0;
sub tick () {
return ++$clock;
}
本来は外部から到着するはずの,配列にいれたストリームの要素は, instream_get() という関数によって 1 個ずつとりだして使用する. 初期設定のためにあらかじめ instream_open() をよびだすことにする. これらの関数の定義はつぎのとおりである.
sub instream_open(\@) {
my ($array) = @_;
my $stream = {array => $array, index => 0};
return $stream;
}
sub instream_get($) {
my ($stream) = @_;
my $array = $stream->{array};
my $next = $array->[$stream->{index}];
if ($stream->{index} >= @$array ||
$next->{timestamp} > $clock) {
return '';
} else {
$stream->{index}++;
return $next;
}
}
instream_open() はストリームのハンドルをかえすが,これを instream_get() の第 1 引数として使用する.
また,外部に出力する (このシミュレーションにおいては印刷する) ストリームは outstream_open() によってひらき,outstream_put() によって出力する. outstream_open() の第 1 引数は本来はストリームを指定するが,現在は出力先を標準出力にかぎっているため使用していない. 第 2 引数は出力するフィールド名のリストを指定する. outstream_open() はストリームのハンドルをかえすが,それを outstream_put() の第 1 引数にわたす.
sub outstream_open($$) {
my ($stream, $keys) = @_;
foreach my $key (@$keys) {
print "\t$key";
}
print "\n";
return $keys;
}
sub outstream_put($$) {
my ($stream, $record) = @_;
my $keys = $stream;
foreach my $key (@$keys) {
print "\t$record->{$key}";
}
print "\n";
}
両替計算
米ドルによるビッドにおける価格をユーロに変換する. ビッドがストリームとして入力されるので,結果もストリームとして出力する. CQL によるプログラムはつぎのとおりである.
stream EuroBid is Select itemID, DolToEuro(bidPrice) euroBidPrice, bidderID From Bid;
これを Perl に翻訳するとつぎのようになる.
sub DolToEuro($) {
my ($dollar) = @_;
return 0.71 * $dollar;
}
#--- euroBid object
#- create an euroBid object
sub new_euroBid($) {
my ($output) = @_;
return {output => $output};
}
#- euroBid entry (for new bidding)
sub euroBid_dolBid($$) {
my ($self, $tuple) = @_;
my $tuple2 = {itemID => $tuple->{itemID},
euroBidPrice => DolToEuro($tuple->{bidPrice}),
bidderID => $tuple->{bidderID}};
outstream_put($self->{output}, $tuple2); # send a message to the parent
}
#--- scheduler
sub euroBid_scheduler() {
my $dolBid = instream_open(@Bid);
my $euroBid =
new_euroBid(outstream_open('', ['itemID', 'euroBidPrice', 'bidderID']));
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($dolBid)) { # From Bid
euroBid_dolBid($euroBid, instream_next($dolBid));
# send a message to the euroBid object (leaf node)
}
}
}
サブルーティン euroBid_scheduler() は入力ストリームの要素を 1 個よむごとにその値を変換し,出力ストリームに出力する. すなわち,この処理は 1 本のパイプだけで実行することができる. ここではややオブジェクト指向風にプログラムを記述している. ただし,Perl のオブジェクト指向機能はつかっていない. euroBid() を実行すれば,つぎのような結果がえられる.
itemID euroBidPrice bidderID 3 58.22 22 2 81.65 19 2 85.2 39 1 110.05 45 3 73.13 0 2 86.62 38 0 23.43 41 3 92.3 19 1 121.41 11 1 128.51 9 1 142.71 20 3 105.08 33 3 107.92 15 3 117.86 44 3 119.99 1 3 135.61 53 3 153.36 31 1 160.46 33 1 176.79 36 0 30.53 18 1 184.6 33 0 37.63 9 0 41.18 13 2 89.46 16 1 198.8 14 1 214.42 55 0 56.8 30
特定のオークションを選択するクエリ
品目識別子が 1 または 2 のビッドだけを抽出するには,つぎのような CQL クエリを実行すればよい.
stream Bid_1_2 is Select itemID, bidPrice From Bid where itemID = 1 or itemID = 2;
これを Perl に翻訳するとつぎのようになる.
#--- bid_1_2 object
#- create a bid_1_2 object
sub new_bid_1_2($) {
my ($output) = @_;
return {output => $output};
}
#- bid_1_2 entry (for new bidding)
sub bid_1_2_bid($$) {
my ($self, $tuple) = @_;
if ($tuple->{itemID} == 1 || $tuple->{itemID} == 2) {
outstream_put($self->{output}, $tuple); # send a message to the parent
}
}
#--- scheuler
sub bid_1_2_scheduler() {
my $bid = instream_open(@Bid);
my $bid_1_2 = new_bid_1_2(outstream_open('', ['itemID', 'bidPrice']));
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($bid)) {
bid_1_2_bid($bid_1_2, instream_next($bid));
# send a message to the bid_1_2 object (leaf node)
}
}
}
この処理も 1 本のパイプだけで実行することができる. bid_1_2_scheduler() を実行すれば,つぎのような結果がえられる.
itemID bidPrice 2 115 2 120 1 155 2 122 1 171 1 181 1 201 1 226 1 249 1 260 2 126 1 280 1 302
[TBD]
Local Item Suggestion Query
米国の売り手が出品した品種が 2 の品目をすべて報告するには,つぎのクエリを実行すればよい.
stream ItemsCat2US is
Select Istream(P.name, P.city, O.itemID)
From OpenAuction [Now] O, Person P, Item I
Where O.sellerID = P.id and P.country = 'United States' and O.itemID = I.id
and I.categoryID = 2;
これを Perl に翻訳するとつぎのようになる.
#--- ItemsCat2US object
#- create an ItemsCat2US object
sub new_itemsCat2US($) {
my ($output) = @_;
# Generate index to relation: id -> person
my %hashedPerson = ();
foreach my $record (@Person) {
$hashedPerson{$record->{id}} = $record; # id is assumed to be unique
}
# print %{$hashedPerson{39}}, "\n";
# Generate index to relation: id -> item
my %hashedItem = ();
foreach my $record (@Item) {
$hashedItem{$record->{id}} = $record; # id is assumed to be unique
}
# print %{$hashedItem{2}}, "\n";
return {output => $output,
hashedPerson => \%hashedPerson, hashedItem => \%hashedItem};
}
#- itemsCat2US entry for open auction stream
sub itemsCat2US($$) {
my ($self, $tuple) = @_;
my $person = $self->{hashedPerson}->{$tuple->{sellerID}};
my $item = $self->{hashedItem}->{$tuple->{itemID}};
if ($person->{country} eq 'United States' && $item->{categoryID} == 2) {
my $tuple2 = {name => $person->{name},
city => $person->{city},
itemID => $tuple->{itemID}}; # hashed join
outstream_put($self->{output}, $tuple2); # send a message to the parent
}
}
#--- Scheduler
sub itemsCat2US_scheduler() {
my $is_auction = instream_open(@OpenAuction);
my $select_obj =
new_itemsCat2US(outstream_open('', ['name', 'city', 'itemID']));
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($is_auction)) {
itemsCat2US($select_obj, instream_next($is_auction));
# send a message to the itemsCat2US object (leaf node)
}
}
}
関係 Person および Item のインデクス (primary index) を生成して使用している. いずれにおいても id が一意であることを仮定している. これらの関係の処理をのぞいたストリームの入力,処理,出力の処理に関しては,上記の 2 つの例題とおおきなちがいはない.
itemsCat2US_scheduler() をよびだすと,結果はつぎのように表示される.
name city itemID Rajamani Pinzani Gothenburg 2
開催中のオークションに関するクエリ
現在開催中のオークションからなるテーブル (関係) を維持するには,つぎのようなクエリを使用すればよい.
temporal relation OpenAuctionItemID is Select itemID From OpenAuction [range .. now]; temporal relation ClosedAuctionItemID is Select itemID From ClosedAuction [range .. now]; temporal relation CurrentAuctions is OpenAuctionItemID Minus ClosedAuctionItemID;
これを Perl に翻訳するとつぎのようになる. このプログラムにおいては複数のパイプがくみあわされるが,このプログラムにおいてはサブルーティンよびだしによってパイプをつないでいる.
#--- Toplevel: printer
sub new_CAresult_put() {
}
sub CAresult_put($) {
my ($tuple) = @_;
my @result = ();
foreach my $itemID (keys %$tuple) {
push(@result, {itemID => $itemID});
}
print "$clock\n";
print_result(@result, ['itemID']);
print "\n";
}
#--- Minus
sub new_hashed_minus() {
new_CAresult_put();
return {hashed2ndArg => {}, result => {}};
}
#- Minus entry for the first argument
sub hashed_minus1($$) {
my ($self, $tuple) = @_;
my $itemID = $tuple->{itemID};
if (!$self->{hashed2ndArg}->{$itemID}) {
$self->{result}->{$itemID} = $tuple;
CAresult_put($self->{result}); # send a message to the parent
};
}
#- Minus entry for the second argument
sub hashed_minus2($$) {
my ($self, $tuple) = @_;
my $itemID = $tuple->{itemID};
$self->{hashed2ndArg}->{$itemID} = $tuple;
if (delete $self->{result}->{$itemID}) {
CAresult_put($self->{result}); # send a message to the parent
}
}
#--- currentAuctions object
#- create a currentAuctions object
sub new_currentAuctions() {
return {minus_obj => new_hashed_minus()};
}
#- currentAuctions entry for open auction stream
sub currentAuctions_open($$) {
my ($self, $tuple) = @_;
my $tuple2 = {itemID => $tuple->{itemID}}; # select itemID
hashed_minus1($self->{minus_obj}, $tuple2); # send a message to the parent (minus)
}
#- currentAuctions entry for closed auction stream
sub currentAuctions_closed($$) {
my ($self, $tuple) = @_;
my $tuple2 = {itemID => $tuple->{itemID}}; # select itemID
hashed_minus2($self->{minus_obj}, $tuple2); # send a message to the parent (minus)
}
#--- scheduler
sub currentAuctions_scheduler() {
my $CA_obj = new_currentAuctions();
my $openAuction = instream_open(@OpenAuction);
my $closedAuction = instream_open(@ClosedAuction);
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($openAuction)) {
currentAuctions_open($CA_obj, instream_next($openAuction));
# send a message to the currentAuctions object (leaf node)
}
if (instream_hasNext($closedAuction)) {
currentAuctions_closed($CA_obj, instream_next($closedAuction));
# send a message to the currentAuctions object (leaf node)
}
}
}
このプログラムにおいて,“# Dispatcher” というコメントをつけた部分が本来なら外部からとどくストリーム要素の内容によって 2 個のパイプ (“OpenAuctionItemID” と “ClosedAuctionItemID”) のうちのいずれかにその要素をふりわける.
currentAuctions_scheduler() をよびだすと,結果はつぎのように表示される.
3100 itemID ---------------- 3 3150 itemID ---------------- 3 2 3300 itemID ---------------- 1 3 2 3600 itemID ---------------- 1 0 3 2 5400 itemID ---------------- 1 0 2 6500 itemID ---------------- 1 0 7050 itemID ---------------- 0 7200 itemID ----------------
おなじ意味をもつプログラムをつぎのように記述することもできる.
stream CurrentAuction2 is Select * From OpenAuction Where itemID Not in (Select itemID From ClosedAuction);
Perl に翻訳してえられるプログラムは,まえのプログラムとほぼおなじなので省略する.
終了時価格に関するクエリ
各オークションの終了時の価格を報告するには,つぎのクエリを実行すればよい.
stream BidPrice is Select itemID, bidPrice as price From Bid; stream OpenPrice is Select itemID, start_price as price From OpenAuction; temporal relation P is BidPrice Union All OpenPrice temporal relation CurrentPrice is Select P.itemID, Max(P.price) as price From P [range .. now] Group By P.itemID; stream Result is Select Rstream(C.itemID, P.price) From ClosedAuction [Now] C, CurrentPrice P Where C.itemID = P.itemID;
これを Perl に翻訳するとつぎのようになる.
#--- ClosingPrice object
sub new_ClosingPrice($$) {
my ($CurrentPrice_object, $output) = @_;
return {CurrentPrice => $CurrentPrice_object->{CurrentPrice},
output => $output};
}
#- ClosingPrice part
sub ClosingPrice_join($$) {
my ($self, $itemID) = @_;
my $currentPrice = $self->{CurrentPrice}->{$itemID};
if ($currentPrice) {
my $Rstream_record = {itemID => $itemID,
price => $currentPrice->{price},
timestamp => $clock};
outstream_put($self->{output}, $Rstream_record); # send a message to the parent
}
}
#- ClosingPrice entry for closed auction stream
sub ClosingPrice_closed($$) {
my ($self, $tuple) = @_;
my $now = $tuple->{itemID};
ClosingPrice_join($self, $now);
}
#--- CurrentPrice
sub new_CurrentPrice() {
return {CurrentPrice => {}};
}
sub CurrentPrice($$$) {
my ($self, $itemID, $price) = @_;
my $CurrentPrice = $self->{CurrentPrice};
# Max(price) & Group By itemID:
if (!$CurrentPrice->{$itemID}) {
$CurrentPrice->{$itemID} = {itemID => $itemID, price => $price};
} elsif ($price > $CurrentPrice->{$itemID}->{price}) {
$CurrentPrice->{$itemID}->{price} = $price;
}
}
#--- P (implicit union all)
#- P entry for both left and right arguments
sub P($$$) {
my ($self, $itemID, $price) = @_;
CurrentPrice($self, $itemID, $price);
}
#- OpenPrice entry for open auction stream
sub OpenPrice_open($$) {
my ($self, $tuple) = @_;
P($self, $tuple->{itemID}, $tuple->{start_price});
}
#- BidPrice entry for bidding stream
sub BidPrice_bid($$) {
my ($self, $tuple) = @_;
P($self, $tuple->{itemID}, $tuple->{bidPrice});
}
#--- Scheduler
sub closingPrice_scheduler() {
my $openAuction = instream_open(@OpenAuction);
my $closedAuction = instream_open(@ClosedAuction);
my $bid = instream_open(@Bid);
my $CurrentPrice_object = new_CurrentPrice();
my $ClosingPrice_object =
new_ClosingPrice($CurrentPrice_object,
outstream_open('', ['itemID', 'price', 'timestamp']));
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($closedAuction)) {
ClosingPrice_closed($ClosingPrice_object,
instream_next($closedAuction));
}
if (instream_hasNext($bid)) {
BidPrice_bid($CurrentPrice_object, instream_next($bid));
}
if (instream_hasNext($openAuction)) {
OpenPrice_open($CurrentPrice_object, instream_next($openAuction));
}
}
}
closingPrices_scheduler() をよびだすと,結果はつぎのように表示される.
itemID price timestamp 3 216 5400 2 126 6500 1 302 7050 0 80 7200
終了時価格のカテゴリーごとの平均値に関するクエリ
Monitor the average closing price across items in each category over the last hour.
temporal relation CurrentPrice is
Select P.itemID, Max(P.price) as price
From ((Select itemID, bid_price as price
From Bid) Union All
(Select itemID, start_price as price
From OpenAuction)) [range .. now] P
Group By P.itemID
stream ClosingPriceStream is
Select Rstream(I.categoryID as categoryID, P.price as price)
From ClosedAuction [Now] C, CurrentPrice P, Item I
Where C.itemID = P.itemID and C.itemID = I.id
temporal relation AvgPrice is
Select catID, Avg(price)
From ClosingPriceStream [Range 1 Hour]
Group By catID
Perl ではつぎのように表現することができる. ただし,ウィンドウ・サイズは 1 時間 (3600 sec) でなく 1500 sec にしている.
sub new_averageClosingPrice() {
my $openAuction = instream_open(@OpenAuction);
my $closedAuction = instream_open(@ClosedAuction);
my $bid = instream_open(@Bid);
my %CurrentPrice = ();
my @ClosingPriceWindow = ();
my %PriceSum = ();
# Generate Item primary index: relation (@Item) -> hashed relation (%hashedItem)
my %hashedItem = ();
foreach my $record (@Item) {
$hashedItem{$record->{id}} = $record;
}
return {openAuction => $openAuction, closedAuction => $closedAuction,
bid => $bid, CurrentPrice => \%CurrentPrice,
ClosingPriceWindow => \@ClosingPriceWindow,
PriceSum => \%PriceSum, hashedItem => \%hashedItem};
}
sub averageClosingPrice_result_put($$) {
my ($self, $change) = @_;
my $PriceSum = $self->{PriceSum};
print "$change $clock\n";
my @result = ();
foreach my $categoryID (keys %$PriceSum) {
my $record = $PriceSum->{$categoryID};
if ($record->{count} > 0) {
push(@result,
{categoryID => $categoryID,
Avg_price => $record->{priceSum} / $record->{count}});
}
}
print_result(@result, ['categoryID', 'Avg_price']);
print "\n";
}
sub averageClosingPrice_avgPrice_put($$$) {
my ($self, $categoryID, $price) = @_;
my $PriceSum = $self->{PriceSum};
if (!$PriceSum->{$categoryID}) {
$PriceSum->{$categoryID} =
{categoryID => $categoryID, priceSum => $price, count => 1};
} else {
$PriceSum->{$categoryID}->{priceSum} += $price;
$PriceSum->{$categoryID}->{count}++;
}
averageClosingPrice_result_put($self, 1);
}
sub averageClosingPrice_avgPrice_negate($$$) {
my ($self, $categoryID, $price) = @_;
my $PriceSum = $self->{PriceSum};
$PriceSum->{$categoryID}->{priceSum} -= $price;
$PriceSum->{$categoryID}->{count}--;
averageClosingPrice_result_put($self, -1);
}
sub averageClosingPrice_closingPriceWindow_put($$$) {
my ($self, $categoryID, $price) = @_;
my $ClosingPriceWindow = $self->{ClosingPriceWindow};
# check expiration
if (@$ClosingPriceWindow > 0 &&
$ClosingPriceWindow->[0]->{timestamp} <= $clock - 1500) {
# $ClosingPriceWindow->[0]->{timestamp} <= $clock - 3600) {
my $removed = shift @$ClosingPriceWindow;
averageClosingPrice_avgPrice_negate($self, $removed->{categoryID},
$removed->{price});
}
my $ClosingPrice_record =
{categoryID => $categoryID, price => $price, timestamp => $clock};
# Get ClosingPriceStream [Range 1 Hour] as ClosingPrice
push(@$ClosingPriceWindow, $ClosingPrice_record);
averageClosingPrice_avgPrice_put($self, $categoryID, $price);
}
sub averageClosingPrice_closingPriceStream($$) {
my ($self, $closedAuction_tuple) = @_;
my $itemID = $closedAuction_tuple->{itemID};
my $categoryID = $self->{hashedItem}->{$itemID}->{categoryID};
my $currentPrice = $self->{CurrentPrice}->{$itemID}->{price};
averageClosingPrice_closingPriceWindow_put($self, $categoryID, $currentPrice);
}
sub averageClosingPrice_currentPrice($$$) {
my ($self, $itemID, $price) = @_;
# Max(price) & Group By itemID:
my $CurrentPrice = $self->{CurrentPrice};
if (!$CurrentPrice->{$itemID}) {
$CurrentPrice->{$itemID} = {itemID => $itemID, price => $price};
} elsif ($price > $CurrentPrice->{$itemID}->{price}) {
$CurrentPrice->{$itemID}->{price} = $price;
}
}
sub averageClosingPrice_open_next($$) {
my ($self, $openAuction_record) = @_;
averageClosingPrice_currentPrice($self, $openAuction_record->{itemID},
$openAuction_record->{start_price});
}
sub averageClosingPrice_closed_next($$) {
my ($self, $tuple) = @_;
averageClosingPrice_closingPriceStream($self, $tuple);
}
sub averageClosingPrice_bid_next($$) {
my ($self, $bid_record) = @_;
averageClosingPrice_currentPrice($self, $bid_record->{itemID},
$bid_record->{bidPrice});
}
sub averageClosingPrice() {
my $self = new_averageClosingPrice();
for (my $time = 0; $time < $Max_timestamp; $time++) {
tick();
if (instream_hasNext($self->{closedAuction})) {
averageClosingPrice_closed_next($self,
instream_next($self->{closedAuction}));
}
if (instream_hasNext($self->{bid})) {
averageClosingPrice_bid_next($self, instream_next($self->{bid}));
}
if (instream_hasNext($self->{openAuction})) {
averageClosingPrice_open_next($self, instream_next($self->{openAuction}));
}
}
}
このプログラムがこれまでのプログラムともっともおおきくことなる点は,スライディング・ウィンドウ [Range 1 Hour] から,ふるい要素が脱落していくために,再計算が必要になることである. その再計算をおこなっているのが averageClosingPrice_closingPriceWindow_put の前半部分と,そこからよびだされる averageClosingPrice_avgPrice_negate である. すなわち,要素を追加する際にあわせて要素の削除もおこなっている.
averageClosingPrice() をよびだすと,結果はつぎのように表示される.
1 5400 categoryID Avg_price ---------------- 1 216 1 6500 categoryID Avg_price ---------------- 1 216 2 126 -1 6900 categoryID Avg_price ---------------- 2 126 1 7050 categoryID Avg_price ---------------- 2 214 1 7200 categoryID Avg_price ---------------- 1 80 2 214 -1 8000 categoryID Avg_price ---------------- 1 80 2 302
おなじ機能を Java で記述したプログラムもある: スライディング・ウィンドウ うめこみ版,スライディング・ウィンドウ 分離版
その他
オンライン・オークションの例題のなかには,さらに以下のような 5 題の問題がふくまれているが,これらは当面,省略する.
# 7. Short Auctions Query: Report all auctions that closed within five hours of their opening. # # Select Rstream(OpenAuction.*) # From OpenAuction [Range 5 Hour] O, ClosedAuction [Now] C # Where O.itemID = C.itemID # 8. Hot Item Query: Select the item(s) with the most bids in the past hour. Update the results every minute. # # HotItemStream: # Select Rstream(itemID) # From (Select B1.itemID as itemID, Count(*) as num # From Bid [Range 60 Minute # Slide 1 Minute] B1 # Group By B1.itemID) # Where num >= All (Select Count(*) # From Bid [Range 60 Minute # Slide 1 Minute] B2 # Group By B2.itemID) # # Select * # From HotItemStream [Range 1 Minute] # 9. Average Selling Price By Seller Query: For each seller, maintain the average selling price over the last 10 items sold. # # CurrentPrice: # Select P.itemID, Max(P.price) as price # From ((Select itemID, bid_price as price # From Bid) Union All # (Select itemID, start_price as price # From OpenAuction)) P # Group By P.itemID # # ClosingPriceStream: # Select Rstream(O.sellerID as sellerID, P.price as price) # From ClosedAuction [Now] C, CurrentPrice P, # OpenAuction O # Where C.itemID = P.itemID and C.itemID = O.itemID # # AvgSellingPrice: # Select sellerID, Avg(price) # From ClosingPriceStream [Partition By sellerID Rows 10] # Group By sellerID # 10. Highest Bid Query: Every 10 minutes return the highest bid(s) in the recent 10 minutes. # # Select Rstream(itemID, bid_price) # From Bid [Range 10 Minute # Slide 10 Minute] # Where bid_price = (Select Max(bid_price) # From Bid [Range 10 Minute] # Slide 10 Minute] # 11. Monitor New Users Query: Find people who put up something for sale within 12 hours of registering to use the auction service. # # NewPersonStream: # Select Istream(P.id, P.name) # From Person P # # Select Distinct(P.id, P.name) # From Select Rstream(P.id, P.name) # From NewPersonStream [Range 12 Hour] P, OpenAuction A [Now] # Where P.id = A.sellerID
