/***************************************************************************
 *
 *	CQL compiled code examples in Java
 *	(without TimeBasedWindow)
 *
 *	Coded by Yasusi Kanada
 *
 *	2009-2-14  Initial version
 *
 **************************************************************************/

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Enumeration;

/***
 * General-purpose Tuples
 ***/

/**
 * Simple tuple
 **/
class Tuple {
    public String itemID;
}

/**
 * Tuple with timestamp
 **/
class TimeStampedTuple extends Tuple {
    public long timestamp;
}

/***
 * Domain-specific Tuples
 ***/

/**
 * Item tuple
 **/
class Item extends Tuple {
    public String name;
    public String categoryID;

    public Item(int i, String n, int c) {
	itemID = "" + i; name = n; categoryID = "" + c;
    }
}

/**
 * Person tuple
 **/
class Person extends Tuple {
    public String name;
    public String country;

    public Person(int i, String n, String c) {
	itemID = "" + i; name = n; country = c;
    }
}

/**
 * Open auction stream element (tuple with timestamp)
 **/
class OpenAuction extends TimeStampedTuple {
    public String sellerID;
    public int start_price;

    public OpenAuction(long t, int i, int s, int p) {
	timestamp = t; itemID = "" + i; sellerID = "" + s; start_price = p;
    }
}

/**
 * Closed auction stream element (tuple with timestamp)
 **/
class ClosedAuction extends TimeStampedTuple {
    public String byerID;

    public ClosedAuction(long t, int ii, int bi) {
	timestamp = t; itemID = "" + ii; byerID = "" + bi;
    }
}

/**
 * Bid stream element (tuple with timestamp)
 **/
class Bid extends TimeStampedTuple {
    public int bidPrice;
    public String bidderID;

    public Bid(long t, int ii, int p, int bi) {
	timestamp = t; itemID = "" + ii; bidPrice = p; bidderID = "" + bi;
    }
}

/**
 * Price tuple (for temporary use)
 **/
class PriceTuple extends TimeStampedTuple {
    public String categoryID;
    public int price;
}

/**
 * Price sum tuple (for temporary use)
 **/
class PriceSumTuple extends TimeStampedTuple {
    public String categoryID;
    public int count;
    public int priceSum;
}

/**
 * Category-price tuple (for temporary use)
 **/
class CategoryPriceTuple extends TimeStampedTuple {
    public String categoryID;
    public int Avg_price;
}

/***
 * Input Stream
 ***/
class InStream {
    TimeStampedTuple[] Data;
    int index;

    public InStream(TimeStampedTuple[] d) {
	Data = d; index = 0;
    }

    public boolean hasNext(int time) {
	return Data.length > index && time >= Data[index].timestamp;
    }

    public Object get() {
	return Data[index++];
    }
}

/***
 * Output Stream (special-purpose version)
 ***/
class OutStream {
    public OutStream() {
    }

    public void put(ArrayList<CategoryPriceTuple> l) {
	for (int i = 0; i < l.size(); i++) {
	    CategoryPriceTuple t = l.get(i);
	    System.out.println("{ categoryID => " + t.categoryID +
			     ", Avg_price => " + t.Avg_price + " }");
	}
    }
}

/***
 * Class for computing average closing prices
 ***/
class AverageClosingPrice {
    int clock;

    Hashtable<String,PriceTuple> CurrentPrice;
    Hashtable<String,PriceSumTuple> PriceSum;
    Hashtable<String,Item> ItemPrimIx;
    ArrayList<PriceTuple> ClosingPriceWindow;
    OutStream out;

    public AverageClosingPrice(Item[] Items) {
	CurrentPrice = new Hashtable<String,PriceTuple>();
	PriceSum = new Hashtable<String,PriceSumTuple>();
	ClosingPriceWindow = new ArrayList<PriceTuple>();

	out = new OutStream();

	// Get primary index of Items
	ItemPrimIx = new Hashtable<String,Item>();
	for (int i = 0; i < Items.length; i++){
	    ItemPrimIx.put(Items[i].itemID, Items[i]);
	}
    }

    void put_result(int change) {
	System.out.println(change + " " + clock);
	ArrayList<CategoryPriceTuple> avgPrice = new ArrayList<CategoryPriceTuple>();
	for (Enumeration<String> e = PriceSum.keys(); e.hasMoreElements();) {
	    String categoryID = e.nextElement();
	    PriceSumTuple o = PriceSum.get(categoryID);
	    if (o.count > 0) {
		CategoryPriceTuple t = new CategoryPriceTuple();
		t.categoryID = categoryID;
		t.Avg_price = o.priceSum / o.count;
		avgPrice.add(t);
	    }
	}
	out.put(avgPrice);
    }

    void add_price(PriceTuple t) {
	PriceSumTuple sums = PriceSum.get(t.categoryID);
	if (sums == null) {
	    PriceSumTuple sum = new PriceSumTuple();
	    sum.categoryID = t.categoryID;
	    sum.priceSum = t.price;
	    sum.count = 1;
	    PriceSum.put(t.categoryID, sum);
	} else {
	    PriceSumTuple sum = PriceSum.get(t.categoryID);
	    sum.priceSum += t.price;
	    sum.count++;
	}
	put_result(1);
    }

    void subtract_price(PriceTuple t) {
	PriceSumTuple sum = PriceSum.get(t.categoryID);
	sum.priceSum -= t.price;
	sum.count--;
	put_result(-1);
    }

    void put_closingPriceWindow(String categoryID, int price) {
	PriceTuple t = new PriceTuple();
	t.categoryID = categoryID;
	t.price = price;

	if (ClosingPriceWindow.size() > 0 &&
	    ClosingPriceWindow.get(0).timestamp <= clock - 1500) {
	    subtract_price(ClosingPriceWindow.remove(0));
	}
	t.timestamp = clock;
	ClosingPriceWindow.add(t);

	add_price(t);
    }

    void currentPrice(String itemID, int price) {
	PriceTuple CP = CurrentPrice.get(itemID);
	if (CP == null) {
	    PriceTuple t = new PriceTuple();
	    t.itemID = itemID;
	    t.price = price;
	    CurrentPrice.put(itemID, t);
	} else if (price > CP.price) {
	    CurrentPrice.get(itemID).price = price;
	}
    }

    void put_open(OpenAuction auction) {
	currentPrice(auction.itemID, auction.start_price);
    }

    void put_next(ClosedAuction auction) {
	String itemID = auction.itemID;
	put_closingPriceWindow(ItemPrimIx.get(itemID).categoryID,
			       CurrentPrice.get(itemID).price);
    }

    void put_bid(Bid bid) {
	currentPrice(bid.itemID, bid.bidPrice);
    }

    /***
     * Main program
     ***/
    public static void main(String[] args) {
	Item[] items =
	    new Item[]{new Item(0, "Item 1", 1),
		       new Item(1, "Item 2", 2),
		       new Item(2, "Item 3", 2),
		       new Item(3, "Item 4", 1)};

	InStream openAuctions = 
	    new InStream(new OpenAuction[]{new OpenAuction(3100, 3, 12,  80),
					   new OpenAuction(3150, 2, 39, 110),
					   new OpenAuction(3300, 1, 18, 150),
					   new OpenAuction(3600, 0, 30,  30)});

	InStream closedAuctions =
	    new InStream(new ClosedAuction[]{new ClosedAuction(5400, 3, 31),
					     new ClosedAuction(6500, 2, 16),
					     new ClosedAuction(7050, 1, 55),
					     new ClosedAuction(7200, 0, 30)});

	InStream bids =
	    new InStream(new Bid[]{new Bid(3105, 3,  82, 22), new Bid(3175, 2, 115, 19),
				   new Bid(3196, 2, 120, 39), new Bid(3306, 1, 155, 45),
				   new Bid(3380, 3, 103,  0), new Bid(3497, 2, 122, 38),
				   new Bid(3662, 0,  33, 41), new Bid(3663, 3, 130, 19),
				   new Bid(3852, 1, 171, 11), new Bid(4395, 1, 181,  9),
				   new Bid(4396, 1, 201, 20), new Bid(4573, 3, 148, 33),
				   new Bid(4574, 3, 152, 15), new Bid(4758, 3, 166, 44),
				   new Bid(4972, 3, 169,  1), new Bid(5227, 3, 191, 53),
				   new Bid(5360, 3, 216, 31), new Bid(5506, 1, 226, 33),
				   new Bid(5536, 1, 249, 36), new Bid(6040, 0,  43, 18),
				   new Bid(6041, 1, 260, 33), new Bid(6282, 0,  53,  9),
				   new Bid(6391, 0,  58, 13), new Bid(6462, 2, 126, 16),
				   new Bid(6570, 1, 280, 14), new Bid(6975, 1, 302, 55),
				   new Bid(7171, 0,  80, 30)});

	AverageClosingPrice self = new AverageClosingPrice(items);
	self.clock = 0;
	for (int i = 0; i < 8000; i++) {
	    if (closedAuctions.hasNext(self.clock)) {
		self.put_next((ClosedAuction)closedAuctions.get());
	    }
	    if (bids.hasNext(self.clock)) {
		self.put_bid((Bid)bids.get());
	    }
	    if (openAuctions.hasNext(self.clock)) {
		self.put_open((OpenAuction)openAuctions.get());
	    }
	    self.clock++;
	}
    }
}
