diff --git a/generator.rb b/generator.rb new file mode 100644 index 0000000..a66c30b --- /dev/null +++ b/generator.rb @@ -0,0 +1,59 @@ +require 'erb' +require 'pry' + +require 'java' +require 'active_support/core_ext' + +Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) } + + +PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb") +PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb") + +to_generate = ["storm.trident.operation.Function"] + + +# Return all java functions of a java class +def get_functions(jlass) + jlass.declared_instance_methods.concat( jlass.interfaces.map{|i| get_functions(i) }.flatten ) +end + +# Return all java deps of a class +def get_java_deps(functions, klass) + functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass +end + +to_generate.each do |klass| + _functions = get_functions(Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class) + + java_deps = get_java_deps(_functions, klass) + + + # Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } + functions = _functions.reduce({}) do |memo, f| + before_serialization = %w{ }.include?(f.name.to_s) + memoize = %w{ prepare execute }.include?(f.name.to_s) + memo[:"#{f.name}"] = { + :return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void", + :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)}, + :before_serialization => before_serialization, + :memoize => memoize + } + memo + end + + interface_name = klass.split(".")[-1] + + # IBlah to ProxyBlah if IBlah + ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}" + + java_class_name = "JRuby#{ruby_class_name}" + + # Rubyify java functions into {:method_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } + methods = functions.map do |f_name, params| + {f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }} + end.reduce({}){|m,o| m.merge(o)} + + File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) } + File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) } +end diff --git a/java_proxy.erb b/java_proxy.erb new file mode 100644 index 0000000..c154169 --- /dev/null +++ b/java_proxy.erb @@ -0,0 +1,51 @@ +package redstorm.storm.jruby; +<% java_deps.each do |dep| %> +import <%= dep %>;<% end %> + +public class <%= java_class_name %> implements <%= interface_name %> { + <%= interface_name %> _proxy; + String _realClassName; + String _baseClassPath; + String[] _fields; + + public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) { + _baseClassPath = baseClassPath; + _realClassName = realClassName; + _fields = fields; + } + +<% functions.each do |function_name, params| %> + @Override + public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) { + <% if function_name == :open %> + _proxy = newProxy(_baseClassPath, _realClassName); + _proxy.open(<%= params[:args].keys.flatten.join(', ') %>); + <% elsif function_name == :declareOutputFields %> + if (_fields.length > 0) { + <%= params[:args].values[0] %>.declare(new Fields(_fields)); + } else { + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + } + <% elsif params[:before_serialization] %> + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + <% elsif params[:memoize] %> + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + <% else %> + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + <% end %> + } +<% end %> + + private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) { + try { + redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/lib/red_storm.rb b/lib/red_storm.rb index 4321e7e..ea6b0cd 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -6,3 +6,4 @@ require 'red_storm/configuration' require 'red_storm/simple_bolt' require 'red_storm/simple_spout' require 'red_storm/simple_topology' +require 'red_storm/simple_drpc_topology' diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index 370ac04..76d167b 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -7,7 +7,7 @@ TARGET_LIB_DIR = "#{TARGET_DIR}/lib" TARGET_SRC_DIR = "#{TARGET_DIR}/src" TARGET_GEM_DIR = "#{TARGET_DIR}/gems/gems" TARGET_SPECS_DIR = "#{TARGET_DIR}/gems/specifications" -TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes" +TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes" TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency" TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked" TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar" @@ -26,9 +26,9 @@ CUSTOM_IVY_SETTINGS = "#{DST_IVY_DIR}/settings.xml" module RedStorm - - class Application - TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake" + + class Application + TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake" def self.local_storm_command(class_file, ruby_mode = nil) src_dir = File.expand_path(File.dirname(class_file)) @@ -38,7 +38,7 @@ module RedStorm def self.cluster_storm_command(class_file, ruby_mode = nil) "storm jar #{TARGET_CLUSTER_JAR} -Djruby.compat.version=#{RedStorm.jruby_mode_token(ruby_mode)} redstorm.TopologyLauncher cluster #{class_file}" end - + def self.usage puts("usage: redstorm version") puts(" redstorm install") @@ -82,4 +82,4 @@ module RedStorm end -end \ No newline at end of file +end diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb new file mode 100644 index 0000000..21a4e9d --- /dev/null +++ b/lib/red_storm/proxy/batch_spout.rb @@ -0,0 +1,71 @@ +require 'java' + +java_import 'storm.trident.operation.TridentCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'storm.trident.spout.IBatchSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the +# Java artifacts when creating a spout. +# +# The real batch spout class implementation must define these methods: +# - open(conf, context, collector) +# - emitBatch +# - getOutputFields +# - ack(batch_id) +# +# and optionnaly: +# - close +# + +class BatchSpout + java_implements IBatchSpout + + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'void open(Map, TopologyContext)' + def open(conf, context) + @real_spout.open(conf, context) + end + + java_signature 'void close()' + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) + end + + java_signature 'void ack(long)' + def ack(batch_id) + @real_spout.ack(batch_id) + end + + java_signature 'Fields getOutputFields()' + def getOutputFields + @real_spout.get_output_fields() + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end diff --git a/lib/red_storm/proxy/proxy_function.rb b/lib/red_storm/proxy/proxy_function.rb new file mode 100644 index 0000000..d0e7944 --- /dev/null +++ b/lib/red_storm/proxy/proxy_function.rb @@ -0,0 +1,48 @@ +require 'java' + + +java_import 'storm.trident.tuple.TridentTuple' + +java_import 'storm.trident.operation.TridentCollector' + +java_import 'java.util.Map' + +java_import 'storm.trident.operation.TridentOperationContext' + +java_import 'storm.trident.operation.Function' + + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class ProxyFunction + java_implements Function + + java_signature 'Function (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new + rescue NameError + require base_class_path + @real = Object.module_eval(real_class_name).new + end + + java_signature 'void execute(TridentTuple, TridentCollector)' + def execute(_trident_tuple, _trident_collector) + @real.execute(_trident_tuple, _trident_collector) + end + + java_signature 'void cleanup()' + def cleanup() + @real.cleanup() + end + + java_signature 'void prepare(Map, TridentOperationContext)' + def prepare(_map, _trident_operation_context) + @real.prepare(_map, _trident_operation_context) + end + + +end diff --git a/lib/red_storm/simple_drpc_topology.rb b/lib/red_storm/simple_drpc_topology.rb new file mode 100644 index 0000000..ab2a318 --- /dev/null +++ b/lib/red_storm/simple_drpc_topology.rb @@ -0,0 +1,87 @@ +require 'java' +require 'red_storm/configuration' +require 'red_storm/configurator' + +module RedStorm + + class InputBoltDefinition < SimpleTopology::BoltDefinition + attr_accessor :grouping + + def initialize(*args) + super + @grouping = :none + end + + def grouping(grouping) + @grouping = @grouping + end + + def define_grouping(declarer) + + case @grouping + when :fields + declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s)))) + when :global + declarer.globalGrouping() + when :shuffle + declarer.shuffleGrouping() + when :local_or_shuffle + declarer.localOrShuffleGrouping() + when :none + declarer.noneGrouping() + when :all + declarer.allGrouping() + when :direct + declarer.directGrouping() + else + raise("unknown grouper=#{grouper.inspect}") + end + end + end + + class SimpleDRPCTopology < SimpleTopology + + def self.spout + raise TopologyDefinitionError, "DRPC spout is already defined" + end + + def start(base_class_path, env) + builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name) + + self.class.bolts.each do |bolt| + declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) + declarer.addConfigurations(bolt.config) + bolt.define_grouping(declarer) + end + + # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode + defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} + + configurator = Configurator.new(defaults) + configurator.instance_exec(env, &self.class.configure_block) + + drpc = nil + if env == :local + drpc = LocalDRPC.new + submitter = @cluster = LocalCluster.new + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc)) + else + submitter = StormSubmitter + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology) + end + instance_exec(env, drpc, &self.class.submit_block) + end + + def self.input_bolt(bolt_class, *args, &bolt_block) + options = args.last.is_a?(Hash) ? args.pop : {} + contructor_args = !args.empty? ? args.pop : [] + bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + + bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) + raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? + bolt.instance_exec(&bolt_block) + self.components << bolt + end + end + +end diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index b537c1a..e5f72f0 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -36,7 +36,7 @@ module RedStorm end class SpoutDefinition < ComponentDefinition - + # WARNING non-dry see BoltDefinition#new_instance def new_instance(base_class_path) if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout" @@ -49,7 +49,7 @@ module RedStorm # is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) end end - + class BoltDefinition < ComponentDefinition attr_accessor :sources, :command @@ -119,7 +119,7 @@ module RedStorm def self.bolt(bolt_class, *args, &bolt_block) options = args.last.is_a?(Hash) ? args.pop : {} contructor_args = !args.empty? ? args.pop : [] - bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + bolt_options = {:id => options[:id] ? options[:id] : self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? @@ -164,7 +164,7 @@ module RedStorm configurator = Configurator.new(defaults) configurator.instance_exec(env, &self.class.configure_block) - + submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) instance_exec(env, &self.class.submit_block) diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 38c3930..d39e494 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -6,18 +6,21 @@ module Backtype end java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.LocalDRPC' java_import 'backtype.storm.StormSubmitter' java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.drpc.LinearDRPCTopologyBuilder' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Values' java_import 'redstorm.storm.jruby.JRubyBolt' java_import 'redstorm.storm.jruby.JRubySpout' +java_import 'redstorm.storm.jruby.JRubyBatchSpout' java_package 'redstorm' -# TopologyLauncher is the application entry point when launching a topology. Basically it will +# TopologyLauncher is the application entry point when launching a topology. Basically it will # call require on the specified Ruby topology class file path and call its start method class TopologyLauncher @@ -36,14 +39,14 @@ class TopologyLauncher $:.unshift File.expand_path(launch_path + '/lib') $:.unshift File.expand_path(launch_path + '/target/lib') - require "#{class_path}" + require "#{class_path}" topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : '' puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment") RedStorm::Configuration.topology_class.new.start(class_path, env) end - private + private def self.camel_case(s) s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } diff --git a/lib/tasks/red_storm.rake b/lib/tasks/red_storm.rake index 2eb765a..7be840f 100644 --- a/lib/tasks/red_storm.rake +++ b/lib/tasks/red_storm.rake @@ -11,7 +11,7 @@ require 'jruby/jrubyc' require 'red_storm' require 'red_storm/application' -DEP_STORM_VERSION = "0.8.1" +DEP_STORM_VERSION = "0.8.2" DEP_JRUBY_VERSION = "1.6.8" INSTALL_IVY_VERSION = "2.2.0" @@ -27,7 +27,7 @@ DEFAULT_DEPENDENCIES = { task :launch, :env, :ruby_mode, :class_file do |t, args| # use ruby mode parameter or default to current interpreter version version_token = RedStorm.jruby_mode_token(args[:ruby_mode]) - + command = case args[:env] when "local" RedStorm::Application.local_storm_command(args[:class_file], args[:ruby_mode]) @@ -170,7 +170,7 @@ task :deps => "ivy:install" do artifact, transitive = dependency.split(/\s*,\s*/) ivy_retrieve(*artifact.split(':').concat([transitive.split(/\s*=\s*/).last, "#{TARGET_DEPENDENCY_DIR}/topology", "default"])) end -end +end task :jar, [:include_dir] => [:clean_jar] do |t, args| puts("\n--> Generating JAR file #{TARGET_CLUSTER_JAR}") @@ -230,8 +230,8 @@ def build_java_dir(source_folder) 'listfiles' => true ) do # compilerarg :value => "-Xlint:unchecked" - end -end + end +end def build_jruby(source_path) puts("\n--> Compiling JRuby") diff --git a/ruby_proxy.erb b/ruby_proxy.erb new file mode 100644 index 0000000..d7c94eb --- /dev/null +++ b/ruby_proxy.erb @@ -0,0 +1,30 @@ +require 'java' + +<% java_deps.each do |dep| %> +java_import '<%= dep %>' +<% end %> + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class <%= ruby_class_name %> + java_implements <%= interface_name %> + + java_signature '<%= interface_name %> (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new + rescue NameError + require base_class_path + @real = Object.module_eval(real_class_name).new + end +<% methods.each do |method_name, params| %> + java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)' + def <%= method_name %>(<%= params[:args].keys.join(', ') %>) + @real.<%= method_name %>(<%= params[:args].keys.join(', ') %>) + end +<% end %> + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java new file mode 100644 index 0000000..9d8d61f --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -0,0 +1,89 @@ +package redstorm.storm.jruby; + +import storm.trident.operation.TridentCollector; +import backtype.storm.task.TopologyContext; +import storm.trident.spout.IBatchSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; +import java.util.Map; + +/** + * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ +public class JRubyBatchSpout implements IBatchSpout { + IBatchSpout _proxySpout; + String _realSpoutClassName; + String _baseClassPath; + String[] _fields; + + /** + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + _baseClassPath = baseClassPath; + _realSpoutClassName = realSpoutClassName; + _fields = fields; + } + + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } + + @Override + public void emitBatch(final long batchId, final TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); + } + + + @Override + public void close() { + _proxySpout.close(); + } + + @Override + public void ack(final long batchId) { + _proxySpout.ack(batchId); + } + + @Override + public Fields getOutputFields() { + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/redstorm/storm/jruby/JRubyProxyFunction.java b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java new file mode 100644 index 0000000..c6030ab --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java @@ -0,0 +1,59 @@ +package redstorm.storm.jruby; + +import storm.trident.tuple.TridentTuple; +import storm.trident.operation.TridentCollector; +import java.util.Map; +import storm.trident.operation.TridentOperationContext; +import storm.trident.operation.Function; + +public class JRubyProxyFunction implements Function { + Function _proxy; + String _realClassName; + String _baseClassPath; + String[] _fields; + + public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) { + _baseClassPath = baseClassPath; + _realClassName = realClassName; + _fields = fields; + } + + + @Override + public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.execute(_tridentTuple, _tridentCollector); + + } + + @Override + public void cleanup() { + + _proxy.cleanup(); + + } + + @Override + public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.prepare(_map, _tridentOperationContext); + + } + + + private static Function newProxy(final String baseClassPath, final String realClassName) { + try { + redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +}