diff --git a/build.sbt b/build.sbt index b85eeb6..49842e6 100644 --- a/build.sbt +++ b/build.sbt @@ -1,17 +1,56 @@ name := "cornucopia" organization := "com.github.kliewkliew" -version := "1.1.2" +//version := "1.1.2" +version := "0.28-SNAPSHOT" scalaVersion := "2.11.8" +enablePlugins(JavaAppPackaging, DockerPlugin) + resolvers += "Sonatype Releases" at "https://oss.sonatype.org/service/repositories/releases/" resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/" +val akkaVersion = "2.4.17" + +val testDependencies = Seq( + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % "3.0.0" % "test", + "org.mockito" % "mockito-all" % "1.10.19" % Test +) + libraryDependencies ++= Seq( "biz.paluch.redis" % "lettuce" % "5.0.0.Beta1", "org.scala-lang.modules" % "scala-java8-compat_2.11" % "0.8.0", "com.typesafe.akka" %% "akka-stream-kafka" % "0.11-RC1", - "com.github.kliewkliew" %% "salad" % "0.11.01", + "com.typesafe.akka" %% "akka-http-core" % "2.4.11", + "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11", + "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.11", +// "com.github.kliewkliew" %% "salad" % "0.11.01", + "com.adenda" %% "salad" % "0.11.03", "org.slf4j" % "slf4j-log4j12" % "1.7.22" +) ++ testDependencies + +// ------------------------------------------------ // +// ------------- Docker configuration ------------- // +// ------------------------------------------------ // +import NativePackagerHelper._ + +mappings in Universal ++= directory( baseDirectory.value / "src" / "main" / "resources" ) + +javaOptions in Universal ++= Seq( + "-Dconfig.file=etc/container.conf", + "-Dlog4j.configuration=file:/usr/local/etc/log4j.properties" ) + +packageName in Docker := packageName.value + +version in Docker := version.value + +dockerBaseImage := "openjdk" + +dockerRepository := Some("gcr.io/adenda-server-mongodb") + +defaultLinuxInstallLocation in Docker := "/usr/local" + +daemonUser in Docker := "root" \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..b674c64 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.typesafe.sbt" %% "sbt-native-packager" % "1.0.4") \ No newline at end of file diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 2f4999a..06f1ccb 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -43,6 +43,17 @@ cornucopia { // Time (seconds) to wait for batches to accumulate before executing a job. batch.period = 5 + + // either `kafka` or `actor` + source = "actor" + + // microservice type: `kafka` or `http` + microservice.type = "http" + + http { + host = "localhost" + port = "9001" + } } kafka { diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Config.scala b/src/main/scala/com/github/kliewkliew/cornucopia/Config.scala similarity index 80% rename from src/main/scala/com/github/kliewkliew/cornucopia/kafka/Config.scala rename to src/main/scala/com/github/kliewkliew/cornucopia/Config.scala index 13dd208..f3d7b75 100644 --- a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Config.scala +++ b/src/main/scala/com/github/kliewkliew/cornucopia/Config.scala @@ -1,9 +1,11 @@ -package com.github.kliewkliew.cornucopia.kafka +package com.github.kliewkliew.cornucopia import akka.actor.ActorSystem import akka.kafka.scaladsl.{Producer, Consumer => ConsumerDSL} import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions} +import akka.stream.scaladsl.Source import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision} +import com.github.kliewkliew.cornucopia.actors.CornucopiaSource import com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.slf4j.LoggerFactory @@ -17,6 +19,7 @@ object Config { val gracePeriod = config.getInt("grace.period") * 1000 val refreshTimeout = config.getInt("refresh.timeout") * 1000 val batchPeriod = config.getInt("batch.period").seconds + val source = config.getString("source") } object Consumer { @@ -43,8 +46,19 @@ object Config { .withBootstrapServers(kafkaServers) implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem) + + val cornucopiaActorProps = CornucopiaSource.props + val cornucopiaActorSource = Source.actorPublisher[CornucopiaSource.Task](cornucopiaActorProps) + + val cornucopiaKafkaSource = ConsumerDSL.plainSource(sourceSettings, subscription) + val cornucopiaSource = ConsumerDSL.plainSource(sourceSettings, subscription) + val cornucopiaSink = Producer.plainSink(sinkSettings) } + object ReshardTableConfig { + final implicit val ExpectedTotalNumberSlots: Int = 16384 + } + } diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/CornucopiaException.scala b/src/main/scala/com/github/kliewkliew/cornucopia/CornucopiaException.scala new file mode 100644 index 0000000..8ee9679 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/CornucopiaException.scala @@ -0,0 +1,13 @@ +package com.github.kliewkliew.cornucopia + +object CornucopiaException { + sealed trait CornucopiaException { + self: Throwable => + val message: String + val reason: Throwable + } + + case class CornucopiaRedisConnectionException(message: String, reason: Throwable = None.orNull) + extends Exception(message, reason) with CornucopiaException +} + diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/Library.scala b/src/main/scala/com/github/kliewkliew/cornucopia/Library.scala new file mode 100644 index 0000000..77a3bfe --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/Library.scala @@ -0,0 +1,9 @@ +package com.github.kliewkliew.cornucopia + +import actors.CornucopiaSource +import akka.actor._ + +object Library { + val ref: ActorRef = new graph.CornucopiaActorSource().ref + val source = CornucopiaSource +} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/Microservice.scala b/src/main/scala/com/github/kliewkliew/cornucopia/Microservice.scala index 7b5fa1f..1896a6a 100644 --- a/src/main/scala/com/github/kliewkliew/cornucopia/Microservice.scala +++ b/src/main/scala/com/github/kliewkliew/cornucopia/Microservice.scala @@ -1,7 +1,22 @@ package com.github.kliewkliew.cornucopia +import com.typesafe.config.ConfigFactory +import org.slf4j.LoggerFactory +import com.github.kliewkliew.cornucopia.http.Server + object Microservice { def main(args: Array[String]): Unit = { - new kafka.Consumer().run + val config = ConfigFactory.load().getConfig("cornucopia") + val microserviceType = config.getString("microservice.type") + val logger = LoggerFactory.getLogger(this.getClass) + + microserviceType match { + case "kafka" => + logger.info("Using kafka as the microservice source") + new graph.CornucopiaKafkaSource().run + case "http" => + logger.info("Using http server as the microservice source") + Server.start + } } } \ No newline at end of file diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/actors/CornucopiaSource.scala b/src/main/scala/com/github/kliewkliew/cornucopia/actors/CornucopiaSource.scala new file mode 100644 index 0000000..544f3e9 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/actors/CornucopiaSource.scala @@ -0,0 +1,61 @@ +package com.github.kliewkliew.cornucopia.actors + +import akka.actor._ +import akka.stream.actor.ActorPublisher + +import scala.annotation.tailrec + +/** + * Copied liberally from akka documentaion on + * [Stream integrations](http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html). + */ +object CornucopiaSource { + def props: Props = Props[CornucopiaSource] + + final case class Task(operation: String, redisNodeIp: String, ref: Option[ActorRef] = None) + case object TaskAccepted + case object TaskDenied +} + +class CornucopiaSource extends ActorPublisher[CornucopiaSource.Task] { + import CornucopiaSource._ + import akka.stream.actor.ActorPublisherMessage._ + + val MaxBufferSize = 100 + var buf = Vector.empty[Task] + + override def receive = { + case _: Task if buf.size == MaxBufferSize => + sender() ! TaskDenied + case task: Task => + if (buf.isEmpty && totalDemand > 0) { + val task2 = task.copy(ref = Some(sender)) + onNext(task2) + } + else { + val task2 = task.copy(ref = Some(sender)) + buf :+= task2 + deliverBuf() + } + case Request(_) => + deliverBuf() + case Cancel => + context.stop(self) + } + + @tailrec final def deliverBuf(): Unit = { + if (totalDemand > 0) { + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach(task => onNext(task)) + } else { + val (use, keep) = buf.splitAt(Int.MaxValue) + buf = keep + use foreach(task => onNext(task)) + deliverBuf() + } + } + } + +} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/graph/Graph.scala b/src/main/scala/com/github/kliewkliew/cornucopia/graph/Graph.scala new file mode 100644 index 0000000..05bd024 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/graph/Graph.scala @@ -0,0 +1,792 @@ +package com.github.kliewkliew.cornucopia.graph + +import java.util +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor._ +import akka.stream.{ClosedShape, FlowShape, ThrottleMode} +import com.github.kliewkliew.cornucopia.redis.Connection.{CodecType, Salad, getConnection, newSaladAPI} +import org.slf4j.LoggerFactory +import org.apache.kafka.clients.consumer.ConsumerRecord +import com.github.kliewkliew.cornucopia.redis._ +import com.github.kliewkliew.salad.SaladClusterAPI +import com.github.kliewkliew.cornucopia.Config.ReshardTableConfig._ +import com.github.kliewkliew.cornucopia.redis.ReshardTable._ +import com.lambdaworks.redis.{RedisException, RedisURI} +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode +import com.lambdaworks.redis.models.role.RedisInstance.Role + +import collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.JavaConversions._ +import scala.language.implicitConversions +import scala.concurrent.{ExecutionContext, Future} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, MergePreferred, Partition, RunnableGraph, Sink} +import com.github.kliewkliew.cornucopia.Config +// TO-DO: put config someplace else + +trait CornucopiaGraph { + import scala.concurrent.ExecutionContext.Implicits.global + import com.github.kliewkliew.cornucopia.CornucopiaException._ + + protected val logger = LoggerFactory.getLogger(this.getClass) + + protected def getNewSaladApi: Salad = newSaladAPI + + def partitionEvents(key: String) = key.trim.toLowerCase match { + case ADD_MASTER.key => ADD_MASTER.ordinal + case ADD_SLAVE.key => ADD_SLAVE.ordinal + case REMOVE_NODE.key => REMOVE_NODE.ordinal + case RESHARD.key => RESHARD.ordinal + case _ => UNSUPPORTED.ordinal + } + + def partitionNodeRemoval(key: String) = key.trim.toLowerCase match { + case REMOVE_MASTER.key => REMOVE_MASTER.ordinal + case REMOVE_SLAVE.key => REMOVE_SLAVE.ordinal + case UNSUPPORTED.key => UNSUPPORTED.ordinal + } + + /** + * Stream definitions for the graph. + */ + // Extract a tuple of the key and value from a Kafka record. + case class KeyValue(key: String, value: String, senderRef: Option[ActorRef] = None, newMasterURI: Option[RedisURI] = None) + + // Allows to create Redis URI from the following forms: + // host OR host:port + // e.g., redis://127.0.0.1 OR redis://127.0.0.1:7006 + protected def createRedisUri(uri: String): RedisURI = { + val parts = uri.split(":") + if (parts.size == 3) { + val host = parts(1).foldLeft("")((acc, ch) => if (ch != '/') acc + ch else acc) + RedisURI.create(host, parts(2).toInt) + } + else RedisURI.create(uri) + } + + // Add a master node to the cluster. + def streamAddMaster(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(_.value) + .map(createRedisUri) + .map(getNewSaladApi.canonicalizeURI) + .groupedWithin(100, Config.Cornucopia.batchPeriod) + .mapAsync(1)(addNodesToCluster(_)) + .mapAsync(1)(waitForTopologyRefresh[Seq[RedisURI]]) + .map(_ => KeyValue(RESHARD.key, "")) + + // Add a slave node to the cluster, replicating the master that has the fewest slaves. + def streamAddSlave(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(_.value) + .map(createRedisUri) + .map(getNewSaladApi.canonicalizeURI) + .groupedWithin(100, Config.Cornucopia.batchPeriod) + .mapAsync(1)(addNodesToCluster(_)) + .mapAsync(1)(waitForTopologyRefresh[Seq[RedisURI]]) + .mapAsync(1)(findMasters) + .mapAsync(1)(waitForTopologyRefresh[Unit]) + .mapAsync(1)(_ => logTopology) + .map(_ => KeyValue("", "")) + + // Emit a key-value pair indicating the node type and URI. + protected def streamRemoveNode(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(_.value) + .map(createRedisUri) + .map(getNewSaladApi.canonicalizeURI) + .mapAsync(1)(emitNodeType) + + // Remove a slave node from the cluster. + protected def streamRemoveSlave(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(_.value) + .groupedWithin(100, Config.Cornucopia.batchPeriod) + .mapAsync(1)(forgetNodes) + .mapAsync(1)(waitForTopologyRefresh[Unit]) + .mapAsync(1)(_ => logTopology) + .map(_ => KeyValue("", "")) + + // Redistribute the hash slots among all nodes in the cluster. + // Execute slot redistribution at most once per configured interval. + // Combine multiple requests into one request. + protected def streamReshard(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(record => Seq(record.value)) + .conflate((seq1, seq2) => seq1 ++ seq2) + .throttle(1, Config.Cornucopia.minReshardWait, 1, ThrottleMode.Shaping) + .mapAsync(1)(reshardCluster) + .mapAsync(1)(waitForTopologyRefresh[Unit]) + .mapAsync(1)(_ => logTopology) + .map(_ => KeyValue("", "")) + + // Throw for keys indicating unsupported operations. + protected def unsupportedOperation = Flow[KeyValue] + .map(record => throw new IllegalArgumentException(s"Unsupported operation ${record.key} for ${record.value}")) + + /** + * Wait for the new cluster topology view to propagate to all nodes in the cluster. May not be strictly necessary + * since this microservice immediately attempts to notify all nodes of topology updates. + * + * @param passthrough The value that will be passed through to the next map stage. + * @param executionContext The thread dispatcher context. + * @tparam T + * @return The unmodified input value. + */ + protected def waitForTopologyRefresh[T](passthrough: T)(implicit executionContext: ExecutionContext): Future[T] = Future { + scala.concurrent.blocking(Thread.sleep(Config.Cornucopia.refreshTimeout)) + passthrough + } + + /** + * Wait for the new cluster topology view to propagate to all nodes in the cluster. Same version as above, but this + * time takes two passthroughs and returns tuple of them as future. + * + * @param passthrough1 The first value that will be passed through to the next map stage. + * @param passthrough2 The second value that will be passed through to the next map stage. + * @param executionContext The thread dispatcher context. + * @tparam T + * @tparam U + * @return The unmodified input value. + */ + protected def waitForTopologyRefresh2[T, U](passthrough1: T, passthrough2: U)(implicit executionContext: ExecutionContext): Future[(T, U)] = Future { + scala.concurrent.blocking(Thread.sleep(Config.Cornucopia.refreshTimeout)) + (passthrough1, passthrough2) + } + + /** + * Log the current view of the cluster topology. + * + * @param executionContext The thread dispatcher context. + * @return + */ + protected def logTopology(implicit executionContext: ExecutionContext): Future[Unit] = { + implicit val saladAPI = getNewSaladApi + saladAPI.clusterNodes.map { allNodes => + val masterNodes = allNodes.filter(Role.MASTER == _.getRole) + val slaveNodes = allNodes.filter(Role.SLAVE == _.getRole) + logger.info(s"Master nodes: $masterNodes") + logger.info(s"Slave nodes: $slaveNodes") + } + } + + /** + * The entire cluster will meet the new nodes at the given URIs. If the connection to a node fails, then retry + * until it succeeds. + * + * @param redisURIList The list of URI of the new nodes. + * @param executionContext The thread dispatcher context. + * @return The list of URI if the nodes were met. TODO: emit only the nodes that were successfully added. + */ + protected def addNodesToCluster(redisURIList: Seq[RedisURI], retries: Int = 0)(implicit executionContext: ExecutionContext): Future[Seq[RedisURI]] = { + addNodesToClusterPrime(redisURIList).recoverWith { + case e: CornucopiaRedisConnectionException => + logger.error(s"${e.message}: retrying for number ${retries + 1}", e) + addNodesToCluster(redisURIList, retries + 1) + } + } + + protected def addNodesToClusterPrime(redisURIList: Seq[RedisURI])(implicit executionContext: ExecutionContext): Future[Seq[RedisURI]] = { + implicit val saladAPI = getNewSaladApi + + def getRedisConnection(nodeId: String): Future[Salad] = { + getConnection(nodeId).recoverWith { + case e: RedisException => throw CornucopiaRedisConnectionException(s"Add nodes to cluster failed to get connection to node", e) + } + } + + saladAPI.clusterNodes.flatMap { allNodes => + val getConnectionsToLiveNodes = allNodes.filter(_.isConnected).map(node => getRedisConnection(node.getNodeId)) + Future.sequence(getConnectionsToLiveNodes).flatMap { connections => + // Meet every new node from every old node. + val metResults = for { + conn <- connections + uri <- redisURIList + } yield { + conn.clusterMeet(uri) + } + Future.sequence(metResults).map(_ => redisURIList) + } + } + } + + /** + * Set the n new slave nodes to replicate the poorest (fewest slaves) n masters. + * + * @param redisURIList The list of ip addresses of the slaves that will be added to the cluster. Hostnames are not acceptable. + * @param executionContext The thread dispatcher context. + * @return Indicate that the n new slaves are replicating the poorest n masters. + */ + protected def findMasters(redisURIList: Seq[RedisURI])(implicit executionContext: ExecutionContext): Future[Unit] = { + implicit val saladAPI = getNewSaladApi + saladAPI.clusterNodes.flatMap { allNodes => + // Node ids for nodes that are currently master nodes but will become slave nodes. + val newSlaveIds = allNodes.filter(node => redisURIList.contains(node.getUri)).map(_.getNodeId) + // The master nodes (the nodes that will become slaves are still master nodes at this point and must be filtered out). + val masterNodes = saladAPI.masterNodes(allNodes) + .filterNot(node => newSlaveIds.contains(node.getNodeId)) + // HashMap of master node ids to the number of slaves for that master. + val masterSlaveCount = new util.HashMap[String, AtomicInteger](masterNodes.length + 1, 1) + // Populate the hash map. + masterNodes.map(_.getNodeId).foreach(nodeId => masterSlaveCount.put(nodeId, new AtomicInteger(0))) + allNodes.map { node => + Option.apply(node.getSlaveOf) + .map(master => masterSlaveCount.get(master).incrementAndGet()) + } + + // Find the poorest n masters for n slaves. + val poorestMasters = new MaxNHeapMasterSlaveCount(redisURIList.length) + masterSlaveCount.asScala.foreach(poorestMasters.offer) + assert(redisURIList.length >= poorestMasters.underlying.length) + + // Create a list so that we can circle back to the first element if the new slaves outnumber the existing masters. + val poorMasterList = poorestMasters.underlying.toList + val poorMasterIndex = new AtomicInteger(0) + // Choose a master for every slave. + val listFuturesResults = redisURIList.map { slaveURI => + getConnection(slaveURI).map(_.clusterReplicate( + poorMasterList(poorMasterIndex.getAndIncrement() % poorMasterList.length)._1)) + } + Future.sequence(listFuturesResults).map(x => x) + } + } + + /** + * Emit a key-value representing the node-type and the node-id. + * @param redisURI + * @param executionContext + * @return the node type and id. + */ + def emitNodeType(redisURI:RedisURI)(implicit executionContext: ExecutionContext): Future[KeyValue] = { + implicit val saladAPI = getNewSaladApi + saladAPI.clusterNodes.map { allNodes => + val removalNodeOpt = allNodes.find(node => node.getUri.equals(redisURI)) + if (removalNodeOpt.isEmpty) throw new Exception(s"Node not in cluster: $redisURI") + val kv = removalNodeOpt.map { node => + node.getRole match { + case Role.MASTER => KeyValue(RESHARD.key, node.getNodeId) + case Role.SLAVE => KeyValue(REMOVE_SLAVE.key, node.getNodeId) + case _ => KeyValue(UNSUPPORTED.key, node.getNodeId) + } + } + kv.get + } + } + + /** + * Safely remove a master by redistributing its hash slots before blacklisting it from the cluster. + * The data is given time to migrate as configured in `cornucopia.grace.period`. + * + * @param withoutNodes The list of ids of the master nodes that will be removed from the cluster. + * @param executionContext The thread dispatcher context. + * @return Indicate that the hash slots were redistributed and the master removed from the cluster. + */ + protected def removeMasters(withoutNodes: Seq[String])(implicit executionContext: ExecutionContext): Future[Unit] = { + val reshardDone = reshardCluster(withoutNodes).map { _ => + scala.concurrent.blocking(Thread.sleep(Config.Cornucopia.gracePeriod)) // Allow data to migrate. + } + reshardDone.flatMap(_ => forgetNodes(withoutNodes)) + } + + /** + * Notify all nodes in the cluster to forget this node. + * + * @param withoutNodes The list of ids of nodes to be forgotten by the cluster. + * @param executionContext The thread dispatcher context. + * @return A future indicating that the node was forgotten by all nodes in the cluster. + */ + def forgetNodes(withoutNodes: Seq[String])(implicit executionContext: ExecutionContext): Future[Unit] = + if (!withoutNodes.exists(_.nonEmpty)) + Future(Unit) + else { + implicit val saladAPI = getNewSaladApi + saladAPI.clusterNodes.flatMap { allNodes => + logger.info(s"Forgetting nodes: $withoutNodes") + // Reset the nodes to be removed. + val validWithoutNodes = withoutNodes.filter(_.nonEmpty) + validWithoutNodes.map(getConnection).map(_.map(_.clusterReset(true))) + + // The nodes that will remain in the cluster should forget the nodes that will be removed. + val withNodes = allNodes + .filterNot(node => validWithoutNodes.contains(node.getNodeId)) // Node cannot forget itself. + + // For the cross product of `withNodes` and `withoutNodes`; to remove the nodes in `withoutNodes`. + val forgetResults = for { + operatorNode <- withNodes + operandNodeId <- validWithoutNodes + } yield { + if (operatorNode.getSlaveOf == operandNodeId) + Future(Unit) // Node cannot forget its master. + else + getConnection(operatorNode.getNodeId).flatMap(_.clusterForget(operandNodeId)) + } + Future.sequence(forgetResults).map(x => x) + } + } + + + /** + * Reshard the cluster using a view of thecluster consisting of a subset of master nodes. + * + * @param withoutNodes The list of ids of nodes that will not be assigned hash slots. Note that this is not used + * (it is empty) when we add a new master node and reshard. + * @return Boolean indicating that all hash slots were reassigned successfully. + */ + protected def reshardCluster(withoutNodes: Seq[String]) + : Future[Unit] = { + // Execute futures using a thread pool so we don't run out of memory due to futures. + implicit val executionContext = Config.Consumer.actorSystem.dispatchers.lookup("akka.actor.resharding-dispatcher") + implicit val saladAPI = getNewSaladApi + saladAPI.masterNodes.flatMap { masterNodes => + + val liveMasters = masterNodes.filter(_.isConnected) + lazy val idToURI = new util.HashMap[String,RedisURI](liveMasters.length + 1, 1) + // Re-use cluster connections so we don't exceed file-handle limit or waste resources. + lazy val clusterConnections = new util.HashMap[String,Future[SaladClusterAPI[CodecType,CodecType]]](liveMasters.length + 1, 1) + liveMasters.map { master => + idToURI.put(master.getNodeId, master.getUri) + clusterConnections.put(master.getNodeId, getConnection(master.getNodeId)) + } + + // Remove dead nodes. This may generate WARN logs if some nodes already forgot the dead node. + val deadMastersIds = masterNodes.filterNot(_.isConnected).map(_.getNodeId) + logger.info(s"Dead nodes: $deadMastersIds") + forgetNodes(deadMastersIds) + + // Migrate the data. + val assignableMasters = liveMasters.filterNot(masterNode => withoutNodes.contains(masterNode.getNodeId)) + logger.info(s"Resharding cluster with ${assignableMasters.map(_.getNodeId)} without ${withoutNodes ++ deadMastersIds}") + val migrateResults = liveMasters.flatMap { node => + val (sourceNodeId, slotList) = (node.getNodeId, node.getSlots.toList.map(_.toInt)) + logger.debug(s"Migrating data from $sourceNodeId among slots $slotList") + slotList.map { slot => + val destinationNodeId = slotNode(slot, assignableMasters) + migrateSlot( + slot, + sourceNodeId, destinationNodeId, idToURI.get(destinationNodeId), + assignableMasters.toList, clusterConnections) + } + } + val finalMigrateResult = Future.sequence(migrateResults) + finalMigrateResult.onFailure { case e => logger.error(s"Failed to migrate hash slot data", e) } + finalMigrateResult.onSuccess { case _ => logger.info(s"Migrated hash slot data") } + // We attempted to migrate the data but do not prevent slot reassignment if migration fails. + // We may lose prior data but we ensure that all slots are assigned. + finalMigrateResult.onComplete { case _ => + // This is broken anyways + //List.range(0, 16384).map(notifySlotAssignment(_, assignableMasters)) + Unit + } + finalMigrateResult.flatMap(_ => forgetNodes(withoutNodes)) + } + } + + // TODO: Put this method out of its misery + // TODO: pass slotNode as a lambda to migrateSlot and notifySlotAssignment. + // TODO: more efficient slot assignment to prevent data migration. + /** + * Choose a master node for a slot. + * + * @param slot The slot to be assigned. + * @param masters The list of masters that can be assigned slots. + * @return The node id of the chosen master. + */ + protected def slotNode(slot: Int, masters: mutable.Buffer[RedisClusterNode]): String = + masters(slot % masters.length).getNodeId + + /** + * Migrate all keys in a slot from the source node to the destination node and update the slot assignment on the + * affected nodes. + * + * @param slot The slot to migrate. + * @param sourceNodeId The current location of the slot data. + * @param destinationNodeId The target location of the slot data. + * @param masters The list of nodes in the cluster that will be assigned hash slots. + * @param clusterConnections The list of connections to nodes in the cluster. + * @param executionContext The thread dispatcher context. + * @return Future indicating success. + */ + protected def migrateSlot(slot: Int, sourceNodeId: String, destinationNodeId: String, destinationURI: RedisURI, + masters: List[RedisClusterNode], + clusterConnections: util.HashMap[String,Future[SaladClusterAPI[CodecType,CodecType]]], + attempts: Int = 1) + (implicit saladAPI: Salad, executionContext: ExecutionContext): Future[Unit] = { + + logger.debug(s"Migrate slot for slot $slot from source node $sourceNodeId to target node $destinationNodeId") + + // Follows redis-trib.rb + def migrateSlotKeys(sourceConn: SaladClusterAPI[CodecType, CodecType], + destinationConn: SaladClusterAPI[CodecType, CodecType]): Future[Unit] = { + + import com.github.kliewkliew.salad.serde.ByteArraySerdes._ + + // get all the keys in the given slot + val keyList = for { + keyCount <- sourceConn.clusterCountKeysInSlot(slot) + keyList <- sourceConn.clusterGetKeysInSlot[CodecType](slot, keyCount.toInt) + } yield keyList + + // migrate over all the keys in the slot from source to destination node + val migrate = for { + keys <- keyList + result <- sourceConn.migrate[CodecType](destinationURI, keys.toList) + } yield result + + migrate.onSuccess { case _ => + logger.info(s"Successfully migrated slot $slot from $sourceNodeId to $destinationNodeId at ${destinationURI.getHost} on attempt $attempts") + } + + def handleFailedMigration(error: Throwable): Future[Unit] = { + val errorString = error.toString + + def findError(e: String, identifier: String): Boolean = { + identifier.r.findFirstIn(e) match { + case Some(_) => true + case _ => false + } + } + + if (findError(errorString, "BUSYKEY")) { + logger.warn(s"Problem migrating slot $slot from $sourceNodeId to $destinationNodeId at ${destinationURI.getHost} (BUSYKEY): Target key exists. Replacing it for FIX.") + def migrateReplace: Future[Unit] = for { + keys <- keyList + result <- sourceConn.migrate[CodecType](destinationURI, keys.toList, replace = true) + } yield result + migrateReplace + } else if (findError(errorString, "CLUSTERDOWN")) { + logger.error(s"Failed to migrate slot $slot from $sourceNodeId to $destinationNodeId at ${destinationURI.getHost} (CLUSTERDOWN): Retrying for attempt $attempts") + migrateSlot(slot, sourceNodeId, destinationNodeId, destinationURI, masters, clusterConnections, attempts + 1) + } else if (findError(errorString, "MOVED")) { + logger.error(s"Failed to migrate slot $slot from $sourceNodeId to $destinationNodeId at ${destinationURI.getHost} (MOVED): Ignoring on attempt $attempts") + Future(Unit) + } else { + logger.error(s"Failed to migrate slot $slot from $sourceNodeId to $destinationNodeId at ${destinationURI.getHost}", error) + Future(Unit) + } + } + + migrate.recover { case e => handleFailedMigration(e) } + } + + def setSlotAssignment(sourceConn: SaladClusterAPI[CodecType, CodecType], + destinationConn: SaladClusterAPI[CodecType, CodecType]): Future[Unit] = { + for { + _ <- destinationConn.clusterSetSlotImporting(slot, sourceNodeId) + _ <- sourceConn.clusterSetSlotMigrating(slot, destinationNodeId) + } yield { + Future(Unit) + } + } + + destinationNodeId match { + case `sourceNodeId` => + // Don't migrate if the source and destination are the same. + Future(Unit) + case _ => + for { + sourceConnection <- clusterConnections.get(sourceNodeId) + destinationConnection <- clusterConnections.get(destinationNodeId) + _ <- setSlotAssignment(sourceConnection, destinationConnection) + _ <- migrateSlotKeys(sourceConnection, destinationConnection) + } yield { + logger.debug(s"Migrate slot successful for slot $slot from source node $sourceNodeId to target node $destinationNodeId, notifying masters of new slot assignment") + notifySlotAssignment(slot, destinationNodeId, masters) + } + } + } + + /** + * Notify all master nodes of a slot assignment so that they will immediately be able to redirect clients. + * + * @param masters The list of nodes in the cluster that will be assigned hash slots. + * @param assignedNodeId The node that should be assigned the slot + * @param executionContext The thread dispatcher context. + * @return Future indicating success. + */ + protected def notifySlotAssignment(slot: Int, assignedNodeId: String, masters: List[RedisClusterNode]) + (implicit saladAPI: Salad, executionContext: ExecutionContext) + : Future[Unit] = { + val getMasterConnections = masters.map(master => getConnection(master.getNodeId)) + Future.sequence(getMasterConnections).flatMap { masterConnections => + val notifyResults = masterConnections.map(_.clusterSetSlotNode(slot, assignedNodeId)) + Future.sequence(notifyResults).map(x => x) + } + } + + /** + * Store the n poorest masters. + * Implemented on scala.mutable.PriorityQueue. + * + * @param n + */ + sealed case class MaxNHeapMasterSlaveCount(n: Int) { + private type MSTuple = (String, AtomicInteger) + private object MSOrdering extends Ordering[MSTuple] { + def compare(a: MSTuple, b: MSTuple) = a._2.intValue compare b._2.intValue + } + implicit private val ordering = MSOrdering + val underlying = new mutable.PriorityQueue[MSTuple] + + /** + * O(1) if the entry is not a candidate for the being one of the poorest n masters. + * O(log(n)) if the entry is a candidate. + * + * @param entry The candidate master-slavecount tuple. + */ + def offer(entry: MSTuple) = + if (n > underlying.length) { + underlying.enqueue(entry) + } + else if (entry._2.intValue < underlying.head._2.intValue) { + underlying.dequeue() + underlying.enqueue(entry) + } + } +} + +class CornucopiaKafkaSource extends CornucopiaGraph { + import Config.Consumer.materializer + + private type KafkaRecord = ConsumerRecord[String, String] + + private def extractKeyValue = Flow[KafkaRecord] + .map[KeyValue](record => KeyValue(record.key, record.value)) + + def run = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => + import scala.concurrent.ExecutionContext.Implicits.global + import GraphDSL.Implicits._ + + val in = Config.Consumer.cornucopiaKafkaSource + val out = Sink.ignore + + val mergeFeedback = builder.add(MergePreferred[KeyValue](2)) + + val partition = builder.add(Partition[KeyValue]( + 5, kv => partitionEvents(kv.key))) + + val kv = builder.add(extractKeyValue) + + val partitionRm = builder.add(Partition[KeyValue]( + 3, kv => partitionNodeRemoval(kv.key) + )) + + in ~> kv + kv ~> mergeFeedback.preferred + mergeFeedback.out ~> partition + partition.out(ADD_MASTER.ordinal) ~> streamAddMaster ~> mergeFeedback.in(0) + partition.out(ADD_SLAVE.ordinal) ~> streamAddSlave ~> out + partition.out(REMOVE_NODE.ordinal) ~> streamRemoveNode ~> partitionRm + partitionRm.out(REMOVE_MASTER.ordinal) ~> mergeFeedback.in(1) + partitionRm.out(REMOVE_SLAVE.ordinal) ~> streamRemoveSlave ~> out + partitionRm.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> out + partition.out(RESHARD.ordinal) ~> streamReshard ~> out + partition.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> out + + ClosedShape + }).run() + +} + +class CornucopiaActorSource extends CornucopiaGraph { + import Config.Consumer.materializer + import com.github.kliewkliew.cornucopia.actors.CornucopiaSource.Task + import scala.concurrent.ExecutionContext.Implicits.global + + protected type ActorRecord = Task + + // Add a master node to the cluster. + override def streamAddMaster(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(kv => (kv.value, kv.senderRef)) + .map(t => (createRedisUri(t._1), t._2) ) + .map(t => (getNewSaladApi.canonicalizeURI(t._1), t._2)) + .groupedWithin(1, Config.Cornucopia.batchPeriod) + .mapAsync(1)(t => { + val t1 = t.unzip + val redisURIs = t1._1 + val actorRefs = t1._2 + addNodesToCluster(redisURIs) flatMap { uris => + waitForTopologyRefresh2[Seq[RedisURI], Seq[Option[ActorRef]]](uris, actorRefs) + } + }) + .map{ case (redisURIs, actorRef) => + val ref = actorRef.head + val uri = redisURIs.head + KeyValue(RESHARD.key, "", ref, Some(uri)) + } + + // Add a slave node to the cluster, replicating the master that has the fewest slaves. + protected def streamAddSlavePrime(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(kv => (kv.value, kv.senderRef)) + .map(t => (createRedisUri(t._1), t._2) ) + .map(t => (getNewSaladApi.canonicalizeURI(t._1), t._2)) + .groupedWithin(1, Config.Cornucopia.batchPeriod) + .mapAsync(1)(t => { + val t1 = t.unzip + val redisURIs = t1._1 + val actorRefs = t1._2 + addNodesToCluster(redisURIs) flatMap { uris => + waitForTopologyRefresh2[Seq[RedisURI], Seq[Option[ActorRef]]](uris, actorRefs) + } + }) + .mapAsync(1)(t => { + val redisURIs = t._1 + val actorRefs = t._2 + findMasters(redisURIs) map { _ => + actorRefs + } + }) + .mapAsync(1)(waitForTopologyRefresh[Seq[Option[ActorRef]]]) + .mapAsync(1)(signalSlavesAdded) + .map(_ => KeyValue("", "")) + + private def signalSlavesAdded(senders: Seq[Option[ActorRef]]): Future[Unit] = { + def signal(ref: ActorRef): Future[Unit] = { + Future { + ref ! Right("slave") + } + } + val flattened = senders.flatten + if (flattened.isEmpty) Future(Unit) + else Future.reduce(senders.flatten.map(signal))((_, _) => Unit) + } + + override protected def streamReshard(implicit executionContext: ExecutionContext) = Flow[KeyValue] + .map(kv => (kv.senderRef, kv.newMasterURI)) + .throttle(1, Config.Cornucopia.minReshardWait, 1, ThrottleMode.Shaping) + .mapAsync(1)(t => { + val senderRef = t._1 + val newMasterURI = t._2 + reshardClusterPrime(senderRef, newMasterURI) + }) + .mapAsync(1)(waitForTopologyRefresh[Unit]) + .mapAsync(1)(_ => logTopology) + .map(_ => KeyValue("", "")) + + protected def reshardClusterPrime(sender: Option[ActorRef], newMasterURI: Option[RedisURI], retries: Int = 0): Future[Unit] = { + + def reshard(ref: ActorRef, uri: RedisURI): Future[Unit] = { + reshardClusterWithNewMaster(uri) map { _: Unit => + logger.info(s"Successfully resharded cluster ($retries retries), informing Kubernetes controller") + ref ! Right("master") + } recover { + case e: ReshardTableException => + logger.error(s"There was a problem computing the reshard table, retrying for retry number ${retries + 1}:", e) + reshardClusterPrime(sender, newMasterURI, retries + 1) + case ex: Throwable => + logger.error("Failed to reshard cluster, informing Kubernetes controller", ex) + ref ! Left(s"${ex.toString}") + } + } + + val result = for { + ref <- sender + uri <- newMasterURI + } yield reshard(ref, uri) + + result match { + case Some(f) => f + case None => + // this should never happen though + logger.error("There was a problem resharding the cluster: sender actor or new redis master URI missing") + Future(Unit) + } + } + + private def printReshardTable(reshardTable: Map[String, List[Int]]) = { + logger.info(s"Reshard Table:") + reshardTable foreach { case (nodeId, slots) => + logger.info(s"Migrating slots from node '$nodeId': ${slots.mkString(", ")}") + } + } + + protected def reshardClusterWithNewMaster(newMasterURI: RedisURI) + : Future[Unit] = { + // Execute futures using a thread pool so we don't run out of memory due to futures. + implicit val executionContext = Config.Consumer.actorSystem.dispatchers.lookup("akka.actor.resharding-dispatcher") + + implicit val saladAPI = getNewSaladApi + + saladAPI.masterNodes.flatMap { mn => + val masterNodes = mn.toList + + logger.debug(s"Reshard table with new master master nodes: ${masterNodes.map(_.getNodeId)}") + + val liveMasters = masterNodes.filter(_.isConnected) + + logger.debug(s"Reshard cluster with new master live masters: ${liveMasters.map(_.getNodeId)}") + + lazy val idToURI = new util.HashMap[String,RedisURI](liveMasters.length + 1, 1) + + // Re-use cluster connections so we don't exceed file-handle limit or waste resources. + lazy val clusterConnections = new util.HashMap[String,Future[SaladClusterAPI[CodecType,CodecType]]](liveMasters.length + 1, 1) + + val targetNode = masterNodes.filter(_.getUri == newMasterURI).head + + logger.debug(s"Reshard cluster with new master target node: ${targetNode.getNodeId}") + + liveMasters.map { master => + idToURI.put(master.getNodeId, master.getUri) + val connection = getConnection(master.getNodeId) + clusterConnections.put(master.getNodeId, connection) + } + + logger.debug(s"Reshard cluster with new master cluster connections for nodes: ${clusterConnections.keySet().toString}") + + val sourceNodes = masterNodes.filterNot(_ == targetNode) + + logger.debug(s"Reshard cluster with new master source nodes: ${sourceNodes.map(_.getNodeId)}") + + val reshardTable = computeReshardTable(sourceNodes) + + printReshardTable(reshardTable) + + val migrateResults = for { + (sourceNodeId, slots) <- reshardTable + slot <- slots + } yield { + migrateSlot(slot, sourceNodeId, targetNode.getNodeId, newMasterURI, liveMasters, clusterConnections) + } + + Future.fold(migrateResults)()((_, b) => b) + } + } + + protected def extractKeyValue = Flow[ActorRecord] + .map[KeyValue](record => KeyValue(record.operation, record.redisNodeIp, record.ref)) + + protected val processTask = Flow.fromGraph(GraphDSL.create() { implicit builder => + import GraphDSL.Implicits._ + + val taskSource = builder.add(Flow[Task]) + + val mergeFeedback = builder.add(MergePreferred[KeyValue](2)) + + val partition = builder.add(Partition[KeyValue]( + 5, kv => partitionEvents(kv.key))) + + val kv = builder.add(extractKeyValue) + + val partitionRm = builder.add(Partition[KeyValue]( + 3, kv => partitionNodeRemoval(kv.key) + )) + + val fanIn = builder.add(Merge[KeyValue](5)) + + taskSource.out ~> kv + kv ~> mergeFeedback.preferred + mergeFeedback.out ~> partition + partition.out(ADD_MASTER.ordinal) ~> streamAddMaster ~> mergeFeedback.in(0) + partition.out(ADD_SLAVE.ordinal) ~> streamAddSlavePrime ~> fanIn + partition.out(REMOVE_NODE.ordinal) ~> streamRemoveNode ~> partitionRm + partitionRm.out(REMOVE_MASTER.ordinal) ~> mergeFeedback.in(1) + partitionRm.out(REMOVE_SLAVE.ordinal) ~> streamRemoveSlave ~> fanIn + partitionRm.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> fanIn + partition.out(RESHARD.ordinal) ~> streamReshard ~> fanIn + partition.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> fanIn + + FlowShape(taskSource.in, fanIn.out) + }) + + protected val cornucopiaSource = Config.Consumer.cornucopiaActorSource + + def ref: ActorRef = processTask + .to(Sink.ignore) + .runWith(cornucopiaSource) + +} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/http/CornucopiaTaskMaster.scala b/src/main/scala/com/github/kliewkliew/cornucopia/http/CornucopiaTaskMaster.scala new file mode 100644 index 0000000..6c34cb2 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/http/CornucopiaTaskMaster.scala @@ -0,0 +1,32 @@ +package com.github.kliewkliew.cornucopia.http + +import akka.actor._ +import akka.util.Timeout +import com.github.kliewkliew.cornucopia.actors._ +import com.github.kliewkliew.cornucopia.graph +import com.github.kliewkliew.cornucopia.redis.Connection.{newSaladAPI, Salad} + +object CornucopiaTaskMaster { + def props(implicit timeout: Timeout) = Props(new CornucopiaTaskMaster) + + case class RestTask(operation: String, redisNodeIp: String) +} + +class CornucopiaTaskMaster(implicit timeout: Timeout) extends Actor with ActorLogging { + import CornucopiaTaskMaster._ + import CornucopiaSource._ + + implicit val newSaladAPIimpl: Salad = newSaladAPI + val ref: ActorRef = new graph.CornucopiaActorSource().ref + + def receive = { + case RestTask(operation, redisNodeIp) => + log.info(s"Received Cornucopia API task request: '$operation', '$redisNodeIp'") + sender ! Right("its all good") + ref ! Task(operation, redisNodeIp, Some(self)) + case Right(msg) => + log.info(s"Received task completion: $msg") + case Left(msg) => + log.info(s"Received task failed: $msg") + } +} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/http/EventMarshalling.scala b/src/main/scala/com/github/kliewkliew/cornucopia/http/EventMarshalling.scala new file mode 100644 index 0000000..02a96ce --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/http/EventMarshalling.scala @@ -0,0 +1,9 @@ +package com.github.kliewkliew.cornucopia.http + +import spray.json._ + +trait EventMarshalling extends DefaultJsonProtocol { + import CornucopiaTaskMaster._ + + implicit val taskFormat = jsonFormat2(RestTask) +} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/http/RestApi.scala b/src/main/scala/com/github/kliewkliew/cornucopia/http/RestApi.scala new file mode 100644 index 0000000..0e2f68f --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/http/RestApi.scala @@ -0,0 +1,59 @@ +package com.github.kliewkliew.cornucopia.http + +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import akka.actor._ +import akka.pattern.ask +import akka.util.Timeout + +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server._ + +class RestApi(system: ActorSystem, timeout: Timeout) extends RestRoutes { + implicit val requestTimeout = timeout + implicit def executionContext = system.dispatcher + + def createCornucopiaTaskMaster = system.actorOf(CornucopiaTaskMaster.props) +} + +trait RestRoutes extends CornucopiaApi with EventMarshalling { + import StatusCodes._ + import CornucopiaTaskMaster._ + + def routes: Route = taskRoute + + def taskRoute = pathPrefix("task") { + pathEndOrSingleSlash { + post { + entity(as[RestTask]) { ed => + onSuccess(submitTask(ed.operation, ed.redisNodeIp)) { + case Left(msg) => + complete(BadRequest, msg) + case Right(msg) => + complete(Accepted, msg) + } + } + } + } + } +} + +trait CornucopiaApi { + import CornucopiaTaskMaster._ + + def createCornucopiaTaskMaster(): ActorRef + + implicit def executionContext: ExecutionContext + implicit def requestTimeout: Timeout + + lazy val cornucopiaTaskMaster = createCornucopiaTaskMaster() + + def submitTask(operation: String, redisNodeIp: String) = { + cornucopiaTaskMaster.ask(RestTask(operation, redisNodeIp)).mapTo[Either[String, String]] + } +} + diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/http/Server.scala b/src/main/scala/com/github/kliewkliew/cornucopia/http/Server.scala new file mode 100644 index 0000000..632ed95 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/http/Server.scala @@ -0,0 +1,48 @@ +package com.github.kliewkliew.cornucopia.http + +import scala.concurrent.Future +import akka.actor.{Actor, ActorSystem, Props} +import akka.event.Logging +import akka.util.Timeout +import akka.http.scaladsl.Http +import akka.http.scaladsl.Http.ServerBinding +import akka.http.scaladsl.server.Directives._ +import akka.stream.ActorMaterializer +import org.slf4j.LoggerFactory +import com.typesafe.config.{ Config, ConfigFactory } + +object Server extends RequestTimeout { + val config = ConfigFactory.load() + val host = config.getString("cornucopia.http.host") + val port = config.getInt("cornucopia.http.port") + + implicit val system = ActorSystem() + implicit val ec = system.dispatcher + + val api = new RestApi(system, requestTimeout(config)).routes + + implicit val materializer = ActorMaterializer() + val bindingFuture: Future[ServerBinding] = Http().bindAndHandle(api, host, port) + + val log = Logging(system.eventStream, "cornucopia-rest-api") + + def start = { + bindingFuture.map { serverBinding => + log.info(s"RestApi bound to ${serverBinding.localAddress} ") + }.onFailure { + case ex: Exception => + log.error(ex, "Failed to bind to {}:{}!", host, port) + system.terminate() + } + } +} + +trait RequestTimeout { + import scala.concurrent.duration._ + def requestTimeout(config: Config): Timeout = { + val t = config.getString("akka.http.server.request-timeout") + val d = Duration(t) + FiniteDuration(d.length, d.unit) + } +} + diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Consumer.scala b/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Consumer.scala deleted file mode 100644 index b51103a..0000000 --- a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Consumer.scala +++ /dev/null @@ -1,478 +0,0 @@ -package com.github.kliewkliew.cornucopia.kafka - -import java.util - -import Config.Consumer.{cornucopiaSource, materializer} -import com.github.kliewkliew.cornucopia.redis.Connection._ -import akka.stream.{ClosedShape, ThrottleMode} -import akka.stream.scaladsl.{Flow, GraphDSL, MergePreferred, Partition, RunnableGraph, Sink} -import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode -import org.apache.kafka.clients.consumer.ConsumerRecord -import java.util.concurrent.atomic.AtomicInteger - -import com.github.kliewkliew.salad.SaladClusterAPI -import com.lambdaworks.redis.RedisURI -import com.lambdaworks.redis.models.role.RedisInstance.Role -import org.slf4j.LoggerFactory - -import collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.JavaConversions._ -import scala.language.implicitConversions -import scala.concurrent.{ExecutionContext, Future} - -class Consumer { - private type Record = ConsumerRecord[String, String] - private val logger = LoggerFactory.getLogger(this.getClass) - - /** - * Run the graph to process the event stream from Kafka. - * - * @return - */ - def run = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => - import GraphDSL.Implicits._ - import scala.concurrent.ExecutionContext.Implicits.global - - val in = cornucopiaSource - val out = Sink.ignore - - def partitionEvents(key: String) = key.trim.toLowerCase match { - case ADD_MASTER.key => ADD_MASTER.ordinal - case ADD_SLAVE.key => ADD_SLAVE.ordinal - case REMOVE_NODE.key => REMOVE_NODE.ordinal - case RESHARD.key => RESHARD.ordinal - case _ => UNSUPPORTED.ordinal - } - - def partitionNodeRemoval(key: String) = key.trim.toLowerCase match { - case REMOVE_MASTER.key => REMOVE_MASTER.ordinal - case REMOVE_SLAVE.key => REMOVE_SLAVE.ordinal - case UNSUPPORTED.key => UNSUPPORTED.ordinal - } - - val mergeFeedback = builder.add(MergePreferred[KeyValue](2)) - - val partition = builder.add(Partition[KeyValue]( - 5, kv => partitionEvents(kv.key))) - - val kv = builder.add(extractKeyValue) - - val partitionRm = builder.add(Partition[KeyValue]( - 3, kv => partitionNodeRemoval(kv.key) - )) - - in ~> kv - kv ~> mergeFeedback.preferred - mergeFeedback.out ~> partition - partition.out(ADD_MASTER.ordinal) ~> streamAddMaster ~> mergeFeedback.in(0) - partition.out(ADD_SLAVE.ordinal) ~> streamAddSlave ~> out - partition.out(REMOVE_NODE.ordinal) ~> streamRemoveNode ~> partitionRm - partitionRm.out(REMOVE_MASTER.ordinal) ~> mergeFeedback.in(1) - partitionRm.out(REMOVE_SLAVE.ordinal) ~> streamRemoveSlave ~> out - partitionRm.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> out - partition.out(RESHARD.ordinal) ~> streamReshard ~> out - partition.out(UNSUPPORTED.ordinal) ~> unsupportedOperation ~> out - - ClosedShape - }).run() - - /** - * Stream definitions for the graph. - */ - // Extract a tuple of the key and value from a Kafka record. - case class KeyValue(key: String, value: String) - private def extractKeyValue = Flow[Record] - .map[KeyValue](record => KeyValue(record.key, record.value)) - - // Add a master node to the cluster. - private def streamAddMaster(implicit executionContext: ExecutionContext) = Flow[KeyValue] - .map(_.value) - .map(RedisURI.create) - .map(newSaladAPI.canonicalizeURI) - .groupedWithin(100, Config.Cornucopia.batchPeriod) - .mapAsync(1)(addNodesToCluster) - .mapAsync(1)(waitForTopologyRefresh) - .map(_ => KeyValue(RESHARD.key, "")) - - // Add a slave node to the cluster, replicating the master that has the fewest slaves. - private def streamAddSlave(implicit executionContext: ExecutionContext) = Flow[KeyValue] - .map(_.value) - .map(RedisURI.create) - .map(newSaladAPI.canonicalizeURI) - .groupedWithin(100, Config.Cornucopia.batchPeriod) - .mapAsync(1)(addNodesToCluster) - .mapAsync(1)(waitForTopologyRefresh) - .mapAsync(1)(findMasters) - .mapAsync(1)(waitForTopologyRefresh) - .mapAsync(1)(_ => logTopology) - - // Emit a key-value pair indicating the node type and URI. - private def streamRemoveNode(implicit executionContext: ExecutionContext) = Flow[KeyValue] - .map(_.value) - .map(RedisURI.create) - .map(newSaladAPI.canonicalizeURI) - .mapAsync(1)(emitNodeType) - - // Remove a slave node from the cluster. - private def streamRemoveSlave(implicit executionContext: ExecutionContext) = Flow[KeyValue] - .map(_.value) - .groupedWithin(100, Config.Cornucopia.batchPeriod) - .mapAsync(1)(forgetNodes) - .mapAsync(1)(waitForTopologyRefresh) - .mapAsync(1)(_ => logTopology) - - // Redistribute the hash slots among all nodes in the cluster. - // Execute slot redistribution at most once per configured interval. - // Combine multiple requests into one request. - private def streamReshard(implicit executionContext: ExecutionContext) = Flow[KeyValue] - .map(record => Seq(record.value)) - .conflate((seq1, seq2) => seq1 ++ seq2) - .throttle(1, Config.Cornucopia.minReshardWait, 1, ThrottleMode.Shaping) - .mapAsync(1)(reshardCluster) - .mapAsync(1)(waitForTopologyRefresh) - .mapAsync(1)(_ => logTopology) - - // Throw for keys indicating unsupported operations. - private def unsupportedOperation = Flow[KeyValue] - .map(record => throw new IllegalArgumentException(s"Unsupported operation ${record.key} for ${record.value}")) - - /** - * Wait for the new cluster topology view to propagate to all nodes in the cluster. May not be strictly necessary - * since this microservice immediately attempts to notify all nodes of topology updates. - * - * @param passthrough The value that will be passed through to the next map stage. - * @param executionContext The thread dispatcher context. - * @tparam T - * @return The unmodified input value. - */ - private def waitForTopologyRefresh[T](passthrough: T)(implicit executionContext: ExecutionContext): Future[T] = Future { - scala.concurrent.blocking(Thread.sleep(Config.Cornucopia.refreshTimeout)) - passthrough - } - - /** - * Log the current view of the cluster topology. - * - * @param executionContext The thread dispatcher context. - * @return - */ - private def logTopology(implicit executionContext: ExecutionContext): Future[Unit] = { - implicit val saladAPI = newSaladAPI - saladAPI.clusterNodes.map { allNodes => - val masterNodes = allNodes.filter(Role.MASTER == _.getRole) - val slaveNodes = allNodes.filter(Role.SLAVE == _.getRole) - logger.info(s"Master nodes: $masterNodes") - logger.info(s"Slave nodes: $slaveNodes") - } - } - - /** - * The entire cluster will meet the new nodes at the given URIs. - * - * @param redisURIList The list of URI of the new nodes. - * @param executionContext The thread dispatcher context. - * @return The list of URI if the nodes were met. TODO: emit only the nodes that were successfully added. - */ - private def addNodesToCluster(redisURIList: Seq[RedisURI])(implicit executionContext: ExecutionContext): Future[Seq[RedisURI]] = { - implicit val saladAPI = newSaladAPI - saladAPI.clusterNodes.flatMap { allNodes => - val getConnectionsToLiveNodes = allNodes.filter(_.isConnected).map(node => getConnection(node.getNodeId)) - Future.sequence(getConnectionsToLiveNodes).flatMap { connections => - // Meet every new node from every old node. - val metResults = for { - conn <- connections - uri <- redisURIList - } yield { - conn.clusterMeet(uri) - } - Future.sequence(metResults).map(_ => redisURIList) - } - } - } - - /** - * Set the n new slave nodes to replicate the poorest (fewest slaves) n masters. - * - * @param redisURIList The list of ip addresses of the slaves that will be added to the cluster. Hostnames are not acceptable. - * @param executionContext The thread dispatcher context. - * @return Indicate that the n new slaves are replicating the poorest n masters. - */ - private def findMasters(redisURIList: Seq[RedisURI])(implicit executionContext: ExecutionContext): Future[Unit] = { - implicit val saladAPI = newSaladAPI - saladAPI.clusterNodes.flatMap { allNodes => - // Node ids for nodes that are currently master nodes but will become slave nodes. - val newSlaveIds = allNodes.filter(node => redisURIList.contains(node.getUri)).map(_.getNodeId) - // The master nodes (the nodes that will become slaves are still master nodes at this point and must be filtered out). - val masterNodes = saladAPI.masterNodes(allNodes) - .filterNot(node => newSlaveIds.contains(node.getNodeId)) - // HashMap of master node ids to the number of slaves for that master. - val masterSlaveCount = new util.HashMap[String, AtomicInteger](masterNodes.length + 1, 1) - // Populate the hash map. - masterNodes.map(_.getNodeId).foreach(nodeId => masterSlaveCount.put(nodeId, new AtomicInteger(0))) - allNodes.map { node => - Option.apply(node.getSlaveOf) - .map(master => masterSlaveCount.get(master).incrementAndGet()) - } - - // Find the poorest n masters for n slaves. - val poorestMasters = new MaxNHeapMasterSlaveCount(redisURIList.length) - masterSlaveCount.asScala.foreach(poorestMasters.offer) - assert(redisURIList.length >= poorestMasters.underlying.length) - - // Create a list so that we can circle back to the first element if the new slaves outnumber the existing masters. - val poorMasterList = poorestMasters.underlying.toList - val poorMasterIndex = new AtomicInteger(0) - // Choose a master for every slave. - val listFuturesResults = redisURIList.map { slaveURI => - getConnection(slaveURI).map(_.clusterReplicate( - poorMasterList(poorMasterIndex.getAndIncrement() % poorMasterList.length)._1)) - } - Future.sequence(listFuturesResults).map(x => x) - } - } - - /** - * Emit a key-value representing the node-type and the node-id. - * @param redisURI - * @param executionContext - * @return the node type and id. - */ - def emitNodeType(redisURI:RedisURI)(implicit executionContext: ExecutionContext): Future[KeyValue] = { - implicit val saladAPI = newSaladAPI - saladAPI.clusterNodes.map { allNodes => - val removalNodeOpt = allNodes.find(node => node.getUri.equals(redisURI)) - if (removalNodeOpt.isEmpty) throw new Exception(s"Node not in cluster: $redisURI") - val kv = removalNodeOpt.map { node => - node.getRole match { - case Role.MASTER => KeyValue(RESHARD.key, node.getNodeId) - case Role.SLAVE => KeyValue(REMOVE_SLAVE.key, node.getNodeId) - case _ => KeyValue(UNSUPPORTED.key, node.getNodeId) - } - } - kv.get - } - } - - /** - * Safely remove a master by redistributing its hash slots before blacklisting it from the cluster. - * The data is given time to migrate as configured in `cornucopia.grace.period`. - * - * @param withoutNodes The list of ids of the master nodes that will be removed from the cluster. - * @param executionContext The thread dispatcher context. - * @return Indicate that the hash slots were redistributed and the master removed from the cluster. - */ - private def removeMasters(withoutNodes: Seq[String])(implicit executionContext: ExecutionContext): Future[Unit] = { - val reshardDone = reshardCluster(withoutNodes).map { _ => - scala.concurrent.blocking(Thread.sleep(Config.Cornucopia.gracePeriod)) // Allow data to migrate. - } - reshardDone.flatMap(_ => forgetNodes(withoutNodes)) - } - - /** - * Notify all nodes in the cluster to forget this node. - * - * @param withoutNodes The list of ids of nodes to be forgotten by the cluster. - * @param executionContext The thread dispatcher context. - * @return A future indicating that the node was forgotten by all nodes in the cluster. - */ - def forgetNodes(withoutNodes: Seq[String])(implicit executionContext: ExecutionContext): Future[Unit] = - if (!withoutNodes.exists(_.nonEmpty)) - Future(Unit) - else { - implicit val saladAPI = newSaladAPI - saladAPI.clusterNodes.flatMap { allNodes => - logger.info(s"Forgetting nodes: $withoutNodes") - // Reset the nodes to be removed. - val validWithoutNodes = withoutNodes.filter(_.nonEmpty) - validWithoutNodes.map(getConnection).map(_.map(_.clusterReset(true))) - - // The nodes that will remain in the cluster should forget the nodes that will be removed. - val withNodes = allNodes - .filterNot(node => validWithoutNodes.contains(node.getNodeId)) // Node cannot forget itself. - - // For the cross product of `withNodes` and `withoutNodes`; to remove the nodes in `withoutNodes`. - val forgetResults = for { - operatorNode <- withNodes - operandNodeId <- validWithoutNodes - } yield { - if (operatorNode.getSlaveOf == operandNodeId) - Future(Unit) // Node cannot forget its master. - else - getConnection(operatorNode.getNodeId).flatMap(_.clusterForget(operandNodeId)) - } - Future.sequence(forgetResults).map(x => x) - } - } - - /** - * Reshard the cluster using a view of the cluster consisting of a subset of master nodes. - * - * @param withoutNodes The list of ids of nodes that will not be assigned hash slots. - * @return Boolean indicating that all hash slots were reassigned successfully. - */ - private def reshardCluster(withoutNodes: Seq[String]) - : Future[Unit] = { - // Execute futures using a thread pool so we don't run out of memory due to futures. - implicit val executionContext = Config.Consumer.actorSystem.dispatchers.lookup("akka.actor.resharding-dispatcher") - implicit val saladAPI = newSaladAPI - saladAPI.masterNodes.flatMap { masterNodes => - - val liveMasters = masterNodes.filter(_.isConnected) - lazy val idToURI = new util.HashMap[String,RedisURI](liveMasters.length + 1, 1) - // Re-use cluster connections so we don't exceed file-handle limit or waste resources. - lazy val clusterConnections = new util.HashMap[String,Future[SaladClusterAPI[CodecType,CodecType]]](liveMasters.length + 1, 1) - liveMasters.map { master => - idToURI.put(master.getNodeId, master.getUri) - clusterConnections.put(master.getNodeId, getConnection(master.getNodeId)) - } - - // Remove dead nodes. This may generate WARN logs if some nodes already forgot the dead node. - val deadMastersIds = masterNodes.filterNot(_.isConnected).map(_.getNodeId) - logger.info(s"Dead nodes: $deadMastersIds") - forgetNodes(deadMastersIds) - - // Migrate the data. - val assignableMasters = liveMasters.filterNot(masterNode => withoutNodes.contains(masterNode.getNodeId)) - logger.info(s"Resharding cluster with ${assignableMasters.map(_.getNodeId)} without ${withoutNodes ++ deadMastersIds}") - val migrateResults = liveMasters.flatMap { node => - val (sourceNodeId, slotList) = (node.getNodeId, node.getSlots.toList.map(_.toInt)) - logger.debug(s"Migrating data from $sourceNodeId among slots $slotList") - slotList.map { slot => - val destinationNodeId = slotNode(slot, assignableMasters) - migrateSlot( - slot, - sourceNodeId, destinationNodeId, idToURI.get(destinationNodeId), - assignableMasters, clusterConnections) - } - } - val finalMigrateResult = Future.sequence(migrateResults) - finalMigrateResult.onFailure { case e => logger.error(s"Failed to migrate hash slot data", e) } - finalMigrateResult.onSuccess { case _ => logger.info(s"Migrated hash slot data") } - // We attempted to migrate the data but do not prevent slot reassignment if migration fails. - // We may lose prior data but we ensure that all slots are assigned. - finalMigrateResult.onComplete { case _ => - List.range(0, 16384).map(notifySlotAssignment(_, assignableMasters)) - } - finalMigrateResult.flatMap(_ => forgetNodes(withoutNodes)) - } - } - - // TODO: pass slotNode as a lambda to migrateSlot and notifySlotAssignment. - // TODO: more efficient slot assignment to prevent data migration. - /** - * Choose a master node for a slot. - * - * @param slot The slot to be assigned. - * @param masters The list of masters that can be assigned slots. - * @return The node id of the chosen master. - */ - private def slotNode(slot: Int, masters: mutable.Buffer[RedisClusterNode]): String = - masters(slot % masters.length).getNodeId - - /** - * Migrate all keys in a slot from the source node to the destination node and update the slot assignment on the - * affected nodes. - * - * @param slot The slot to migrate. - * @param sourceNodeId The current location of the slot data. - * @param destinationNodeId The target location of the slot data. - * @param masters The list of nodes in the cluster that will be assigned hash slots. - * @param clusterConnections The list of connections to nodes in the cluster. - * @param executionContext The thread dispatcher context. - * @return Future indicating success. - */ - private def migrateSlot(slot: Int, sourceNodeId: String, destinationNodeId: String, destinationURI: RedisURI, - masters: mutable.Buffer[RedisClusterNode], - clusterConnections: util.HashMap[String,Future[SaladClusterAPI[CodecType,CodecType]]]) - (implicit saladAPI: Salad, executionContext: ExecutionContext) - : Future[Unit] = { - destinationNodeId match { - case `sourceNodeId` => - // Don't migrate if the source and destination are the same. - Future(Unit) - case _ => - for { - sourceConnection <- clusterConnections.get(sourceNodeId) - destinationConnection <- clusterConnections.get(destinationNodeId) - } yield { - // Sequentially execute the steps outline in: - // https://redis.io/commands/cluster-setslot#redis-cluster-live-resharding-explained - import com.github.kliewkliew.salad.serde.ByteArraySerdes._ - val migrationResult = - for { - _ <- destinationConnection.clusterSetSlotStable(slot).recover { case _ => Unit } - _ <- sourceConnection.clusterSetSlotStable(slot).recover { case _ => Unit } - _ <- destinationConnection.clusterSetSlotImporting(slot, sourceNodeId) - _ <- sourceConnection.clusterSetSlotMigrating(slot, destinationNodeId) - keyCount <- sourceConnection.clusterCountKeysInSlot(slot) - keyList <- sourceConnection.clusterGetKeysInSlot[CodecType](slot, keyCount.toInt) - _ <- sourceConnection.migrate[CodecType](destinationURI, keyList.toList) - _ <- sourceConnection.clusterSetSlotNode(slot, destinationNodeId) - finalResult <- destinationConnection.clusterSetSlotNode(slot, destinationNodeId) - } yield { - finalResult - } - migrationResult.onSuccess { case _ => logger.trace(s"Migrated data of slot $slot from $sourceNodeId to: $destinationNodeId at $destinationURI") } - migrationResult.onFailure { case e => logger.debug(s"Failed to migrate data of slot $slot from $sourceNodeId to: $destinationNodeId at $destinationURI", e)} - // Undocumented but necessary final steps found in http://download.redis.io/redis-stable/src/redis-trib.rb - // `recover` to perform these steps even if the previous steps failed, but don't perform these steps until the previous steps did attempt execution. - val finalMigrationResult = migrationResult.recover { case _ => Unit } - .flatMap(_ => notifySlotAssignment(slot, masters)).recover { case _ => Unit } - .flatMap(_ => sourceConnection.clusterDelSlot(slot)).recover { case _ => Unit } - .flatMap(_ => destinationConnection.clusterAddSlot(slot)) - finalMigrationResult.onSuccess { case _ => logger.trace(s"Updated slot table for slot $slot") } - finalMigrationResult.onFailure { case e => logger.debug(s"Failed to update slot table for slot $slot", e)} - finalMigrationResult.map(x => x) - } - } - } - - /** - * Notify all master nodes of a slot assignment so that they will immediately be able to redirect clients. - * - * @param masters The list of nodes in the cluster that will be assigned hash slots. - * @param executionContext The thread dispatcher context. - * @return Future indicating success. - */ - private def notifySlotAssignment(slot: Int, masters: mutable.Buffer[RedisClusterNode]) - (implicit saladAPI: Salad, executionContext: ExecutionContext) - : Future[Unit] = { - val getMasterConnections = masters.map(master => getConnection(master.getNodeId)) - Future.sequence(getMasterConnections).flatMap { masterConnections => - val notifyResults = masterConnections.map(_.clusterSetSlotNode(slot, slotNode(slot, masters))) - Future.sequence(notifyResults).map(x => x) - } - } - - /** - * Store the n poorest masters. - * Implemented on scala.mutable.PriorityQueue. - * - * @param n - */ - sealed case class MaxNHeapMasterSlaveCount(n: Int) { - private type MSTuple = (String, AtomicInteger) - private object MSOrdering extends Ordering[MSTuple] { - def compare(a: MSTuple, b: MSTuple) = a._2.intValue compare b._2.intValue - } - implicit private val ordering = MSOrdering - val underlying = new mutable.PriorityQueue[MSTuple] - - /** - * O(1) if the entry is not a candidate for the being one of the poorest n masters. - * O(log(n)) if the entry is a candidate. - * - * @param entry The candidate master-slavecount tuple. - */ - def offer(entry: MSTuple) = - if (n > underlying.length) { - underlying.enqueue(entry) - } - else if (entry._2.intValue < underlying.head._2.intValue) { - underlying.dequeue() - underlying.enqueue(entry) - } - } - -} diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/redis/Connection.scala b/src/main/scala/com/github/kliewkliew/cornucopia/redis/Connection.scala index 0c089db..71ca46f 100644 --- a/src/main/scala/com/github/kliewkliew/cornucopia/redis/Connection.scala +++ b/src/main/scala/com/github/kliewkliew/cornucopia/redis/Connection.scala @@ -20,6 +20,8 @@ object Connection { private val redisClusterPort = redisClusterConfig.getInt("seed.server.port") private val nodes = List(RedisURI.create(redisClusterSeedServer, redisClusterPort)) + LoggerFactory.getLogger(this.getClass).debug(s"Cluster seed server: '${nodes}'") + /** * Create a new API connection - new connections are necessary to refresh the view of the cluster topology * after adding or removing a node. diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Operation.scala b/src/main/scala/com/github/kliewkliew/cornucopia/redis/Operation.scala similarity index 94% rename from src/main/scala/com/github/kliewkliew/cornucopia/kafka/Operation.scala rename to src/main/scala/com/github/kliewkliew/cornucopia/redis/Operation.scala index e496ebf..01aa89e 100644 --- a/src/main/scala/com/github/kliewkliew/cornucopia/kafka/Operation.scala +++ b/src/main/scala/com/github/kliewkliew/cornucopia/redis/Operation.scala @@ -1,4 +1,4 @@ -package com.github.kliewkliew.cornucopia.kafka +package com.github.kliewkliew.cornucopia.redis trait Operation { def key: String diff --git a/src/main/scala/com/github/kliewkliew/cornucopia/redis/ReshardTable.scala b/src/main/scala/com/github/kliewkliew/cornucopia/redis/ReshardTable.scala new file mode 100644 index 0000000..e140633 --- /dev/null +++ b/src/main/scala/com/github/kliewkliew/cornucopia/redis/ReshardTable.scala @@ -0,0 +1,68 @@ +package com.github.kliewkliew.cornucopia.redis + +import org.slf4j.LoggerFactory +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode + +object ReshardTable { + + type NodeId = String + type Slot = Int + type ReshardTableType = scala.collection.immutable.Map[NodeId, List[Slot]] + + case class ReshardTableException(private val message: String = "", private val cause: Throwable = None.orNull) + extends Exception(message, cause) + + case class LogicalNode(node: RedisClusterNode, slots: List[Int]) + + private val logger = LoggerFactory.getLogger(this.getClass) + + def computeReshardTable(sourceNodes: List[RedisClusterNode]) + (implicit ExpectedTotalNumberSlots: Int): ReshardTableType = { + import scala.collection.JavaConverters._ + + val logicalNodes = sourceNodes.map { n => + val slots = n.getSlots.asScala.toList.map(_.toInt) + LogicalNode(n, slots) + } + + val sortedSources = logicalNodes.sorted(Ordering.by((_: LogicalNode).slots.size).reverse) + + printSortedSources(sortedSources) + + val totalSourceSlots = sortedSources.foldLeft(0)((sum, n) => sum + n.slots.size) + + logger.debug(s"Reshard table total sources: $totalSourceSlots") + + if (totalSourceSlots != ExpectedTotalNumberSlots) { + throw ReshardTableException(s"Reshard table total source slots is $totalSourceSlots, but is not equal to expected number $ExpectedTotalNumberSlots") + } + + val numSlots = totalSourceSlots / (logicalNodes.size + 1) // total number of slots to move to target + + logger.debug(s"Reshard table total number of slots to move to target: $numSlots") + + def computeNumSlots(i: Int, source: LogicalNode): Int = { + if (i == 0) Math.ceil((numSlots.toFloat / totalSourceSlots) * source.slots.size).toInt + else Math.floor((numSlots.toFloat / totalSourceSlots) * source.slots.size).toInt + } + + val reshardTable: ReshardTableType = Map.empty[NodeId, List[Slot]] + + val table = sortedSources.zipWithIndex.foldLeft(reshardTable) { case (tbl, (source, i)) => + val sortedSlots = source.slots.sorted + val n = computeNumSlots(i, source) + val slots = sortedSlots.take(n) + val nodeId = source.node.getNodeId + logger.debug(s"Reshard table adding $n slots from $nodeId to move to target") + tbl + (nodeId -> slots) + } + + table + } + + private def printSortedSources(sources: List[LogicalNode]): Unit = { + logger.debug(s"Reshard table sorted source slots:") + sources.foreach(n => logger.debug(s"${n.node.getNodeId} has ${n.slots.size} slots: ${n.slots}")) + } + +} diff --git a/src/test/scala/com/github/kliewkliew/cornucopia/LibraryTest.scala b/src/test/scala/com/github/kliewkliew/cornucopia/LibraryTest.scala new file mode 100644 index 0000000..cf39ae9 --- /dev/null +++ b/src/test/scala/com/github/kliewkliew/cornucopia/LibraryTest.scala @@ -0,0 +1,131 @@ +package com.github.kliewkliew.cornucopia + +import com.github.kliewkliew.cornucopia.redis._ +import com.lambdaworks.redis.RedisURI +//import com.github.kliewkliew.cornucopia._ +import com.github.kliewkliew.cornucopia.graph._ +import com.github.kliewkliew.cornucopia.redis.Connection.Salad +import akka.testkit.{TestActorRef, TestKit, TestProbe} +import akka.actor.{ActorSystem, ActorRef} +import akka.pattern.ask +//import akka.actor.Status.Failure +import akka.stream.ActorMaterializer +import akka.util.Timeout +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, MustMatchers, WordSpecLike} +import org.mockito.Mockito._ +import org.scalatest.mockito.MockitoSugar._ +import org.mockito.Matchers._ +import com.github.kliewkliew.cornucopia.graph._ +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{ Success, Failure } + +class LibraryTest extends TestKit(ActorSystem("LibraryTest")) + with WordSpecLike with BeforeAndAfterAll with MustMatchers with MockitoSugar { + + trait FakeCornucopiaActorSourceGraph { + import Config.Consumer.materializer + + // hostname IP address must be semantically correct, java.net actually checks for RFC conformance + val redisUri = "redis://192.168.0.1" + + val fakeSalad = mock[Salad] + when(fakeSalad.canonicalizeURI(anyObject())).thenReturn(RedisURI.create(redisUri)) + + class CornucopiaActorSourceLocal extends CornucopiaActorSource { + lazy val probe = TestProbe() + + override def getNewSaladApi: Salad = fakeSalad + + override def streamAddSlave(implicit executionContext: ExecutionContext) = + Flow[KeyValue].map(_ => KeyValue("test", "")) + + override def streamRemoveNode(implicit executionContext: ExecutionContext) = + Flow[KeyValue].map(_ => KeyValue("test", "")) + + override def streamRemoveSlave(implicit executionContext: ExecutionContext) = + Flow[KeyValue].map(_ => KeyValue("test", "")) + + override protected def waitForTopologyRefresh[T](passthrough: T) + (implicit executionContext: ExecutionContext): Future[T] = Future { + passthrough + } + + override protected def waitForTopologyRefresh2[T, U](passthrough1: T, passthrough2: U) + (implicit executionContext: ExecutionContext): Future[(T, U)] = Future { + (passthrough1, passthrough2) + } + + override protected def logTopology(implicit executionContext: ExecutionContext): Future[Unit] = Future(Unit) + + override protected def reshardCluster(withoutNodes: Seq[String]): Future[Unit] = Future(Unit) + + override protected def addNodesToCluster(redisURIList: Seq[RedisURI], retries: Int = 0) + (implicit executionContext: ExecutionContext): Future[Seq[RedisURI]] = { + Future(redisURIList) + } + + override protected def findMasters(redisURIList: Seq[RedisURI]) + (implicit executionContext: ExecutionContext): Future[Unit] = Future(Unit) + + override protected def reshardClusterWithNewMaster(newMasterURI: RedisURI): Future[Unit] = Future(Unit) + + } + + } + + implicit val ec = system.dispatcher + + override def afterAll(): Unit = { + system.terminate() + } + + "Add master" must { + "add new master and reshard cluster" in new FakeCornucopiaActorSourceGraph { + import Library.source._ + + val cornucopiaActorSourceLocal = new CornucopiaActorSourceLocal + + private val ref = cornucopiaActorSourceLocal.ref + + implicit val timeout = Timeout(5 seconds) + + val future = ask(ref, Task("+master", redisUri)) + + future.onComplete { + case Failure(_) => assert(false) + case Success(msg) => + assert(msg == Right("master")) + } + + Await.ready(future, timeout.duration) + } + } + + "Add slave" must { + "add new slave and find masters" in new FakeCornucopiaActorSourceGraph { + import Library.source._ + + val cornucopiaActorSourceLocal = new CornucopiaActorSourceLocal + + private val ref = cornucopiaActorSourceLocal.ref + + implicit val timeout = Timeout(5 seconds) + + val future = ask(ref, Task("+slave", redisUri)) + + future.onComplete { + case Failure(_) => assert(false) + case Success(msg) => + assert(msg == Right("slave")) + } + + Await.ready(future, timeout.duration) + } + } + +} diff --git a/src/test/scala/com/github/kliewkliew/cornucopia/ReshardTest.scala b/src/test/scala/com/github/kliewkliew/cornucopia/ReshardTest.scala new file mode 100644 index 0000000..852e3c6 --- /dev/null +++ b/src/test/scala/com/github/kliewkliew/cornucopia/ReshardTest.scala @@ -0,0 +1,118 @@ +package com.github.kliewkliew.cornucopia + +import com.github.kliewkliew.cornucopia.redis._ +import com.lambdaworks.redis.RedisURI +//import com.github.kliewkliew.cornucopia._ +import com.github.kliewkliew.cornucopia.graph._ +import com.github.kliewkliew.cornucopia.redis.Connection.Salad +import akka.testkit.{TestActorRef, TestKit, TestProbe} +import akka.actor.{ActorSystem, ActorRef} +import akka.pattern.ask +//import akka.actor.Status.Failure +import akka.stream.ActorMaterializer +import akka.util.Timeout +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, MustMatchers, WordSpecLike} +import org.mockito.Mockito._ +import org.scalatest.mockito.MockitoSugar._ +import org.mockito.Matchers._ +import com.github.kliewkliew.cornucopia.graph._ +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{ Success, Failure } +import redis.Connection.{newSaladAPI, Salad} +import redis.ReshardTable +import redis.ReshardTable._ +import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode + +class ReshardTest extends TestKit(ActorSystem("ReshardTest")) + with WordSpecLike with BeforeAndAfterAll with MustMatchers with MockitoSugar { + + trait ReshardTableTest { + val one: java.util.List[Integer] = new java.util.ArrayList[Integer](java.util.Arrays.asList[Integer](1,2,3,4,5,6)) + val two: java.util.List[Integer] = new java.util.ArrayList[Integer](java.util.Arrays.asList[Integer](7,8,9,10,11,12)) + val three: java.util.List[Integer] = new java.util.ArrayList[Integer](java.util.Arrays.asList[Integer](13,14,15,16,17)) + + class RedisClusterNodeTest(private val slots: java.util.List[Integer], private val nodeId: String) extends RedisClusterNode { + override def getSlots: java.util.List[Integer] = slots + override def getNodeId: String = nodeId + } + + val node1 = new RedisClusterNodeTest(one, "a") + val node2 = new RedisClusterNodeTest(two, "b") + val node3 = new RedisClusterNodeTest(three, "c") + + val clusterNodes = List(node1, node2, node3) + + val expectedReshardTable: ReshardTableType = Map( + "a" -> List(1,2), + "b" -> List(7), + "c" -> List(13) + ) + + } + + trait ReshardDebug { + val redisUri = "redis://127.0.0.1" + implicit val newSaladAPIimpl: Salad = newSaladAPI + } + + implicit val ec = system.dispatcher + + override def afterAll(): Unit = { + system.terminate() + } + + "Reshard cluster with new master" must { + "calculate reshard table correctly" in new ReshardTableTest { + + final implicit val ExpectedTotalNumberSlots: Int = 17 + val reshardTable: ReshardTableType = computeReshardTable(clusterNodes) + + assert(reshardTable == expectedReshardTable) + } + } + + "Reshard cluster with new master" must { + "throw a ReshardTableException if the expected total number of slots does not match the actual number of slots" in new ReshardTableTest { + + final implicit val ExpectedTotalNumberSlots: Int = 1234 + + try { + val reshardTable: ReshardTableType = computeReshardTable(clusterNodes) + assert(false) + } catch { + case e: ReshardTableException => assert(true) + case _ => assert(false) + } + + } + } + + "Debugging" must { + "be fun" ignore new ReshardDebug { + import Library.source._ + + val cornucopiaActorSource = new CornucopiaActorSource + + private val ref = cornucopiaActorSource.ref + + implicit val timeout = Timeout(6 seconds) + + val future = ask(ref, Task("+master", redisUri)) + + future.onComplete { + case Failure(_) => assert(false) + case Success(msg) => + assert(msg == Right("master")) + } + + Await.ready(future, timeout.duration) + } + } + +} +