diff --git a/examples/trident/word_count_query.rb b/examples/trident/word_count_query.rb new file mode 100644 index 0000000..87f36e9 --- /dev/null +++ b/examples/trident/word_count_query.rb @@ -0,0 +1,33 @@ +require "red_storm" +require "json" + +java_import "backtype.storm.utils.DRPCClient" + +# Usage: +# +# This is a DRPC client that will query a Storm cluster trident drpc topology. +# See the trident word_count_topology.rb for runnnig the drpc topology. +# +# Edit the host and port below. + +module Example + + # this is not a topology, the redstorm topology_launcher will launch any class with the + # start method in the correct storm environment + + class TridentWordCountQuery + RedStorm::Configuration.topology_class = self + + def start(env) + puts("TridentWordCountQuery starting") + + client = DRPCClient.new("localhost", 3772) + loop do + json_result = client.execute("words", "cat the dog jumped") + puts("DRPC execute=#{JSON.parse(json_result)[0][0]}") + + sleep(2) + end + end + end +end \ No newline at end of file diff --git a/examples/trident/word_count_topology.rb b/examples/trident/word_count_topology.rb new file mode 100644 index 0000000..f73f0b2 --- /dev/null +++ b/examples/trident/word_count_topology.rb @@ -0,0 +1,153 @@ +require 'red_storm' +require 'json' + +java_import "backtype.storm.LocalCluster" +java_import "backtype.storm.LocalDRPC" +java_import "backtype.storm.StormSubmitter" +java_import "backtype.storm.generated.StormTopology" +java_import "backtype.storm.tuple.Fields" +java_import "backtype.storm.tuple.Values" +java_import "storm.trident.TridentState" +java_import "storm.trident.TridentTopology" +java_import "storm.trident.operation.BaseFunction" +java_import "storm.trident.operation.TridentCollector" +java_import "storm.trident.operation.builtin.Count" +java_import "storm.trident.operation.builtin.FilterNull" +java_import "storm.trident.operation.builtin.MapGet" +java_import "storm.trident.operation.builtin.Sum" +java_import "storm.trident.testing.FixedBatchSpout" +java_import "storm.trident.testing.MemoryMapState" +java_import "storm.trident.tuple.TridentTuple" + +java_import 'redstorm.storm.jruby.JRubyTridentFunction' + +REQUIRE_PATH = Pathname.new(__FILE__).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s + +# Usage: +# +# Local mode: +# +# $ redstorm install +# $ redstorm examples +# $ restorm local examples/trident/word_count_topology.rb +# +# Cluster mode: +# +# $ redstorm install +# $ redstorm examples +# $ redstorm jar examples +# $ redstorm cluster examples/trident/word_count_topology.rb +# +# After submission, wait a bit for topology to startup and launch the drpc query example: +# Edit word_count_query.rb to set the host/port of your cluster drpc daemon. +# +# $ redstorm local examples/trident/word_count_query.rb + +module Examples + class TridentSplit + + def execute(tuple, collector) + tuple[0].split(" ").each do |word| + collector.emit(Values.new(word)) + end + end + + def prepare(conf, context); end + def cleanup;end + end + + class TridentWordCountTopology + RedStorm::Configuration.topology_class = self + + def build_topology(local_drpc) + spout = FixedBatchSpout.new( + Fields.new("sentence"), 3, + Values.new("the cow jumped over the moon"), + Values.new("the man went to the store and bought some candy"), + Values.new("four score and seven years ago"), + Values.new("how many apples can you eat"), + Values.new("to be or not to be the person") + ) + spout.cycle = true + + topology = TridentTopology.new + + stream = topology.new_stream("spout1", spout) + .parallelism_hint(3) + .each( + Fields.new("sentence"), + JRubyTridentFunction.new(REQUIRE_PATH, "Examples::TridentSplit"), + Fields.new("word") + ) + .groupBy( + Fields.new("word") + ) + .persistentAggregate( + MemoryMapState::Factory.new, + Count.new, + Fields.new("count") + ) + .parallelism_hint(3) + + # topology.newDRPCStream("words", drpc) + topology.newDRPCStream("words", local_drpc) + .each( + Fields.new("args"), + JRubyTridentFunction.new(REQUIRE_PATH, "Examples::TridentSplit"), + Fields.new("word") + ) + .groupBy( + Fields.new("word") + ) + .stateQuery( + stream, + Fields.new("word"), + MapGet.new, + Fields.new("count") + ) + .each( + Fields.new("count"), + FilterNull.new + ) + .aggregate( + Fields.new("count"), + Sum.new, + Fields.new("sum") + ) + + topology.build + end + + def display_drpc(client) + loop do + sleep(2) + + json_result = client.execute("words", "cat the dog jumped") + puts("DRPC execute=#{JSON.parse(json_result)[0][0]}") + end + end + + def start(env) + conf = Backtype::Config.new + conf.debug = false + conf.max_spout_pending = 20 + + case env + when :local + local_drpc = LocalDRPC.new + submitter = LocalCluster.new + conf.num_workers = 1 # set to 1 in local, see https://issues.apache.org/jira/browse/STORM-113 + when :cluster + local_drpc = nil + submitter = StormSubmitter + conf.put("drpc.servers", ["localhost"]) + conf.num_workers = 3 + end + + submitter.submit_topology("trident_word_count", conf, build_topology(local_drpc)); + + display_drpc(local_drpc) if local_drpc + end + end + +end