Refactor, update config file format, use STOMP header for push_id, support multiple STOMP hosts.
This commit is contained in:
parent
502d04c7ec
commit
82ca004c39
19
config.yml
19
config.yml
|
@ -1,4 +1,15 @@
|
|||
stomp_host: "localhost"
|
||||
stomp_port: 61613
|
||||
stomp_topic: "/topic/test_stomp"
|
||||
http_port: 8080
|
||||
stomp:
|
||||
inbox: "/topic/activepush"
|
||||
hosts:
|
||||
- host: "localhost"
|
||||
port: 61613
|
||||
ssl: false
|
||||
login: "guest"
|
||||
passcode: "guest"
|
||||
|
||||
logging:
|
||||
level: "WARN"
|
||||
file: "log/activepush.log"
|
||||
|
||||
http:
|
||||
port: 9191
|
||||
|
|
114
index.coffee
114
index.coffee
|
@ -1,5 +1,6 @@
|
|||
|
||||
fs = require "fs"
|
||||
path = require "path"
|
||||
http = require "http"
|
||||
express = require "express"
|
||||
socket_io = require "socket.io"
|
||||
|
@ -7,61 +8,78 @@ socket_io = require "socket.io"
|
|||
{ EventEmitter } = require "events"
|
||||
require "js-yaml"
|
||||
|
||||
config = require "./config.yml"
|
||||
exports.loadConfiguration = (p) ->
|
||||
require path.resolve(".", p)
|
||||
|
||||
# Create STOMP client
|
||||
stomp = new Stomp
|
||||
host: config.stomp_host
|
||||
port: config.stomp_port
|
||||
debug: false
|
||||
login: "guest"
|
||||
passcode: "guest"
|
||||
createSocketIOServer = (config, subscriptions) ->
|
||||
app = express()
|
||||
|
||||
# Create Socket.io and Express webserver
|
||||
app = express()
|
||||
server = http.createServer(app)
|
||||
io = socket_io.listen(server)
|
||||
app.get "/", (req, res) ->
|
||||
res.sendfile "#{__dirname}/demo.html"
|
||||
|
||||
app.get "/", (req, res) ->
|
||||
res.sendfile "#{__dirname}/demo.html"
|
||||
server = http.createServer(app)
|
||||
io = socket_io.listen(server)
|
||||
server.listen(config.http.port)
|
||||
console.log "SOCKET.IO listening on:", config.http.port
|
||||
|
||||
server.listen config.http_port
|
||||
console.info "Listening on port #{config.http_port}"
|
||||
io.sockets.on "connection", (socket) ->
|
||||
console.log "SOCKET.IO connection"
|
||||
socket.on "subscribe", (push_id) ->
|
||||
console.log "SOCKET.IO subscribe:", push_id
|
||||
listener = (data) ->
|
||||
socket.emit "message", data
|
||||
subscriptions.addListener push_id, listener
|
||||
socket.on "disconnect", ->
|
||||
console.log "SOCKET.IO disconnect, removing listener:", push_id
|
||||
subscriptions.removeListener push_id, listener
|
||||
|
||||
# EventEmitter used to keep track of subscriptions
|
||||
subscriptions = new EventEmitter()
|
||||
[app, io]
|
||||
|
||||
# STOMP
|
||||
stomp.connect()
|
||||
stomp.on "connected", ->
|
||||
console.log "STOMP connection: #{config.stomp_host}:#{config.stomp_port}"
|
||||
stomp.subscribe { destination: config.stomp_topic, ack: "client" }, (body, headers) ->
|
||||
console.log body, headers
|
||||
try
|
||||
data = JSON.parse body[0]
|
||||
console.log data
|
||||
push_id = data.push_id #body[0]
|
||||
message = data.message #body[0]
|
||||
createStompConnection = (config, host, subscriptions) ->
|
||||
console.log "STOMP connecting to: #{host.host}:#{host.port}#{config.stomp.inbox}"
|
||||
|
||||
stomp = new Stomp host
|
||||
stomp.connect()
|
||||
stomp.on "connected", ->
|
||||
console.log "STOMP connected: #{host.host}:#{host.port}#{config.stomp.inbox}"
|
||||
stomp.subscribe { destination: config.stomp.inbox, ack: "client" }, (body, headers) ->
|
||||
push_id = headers.push_id
|
||||
message = body[0]
|
||||
console.log "STOMP message:", push_id, message
|
||||
subscriptions.emit push_id, message
|
||||
catch e
|
||||
console.warn "error", e
|
||||
# stomp.on "message", (message) ->
|
||||
# stomp.ack message.headers['message-id']
|
||||
|
||||
stomp.on "error", (error) ->
|
||||
console.log "STOMP error:", error.body
|
||||
# stomp.disconnect()
|
||||
|
||||
stomp.on "error", (errorFrame) ->
|
||||
# TODO: reconnect? exit?
|
||||
console.error errorFrame.body
|
||||
stomp.disconnect()
|
||||
stomp.on "disconnected", (_) ->
|
||||
console.log "STOMP disconnected: #{host.host}:#{host.port}"
|
||||
|
||||
# Socket.io
|
||||
io.sockets.on "connection", (socket) ->
|
||||
console.log "Socket.io connection."
|
||||
socket.on "subscribe", (push_id) ->
|
||||
console.log "subscribe to #{push_id}"
|
||||
listener = (data) ->
|
||||
socket.emit "message", data
|
||||
subscriptions.addListener push_id, listener
|
||||
socket.on "disconnect", ->
|
||||
console.log "disconnect, removing listener for #{push_id}"
|
||||
subscriptions.removeListener push_id, listener
|
||||
stomp
|
||||
|
||||
exports.stompPublish = (host, destination, push_id, message) ->
|
||||
stomp = new Stomp host
|
||||
stomp.connect()
|
||||
stomp.on "connected", ->
|
||||
stomp.send
|
||||
destination: destination
|
||||
push_id: push_id
|
||||
body: message
|
||||
persistent: false
|
||||
, false
|
||||
stomp.disconnect()
|
||||
|
||||
exports.main = (config) ->
|
||||
# EventEmitter used to keep track of subscriptions
|
||||
subscriptions = new EventEmitter()
|
||||
|
||||
# Create one Socket.io server
|
||||
createSocketIOServer(config, subscriptions)
|
||||
|
||||
# # Create one or more STOMP connections
|
||||
for host in config.stomp.hosts
|
||||
createStompConnection(config, host, subscriptions)
|
||||
|
||||
if require.main is module
|
||||
config = exports.loadConfiguration(process.argv[2] or "./config.yml")
|
||||
exports.main config
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
"description": "",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
"test": "./node_modules/.bin/mocha --compilers coffee:coffee-script tests"
|
||||
},
|
||||
"repository": "",
|
||||
"author": "",
|
||||
|
@ -13,6 +13,8 @@
|
|||
"socket.io": "~0.9.16",
|
||||
"stomp": "~0.1.1",
|
||||
"js-yaml": "~2.1.0",
|
||||
"express": "~3.3.4"
|
||||
"express": "~3.3.4",
|
||||
"mocha": "~1.12.0",
|
||||
"coffee-script": "~1.6.3"
|
||||
}
|
||||
}
|
||||
|
|
35
test-push
35
test-push
|
@ -1,39 +1,12 @@
|
|||
#!/usr/bin/env coffee
|
||||
|
||||
{ Stomp } = require "stomp"
|
||||
|
||||
require "js-yaml"
|
||||
config = require "./config.yml"
|
||||
activepush = require "./index"
|
||||
|
||||
push_id = process.argv[2] or "demo"
|
||||
message = process.argv[3] or "hello world"
|
||||
|
||||
receipt = false
|
||||
persistent = false
|
||||
config = activepush.loadConfiguration "config.yml"
|
||||
|
||||
stomp = new Stomp
|
||||
host: config.stomp_host
|
||||
port: config.stomp_port
|
||||
debug: false
|
||||
login: "guest"
|
||||
passcode: "guest"
|
||||
console.log { config, push_id, message }
|
||||
|
||||
stomp.connect()
|
||||
stomp.on "connected", ->
|
||||
body = JSON.stringify
|
||||
push_id: push_id
|
||||
message: message
|
||||
|
||||
console.log "Sending to STOMP broker #{config.stomp_host}:#{config.stomp_port}"
|
||||
console.log body
|
||||
|
||||
stomp.send
|
||||
destination: config.stomp_topic
|
||||
body: body
|
||||
persistent: persistent
|
||||
, receipt
|
||||
stomp.disconnect()
|
||||
|
||||
stomp.on "error", (error) ->
|
||||
console.error error.body
|
||||
stomp.disconnect()
|
||||
activepush.stompPublish config.stomp.hosts[0], config.stomp.inbox, push_id, message
|
||||
|
|
Loading…
Reference in New Issue