Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
31654a4
WIP: Refactored graph, and it compiles
sjking Apr 23, 2017
0c2a572
WIP: expose source actor in library object
sjking Apr 24, 2017
2a3cc4c
WIP: trying stuff
sjking Apr 26, 2017
efc8775
version 0.4-SNAPSHOT
sjking Apr 26, 2017
9a2698e
WIP: fixing the graph for actor source
sjking Apr 27, 2017
2b5865e
Add test for adding a new master using actor publisher
sjking Apr 27, 2017
2f694dc
Provide explicit type annotations for stream processing topology refresh
sjking Apr 27, 2017
76364cc
Version 0.6-SNAPSHOT
sjking Apr 27, 2017
c791048
Update test to use proper redis uri
sjking Apr 27, 2017
6d7c0ff
WIP: implementing the library interface
sjking Apr 27, 2017
afd0c9d
Pass down the sender ActorRef through the graph so we can respond aft…
sjking May 6, 2017
2c8c665
Version 0.7-SNAPSHOT
sjking May 7, 2017
62d4de4
Stub logTopology in test
sjking May 8, 2017
85a347a
Pass back an Either from reshard cluster
sjking May 9, 2017
32ab91f
We need to add the new master node to the cluster
sjking May 11, 2017
f1c8718
Send message to sender actor after adding a slave
sjking May 11, 2017
2d59d4d
Version 0.9-SNAPSHOT
sjking May 11, 2017
749865f
Add more logging
sjking May 12, 2017
9e9b9d7
Send the node type back to client
sjking May 12, 2017
f52d9be
Refactor the migrateSlot function
sjking May 14, 2017
3dc2e86
Rework resharding function
sjking May 15, 2017
05fbb94
Just use mutable Buffer
sjking May 15, 2017
1051452
Print stack trace
sjking May 15, 2017
428f571
try this
sjking May 15, 2017
bcc6dda
version bump
sjking May 15, 2017
05cdf44
Try some funky conversion from Java to Scala
sjking May 15, 2017
b14c799
Fix map problem
sjking May 15, 2017
e455083
Move reshard table function into its own helper object
sjking May 15, 2017
cc3c89f
bump
sjking May 15, 2017
1219a0e
Add an http server to accept Cornucopia Tasks
sjking May 15, 2017
36d1f77
Implement the API interface to the graph, and resharding works locall…
sjking May 15, 2017
eb09430
Version bump 0.20-SNAPSHOT
sjking May 16, 2017
d9b4a8a
Handle different types of known migration errors
sjking May 16, 2017
f169091
Docker stuff
sjking Mar 10, 2017
49f8085
Add Docker deployment configuration to build file
sjking May 16, 2017
be7e737
bump
sjking May 16, 2017
d78343f
Use a helper function to retrieve new Salad API connections, which is…
sjking May 17, 2017
4ee8f2a
bump
sjking May 17, 2017
86e5a66
Add debug logging to Reshard Table compute function
sjking May 17, 2017
e057df8
Add log4j.properties file as command-line parameter for Docker contai…
sjking May 17, 2017
bd9c435
Add log messages during slot migration and reshard with new master fu…
sjking May 17, 2017
d1ab635
Throw a ReshardTableException if the reshard table does not compute t…
sjking May 18, 2017
13b6f05
Bump
sjking May 18, 2017
42d5ffb
Keep retrying to add nodes to cluster recursively if we fail to make …
sjking May 18, 2017
dfe96bb
Bump version
sjking May 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,56 @@
name := "cornucopia"
organization := "com.github.kliewkliew"

version := "1.1.2"
//version := "1.1.2"
version := "0.28-SNAPSHOT"

scalaVersion := "2.11.8"

enablePlugins(JavaAppPackaging, DockerPlugin)

resolvers += "Sonatype Releases" at "https://oss.sonatype.org/service/repositories/releases/"
resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/"

val akkaVersion = "2.4.17"

val testDependencies = Seq(
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"org.mockito" % "mockito-all" % "1.10.19" % Test
)

libraryDependencies ++= Seq(
"biz.paluch.redis" % "lettuce" % "5.0.0.Beta1",
"org.scala-lang.modules" % "scala-java8-compat_2.11" % "0.8.0",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.11-RC1",
"com.github.kliewkliew" %% "salad" % "0.11.01",
"com.typesafe.akka" %% "akka-http-core" % "2.4.11",
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.11",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.11",
// "com.github.kliewkliew" %% "salad" % "0.11.01",
"com.adenda" %% "salad" % "0.11.03",
"org.slf4j" % "slf4j-log4j12" % "1.7.22"
) ++ testDependencies

// ------------------------------------------------ //
// ------------- Docker configuration ------------- //
// ------------------------------------------------ //
import NativePackagerHelper._

mappings in Universal ++= directory( baseDirectory.value / "src" / "main" / "resources" )

javaOptions in Universal ++= Seq(
"-Dconfig.file=etc/container.conf",
"-Dlog4j.configuration=file:/usr/local/etc/log4j.properties"
)

packageName in Docker := packageName.value

version in Docker := version.value

dockerBaseImage := "openjdk"

dockerRepository := Some("gcr.io/adenda-server-mongodb")

defaultLinuxInstallLocation in Docker := "/usr/local"

daemonUser in Docker := "root"
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.typesafe.sbt" %% "sbt-native-packager" % "1.0.4")
11 changes: 11 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ cornucopia {

// Time (seconds) to wait for batches to accumulate before executing a job.
batch.period = 5

// either `kafka` or `actor`
source = "actor"

// microservice type: `kafka` or `http`
microservice.type = "http"

http {
host = "localhost"
port = "9001"
}
}

kafka {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.kliewkliew.cornucopia.kafka
package com.github.kliewkliew.cornucopia

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Producer, Consumer => ConsumerDSL}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.github.kliewkliew.cornucopia.actors.CornucopiaSource
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.slf4j.LoggerFactory
Expand All @@ -17,6 +19,7 @@ object Config {
val gracePeriod = config.getInt("grace.period") * 1000
val refreshTimeout = config.getInt("refresh.timeout") * 1000
val batchPeriod = config.getInt("batch.period").seconds
val source = config.getString("source")
}

object Consumer {
Expand All @@ -43,8 +46,19 @@ object Config {
.withBootstrapServers(kafkaServers)

implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

val cornucopiaActorProps = CornucopiaSource.props
val cornucopiaActorSource = Source.actorPublisher[CornucopiaSource.Task](cornucopiaActorProps)

val cornucopiaKafkaSource = ConsumerDSL.plainSource(sourceSettings, subscription)

val cornucopiaSource = ConsumerDSL.plainSource(sourceSettings, subscription)

val cornucopiaSink = Producer.plainSink(sinkSettings)
}

object ReshardTableConfig {
final implicit val ExpectedTotalNumberSlots: Int = 16384
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.kliewkliew.cornucopia

object CornucopiaException {
sealed trait CornucopiaException {
self: Throwable =>
val message: String
val reason: Throwable
}

case class CornucopiaRedisConnectionException(message: String, reason: Throwable = None.orNull)
extends Exception(message, reason) with CornucopiaException
}

9 changes: 9 additions & 0 deletions src/main/scala/com/github/kliewkliew/cornucopia/Library.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.github.kliewkliew.cornucopia

import actors.CornucopiaSource
import akka.actor._

object Library {
val ref: ActorRef = new graph.CornucopiaActorSource().ref
val source = CornucopiaSource
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
package com.github.kliewkliew.cornucopia

import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import com.github.kliewkliew.cornucopia.http.Server

object Microservice {
def main(args: Array[String]): Unit = {
new kafka.Consumer().run
val config = ConfigFactory.load().getConfig("cornucopia")
val microserviceType = config.getString("microservice.type")
val logger = LoggerFactory.getLogger(this.getClass)

microserviceType match {
case "kafka" =>
logger.info("Using kafka as the microservice source")
new graph.CornucopiaKafkaSource().run
case "http" =>
logger.info("Using http server as the microservice source")
Server.start
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.kliewkliew.cornucopia.actors

import akka.actor._
import akka.stream.actor.ActorPublisher

import scala.annotation.tailrec

/**
* Copied liberally from akka documentaion on
* [Stream integrations](http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-integrations.html).
*/
object CornucopiaSource {
def props: Props = Props[CornucopiaSource]

final case class Task(operation: String, redisNodeIp: String, ref: Option[ActorRef] = None)
case object TaskAccepted
case object TaskDenied
}

class CornucopiaSource extends ActorPublisher[CornucopiaSource.Task] {
import CornucopiaSource._
import akka.stream.actor.ActorPublisherMessage._

val MaxBufferSize = 100
var buf = Vector.empty[Task]

override def receive = {
case _: Task if buf.size == MaxBufferSize =>
sender() ! TaskDenied
case task: Task =>
if (buf.isEmpty && totalDemand > 0) {
val task2 = task.copy(ref = Some(sender))
onNext(task2)
}
else {
val task2 = task.copy(ref = Some(sender))
buf :+= task2
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}

@tailrec final def deliverBuf(): Unit = {
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach(task => onNext(task))
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach(task => onNext(task))
deliverBuf()
}
}
}

}
Loading