Handle running as a JMS-based Stapfen::Worker
This commit is contained in:
parent
272870ed4f
commit
9d5b4cbeef
1
Gemfile
1
Gemfile
|
@ -4,6 +4,7 @@ source 'https://rubygems.org'
|
|||
gemspec
|
||||
|
||||
gem 'jruby-jms', :platform => :jruby
|
||||
gem 'stomp', '>= 1.2.14'
|
||||
|
||||
group :development do
|
||||
gem 'rake'
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
module Stapfen
|
||||
class Destination
|
||||
attr_accessor :name, :type
|
||||
|
||||
def queue?
|
||||
@type == :queue
|
||||
end
|
||||
|
||||
def topic?
|
||||
@type == :topic
|
||||
end
|
||||
|
||||
def as_stomp
|
||||
if queue?
|
||||
return "/queue/#{@name}"
|
||||
end
|
||||
|
||||
if topic?
|
||||
return "/topic/#{@name}"
|
||||
end
|
||||
end
|
||||
|
||||
# Create a {Stapfen::Destination} from the given string
|
||||
#
|
||||
# @param [String] name
|
||||
# @return [Stapfen::Destination]
|
||||
def self.from_string(name)
|
||||
destination = self.new
|
||||
pieces = name.split('/')
|
||||
destination.type = pieces[1].to_sym
|
||||
destination.name = pieces[2 .. -1].join('/')
|
||||
return destination
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,15 +1,22 @@
|
|||
require 'stomp'
|
||||
require 'stapfen/logger'
|
||||
require 'stapfen/destination'
|
||||
require 'stapfen/message'
|
||||
|
||||
module Stapfen
|
||||
class Worker
|
||||
include Stapfen::Logger
|
||||
|
||||
# Class variables!
|
||||
@@signals_handled = false
|
||||
@@workers = []
|
||||
|
||||
# Class instance variables!
|
||||
@use_stomp = true
|
||||
|
||||
class << self
|
||||
attr_accessor :configuration, :consumers, :logger, :destructor
|
||||
|
||||
end
|
||||
|
||||
# Instantiate a new +Worker+ instance and run it
|
||||
|
@ -33,6 +40,51 @@ module Stapfen
|
|||
@configuration = block
|
||||
end
|
||||
|
||||
# Force the worker to use STOMP as the messaging protocol (default)
|
||||
#
|
||||
# @return [Boolean]
|
||||
def self.use_stomp!
|
||||
begin
|
||||
require 'stomp'
|
||||
rescue LoadError
|
||||
puts "You need the `stomp` gem to be installed to use stomp!"
|
||||
raise
|
||||
end
|
||||
|
||||
@use_stomp = true
|
||||
return true
|
||||
end
|
||||
|
||||
def self.stomp?
|
||||
@use_stomp
|
||||
end
|
||||
|
||||
# Force the worker to use JMS as the messaging protocol.
|
||||
#
|
||||
# *Note:* Only works under JRuby
|
||||
#
|
||||
# @return [Boolean]
|
||||
def self.use_jms!
|
||||
unless RUBY_PLATFORM == 'java'
|
||||
raise Stapfen::ConfigurationError, "You cannot use JMS unless you're running under JRuby!"
|
||||
end
|
||||
|
||||
begin
|
||||
require 'java'
|
||||
require 'jms'
|
||||
rescue LoadError
|
||||
puts "You need the `jms` gem to be installed to use JMS!"
|
||||
raise
|
||||
end
|
||||
|
||||
@use_stomp = false
|
||||
return true
|
||||
end
|
||||
|
||||
def self.jms?
|
||||
!(@use_stomp)
|
||||
end
|
||||
|
||||
# Optional method, should be passed a block which will yield a {{Logger}}
|
||||
# instance for the Stapfen worker to use
|
||||
def self.log
|
||||
|
@ -105,6 +157,53 @@ module Stapfen
|
|||
attr_accessor :client
|
||||
|
||||
def run
|
||||
if self.class.stomp?
|
||||
run_stomp
|
||||
elsif self.class.jms?
|
||||
run_jms
|
||||
end
|
||||
end
|
||||
|
||||
def run_jms
|
||||
JMS::Connection.start(self.class.configuration.call) do |connection|
|
||||
@client = connection
|
||||
debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")
|
||||
|
||||
self.class.consumers.each do |name, headers, block|
|
||||
destination = Stapfen::Destination.from_string(name)
|
||||
type = 'queue'
|
||||
options = {}
|
||||
|
||||
if destination.queue?
|
||||
options[:queue_name] = destination.name
|
||||
end
|
||||
|
||||
if destination.topic?
|
||||
type = 'topic'
|
||||
options[:topic_name] = destination.name
|
||||
end
|
||||
|
||||
method_name = "handle_#{type}_#{name}".to_sym
|
||||
self.class.send(:define_method, method_name, &block)
|
||||
|
||||
connection.on_message(options) do |m|
|
||||
message = Stapfen::Message.from_jms(m)
|
||||
self.send(method_name, message)
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
loop do
|
||||
sleep 1
|
||||
end
|
||||
debug("Exiting the JMS runloop for #{self}")
|
||||
rescue Interrupt
|
||||
exit_cleanly
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def run_stomp
|
||||
@client = Stomp::Client.new(self.class.configuration.call)
|
||||
debug("Running with #{@client} inside of Thread:#{Thread.current.inspect}")
|
||||
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
require 'spec_helper'
|
||||
require 'stapfen/destination'
|
||||
|
||||
describe Stapfen::Destination do
|
||||
it { should respond_to :name }
|
||||
it { should respond_to :type }
|
||||
|
||||
describe '#as_stomp' do
|
||||
let(:name) { 'rspec/dlq' }
|
||||
|
||||
subject(:destination) do
|
||||
d = described_class.new
|
||||
d.type = :queue
|
||||
d.name = name
|
||||
d.as_stomp
|
||||
end
|
||||
|
||||
it { should be_instance_of String }
|
||||
it { should eql "/queue/#{name}" }
|
||||
end
|
||||
|
||||
context 'class methods' do
|
||||
describe '#from_string' do
|
||||
subject(:destination) { described_class.from_string(name) }
|
||||
|
||||
context 'a simple queue' do
|
||||
let(:name) { '/queue/rspec' }
|
||||
its(:name) { should eql 'rspec' }
|
||||
its(:type) { should eql :queue }
|
||||
end
|
||||
|
||||
context 'a queue with slashes' do
|
||||
let(:name) { '/queue/rspec/dlq' }
|
||||
its(:name) { should eql 'rspec/dlq' }
|
||||
its(:type) { should eql :queue }
|
||||
end
|
||||
|
||||
context 'a complex topic' do
|
||||
let(:name) { '/topic/rspec/dlq' }
|
||||
its(:name) { should eql 'rspec/dlq' }
|
||||
its(:type) { should eql :topic }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -13,6 +13,31 @@ describe Stapfen::Worker do
|
|||
it { should respond_to :log }
|
||||
it { should respond_to :shutdown }
|
||||
|
||||
describe '#use_stomp!' do
|
||||
subject(:result) { worker.use_stomp! }
|
||||
|
||||
it 'should update the instance variable' do
|
||||
expect(result).to be_true
|
||||
expect(worker).to be_stomp
|
||||
expect(worker).not_to be_jms
|
||||
end
|
||||
end
|
||||
|
||||
describe '#use_jms!', :java => true do
|
||||
subject(:result) { worker.use_jms! }
|
||||
|
||||
after :each do
|
||||
# Reset to the default since we've modified the class
|
||||
worker.use_stomp!
|
||||
end
|
||||
|
||||
it 'should update the instance variable' do
|
||||
expect(result).to be_true
|
||||
expect(worker).to be_jms
|
||||
expect(worker).not_to be_stomp
|
||||
end
|
||||
end
|
||||
|
||||
describe '#configure' do
|
||||
it 'should error when not passed a block' do
|
||||
expect {
|
||||
|
|
|
@ -16,7 +16,4 @@ Gem::Specification.new do |gem|
|
|||
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
|
||||
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
|
||||
gem.require_paths = ["lib"]
|
||||
|
||||
|
||||
gem.add_dependency('stomp', '>= 1.2.14') # 1.2.14 fixes Stomp::Client#unreceive behavior
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue