Merge pull request #20 from trane/master
Add encrypted session memcached store
This commit is contained in:
commit
ad0dd94d1c
|
@ -46,9 +46,9 @@ trait SymmetricKey {
|
|||
def cipher(mode: Int): Cipher =
|
||||
tap(Cipher.getInstance(cipherAlgo, provider.getName))(_.init(mode, key, iv))
|
||||
|
||||
def encrypt(bytes: Seq[Byte]): Seq[Byte]
|
||||
def encrypt(bytes: Seq[Byte]): Array[Byte]
|
||||
|
||||
def decrypt(bytes: Seq[Byte]): Seq[Byte]
|
||||
def decrypt(bytes: Seq[Byte]): Array[Byte]
|
||||
}
|
||||
|
||||
case class CryptKey(keyBytes: Array[Byte], ivBytes: Array[Byte], provider: Provider = new BouncyCastleProvider) extends SymmetricKey {
|
||||
|
@ -57,10 +57,10 @@ case class CryptKey(keyBytes: Array[Byte], ivBytes: Array[Byte], provider: Provi
|
|||
val key = new SecretKeySpec(keyBytes, keyAlgo)
|
||||
val iv = new IvParameterSpec(ivBytes)
|
||||
|
||||
def encrypt(bytes: Seq[Byte]): Seq[Byte] =
|
||||
def encrypt(bytes: Seq[Byte]): Array[Byte] =
|
||||
cipher(Cipher.ENCRYPT_MODE).doFinal(bytes.toArray)
|
||||
|
||||
def decrypt(bytes: Seq[Byte]): Seq[Byte] =
|
||||
def decrypt(bytes: Seq[Byte]): Array[Byte] =
|
||||
cipher(Cipher.DECRYPT_MODE).doFinal(bytes.toArray)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import argonaut.CodecJson
|
|||
import com.lookout.borderpatrol.session.secret._
|
||||
import com.lookout.borderpatrol.session.tokens._
|
||||
import com.lookout.borderpatrol.session.id._
|
||||
import com.twitter.bijection.{Base64String, Injection}
|
||||
import com.twitter.util.{Time, Duration}
|
||||
import org.jboss.netty.buffer.ChannelBuffers
|
||||
import com.lookout.borderpatrol.util.Combinators.tap
|
||||
|
@ -78,5 +79,8 @@ package object session {
|
|||
s.decodeOption[Session]
|
||||
}
|
||||
|
||||
lazy val bytes264 = Injection.connect[Array[Byte], Base64String, String]
|
||||
lazy val json2bytes = Injection.connect[String, Array[Byte]]
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -4,14 +4,18 @@ import com.lookout.borderpatrol.Session
|
|||
import com.lookout.borderpatrol.session.id._
|
||||
import com.lookout.borderpatrol.session._
|
||||
import com.twitter.bijection.{Base64String, Injection}
|
||||
import com.twitter.finagle.Memcached
|
||||
import com.twitter.io.Charsets
|
||||
import com.twitter.util.{Future, Duration, Await}
|
||||
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
|
||||
import com.twitter.finagle.Memcachedx
|
||||
import com.twitter.finagle.memcachedx
|
||||
import com.twitter.io.Buf
|
||||
import com.twitter.util.{Time, Future, Duration, Await}
|
||||
import com.lookout.borderpatrol.util.Combinators.tap
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
trait SessionStoreComponent {
|
||||
implicit val marshaller: Marshaller
|
||||
val sessionStore: SessionStoreApi
|
||||
}
|
||||
/**
|
||||
* This prototypes an API, and should be implemented using some shared store.
|
||||
*
|
||||
|
@ -23,16 +27,113 @@ sealed trait SessionStoreApi {
|
|||
def update(s: Session): Session
|
||||
}
|
||||
|
||||
trait EncryptedSessions {
|
||||
def cryptKey(id: String): Try[CryptKey]
|
||||
def cryptKey(id: SessionId): Try[CryptKey]
|
||||
sealed trait EncryptedSessions[A] {
|
||||
|
||||
def cryptKey(id: SessionId, secret: Secret): CryptKey =
|
||||
CryptKey(id, secret)
|
||||
|
||||
def encrypt(s: Session, c: CryptKey): A
|
||||
def decrypt(a: A, c: CryptKey): Option[Session]
|
||||
}
|
||||
|
||||
trait SessionStoreComponent {
|
||||
implicit val marshaller: Marshaller
|
||||
val sessionStore: SessionStoreApi
|
||||
/**
|
||||
* Memcached encrypted store, utilizing Ketama consistent hashing
|
||||
*
|
||||
* @param dest A comma separated value list of host:port, localhost:11211,localhost:11212
|
||||
* @param timeout The timeout for fetching from memcache
|
||||
* @param marshaller A session marshaller
|
||||
*/
|
||||
case class MemcachedEncryptedSessionStore(dest: String, timeout: Duration)(implicit marshaller: Marshaller) extends SessionStoreApi with EncryptedSessions[Buf] {
|
||||
val store: memcachedx.BaseClient[Buf] = Memcachedx.newKetamaClient(dest)
|
||||
|
||||
def decrypt(buf: Buf, c: CryptKey): Option[Session] = {
|
||||
val encBytes = toArray(buf)
|
||||
val decBytes = c.decrypt(encBytes)
|
||||
((json2bytes invert decBytes) toOption) flatMap (_ asSession)
|
||||
}
|
||||
|
||||
def encrypt(s: Session, c: CryptKey): Buf = {
|
||||
val bytes = json2bytes (s asJson)
|
||||
val encBytes = c.encrypt(bytes)
|
||||
toBuf(encBytes)
|
||||
}
|
||||
|
||||
def get(s: String): Option[Session] =
|
||||
for {
|
||||
(sid, sec) <- s.asSessionIdAndSecret.toOption
|
||||
buf <- Await.result(store.get(bytes264(sid.signature.toArray)), timeout)
|
||||
session <- decrypt(buf, cryptKey(sid, sec))
|
||||
} yield session
|
||||
|
||||
def update(s: Session): Session =
|
||||
s.id.asSessionIdAndSecret.toOption map {t =>
|
||||
val encKey = cryptKey(t._1, t._2)
|
||||
val value = encrypt(s, encKey)
|
||||
val key = bytes264(s.id.signature.toArray)
|
||||
set(key, s.id.expires, value)
|
||||
s
|
||||
} get
|
||||
|
||||
def cryptKey(id: String): Try[CryptKey] =
|
||||
id.asSessionIdAndSecret map (t => CryptKey(t._1, t._2))
|
||||
|
||||
def cryptKey(id: SessionId): Try[CryptKey] =
|
||||
id.asSessionIdAndSecret map (t => CryptKey(t._1, t._2))
|
||||
|
||||
def toArray(buf: Buf): Array[Byte] =
|
||||
Buf.ByteArray.Owned.extract(buf)
|
||||
|
||||
def toBuf(bytes: Array[Byte]): Buf =
|
||||
Buf.ByteArray.Owned(bytes)
|
||||
|
||||
def set(key: String, expiry: Time, value: Buf): Future[Unit] =
|
||||
store.set(key, 0, expiry, value)
|
||||
.onFailure(err => println(s"Failed ${err}"))
|
||||
.onSuccess(suc => println(s"Success ${key}"))
|
||||
|
||||
}
|
||||
|
||||
case class InMemoryEncryptedSessionStore(implicit marshaller: Marshaller) extends SessionStoreApi with EncryptedSessions[Buf] {
|
||||
private [this] lazy val json2bytes = Injection.connect[String, Array[Byte]]
|
||||
private [this] lazy val bytes264 = Injection.connect[Array[Byte], Base64String, String]
|
||||
private [this] var _store = Map[String, Buf]()
|
||||
|
||||
def decrypt(buf: Buf, c: CryptKey): Option[Session] = {
|
||||
val encBytes = toArray(buf)
|
||||
val decBytes = c.decrypt(encBytes)
|
||||
((json2bytes invert decBytes) toOption) flatMap (_ asSession)
|
||||
}
|
||||
|
||||
def encrypt(s: Session, c: CryptKey): Buf = {
|
||||
val bytes = json2bytes (s asJson)
|
||||
val encBytes = c.encrypt(bytes)
|
||||
toBuf(encBytes)
|
||||
}
|
||||
|
||||
def get(s: String): Option[Session] =
|
||||
for {
|
||||
(sid, sec) <- s.asSessionIdAndSecret.toOption
|
||||
buf <- _store.get(bytes264(sid.signature.toArray))
|
||||
session <- decrypt(buf, cryptKey(sid, sec))
|
||||
} yield session
|
||||
|
||||
def update(s: Session): Session =
|
||||
cryptKey(s.id) map { encKey =>
|
||||
val value = encrypt(s, encKey)
|
||||
val key = bytes264(s.id.signature.toArray)
|
||||
_store = _store.updated(key, value)
|
||||
s
|
||||
} get
|
||||
|
||||
def cryptKey(id: SessionId): Try[CryptKey] =
|
||||
id.asSessionIdAndSecret map (t => CryptKey(t._1, t._2))
|
||||
|
||||
def toArray(buf: Buf): Array[Byte] =
|
||||
Buf.ByteArray.Owned.extract(buf)
|
||||
|
||||
def toBuf(bytes: Array[Byte]): Buf =
|
||||
Buf.ByteArray.Owned(bytes)
|
||||
|
||||
}
|
||||
|
||||
case class InMemorySessionStore(implicit marshaller: Marshaller) extends SessionStoreApi {
|
||||
|
@ -51,48 +152,8 @@ case class InMemorySessionStore(implicit marshaller: Marshaller) extends Session
|
|||
}
|
||||
}
|
||||
|
||||
case class InMemoryEncryptedSessionStore(implicit marshaller: Marshaller) extends SessionStoreApi with EncryptedSessions {
|
||||
private [this] lazy val json2bytes = Injection.connect[String, Array[Byte]]
|
||||
private [this] lazy val bytes264 = Injection.connect[Array[Byte], Base64String, String]
|
||||
private [this] var _store = Map[Seq[Byte], String]()
|
||||
|
||||
def cryptKey(id: String): Try[CryptKey] =
|
||||
id.asSessionIdAndSecret map (t => CryptKey(t._1, t._2))
|
||||
|
||||
def cryptKey(id: SessionId): Try[CryptKey] =
|
||||
id.asSessionIdAndSecret map (t => CryptKey(t._1, t._2))
|
||||
|
||||
def get(id: String): Option[Session] =
|
||||
for {
|
||||
(sid, sec) <- id.asSessionIdAndSecret.toOption
|
||||
base64 <- _store get sid.signature
|
||||
bytes <- toBytes(base64)
|
||||
json <- json2bytes.invert(cryptKey(sid, sec).decrypt(bytes).toArray).toOption
|
||||
session <- json.asSession
|
||||
} yield session
|
||||
|
||||
def toBytes(s: Session): Array[Byte] =
|
||||
json2bytes(s.asJson)
|
||||
|
||||
def toBytes(s: String): Option[Array[Byte]] =
|
||||
bytes264.invert(s).toOption
|
||||
|
||||
def get(id: SessionId): Option[Session] =
|
||||
get(id.asString)
|
||||
|
||||
def update(s: Session): Session =
|
||||
s.id.asSessionIdAndSecret.toOption map (t => {
|
||||
val key = cryptKey(t._1, t._2)
|
||||
val encrypted = key.encrypt(toBytes(s))
|
||||
val encoded = bytes264(encrypted.toArray)
|
||||
_store = _store.updated(s.id.signature, encoded)
|
||||
s
|
||||
}) get
|
||||
|
||||
}
|
||||
|
||||
case class MemcachedSessionStore(dest: String, timeout: Duration)(implicit marshaller: Marshaller) extends SessionStoreApi {
|
||||
val store = Memcached.newRichClient(dest).withStrings
|
||||
val store = Memcachedx.newKetamaClient(dest).withStrings
|
||||
|
||||
def get(s: String): Option[Session] = {
|
||||
for {
|
||||
|
|
|
@ -5,10 +5,10 @@ import java.util.concurrent.TimeUnit
|
|||
import com.lookout.borderpatrol.Session
|
||||
import com.lookout.borderpatrol.session.id.{Marshaller, Generator => IdGenerator}
|
||||
import com.lookout.borderpatrol.session.secret.InMemorySecretStore
|
||||
import com.lookout.borderpatrol.session.store.{MemcachedSessionStore, InMemoryEncryptedSessionStore}
|
||||
import com.twitter.finagle.memcached
|
||||
import com.twitter.io.Charsets
|
||||
import com.twitter.util.{Duration, Future}
|
||||
import com.lookout.borderpatrol.session.store.{EncryptedSessions, MemcachedEncryptedSessionStore, MemcachedSessionStore, InMemoryEncryptedSessionStore}
|
||||
import com.twitter.finagle.memcachedx
|
||||
import com.twitter.io.{Buf, Charsets}
|
||||
import com.twitter.util.{Time, Duration, Future}
|
||||
import org.jboss.netty.buffer.{ChannelBuffers, ChannelBuffer}
|
||||
import org.jboss.netty.handler.codec.http.{HttpRequest, DefaultHttpRequest, HttpMethod, HttpVersion}
|
||||
import org.scalamock.scalatest.MockFactory
|
||||
|
@ -33,13 +33,12 @@ class SessionStoreSpec extends FlatSpec with Matchers with MockFactory {
|
|||
it should "store and retrieve sessions" in {
|
||||
val s = mockSession
|
||||
encryptedStore.update(s)
|
||||
encryptedStore.get(s.id).value.equals(s) shouldBe true
|
||||
encryptedStore.get(s.id.asString).value.equals(s) shouldBe true
|
||||
}
|
||||
|
||||
it should "create a CryptKey for a session" in {
|
||||
val s = mockSession
|
||||
encryptedStore.cryptKey(s.id).isSuccess shouldBe true
|
||||
encryptedStore.cryptKey(s.id.asString).isSuccess shouldBe true
|
||||
}
|
||||
|
||||
it should "fail to create a CryptKey for an invalid session" in {
|
||||
|
@ -47,7 +46,6 @@ class SessionStoreSpec extends FlatSpec with Matchers with MockFactory {
|
|||
val invalidSecretId = ~s.id.secretId
|
||||
val id = SessionId(s.id.expires, s.id.entropy, invalidSecretId.toByte, s.id.signature)
|
||||
encryptedStore.cryptKey(id).isFailure shouldBe true
|
||||
encryptedStore.cryptKey(id.asString).isFailure shouldBe true
|
||||
}
|
||||
|
||||
behavior of "MemcachedSessionStore"
|
||||
|
@ -55,13 +53,13 @@ class SessionStoreSpec extends FlatSpec with Matchers with MockFactory {
|
|||
def toCb(s: String): ChannelBuffer =
|
||||
ChannelBuffers.copiedBuffer(s.getBytes(Charsets.Utf8))
|
||||
|
||||
class MemcachedMockSessionStore(client: memcached.BaseClient[String]) extends MemcachedSessionStore(dest = "", timeout = Duration(1, TimeUnit.SECONDS)) {
|
||||
class MemcachedMockSessionStore(client: memcachedx.BaseClient[String]) extends MemcachedSessionStore(dest = "", timeout = Duration(1, TimeUnit.SECONDS)) {
|
||||
override val store = client
|
||||
}
|
||||
|
||||
it should "store and retrieve sessions" in {
|
||||
val s = mockSession
|
||||
val memcachedClient = mock[memcached.BaseClient[String]]
|
||||
val memcachedClient = mock[memcachedx.BaseClient[String]]
|
||||
|
||||
val flag = 0 // meaningless part of the protocol
|
||||
|
||||
|
@ -81,4 +79,34 @@ class SessionStoreSpec extends FlatSpec with Matchers with MockFactory {
|
|||
memcachedStore.get(s).value.equals(s) shouldBe true
|
||||
}
|
||||
|
||||
behavior of "MemcachedEncryptedSessionStore"
|
||||
|
||||
class MockMemcachedEncryptSessionStore(client: memcachedx.BaseClient[Buf]) extends MemcachedEncryptedSessionStore(dest = "", timeout = Duration(1, TimeUnit.SECONDS)) {
|
||||
override val store = client
|
||||
}
|
||||
|
||||
it should "store and retrieve sessions" in {
|
||||
val s = mockSession
|
||||
val memcachedClient = mock[memcachedx.BaseClient[Buf]]
|
||||
val memcachedStore = new MockMemcachedEncryptSessionStore(memcachedClient)
|
||||
val cryptKey = memcachedStore.cryptKey(s.id).get
|
||||
|
||||
val flag = 0 // meaningless part of the protocol
|
||||
|
||||
/* re-enable after https://github.com/paulbutcher/ScalaMock/issues/39#issuecomment-71727931
|
||||
(memcachedClient.set(_: String, _: Int, _: Time, _: Buf))
|
||||
.expects(s.id.asString, flag, Time.now, Buf.ByteArray.Owned(json2bytes(s.asJson)))
|
||||
.returns(Future.value((): Unit))
|
||||
*/
|
||||
|
||||
(memcachedClient.get(_: String))
|
||||
.expects(bytes264(s.id.signature.toArray))
|
||||
.returning(Future.value(Some(Buf.ByteArray.Owned(cryptKey.encrypt(json2bytes(s.asJson))))))
|
||||
|
||||
|
||||
// memcachedStore.update(s) // re-enable after https://github.com/paulbutcher/ScalaMock/issues/39#issuecomment-71727931
|
||||
memcachedStore.get(s.id.asString).value.equals(s) shouldBe true
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ object BorderPatrol extends Build {
|
|||
scalaVersion := "2.11.5",
|
||||
libraryDependencies ++= Seq(
|
||||
"com.twitter" %% "twitter-server" % twitter_server,
|
||||
"com.twitter" %% "finagle-memcached" % "6.24.0",
|
||||
"com.twitter" %% "finagle-memcachedx" % "6.24.0",
|
||||
"com.twitter" %% "bijection-core" % "0.7.0",
|
||||
"io.argonaut" %% "argonaut" % "6.1-M5",
|
||||
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
|
||||
|
|
Loading…
Reference in New Issue