Create Secrets Serializer

Create Consul Service
Create SecretsWatcher backed by Consul
This commit is contained in:
Will Kimeria 2015-01-08 18:13:17 -08:00
parent 10adf0ab8d
commit 0ba65d0746
10 changed files with 261 additions and 35 deletions

View File

@ -17,12 +17,11 @@ import scala.collection.JavaConversions._
object BorderPatrolApp extends TwitterServer {
lazy val upstreams = getUpstreamClients
val loginPipeline = new LoginFilter andThen new LoginService
val sessionFilter = new SessionFilter
val authService = sessionFilter andThen new AuthService(new TokenService, loginPipeline)
val upstreamService = new UpstreamService(authService, upstreams)
val upstreamService = new UpstreamService(authService)
val upstreamFilter = new UpstreamFilter(authService)
val routingFilter = new RoutingFilter
@ -33,6 +32,8 @@ object BorderPatrolApp extends TwitterServer {
val authPipeline = basePipeline andThen authService
val upstreamPipeline = basePipeline andThen upstreamFilter
def main() {
val server = ServerBuilder()
.codec(Http())
@ -46,28 +47,6 @@ object BorderPatrolApp extends TwitterServer {
Await.ready(adminHttpServer)
}
/**
* Build upstream clients from borderpatrol.conf. A map of the clients (where service name is the key)
* gets passed to the UpstreamService, which dispatches requests based on the service name
* @return
*/
def getUpstreamClients: Map[String, Service[HttpRequest, HttpResponse]] = {
val conf = ConfigFactory.parseReader(new FileReader("borderpatrol.conf"))
val services = conf.getConfigList("services").toList
case class ServiceConfiguration(name: String, friendlyName: String, hosts: String, rewriteRule: String) {}
val clients = services map(s =>
(s.getString("name"),
ClientBuilder()
.codec(Http())
.hosts(s.getString("hosts"))
.hostConnectionLimit(10)
.loadBalancer(HeapBalancerFactory.toWeighted)
.retries(2)
.build()))
clients.toMap
}
/**
* Run Mock Services
*/

View File

@ -0,0 +1,17 @@
package com.lookout.borderpatrol
import com.twitter.finagle.Service
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http._
/**
* Call Consul Service endPoint
*/
class ConsulService extends Service[HttpRequest, HttpResponse] {
def apply(request: HttpRequest) = {
println("------------------------------ ConsulService----------------------------->")
upstreams.get("consul") map (svc =>
svc(request) filter (_.getStatus == HttpResponseStatus.OK)
) getOrElse (Future.value(Responses.NotFound()))
}
}

View File

@ -9,8 +9,7 @@ import org.jboss.netty.handler.codec.http._
* Generic upstream service
* @param authService
*/
class UpstreamService(authService: Service[RoutedRequest, HttpResponse],
upstreams: Map[String,Service[HttpRequest, HttpResponse]]) extends Service[RoutedRequest, FinagleResponse] {
class UpstreamService(authService: Service[RoutedRequest, HttpResponse]) extends Service[RoutedRequest, FinagleResponse] {
def apply(request: RoutedRequest) = {
println("------------------------------ UpstreamService " + request.getUri + "----------------------------->")
val service = request.service

View File

@ -1,9 +1,14 @@
package com.lookout
import java.io.FileReader
import java.net.{InetAddress, InetSocketAddress}
import com.lookout.borderpatrol.session._
import com.twitter.finagle.http.{Request => FinagleRequest, Response => FinagleResponse}
import com.twitter.finagle.Service
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.{Request => FinagleRequest, Response => FinagleResponse, Http}
import com.twitter.finagle.loadbalancer.HeapBalancerFactory
import com.typesafe.config.ConfigFactory
import org.jboss.netty.handler.codec.http._
import scala.collection.JavaConversions._
@ -75,19 +80,42 @@ package object borderpatrol {
object Responses {
object NotFound {
def apply(httpVersion: HttpVersion = HttpVersion.HTTP_1_1): Response =
Response(new DefaultHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND))
def apply(httpVersion: HttpVersion = HttpVersion.HTTP_1_1): HttpResponse =
new DefaultHttpResponse(httpVersion, HttpResponseStatus.NOT_FOUND)
}
object OK {
def apply(httpVersion: HttpVersion = HttpVersion.HTTP_1_1): Response =
Response(new DefaultHttpResponse(httpVersion, HttpResponseStatus.OK))
def apply(httpVersion: HttpVersion = HttpVersion.HTTP_1_1): HttpResponse =
new DefaultHttpResponse(httpVersion, HttpResponseStatus.OK)
}
}
/**
* Build upstream clients from borderpatrol.conf. A map of the clients (where service name is the key)
* gets passed to the UpstreamService, which dispatches requests based on the service name
* @return
*/
def getUpstreamClients: Map[String, Service[HttpRequest, HttpResponse]] = {
val conf = ConfigFactory.parseReader(new FileReader("borderpatrol.conf"))
val services = conf.getConfigList("services").toList
case class ServiceConfiguration(name: String, friendlyName: String, hosts: String, rewriteRule: String) {}
val clients = services map(s =>
(s.getString("name"),
ClientBuilder()
.codec(Http())
.hosts(s.getString("hosts"))
.hostConnectionLimit(10)
.loadBalancer(HeapBalancerFactory.toWeighted)
.retries(2)
.build()))
clients.toMap
}
//Unsuccessful Response
case class NeedsAuthResponse(httpResponse: HttpResponse) extends FinagleResponse //with BorderPatrolResponse
//Successful response
case class Response(httpResponse: HttpResponse) extends FinagleResponse //with BorderPatrolResponse
implicit val upstreams = getUpstreamClients
}

View File

@ -23,4 +23,4 @@ case class Secret(expiry: Time,
id: Byte = Generator(1).head,
entropy: List[Byte] = Generator(16).toList) extends ASecret
case class Secrets(current: Secret, previous: Secret)
case class Secrets(current: Secret, previous: Secret)

View File

@ -1,8 +1,12 @@
package com.lookout.borderpatrol.session
import com.twitter.util.Time
import com.twitter.util.{Throw, Time}
import scala.concurrent.duration.Duration
import scala.concurrent.{Future => ScalaFuture, Promise => ScalaPromise, Await}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import com.lookout.borderpatrol.session._
import concurrent.ExecutionContext.Implicits.global
/**
* This prototypes out an API for the SecretStore, keeping secrets in memory
@ -19,6 +23,51 @@ sealed trait SecretStoreApi {
def find(f: (Secret) => Boolean): Try[Secret]
}
case class ConsulSecretStore(watcher: SecretsWatcherApi) extends SecretStoreApi {
//During initialization, we want this to be a hard failure that prevents server from starting
//private[this] var _secrets: Secrets = Await.result[Secrets]( watcher.getSecrets,Duration(2000, MILLISECONDS))
//If we unable to get secrets. Hard fail
private[this] var _secrets: Secrets = watcher.initialSecrets match {
case Success(secrets) => secrets
case Failure(f) => {
throw new IllegalArgumentException(f)
}
}
/**
* Get Next secrets
* @return
*/
private def nextSecrets: Secrets = {
watcher.getNext match {
case Success(newSecrets) => newSecrets
case Failure(f) => {
//Do something indicating we got an error
println(s"Unable to get new secrets, logging.........................>$f")
_secrets
}
}
}
def current = {
val c = _secrets.current
if (c.expiry > Time.now && c.expiry <= SecretExpiry.currentExpiry) c
else {
println("Secrets have expired..................................................................................>")
_secrets = nextSecrets
}
_secrets.current
}
def previous = _secrets.previous
def find(f: (Secret) => Boolean) =
if (f(current)) Success(current)
else if (f(previous)) Success(previous)
else Failure(new Exception("No matching secrets found"))
}
case class InMemorySecretStore(secrets: Secrets) extends SecretStoreApi {
import com.lookout.borderpatrol.session.SecretExpiry._
private[this] var _secrets: Secrets = secrets

View File

@ -0,0 +1,110 @@
package com.lookout.borderpatrol.session
import argonaut.Argonaut._
import argonaut._
import com.lookout.borderpatrol.ConsulService
import com.twitter.io.Charsets
import org.jboss.netty.handler.codec.http._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future => ScalaFuture, Promise => ScalaPromise}
import scala.util.Try
sealed trait SecretsWatcherApi {
def initialSecrets: Try[Secrets]
def getNext: Try[Secrets]
}
object ConsulSecretsWatcher extends SecretsWatcherApi {
val consulService = new ConsulService
var currentModifyIndex: Int = 0
var _nextSecrets: ScalaFuture[Secrets] = ScalaPromise[Secrets]().future
def initialSecrets: Try[Secrets] = {
Try {
val secrets = Await.result[Secrets](getSecrets, Duration(3000, MILLISECONDS))
_nextSecrets = getSecrets
secrets
}
}
def getNext: Try[Secrets] = {
val next = _nextSecrets
_nextSecrets = getSecrets
Try(Await.result[Secrets](next, Duration(2500, MILLISECONDS)))
}
private def getSecrets: ScalaFuture[Secrets] = {
var promise = ScalaPromise[Secrets]()
//Use a ridiculously long timeout (48 hrs)
val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, s"/v1/kv/borderpatrol/secrets?wait=48h&index=$currentModifyIndex")
consulService(request).onSuccess { resp =>
handleResponse(resp) match {
case Left(response) => promise.failure(new IllegalStateException("An error occurred getting secrets from the Data Store:" + resp.getStatus))
case Right(content) => {
for {
secrets <- getSecretInformation(content)
secret <- newSecrets(secrets)
} yield promise.success(secret)
}
}
println("Retrieved from Consul: " + resp.getContent.toString(Charsets.Utf8))
}
promise.future
}
private def newSecrets(secrets: SecretData): Option[Secrets] = {
if (currentModifyIndex == 0 || secrets.ModifyIndex > currentModifyIndex)
decode(secrets.Value).asSecrets
else
None
}
private def decode(secret: String): String = {
new String(new sun.misc.BASE64Decoder().decodeBuffer(secret))
}
/**
* Handle http response. If 200, return body content, otherwise, return response
* @param response
* @return
*/
private def handleResponse(response: HttpResponse): Either[HttpResponse, String] = {
response.getStatus match {
case HttpResponseStatus.OK => Right(response.getContent.toString(Charsets.Utf8))
case _ => Left(response)
}
}
/**
* Parse Response from Consul and extract new secret and
* current ModifyIndex
* @param s
* @return
*/
def getSecretInformation(s: String): Option[SecretData] = {
implicit def SecretDataCodecJson = casecodec6(SecretData.apply, SecretData.unapply)("CreateIndex", "ModifyIndex", "LockIndex", "Key", "Flags", "Value")
for {
res <- Parse.decodeOption[List[SecretData]](s)
sData <- res.headOption
} yield sData
}
/**
* The SecretData class represents a row of data coming back from the Secret Store back-end (Consul)
* The JSON Data structure looks like below (value is Base64 Encoded)
* [{"CreateIndex":66,"ModifyIndex":68,"LockIndex":0,"Key":"borderpatrol/currentSecret","Flags":0,"Value":"YmJiYmJi......"}]
* THE Value is Base64 encoded, when decoded, it looks like
* {"current":{"expiry":{"nanos":100000000000},"id":-29,"entropy":[67,93,65,26,89,123,-88,-59,-82,103,-81,-43,-113,-98,-21,-19]},
* "previous":{"expiry":{"nanos":100000000000},"id":-96,"entropy":[-66,5,60,86,12,62,-85,67,72,57,-19,-5,-47,-26,-101,63]}}
*/
case class SecretData(val CreateIndex: Int,
val ModifyIndex: Int,
val LockIndex: Int,
val Key: String,
val Flags: Int,
val Value: String)
}

View File

@ -130,7 +130,7 @@ package object session {
val cookieName = "border_session"
val entropySize = Constants.SessionId.entropySize
implicit val secretStore = InMemorySecretStore(Secrets(Secret(currentExpiry), Secret(Time.fromSeconds(100))))
implicit val secretStore = getSecretStore
implicit val marshaller = SessionIdMarshaller(secretStore)
implicit val generator: SessionIdGenerator = new SessionIdGenerator
val sessionStore = new InMemorySessionStore
@ -148,4 +148,8 @@ package object session {
sessionStore.update(session)
}
//TODO: This should be configurable(should be Memory for unit tests, and consul in run mode
//def getSecretStore: SecretStoreApi = ConsulSecretStore(ConsulSecretsWatcher)
def getSecretStore: SecretStoreApi = InMemorySecretStore(Secrets(Secret(SecretExpiry.currentExpiry), Secret(Time.fromSeconds(100))))
}

View File

@ -0,0 +1,34 @@
package com.lookout.borderpatrol.session
import com.twitter.finagle.Service
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http._
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration._
/**
* Created by wkimeria on 1/15/15.
*/
class SecretsWatcherApiSpec extends FlatSpec with Matchers{
behavior of "SecretWatcherApi"
def MockConsulService(response: HttpResponse) = new Service[HttpRequest, HttpResponse] {
def apply(request: HttpRequest) = {
val content =
"""{'current':{'expiry':{'nanos':100000000000},
#'id':-29,'entropy':[67,93,65,26,89,123,-88,-59,-82,103,-81,-43,-113,-98,-21,-19]},
#'previous':{'expiry':{'nanos':100000000000},'id':-96,
#'entropy':[-66,5,60,86,12,62,-85,67,72,57,-19,-5,-47,-26,-101,63]}}""".stripMargin('#')
Future.value(response)
}
}
it should "get secrets" in {
//TODO
}
}

View File

@ -18,5 +18,11 @@
"rewriteRule":"/baz=>/"
"hosts":"localhost:8084"
}
{
"name": "consul",
"friendlyName": "Consul Service"
"rewriteRule":"/consul=>/"
"hosts":"localhost:8500"
}
]
}