[ トップページ ]

« 開催中のオークションをもとめるクエリの実行法 | メイン | ストリーム・データ処理によるオンライン・オークションのシミュレーション (構造化版) »

ストリームデータ処理

ストリーム・データ処理によるオンライン・オークションのシミュレーション (非構造化版)

ストリーム・データ処理は通常はデータベースの処理と同様にインタプリタ的に実行される. しかし,ここではクエリの実行のしくみをみるために,あえてそれを Perl のプログラム (“コンパイル・コード”) でシミュレートしてみる. とりあげる例題は STREAM (Stanford Stream Data Manager) や ATLaS (Aggregate & Table Language and System) でも使用されているオンライン・オークションである. なお,このページでは非構造的な Perl に翻訳するやりかたをしめす. 「ストリーム・データ処理によるオンライン・オークションのシミュレーション (構造化版)」 において,構造化されたプログラムをしめす.

目次

はじめに

Stream Query Repository” というサイトには,CQL (Continuous Query Language) の文とその実行例がいろいろ書かれている. そこにあるをすこしかえて,Perl に翻訳してみることにする. このページのなかにもひととおりのプログラムを収容するが,全部をつないだ ソースコード もある. (また,このコードより 構造化されているがアドホックなソースコード もある.)

使用するデータ

Stream Query Repository: Online Auctions” でつかわれているデータを一部だけ変更したものをしめす.

関係

ここであつかういくつかの関係 (relation) は配列によって表現する. オークションにおいてあつかわれる品目は Item という関係の要素 (record または tuple) とみなすことができる. 品目は識別子 (id),なまえ (name),種類の識別子 (categoryID),登録時刻 (registrationTime) などの属性をもつ. 以下のテスト・データは NEXMark ペンチマークのテスト・データのごく一部の部分を変更して使用している.

my @Item =
    ({id => 0, name => 'Item 1',
      description => '',
      categoryID => 1, registrationTime => 0},
     {id => 1, name => 'Item 2',
      description => '',
      categoryID => 2, registrationTime => 0},
     {id => 2, name => 'Item 3',
      description => '',
      categoryID => 2, registrationTime => 0},
     {id => 3, name => 'Item 4',
      description => '',
      categoryID => 1, registrationTime => 0});

もとのベンチマークであつかわれている多数の品目のうち,ここでは 4 品目だけをあつかっている.

また,売り手,買い手や入札者はひとであり,それは Person という関係の要素だとかんがえることができる. 各人はなまえ,E-メイル・アドレス,国,市などの属性をもつが,国,市などは不明なばあいもある.

my @Person = 
    ({id => 0,
      name => 'Luitpold Martucci', emailAddress => 'Martucci@toronto.edu',
      city => '', country => ''},
     {id => 1,
      name => 'Takakazu Schonegge', emailAddress => 'Schonegge@panasonic.com',
      city => 'Gothenburg', country => 'United States'},
     {id => 9,
      name => 'Leonid Nour', emailAddress => 'Nour@llnl.gov',
      city => '', country => ''},
     {id => 11,
      name => 'Ravindra Abdelmoty', emailAddress => 'Abdelmoty@whizbang.com',
      city => 'Manchester', country => 'United States'},
     {id => 13,
      name => 'Annette Klaiber', emailAddress => 'Klaiber@propel.com',
      city => '', country => ''},
     {id => 14,
      name => 'Jacinto Ceccarelli', emailAddress => 'Ceccarelli@unbc.ca',
      city => 'Corpus', country => 'United States'},
     {id => 15,
      name => 'Janick Blaauw', emailAddress => 'Blaauw@ogi.edu',
      city => '', country => ''},
     {id => 16,
      name => 'Warwich Marsiglia', emailAddress => 'Marsiglia@cmu.edu',
      city => 'South', country => 'United States'},
     {id => 18,
      name => 'Justus Binkley', emailAddress => 'Binkley@ogi.edu',
      city => '', country => ''},
     {id => 19,
      name => 'Kunsoo Raghavendran', emailAddress => 'Raghavendran@umass.edu',
      city => 'Paris', country => 'Tuvalu'},
     {id => 20,
      name => 'Takuji Liedekerke', emailAddress => 'Liedekerke@cnr.it',
      city => '', country => ''},
     {id => 22,
      name => 'Shiyi Polster', emailAddress => 'Polster@ucd.ie',
      city => '', country => ''},
     {id => 30,
      name => 'Keiko Nastansky', emailAddress => 'Nastansky@co.jp',
      city => '', country => ''},
     {id => 31,
      name => 'Keumog Vuskovic', emailAddress => 'Vuskovic@filemaker.com',
      city => 'Cancun', country => 'Tajikistan'},
     {id => 33,
      name => 'Armand Impagliazzo', emailAddress => 'Impagliazzo@cwi.nl',
      city => 'Oakland', country => 'Heard and Mcdonald Island'},
     {id => 36,
      name => 'Zoe Holmback', emailAddress => 'Holmback@ucsb.edu',
      city => '', country => ''},
     {id => 39,
      name => 'Rajamani Pinzani', emailAddress => 'Pinzani@mitre.org',
      city => 'Gothenburg', country => 'United States'},
     {id => 44,
      name => 'Jianying Parikh', emailAddress => 'Parikh@sleepycat.com',
      city => 'Torreon', country => 'United States'},
     {id => 45,
      name => 'Edmond Rajcani', emailAddress => 'Rajcani@okcu.edu',
      city => '', country => ''},
     {id => 53,
      name => 'Guijun Cosette', emailAddress => 'Cosette@unl.edu',
      city => '', country => ''},
     {id => 55,
      name => 'Fabio Denna', emailAddress => 'Denna@cmu.edu',
      city => 'Wilmington', country => 'Australia'});

関係の内容を印刷するためのサブルーティン print_result() を用意する. その定義はつぎのとおりである.

sub print_result(\@$) {
    my ($result, $keys) = @_;
    foreach my $key (@$keys) {
	print "\t$key";
    }
    print "\n\t----------------\n";

    foreach my $record (@$result) {
	foreach my $key (@$keys) {
	    print "\t$record->{$key}";
	}
	print "\n";
    }
}

ストリーム

ストリームの要素はこのシミュレーションにおいてはあらかじめ配列にいれ,必要なときにとりだすことにする. オークションの開始をあらわすストリーム OpenAuction の要素を配列 @OpenAuction にいれる.

my @OpenAuction =
    ({itemID => 3, sellerID => 12, start_price =>  80, timestamp => 3100},
     {itemID => 2, sellerID => 39, start_price => 110, timestamp => 3150},
     {itemID => 1, sellerID => 18, start_price => 150, timestamp => 3300},
     {itemID => 0, sellerID => 30, start_price =>  30, timestamp => 3600});

また,オークションの終了をあらわすストリーム ClosedAuction の要素を配列 @ClosedAuction にいれる.

my @ClosedAuction =
    ({itemID => 3, byerID => 31, timestamp => 5400},
     {itemID => 2, byerID => 16, timestamp => 6500},
     {itemID => 1, byerID => 55, timestamp => 7050},
     {itemID => 0, byerID => 30, timestamp => 7200});

競りにおけるビッドのストリーム Bid の要素を配列 @Bid にいれる.

my @Bid =
    ({itemID => 3, bidPrice =>  82, bidderID => 22, timestamp => 3105},
     {itemID => 2, bidPrice => 115, bidderID => 19, timestamp => 3175},
     {itemID => 2, bidPrice => 120, bidderID => 39, timestamp => 3196},
     {itemID => 1, bidPrice => 155, bidderID => 45, timestamp => 3306},
     {itemID => 3, bidPrice => 103, bidderID =>  0, timestamp => 3380},
     {itemID => 2, bidPrice => 122, bidderID => 38, timestamp => 3497},
     {itemID => 0, bidPrice =>  33, bidderID => 41, timestamp => 3662},
     {itemID => 3, bidPrice => 130, bidderID => 19, timestamp => 3663},
     {itemID => 1, bidPrice => 171, bidderID => 11, timestamp => 3852},
     {itemID => 1, bidPrice => 181, bidderID =>  9, timestamp => 4395},
     {itemID => 1, bidPrice => 201, bidderID => 20, timestamp => 4396},
     {itemID => 3, bidPrice => 148, bidderID => 33, timestamp => 4573},
     {itemID => 3, bidPrice => 152, bidderID => 15, timestamp => 4574},
     {itemID => 3, bidPrice => 166, bidderID => 44, timestamp => 4758},
     {itemID => 3, bidPrice => 169, bidderID =>  1, timestamp => 4972},
     {itemID => 3, bidPrice => 191, bidderID => 53, timestamp => 5227},
     {itemID => 3, bidPrice => 216, bidderID => 31, timestamp => 5360},
     {itemID => 1, bidPrice => 226, bidderID => 33, timestamp => 5506},
     {itemID => 1, bidPrice => 249, bidderID => 36, timestamp => 5536},
     {itemID => 0, bidPrice =>  43, bidderID => 18, timestamp => 6040},
     {itemID => 1, bidPrice => 260, bidderID => 33, timestamp => 6041},
     {itemID => 0, bidPrice =>  53, bidderID =>  9, timestamp => 6282},
     {itemID => 0, bidPrice =>  58, bidderID => 13, timestamp => 6391},
     {itemID => 2, bidPrice => 126, bidderID => 16, timestamp => 6462},
     {itemID => 1, bidPrice => 280, bidderID => 14, timestamp => 6570},
     {itemID => 1, bidPrice => 302, bidderID => 55, timestamp => 6975},
     {itemID => 0, bidPrice =>  80, bidderID => 30, timestamp => 7171});

もとのデータにおいては同一のタイム・スタンプをもつビッドがふくまれていたが,プログラムにおけるあつかいを単純化するため,タイム・スタンプをずらして,かさならないようにしている.

時間の経過は $clock という変数によってシミュレートする. tick() がよびだされるごとに時刻が 1 (秒) すすむものとする (実用的にはもっとこまかい単位でタイムスタンプをつける必要があるが,ここではかんたんのために 1 秒単位としている). また,ここではかんたんのため,1 秒に 2 個ストリーム要素が到着することはないものとする.

my $clock = 0;

sub tick () {
    return ++$clock;
}

本来は外部から到着するはずの,配列にいれたストリームの要素は, instream_get() という関数によって 1 個ずつとりだして使用する. 初期設定のためにあらかじめ instream_open() をよびだすことにする. これらの関数の定義はつぎのとおりである.

sub instream_open(\@) {
    my ($array) = @_;
    my $stream = {array => $array, index => 0};

    return $stream;
}

sub instream_get($) {
    my ($stream) = @_;
    my $array = $stream->{array};
    my $next = $array->[$stream->{index}];
    if ($stream->{index} >= @$array ||
	$next->{timestamp} > $clock) {
	return '';
    } else {
	$stream->{index}++;
	return $next;
    }
}

instream_open() はストリームのハンドルをかえすが,これを instream_get() の第 1 引数として使用する.

また,外部に出力する (このシミュレーションにおいては印刷する) ストリームは outstream_open() によってひらき,outstream_put() によって出力する. outstream_open() の第 1 引数は本来はストリームを指定するが,現在は出力先を標準出力にかぎっているため使用していない. 第 2 引数は出力するフィールド名のリストを指定する. outstream_open() はストリームのハンドルをかえすが,それを outstream_put() の第 1 引数にわたす.

sub outstream_open($$) {
    my ($stream, $keys) = @_;
    foreach my $key (@$keys) {
	print "\t$key";
    }
    print "\n";
    return $keys;
}

sub outstream_put($$) {
    my ($stream, $record) = @_;
    my $keys = $stream;
    foreach my $key (@$keys) {
	print "\t$record->{$key}";
    }
    print "\n";
}

両替計算

米ドルによるビッドにおける価格をユーロに変換する. ビッドがストリームとして入力されるので,結果もストリームとして出力する. CQL によるプログラムはつぎのとおりである.

stream EuroBid is
Select itemID, DolToEuro(bidPrice) euroBidPrice, bidderID
From Bid;

これを Perl に翻訳するとつぎのようになる.

sub DolToEuro($) {
    my ($dollar) = @_;
    return 0.71 * $dollar;
}

sub euroBid() {
    my $dolBid = instream_open(@Bid);
    my $euroBid = outstream_open('', ['itemID', 'euroBidPrice', 'bidderID']);
    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();

	my $record;
	if ($record = instream_get($dolBid)) { # From Bid
	    outstream_put($euroBid,
			  {itemID => $record->{itemID},
			   euroBidPrice => DolToEuro($record->{bidPrice}),
			   bidderID => $record->{bidderID}}); # Select ...
	}
    }	
}

サブルーティン euroBid() は入力ストリームの要素を 1 個よむごとにその値を変換し,出力ストリームに出力する. すなわち,この処理は 1 本のパイプだけで実行することができる. euroBid() を実行すれば,つぎのような結果がえられる.

	itemID	euroBidPrice	bidderID
	3	58.22	22
	2	81.65	19
	2	85.2	39
	1	110.05	45
	3	73.13	0
	2	86.62	38
	0	23.43	41
	3	92.3	19
	1	121.41	11
	1	128.51	9
	1	142.71	20
	3	105.08	33
	3	107.92	15
	3	117.86	44
	3	119.99	1
	3	135.61	53
	3	153.36	31
	1	160.46	33
	1	176.79	36
	0	30.53	18
	1	184.6	33
	0	37.63	9
	0	41.18	13
	2	89.46	16
	1	198.8	14
	1	214.42	55
	0	56.8	30

特定のオークションを選択するクエリ

品目識別子が 1 または 2 のビッドだけを抽出するには,つぎのような CQL クエリを実行すればよい.

stream Bid_1_2 is
Select itemID, bidPrice
From Bid
where itemID = 1 or itemID = 2;

これを Perl に翻訳するとつぎのようになる.

sub bid_1_2() {
    my $bid = instream_open(@Bid);
    my $selected = outstream_open('', ['itemID', 'bidPrice']);
    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();

	my $record;
	if ($record = instream_get($bid)) {
	    if ($record->{itemID} == 1 || $record->{itemID} == 2) {
		outstream_put($selected, $record);
	    }
	}
    }
}

この処理も 1 本のパイプだけで実行することができる. bid_1_2() を実行すれば,つぎのような結果がえられる.

	itemID	bidPrice
	2	115
	2	120
	1	155
	2	122
	1	171
	1	181
	1	201
	1	226
	1	249
	1	260
	2	126
	1	280
	1	302

[TBD]

Local Item Suggestion Query

米国の売り手が出品した品種が 2 の品目をすべて報告するには,つぎのクエリを実行すればよい.

stream Items_cat2_US is
Select Istream(P.name, P.city, O.itemID)
From OpenAuction [Now] O, Person P, Item I
Where O.sellerID = P.id and P.country = 'United States' and O.itemID = I.id
      and I.categoryID = 2;

これを Perl に翻訳するとつぎのようになる.

sub items_cat2_US() {
    # Generate index to relation: id -> person
    my %personByID = ();
    foreach my $record (@Person) {
	$personByID{$record->{id}} = $record; # id is assumed to be unique
    }
    # print %{$personByID{39}}, "\n";

    # Generate index to relation: id -> item
    my %itemByID = ();
    foreach my $record (@Item) {
	$itemByID{$record->{id}} = $record; # id is assumed to be unique
    }
    # print %{$itemByID{2}}, "\n";

    my $is_auction = instream_open(@OpenAuction);
    my $selected = outstream_open('', ['name', 'city', 'itemID']);
    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();

	my $auction;
	if ($auction = instream_get($is_auction)) {
	    my $person = $personByID{$auction->{sellerID}};
	    my $item = $itemByID{$auction->{itemID}};
	    if ($person->{country} eq 'United States' &&
		$item->{categoryID} == 2) {
		outstream_put($selected,
			      {name => $person->{name},
			       city => $person->{city},
			       itemID => $auction->{itemID}});
	    }
	}
    }
}

関係 Person および Item のインデクス (primary index) を生成して使用している. いずれにおいても id が一意であることを仮定している. これらの関係の処理をのぞいたストリームの入力,処理,出力の処理に関しては,上記の 2 つの例題とおおきなちがいはない.

items_cat2_US() をよびだすと,結果はつぎのように表示される.

	name	city	itemID
	Rajamani Pinzani	Gothenburg	2

開催中のオークションに関するクエリ

現在開催中のオークションからなるテーブル (関係) を維持するには,つぎのようなクエリを使用すればよい.

temporal relation OpenAuctionItemID is
Select itemID
From OpenAuction [range .. now];

temporal relation ClosedAuctionItemID is
Select itemID
From ClosedAuction [range .. now];

temporal relation CurrentAuctions is
OpenAuctionItemID Minus ClosedAuctionItemID;

これを Perl に翻訳するとつぎのようになる. このプログラムにおいては複数のパイプがくみあわされるので,このプログラムにおいてはパイプごとにラベルをつけ,そのあいだを goto 文によってつなぎあわせる (移動する) ようにしている.

sub currentAuctions() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my %closedAuction_itemID = ();
    my %minus_itemID = ();

    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();
	my $record;
	my $minus_input;

	# Dispatcher:
	if ($record = instream_get($openAuction)) {
	    goto OpenAuctionItemID;
	} elsif ($record = instream_get($closedAuction)) {
	    goto ClosedAuctionItemID;
	}
	goto Next;

      OpenAuctionItemID: {
	  my $itemID = $record->{itemID};
	  if (!$closedAuction_itemID{$itemID}) {
	      $minus_itemID{$itemID} = $record;
	      $minus_input = 1;
	      goto CurrentAuctions;
	  };
	  goto Next;
      }

      ClosedAuctionItemID: {
	  my $itemID = $record->{itemID};
	  $closedAuction_itemID{$itemID} = $record;
	  delete $minus_itemID{$itemID};
	  $minus_input = -1;
	  goto CurrentAuctions;
      }

      CurrentAuctions: {
	  my @result = ();
	  foreach my $itemID (keys %minus_itemID) {
	      push(@result, {itemID => $itemID});
	  }
	  print "$minus_input $clock\n";
	  print_result(@result, ['itemID']);
	  print "\n";
	  goto Next;
      }

      Next:
    }
}

このプログラムにおいて,“# Dispatcher” というコメントをつけた部分が本来なら外部からとどくストリーム要素の内容によって 2 個のパイプ (“OpenAuctionItemID” と “ClosedAuctionItemID”) のうちのいずれかにその要素をふりわける.

currentAuctions() をよびだすと,結果はつぎのように表示される.

1 3100
	itemID
	----------------
	3

1 3150
	itemID
	----------------
	3
	2

1 3300
	itemID
	----------------
	1
	3
	2

1 3600
	itemID
	----------------
	1
	0
	3
	2

-1 5400
	itemID
	----------------
	1
	0
	2

-1 6500
	itemID
	----------------
	1
	0

-1 7050
	itemID
	----------------
	0

-1 7200
	itemID
	----------------

おなじ意味をもつプログラムをつぎのように記述することもできる.

stream CurrentAuction2 is
Select *
From OpenAuction
Where itemID Not in (Select itemID From ClosedAuction);

このプログラムを Perl ではつぎのように表現することができる.

sub currentAuctions2() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my %closedAuction = ();
    my %result = ();

    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();
	my $record;
	my $changed = 0;

	if ($record = instream_get($openAuction)) {
	    goto OpenAuctionItemID;
	} elsif ($record = instream_get($closedAuction)) {
	    goto ClosedAuctionItemID;
	}
	goto Next;

      OpenAuctionItemID: {
	  my $itemID = $record->{itemID};
	  if (!$closedAuction{$itemID}) {
	      $result{$itemID} = $record;
	      $changed = 1;
	      goto Result;
	  };
	  goto Next;
      }

      ClosedAuctionItemID: {
	  my $itemID = $record->{itemID};
	  $closedAuction{$itemID} = $record;
	  delete $result{$itemID}; # where itemID Not in ...
	  $changed = -1;
	  goto Result;
      }

      Result: {
	  my @result = ();
	  foreach my $itemID (keys %result) {
	      push(@result, {itemID => $itemID});
	  }
	  print "$changed $clock\n";
	  print_result(@result, ['itemID']);
	  print "\n";
	  goto Next;
      }

      Next:
    }
}

[TBD]

currentAuctions2() の実行によってえられる結果は currentAuctions() のそれとひとしい.

終了時価格に関するクエリ

各オークションの終了時の価格を報告するには,つぎのクエリを実行すればよい.

stream BidPrice is
Select itemID, bidPrice as price
From Bid;

stream OpenPrice is
Select itemID, start_price as price
From OpenAuction;

temporal relation P is
BidPrice Union All OpenPrice

temporal relation CurrentPrice is
Select P.itemID, Max(P.price) as price
From P [range .. now]
Group By P.itemID;

stream Result is
Select Rstream(C.itemID, P.price)
From ClosedAuction [Now] C, CurrentPrice P
Where C.itemID = P.itemID;

これを Perl に翻訳するとつぎのようになる. ここでもパイプどうしを goto 文によってつないでいる.

sub closedPrices() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my $bid = instream_open(@Bid);
    my %CurrentPrice = ();
    my $selected = outstream_open('', ['itemID', 'price', 'timestamp']);

    for (my $t = 0; $t < $Max_timestamp; $t++) {
	tick();
	my $record;
	my $itemID;
	my $price;

	# Input: external stream $closedAuction, $bid, $openAuction ->
	#	stream ($record)
	if ($record = instream_get($closedAuction)) {
	    goto Result;
	} elsif ($record = instream_get($bid)) {
	    goto BidPrice;
	} elsif ($record = instream_get($openAuction)) {
	    goto OpenPrice;
	}
	goto Next;

	# BidPrice: stream $record->{itemID, bidPrice} ->
	#	stream ($itemID, $price)
      BidPrice: {
	  $itemID = $record->{itemID};
	  $price = $record->{bidPrice};
	  goto CurrentPrice; # implicit union
      }

	# OpenPrice: stream $record->{itemID, start_price} ->
	#	stream ($itemID, $price)
      OpenPrice: {
	  $itemID = $record->{itemID};
	  $price = $record->{start_price};
	  goto CurrentPrice; # implicit union
      }

	# CurrentPrice: stream ($itemID, $price) -> hashed relation (%CurrentPrice)
      CurrentPrice: {
	  # Max(price) & Group By itemID:
	  if (!$CurrentPrice{$itemID}) {
	      $CurrentPrice{$itemID} = {itemID => $itemID, price => $price};
	      # goto Result;
	  } elsif ($price > $CurrentPrice{$itemID}->{price}) {
	      $CurrentPrice{$itemID}->{price} = $price;
	      # goto Result;
	  }
	  goto Next;
      }

	# Result: hashed relation (%CurrentPrice) -> external stream $selected
      Result: {
	  my $itemID = $record->{itemID};
	  my $currentPrice = $CurrentPrice{$itemID};
	  my $Rstream_record = {itemID => $itemID,
				price => $currentPrice->{price},
				timestamp => $clock};

	  outstream_put($selected, $Rstream_record);
	  goto Next;
      }

      Next:
    }
}

## [Original CQL Query]
# CurrentPrice:
#   Select P.itemID, Max(P.price) as price
#   From ((Select itemID, bidPrice as price
# 	   From Bid) Union All
# 	  (Select itemID, start_price as price
# 	   From OpenAuction)) P
#   Group By P.itemID;
# 
# Select   Rstream(C.itemID, I.categoryID, P.price)
# From     ClosedAuction [Now] C, CurrentPrice P, Item I
# Where    C.itemID = P.itemID and C.itemID = I.id

closedPrices() をよびだすと,結果はつぎのように表示される.

	itemID	price	timestamp
	3	216	5400
	2	126	6500
	1	302	7050
	0	80	7200

終了時価格のカテゴリーごとの平均値に関するクエリ

Monitor the average closing price across items in each category over the last hour.

temporal relation CurrentPrice is
    Select    P.itemID, Max(P.price) as price
    From      ((Select itemID, bid_price as price 
                From   Bid) Union All
               (Select itemID, start_price as price 
                From OpenAuction)) [range .. now] P
    Group By  P.itemID

stream ClosingPriceStream is
    Select   Rstream(I.categoryID as categoryID, P.price as price)
    From     ClosedAuction [Now] C, CurrentPrice P, Item I
    Where    C.itemID = P.itemID and C.itemID = I.id

temporal relation AvgPrice is
    Select   catID, Avg(price)
    From     ClosingPriceStream [Range 1 Hour]
    Group By catID 
## [Original]
# ClosingPriceStream:
#     Select   Rstream(T.id as catID, P.price as price)
#     From     ClosedAuction [Now] C, CurrentPrice P, 
#              Item I, Category T    
#     Where    C.itemID = P.itemID and C.itemID = I.id and I.categoryID = T.id 

Perl ではつぎのように表現することができる. ただし,ウィンドウ・サイズは 1 時間 (3600 sec) でなく 1500 sec にしている.

sub averageClosingPrice() {
    my $openAuction = instream_open(@OpenAuction);
    my $closedAuction = instream_open(@ClosedAuction);
    my $bid = instream_open(@Bid);
    my %CurrentPrice = ();
    my @ClosingPriceWindow = ();
    my %AvgPrice = ();

    # Generate Item primary index: relation (@Item) -> hashed relation (%ItemById)
    my %ItemById = ();
    foreach my $record (@Item) {
	$ItemById{$record->{id}} = $record;
    }

    for (my $t = 0; $t < $Max_timestamp; $t++) {
	my $changed = 0;
	my $closedAuction_record;
	my $bid_record;
	my $openAuction_record;
	my $categoryID;
	my $itemID;
	my $price;

	tick();

	my $changed1 = 0;
      ClosingPriceWindow_remove: {
	  if (@ClosingPriceWindow > 0 &&
	      $ClosingPriceWindow[0]->{timestamp} <= $clock - 1500) {
	      # $ClosingPriceWindow[0]->{timestamp} <= $clock - 3600) {
	      my $removedFromClosingPrice = shift @ClosingPriceWindow;
	      $categoryID = $removedFromClosingPrice->{categoryID};
	      $price = $removedFromClosingPrice->{price};
	      $changed1 = 1;
	  }
      }

      AvgPrice_remove: {
	  if ($changed1) {
	      $AvgPrice{$categoryID}->{priceSum} -= $price;
	      $AvgPrice{$categoryID}->{count}--;
	      $changed = -1;
	  }
      }

      Result_remove: {
	  if ($changed) {
	      print "$changed $clock\n";
	      my @result = ();
	      foreach my $categoryID (keys %AvgPrice) {
		  my $record = $AvgPrice{$categoryID};
		  if ($record->{count} > 0) {
		      push(@result,
			   {categoryID => $categoryID,
			    Avg_price => $record->{priceSum} / $record->{count}});
		  }
	      }
	      print_result(@result, ['categoryID', 'Avg_price']);
	      print "\n";
	  }
      }

	# Input: external stream $closedAuction, $bid, $openAuction ->
	#	stream ($record)
	if ($closedAuction_record = instream_get($closedAuction)) {
	    goto ClosingPriceStream;
	} elsif ($bid_record = instream_get($bid)) {
	    goto BidPrice;
	} elsif ($openAuction_record = instream_get($openAuction)) {
	    goto OpenPrice;
	}
	goto Next;

	# BidPrice: stream $record->{itemID, bidPrice} ->
	#	stream ($itemID, $price)
      BidPrice: {
	  $itemID = $bid_record->{itemID};
	  $price = $bid_record->{bidPrice};
	  goto CurrentPrice;
      }

	# OpenPrice: stream $record->{itemID, start_price} ->
	#	stream ($itemID, $price)
      OpenPrice: {
	  $itemID = $openAuction_record->{itemID};
	  $price = $openAuction_record->{start_price};
	  goto CurrentPrice;
      }

	# CurrentPrice: stream ($itemID, $price) -> hashed relation (%CurrentPrice)
      CurrentPrice: {
	  # Max(price) & Group By itemID:
	  if (!$CurrentPrice{$itemID}) {
	      $CurrentPrice{$itemID} = {itemID => $itemID, price => $price};
	      goto ClosingPriceStream;
	  } elsif ($price > $CurrentPrice{$itemID}->{price}) {
	      $CurrentPrice{$itemID}->{price} = $price;
	      goto ClosingPriceStream;
	  }

	  goto Next;
      }

	# ClosingPriceStream:
	#	external stream ($closedAuction), hashed relation (%CurrentPrice) ->
	#	stream ($categoryID, $price)
      ClosingPriceStream: {
	  if ($closedAuction_record) {
	      my $itemID = $closedAuction_record->{itemID};
	      my $currentPrice = $CurrentPrice{$itemID};
	      if ($currentPrice) {
		  $categoryID = $ItemById{$itemID}->{categoryID};
		  $price = $currentPrice->{price};
		  goto ClosingPriceWindow_add;
	      }
	  }
	  goto Next;
      }

	# ClosingPriceWindow: stream ($categoryID, $price) ->
	#	relation (@ClosingPriceWindow)
      ClosingPriceWindow_add: {
	  my $ClosingPrice_record =
	  {categoryID => $categoryID, price => $price, timestamp => $clock};

	  # Get ClosingPriceStream [Range 1 Hour] as ClosingPrice
	  push(@ClosingPriceWindow, $ClosingPrice_record);
	  goto AvgPrice_add;
      }

	# AvgPrice: signed stream ($categoryID, $price) ->
	#	hashed relation (%AvgPrice)
      AvgPrice_add: {
	  if (!$AvgPrice{$categoryID}) {
	      $AvgPrice{$categoryID} = {categoryID => $categoryID,
					priceSum => $price,
					count => 1};
	  } else {
	      $AvgPrice{$categoryID}->{priceSum} += $price;
	      $AvgPrice{$categoryID}->{count}++;
	  }
	  $changed = 1;
	  goto Result;
      }

	# Result: hashed relation (%AvgPrice) -> view of relation @result
      Result: {
	  print "$changed $clock\n";
	  my @result = ();
	  foreach my $categoryID (keys %AvgPrice) {
	      my $record = $AvgPrice{$categoryID};
	      if ($record->{count} > 0) {
		  push(@result,
		       {categoryID => $categoryID,
			Avg_price => $record->{priceSum} / $record->{count}});
	      }
	  }
	  print_result(@result, ['categoryID', 'Avg_price']);
	  print "\n";
	  goto Next;
      }

      Next:
    }
}

このプログラムがこれまでのプログラムともっともおおきくことなる点は,スライディング・ウィンドウ [Range 1 Hour] から,ふるい要素が脱落していくために,再計算が必要になることである. その再計算をおこなっているのが ClosingPriceWindow_remove と AvgPrice_remove という 2 つのパイプであり,再計算した値を Result_remove によって印刷している. これらが for 文の最初の部分におかれているのは,tick() によって時刻が 1 秒すすめられたときに,必要ならすぐに要素をウィンドウから削除するためである.

averageClosingPrice() をよびだすと,結果はつぎのように表示される.

1 5400
	categoryID	Avg_price
	----------------
	1	216

1 6500
	categoryID	Avg_price
	----------------
	1	216
	2	126

-1 6900
	categoryID	Avg_price
	----------------
	2	126

1 7050
	categoryID	Avg_price
	----------------
	2	214

1 7200
	categoryID	Avg_price
	----------------
	1	80
	2	214

-1 8000
	categoryID	Avg_price
	----------------
	1	80
	2	302

その他

オンライン・オークションの例題のなかには,さらに以下のような 5 題の問題がふくまれているが,これらは当面,省略する.

# 7. Short Auctions Query: Report all auctions that closed within five hours of their opening.
# 
# Select Rstream(OpenAuction.*)
# From   OpenAuction [Range 5 Hour] O, ClosedAuction [Now] C
# Where  O.itemID = C.itemID


# 8. Hot Item Query: Select the item(s) with the most bids in the past hour. Update the results every minute.
# 
# HotItemStream: 
#     Select Rstream(itemID)
#     From   (Select   B1.itemID as itemID, Count(*) as num
#             From     Bid [Range 60 Minute
#                           Slide 1 Minute] B1
#             Group By B1.itemID) 
#     Where   num >= All (Select   Count(*)
#                         From     Bid [Range 60 Minute
#                                       Slide 1 Minute] B2
#                         Group By B2.itemID)
# 
# Select *
# From   HotItemStream [Range 1 Minute]


# 9. Average Selling Price By Seller Query: For each seller, maintain the average selling price over the last 10 items sold.
# 
# CurrentPrice: 
#     Select    P.itemID, Max(P.price) as price
#     From      ((Select itemID, bid_price as price 
#                 From   Bid) Union All
#                (Select itemID, start_price as price 
#                 From OpenAuction)) P
#     Group By  P.itemID
# 
# ClosingPriceStream:
#     Select   Rstream(O.sellerID as sellerID, P.price as price)
#     From     ClosedAuction [Now] C, CurrentPrice P, 
#              OpenAuction O
#     Where    C.itemID = P.itemID and C.itemID = O.itemID
# 
# AvgSellingPrice:
#     Select   sellerID, Avg(price)
#     From     ClosingPriceStream [Partition By sellerID Rows 10] 
#     Group By sellerID 


# 10. Highest Bid Query: Every 10 minutes return the highest bid(s) in the recent 10 minutes.
# 
# Select  Rstream(itemID, bid_price)
# From    Bid [Range 10 Minute
#              Slide 10 Minute]
# Where   bid_price = (Select  Max(bid_price)
#                      From    Bid [Range 10 Minute]
#                                   Slide 10 Minute]


# 11. Monitor New Users Query: Find people who put up something for sale within 12 hours of registering to use the auction service.
# 
# NewPersonStream:
#   Select Istream(P.id, P.name)
#   From   Person P
# 
# Select Distinct(P.id, P.name)
# From   Select Rstream(P.id, P.name)
#        From   NewPersonStream [Range 12 Hour] P, OpenAuction A [Now]
#        Where  P.id = A.sellerID
Keywords:

トラックバック

このエントリーのトラックバックURL:
http://www.kanadas.com/mt/mt-tb.cgi/3448

コメントを投稿

このページについて

2009-01-05 00:39 に投稿されたエントリーのページです。

他にも多くのエントリーがあります。メインページアーカイブページも見てください。

Creative Commons License
このブログは、次のライセンスで保護されています。 クリエイティブ・コモンズ・ライセンス.
Powered by
Movable Type 3.36