Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,7 @@ object ProtoConverters {
case AggregationOperator.BottomK => GrpcMultiPartitionQueryService.AggregationOperator.BOTTOM_K
case AggregationOperator.Min => GrpcMultiPartitionQueryService.AggregationOperator.MIN
case AggregationOperator.Avg => GrpcMultiPartitionQueryService.AggregationOperator.AVG
case AggregationOperator.HAvg => GrpcMultiPartitionQueryService.AggregationOperator.HAVG
case AggregationOperator.Sum => GrpcMultiPartitionQueryService.AggregationOperator.SUM
case AggregationOperator.Stddev => GrpcMultiPartitionQueryService.AggregationOperator.STDDEV
case AggregationOperator.Stdvar => GrpcMultiPartitionQueryService.AggregationOperator.STDVAR
Expand All @@ -1448,6 +1449,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.AggregationOperator.BOTTOM_K => AggregationOperator.BottomK
case GrpcMultiPartitionQueryService.AggregationOperator.MIN => AggregationOperator.Min
case GrpcMultiPartitionQueryService.AggregationOperator.AVG => AggregationOperator.Avg
case GrpcMultiPartitionQueryService.AggregationOperator.HAVG => AggregationOperator.HAvg
case GrpcMultiPartitionQueryService.AggregationOperator.SUM => AggregationOperator.Sum
case GrpcMultiPartitionQueryService.AggregationOperator.STDDEV => AggregationOperator.Stddev
case GrpcMultiPartitionQueryService.AggregationOperator.STDVAR => AggregationOperator.Stdvar
Expand Down
45 changes: 44 additions & 1 deletion core/src/main/scala/filodb.core/query/TransientRow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ final class NaNRowReader(var timestamp: Long) extends RowReader {
* IMPORTANT: It is mutable for memory efficiency purposes. Consumers from
* iterators should be aware of the semantics of ability to save the next() value.
*/
final class TransientRow(var timestamp: Long, var value: Double) extends MutableRowReader {
class TransientRow(var timestamp: Long, var value: Double) extends MutableRowReader {
def this() = this(0L, 0d)

def setValues(ts: Long, valu: Double): Unit = {
Expand Down Expand Up @@ -194,6 +194,49 @@ final class AvgAggTransientRow extends MutableRowReader {
}
}

/**
* Mutable RowReader for transporting histogram average results.
* Contains three columns: Timestamp, Mean, and Count.
* Timestamp & Mean are part of the superclass, and count is added here.
*/
final class HistAvgAggTransientRow extends TransientRow {
var count: Double = 0d

override def setLong(columnNo: Int, valu: Long): Unit =
if (columnNo == 0) timestamp = valu
else throw new IllegalArgumentException()

override def setDouble(columnNo: Int, valu: Double): Unit =
if (columnNo == 1) super.setDouble(columnNo, valu)
else if (columnNo == 2) count = valu
else throw new IllegalArgumentException()

override def setString(columnNo: Int, value: ZeroCopyUTF8String): Unit = throw new IllegalArgumentException()
override def setBlob(columnNo: Int, base: Array[Byte], offset: Int, length: Int): Unit =
throw new IllegalArgumentException()
override def notNull(columnNo: Int): Boolean = columnNo < 3
override def getBoolean(columnNo: Int): Boolean = throw new IllegalArgumentException()
override def getInt(columnNo: Int): Int = throw new IllegalArgumentException()
override def getLong(columnNo: Int): Long = if (columnNo == 0) timestamp
else throw new IllegalArgumentException()
override def getDouble(columnNo: Int): Double = if (columnNo == 1) super.getDouble(columnNo)
else if (columnNo == 2) count
else throw new IllegalArgumentException()
override def getFloat(columnNo: Int): Float = throw new IllegalArgumentException()
override def getString(columnNo: Int): String = throw new IllegalArgumentException()
override def getAny(columnNo: Int): Any = throw new IllegalArgumentException()
override def getBlobBase(columnNo: Int): Any = throw new IllegalArgumentException()
override def getBlobOffset(columnNo: Int): Long = throw new IllegalArgumentException()
override def getBlobNumBytes(columnNo: Int): Int = throw new IllegalArgumentException()

override def copyFrom(r: RowReader): Unit = r match {
case k: HistAvgAggTransientRow =>
super.copyFrom(r)
count = k.count
case _ => throw new IllegalArgumentException("Unknown Row reader")
}
}

/**
* Serves for stdvar and stddev
* stdVal represents either stdvar or stddev
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ enum AggregationOperator {
STDVAR = 9;
QUANTILE = 10;
MAX = 11;
HAVG = 12;
}

message AggregateParameter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {

final val numShards = 8
final val numSamplesPerTs = 720 // 2 hours * 3600 / 10 sec interval
final val numSeries = 100
final val numSeries = 1000
final val numQueries = 50
final val numBuckets = 160
val spread = 3
Expand Down Expand Up @@ -126,6 +126,17 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
val queryCommands = logicalPlans.map { plan =>
LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000))
}

val histAvgQueryNew = """havg(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m]))"""
val histAvgQueryOld = """sum(rate(http_request_latency_delta::sum{_ws_="demo", _ns_="App-0"}[5m])) /
|sum(rate(http_request_latency_delta::count{_ws_="demo", _ns_="App-0"}[5m]))""".stripMargin
val hAvgLPOld = LogicalPlan2Query(
dataset.ref, Parser.queryRangeToLogicalPlan(histAvgQueryOld, qParams),
QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000))
val hAvgLPNew = LogicalPlan2Query(
dataset.ref, Parser.queryRangeToLogicalPlan(histAvgQueryNew, qParams),
QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000))

println(s"Querying data from $queryStartTime to $queryEndTime")

var queriesSucceeded = 0
Expand Down Expand Up @@ -157,4 +168,40 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
}
Await.result(Future.sequence(futures), 60.seconds)
}

@Benchmark
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@OperationsPerInvocation(numQueries)
def histAvgOldQueries(): Unit = {
val futures = (0 until numQueries).map { n =>
val qCmd = hAvgLPOld
val time = System.currentTimeMillis
val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time)))
f.foreach {
case q: QueryResult2 => if (q.result.nonEmpty) queriesSucceeded += 1 else queryZeroResults += 1
case e: QError => queriesFailed += 1
}
f
}
Await.result(Future.sequence(futures), 60.seconds)
}

@Benchmark
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@OperationsPerInvocation(numQueries)
def histAvgNewQueries(): Unit = {
val futures = (0 until numQueries).map { n =>
val qCmd = hAvgLPNew
val time = System.currentTimeMillis
val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time)))
f.foreach {
case q: QueryResult2 => if (q.result.nonEmpty) queriesSucceeded += 1 else queryZeroResults += 1
case e: QError => queriesFailed += 1
}
f
}
Await.result(Future.sequence(futures), 60.seconds)
}
}
1 change: 1 addition & 0 deletions prometheus/src/main/java/filodb/prometheus/antlr/PromQL.g4
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ AGGREGATION_OP
| M I N
| M A X
| A V G
| H A V G
| G R O U P
| S T D D E V
| S T D V A R
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated from PromQL.g4 by ANTLR 4.9.1
// Generated from filodb/prometheus/antlr/PromQL.g4 by ANTLR 4.9.1
package filodb.prometheus.antlr;
import org.antlr.v4.runtime.tree.AbstractParseTreeVisitor;

Expand Down

Large diffs are not rendered by default.

Loading