diff --git a/examples/simple/ruby_version_topology.rb b/examples/simple/ruby_version_topology.rb new file mode 100644 index 0000000..8c48ee1 --- /dev/null +++ b/examples/simple/ruby_version_topology.rb @@ -0,0 +1,32 @@ +require 'red_storm' + +# this example topology only prints the Ruby version string. No tuple is emitted. + +module RedStorm + module Examples + class VersionSpout < RedStorm::SimpleSpout + output_fields :dummy + on_init {log.info("****** JRuby version #{RUBY_VERSION}")} + on_send {} + end + + class RubyVersionTopology < RedStorm::SimpleTopology + spout VersionSpout + + configure do |env| + debug true + + # set the JRuby version property for this topology. this will only affect remote cluster execution + # for local execution use the --1.8|--1.9 switch when launching + set("topology.worker.childopts", "-Djruby.compat.version=RUBY1_9") + end + + on_submit do |env| + if env == :local + sleep(1) + cluster.shutdown + end + end + end + end +end \ No newline at end of file diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index ead1b1a..067644b 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -13,15 +13,19 @@ module RedStorm if ["install", "examples", "jar"].include?(args[0]) load(TASKS_FILE) Rake::Task[args.shift].invoke(*args) - elsif args.size == 2 && ["local"].include?(args[0]) && File.exist?(args[1]) - load(TASKS_FILE) - Rake::Task['launch'].invoke(*args) - else - usage + exit + elsif args.size >= 2 && args.include?("local") + args.delete("local") + version = args.delete("--1.8") || args.delete("--1.9") || "--1.8" + if args.size == 1 + file = args[0] + load(TASKS_FILE) + Rake::Task['launch'].invoke("local", version, file) + exit + end end - else - usage end + usage end end end \ No newline at end of file diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb index f39488e..c739da6 100644 --- a/lib/red_storm/proxy/bolt.rb +++ b/lib/red_storm/proxy/bolt.rb @@ -8,6 +8,7 @@ java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' java_package 'redstorm.proxy' diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb index 2789de0..f84c2d0 100644 --- a/lib/red_storm/proxy/spout.rb +++ b/lib/red_storm/proxy/spout.rb @@ -8,6 +8,7 @@ java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' java_package 'redstorm.proxy' @@ -25,6 +26,7 @@ java_package 'redstorm.proxy' # - fail(msg_id) # - close # + class Spout java_implements IRichSpout diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb index 251da38..4f21183 100644 --- a/lib/red_storm/simple_bolt.rb +++ b/lib/red_storm/simple_bolt.rb @@ -5,6 +5,10 @@ module RedStorm # DSL class methods + def self.log + @log ||= Logger.getLogger(self.name) + end + def self.output_fields(*fields) @fields = fields.map(&:to_s) end @@ -27,6 +31,10 @@ module RedStorm # DSL instance methods + def log + self.class.log + end + def unanchored_emit(*values) @collector.emit(Values.new(*values)) end diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index bfe17b4..8ad1c5c 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -9,6 +9,10 @@ module RedStorm self.spout_options.merge!(options) end + def self.log + @log ||= Logger.getLogger(self.name) + end + def self.output_fields(*fields) @fields = fields.map(&:to_s) end @@ -43,6 +47,10 @@ module RedStorm @collector.emit(Values.new(*values)) end + def log + self.class.log + end + # Spout proxy interface def next_tuple diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index 6f203dc..b675a71 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -64,6 +64,10 @@ module RedStorm @config = Config.new end + def set(attribute, value) + @config.put(attribute, value) + end + def method_missing(sym, *args) config_method = "set#{self.class.camel_case(sym)}" @config.send(config_method, *args) diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index fc4b48b..07e5236 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -21,6 +21,8 @@ java_import 'backtype.storm.tuple.Values' java_import 'redstorm.storm.jruby.JRubyBolt' java_import 'redstorm.storm.jruby.JRubySpout' +java_import 'org.apache.log4j.Logger' + java_package 'redstorm' # TopologyLauncher is the application entry point when launching a topology. Basically it will @@ -36,7 +38,7 @@ class TopologyLauncher env = args[0].to_sym class_path = args[1] - require class_path + require "./#{class_path}" # ./ for 1.9 compatibility topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : '' puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment") diff --git a/lib/tasks/red_storm.rake b/lib/tasks/red_storm.rake index 645ce49..4b84f1e 100644 --- a/lib/tasks/red_storm.rake +++ b/lib/tasks/red_storm.rake @@ -25,9 +25,10 @@ JRUBY_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/lib" SRC_EXAMPLES = "#{RedStorm::REDSTORM_HOME}/examples" DST_EXAMPLES = "#{CWD}/examples" -task :launch, :env, :class_file do |t, args| +task :launch, :env, :version, :class_file do |t, args| + version_token = args[:version] == "--1.9" ? "RUBY1_9" : "RUBY1_8" gem_home = ENV["GEM_HOME"].to_s.empty? ? " -Djruby.gem.home=`gem env home`" : "" - command = "java -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:env]} #{args[:class_file]}" + command = "java -Djruby.compat.version=#{version_token} -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:env]} #{args[:class_file]}" puts("launching #{command}") system(command) end