Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ static double[] createItemsArray(final double item, final long weight) {
* The following code is only valid in the special case of exactly reaching capacity while updating.
* It cannot be used while merging, while reducing k, or anything else.
* @param dblSk the current KllDoublesSketch
* @param random the random number generator
*/
static void compressWhileUpdatingSketch(final KllDoublesSketch dblSk) {
static void compressWhileUpdatingSketch(final KllDoublesSketch dblSk, final Random random) {
final int level =
findLevelToCompact(dblSk.getK(), dblSk.getM(), dblSk.getNumLevels(), dblSk.levelsArr);
if (level == (dblSk.getNumLevels() - 1)) {
Expand Down Expand Up @@ -83,9 +84,9 @@ static void compressWhileUpdatingSketch(final KllDoublesSketch dblSk) {
Arrays.sort(myDoubleItemsArr, adjBeg, adjBeg + adjPop);
}
if (popAbove == 0) {
KllDoublesHelper.randomlyHalveUpDoubles(myDoubleItemsArr, adjBeg, adjPop, KllSketch.random);
KllDoublesHelper.randomlyHalveUpDoubles(myDoubleItemsArr, adjBeg, adjPop, random);
} else {
KllDoublesHelper.randomlyHalveDownDoubles(myDoubleItemsArr, adjBeg, adjPop, KllSketch.random);
KllDoublesHelper.randomlyHalveDownDoubles(myDoubleItemsArr, adjBeg, adjPop, random);
KllDoublesHelper.mergeSortedDoubleArrays(
myDoubleItemsArr, adjBeg, halfAdjPop,
myDoubleItemsArr, rawEnd, popAbove,
Expand Down Expand Up @@ -119,7 +120,7 @@ static void compressWhileUpdatingSketch(final KllDoublesSketch dblSk) {
}

//assumes readOnly = false and UPDATABLE, called from KllDoublesSketch::merge
static void mergeDoubleImpl(final KllDoublesSketch mySketch, final KllDoublesSketch otherDblSk) {
static void mergeDoubleImpl(final KllDoublesSketch mySketch, final KllDoublesSketch otherDblSk, final Random random) {
if (otherDblSk.isEmpty()) { return; }

//capture my key mutable fields before doing any merging
Expand All @@ -136,12 +137,12 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch, final KllDoublesSke

//MERGE: update this sketch with level0 items from the other sketch
if (otherDblSk.isCompactSingleItem()) {
KllDoublesSketch.updateDouble(mySketch, otherDblSk.getDoubleSingleItem());
KllDoublesSketch.updateDouble(mySketch, otherDblSk.getDoubleSingleItem(), random);
otherDoubleItemsArr = new double[0];
} else {
otherDoubleItemsArr = otherDblSk.getDoubleItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
KllDoublesSketch.updateDouble(mySketch, otherDoubleItemsArr[i]);
KllDoublesSketch.updateDouble(mySketch, otherDoubleItemsArr[i], random);
}
}

Expand Down Expand Up @@ -173,7 +174,7 @@ static void mergeDoubleImpl(final KllDoublesSketch mySketch, final KllDoublesSke

// notice that workbuf is being used as both the input and output
final int[] result = generalDoublesCompress(mySketch.getK(), mySketch.getM(), provisionalNumLevels,
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), KllSketch.random);
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), random);
final int targetItemCount = result[1]; //was finalCapacity. Max size given k, m, numLevels
final int curItemCount = result[2]; //was finalPop

Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.foreign.MemorySegment;
import java.util.Arrays;
import java.util.Objects;
import java.util.Random;

import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.MemorySegmentRequest;
Expand Down Expand Up @@ -272,12 +273,12 @@ public QuantilesDoublesSketchIteratorAPI iterator() {
}

@Override
public final void merge(final KllSketch other) {
public final void merge(final KllSketch other, final Random random) {
if (readOnly || (sketchStructure != UPDATABLE)) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
final KllDoublesSketch othDblSk = (KllDoublesSketch)other;
if (othDblSk.isEmpty()) { return; }
KllDoublesHelper.mergeDoubleImpl(this, othDblSk);
KllDoublesHelper.mergeDoubleImpl(this, othDblSk, random);
doublesSV = null;
}

Expand Down Expand Up @@ -323,17 +324,17 @@ public String toString(final boolean withLevels, final boolean withLevelsAndItem
public void update(final double item) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
updateDouble(this, item);
updateDouble(this, item, random);
doublesSV = null;
}

//Also Called from KllDoublesHelper::merge
static void updateDouble(final KllDoublesSketch dblSk, final double item) {
static void updateDouble(final KllDoublesSketch dblSk, final double item, final Random random) {
dblSk.updateMinMax(item);
int freeSpace = dblSk.levelsArr[0];
assert (freeSpace >= 0);
if (freeSpace == 0) {
KllDoublesHelper.compressWhileUpdatingSketch(dblSk);
KllDoublesHelper.compressWhileUpdatingSketch(dblSk, random);
freeSpace = dblSk.levelsArr[0];
assert (freeSpace > 0);
}
Expand Down Expand Up @@ -369,9 +370,9 @@ public void update(final double item, final long weight) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { updateDouble(this, item); }
if (weight == 1L) { updateDouble(this, item, random); }
else if (weight < levelsArr[0]) {
for (int i = 0; i < (int)weight; i++) { updateDouble(this, item); }
for (int i = 0; i < (int)weight; i++) { updateDouble(this, item, random); }
} else {
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(getK(), DEFAULT_M, item, weight);
merge(tmpSk);
Expand Down Expand Up @@ -403,7 +404,7 @@ public void update(final double[] items, final int offset, final int length) {
for (int i = offset; i < end; i++) {
final double v = items[i];
if (!Double.isNaN(v)) {
updateDouble(this, v); //normal path
updateDouble(this, v, random); //normal path
doublesSV = null;
}
}
Expand All @@ -419,7 +420,7 @@ private void updateDouble(final double[] srcItems, final int srcOffset, final in
int count = 0;
while (count < length) {
if (levelsArr[0] == 0) {
KllDoublesHelper.compressWhileUpdatingSketch(this);
KllDoublesHelper.compressWhileUpdatingSketch(this, random);
}
final int spaceNeeded = length - count;
final int freeSpace = levelsArr[0];
Expand Down
15 changes: 8 additions & 7 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ static float[] createItemsArray(final float item, final long weight) {
* The following code is only valid in the special case of exactly reaching capacity while updating.
* It cannot be used while merging, while reducing k, or anything else.
* @param fltSk the current KllFloatsSketch
* @param random the random number generator
*/
static void compressWhileUpdatingSketch(final KllFloatsSketch fltSk) {
static void compressWhileUpdatingSketch(final KllFloatsSketch fltSk, Random random) {
final int level =
findLevelToCompact(fltSk.getK(), fltSk.getM(), fltSk.getNumLevels(), fltSk.levelsArr);
if (level == (fltSk.getNumLevels() - 1)) {
Expand Down Expand Up @@ -83,9 +84,9 @@ static void compressWhileUpdatingSketch(final KllFloatsSketch fltSk) {
Arrays.sort(myFloatItemsArr, adjBeg, adjBeg + adjPop);
}
if (popAbove == 0) {
KllFloatsHelper.randomlyHalveUpFloats(myFloatItemsArr, adjBeg, adjPop, KllSketch.random);
KllFloatsHelper.randomlyHalveUpFloats(myFloatItemsArr, adjBeg, adjPop, random);
} else {
KllFloatsHelper.randomlyHalveDownFloats(myFloatItemsArr, adjBeg, adjPop, KllSketch.random);
KllFloatsHelper.randomlyHalveDownFloats(myFloatItemsArr, adjBeg, adjPop, random);
KllFloatsHelper.mergeSortedFloatArrays(
myFloatItemsArr, adjBeg, halfAdjPop,
myFloatItemsArr, rawEnd, popAbove,
Expand Down Expand Up @@ -119,7 +120,7 @@ static void compressWhileUpdatingSketch(final KllFloatsSketch fltSk) {
}

//assumes readOnly = false and UPDATABLE, called from KllFloatsSketch::merge
static void mergeFloatImpl(final KllFloatsSketch mySketch, final KllFloatsSketch otherFltSk) {
static void mergeFloatImpl(final KllFloatsSketch mySketch, final KllFloatsSketch otherFltSk, final Random random) {
if (otherFltSk.isEmpty()) { return; }

//capture my key mutable fields before doing any merging
Expand All @@ -136,12 +137,12 @@ static void mergeFloatImpl(final KllFloatsSketch mySketch, final KllFloatsSketch

//MERGE: update this sketch with level0 items from the other sketch
if (otherFltSk.isCompactSingleItem()) {
KllFloatsSketch.updateFloat(mySketch, otherFltSk.getFloatSingleItem());
KllFloatsSketch.updateFloat(mySketch, otherFltSk.getFloatSingleItem(), random);
otherFloatItemsArr = new float[0];
} else {
otherFloatItemsArr = otherFltSk.getFloatItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
KllFloatsSketch.updateFloat(mySketch, otherFloatItemsArr[i]);
KllFloatsSketch.updateFloat(mySketch, otherFloatItemsArr[i], random);
}
}

Expand Down Expand Up @@ -173,7 +174,7 @@ static void mergeFloatImpl(final KllFloatsSketch mySketch, final KllFloatsSketch

// notice that workbuf is being used as both the input and output
final int[] result = generalFloatsCompress(mySketch.getK(), mySketch.getM(), provisionalNumLevels,
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), KllSketch.random);
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), random);
final int targetItemCount = result[1]; //was finalCapacity. Max size given k, m, numLevels
final int curItemCount = result[2]; //was finalPop

Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/apache/datasketches/kll/KllFloatsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.foreign.MemorySegment;
import java.util.Arrays;
import java.util.Objects;
import java.util.Random;

import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.MemorySegmentRequest;
Expand Down Expand Up @@ -272,12 +273,12 @@ public QuantilesFloatsSketchIterator iterator() {
}

@Override
public final void merge(final KllSketch other) {
public final void merge(final KllSketch other, final Random random) {
if (readOnly || (sketchStructure != UPDATABLE)) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
final KllFloatsSketch othFltSk = (KllFloatsSketch)other;
if (othFltSk.isEmpty()) { return; }
KllFloatsHelper.mergeFloatImpl(this, othFltSk);
KllFloatsHelper.mergeFloatImpl(this, othFltSk, random);
floatsSV = null;
}

Expand Down Expand Up @@ -323,17 +324,17 @@ public String toString(final boolean withLevels, final boolean withLevelsAndItem
public void update(final float item) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
updateFloat(this, item);
updateFloat(this, item, random);
floatsSV = null;
}

//Also Called from KllFloatsHelper::merge
static void updateFloat(final KllFloatsSketch fltSk, final float item) {
static void updateFloat(final KllFloatsSketch fltSk, final float item, final Random random) {
fltSk.updateMinMax(item);
int freeSpace = fltSk.levelsArr[0];
assert (freeSpace >= 0);
if (freeSpace == 0) {
KllFloatsHelper.compressWhileUpdatingSketch(fltSk);
KllFloatsHelper.compressWhileUpdatingSketch(fltSk, random);
freeSpace = fltSk.levelsArr[0];
assert (freeSpace > 0);
}
Expand Down Expand Up @@ -369,9 +370,9 @@ public void update(final float item, final long weight) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { updateFloat(this, item); }
if (weight == 1L) { updateFloat(this, item, random); }
else if (weight < levelsArr[0]) {
for (int i = 0; i < (int)weight; i++) { updateFloat(this, item); }
for (int i = 0; i < (int)weight; i++) { updateFloat(this, item, random); }
} else {
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(getK(), DEFAULT_M, item, weight);
merge(tmpSk);
Expand Down Expand Up @@ -403,7 +404,7 @@ public void update(final float[] items, final int offset, final int length) {
for (int i = offset; i < end; i++) {
final float v = items[i];
if (!Float.isNaN(v)) {
updateFloat(this, v); //normal path
updateFloat(this, v, random); //normal path
floatsSV = null;
}
}
Expand All @@ -419,7 +420,7 @@ private void updateFloat(final float[] srcItems, final int srcOffset, final int
int count = 0;
while (count < length) {
if (levelsArr[0] == 0) {
KllFloatsHelper.compressWhileUpdatingSketch(this);
KllFloatsHelper.compressWhileUpdatingSketch(this, random);
}
final int spaceNeeded = length - count;
final int freeSpace = levelsArr[0];
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/org/apache/datasketches/kll/KllItemsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ static <T> T[] createItemsArray(final Class<T> clazz, final T item, final long w
* The following code is only valid in the special case of exactly reaching capacity while updating.
* It cannot be used while merging, while reducing k, or anything else.
* @param itmSk the current KllItemsSketch
* @param random the random number generator
*/
private static <T> void compressWhileUpdatingSketch(final KllItemsSketch<T> itmSk) {
private static <T> void compressWhileUpdatingSketch(final KllItemsSketch<T> itmSk, final Random random) {
final int level =
findLevelToCompact(itmSk.getK(), itmSk.getM(), itmSk.getNumLevels(), itmSk.levelsArr);
if (level == (itmSk.getNumLevels() - 1)) {
Expand Down Expand Up @@ -88,9 +89,9 @@ private static <T> void compressWhileUpdatingSketch(final KllItemsSketch<T> itmS
Arrays.sort((T[])myItemsArr, adjBeg, adjBeg + adjPop, itmSk.comparator);
}
if (popAbove == 0) {
KllItemsHelper.randomlyHalveUpItems(myItemsArr, adjBeg, adjPop, KllSketch.random);
KllItemsHelper.randomlyHalveUpItems(myItemsArr, adjBeg, adjPop, random);
} else {
KllItemsHelper.randomlyHalveDownItems(myItemsArr, adjBeg, adjPop, KllSketch.random);
KllItemsHelper.randomlyHalveDownItems(myItemsArr, adjBeg, adjPop, random);
KllItemsHelper.mergeSortedItemsArrays(
myItemsArr, adjBeg, halfAdjPop,
myItemsArr, rawEnd, popAbove,
Expand Down Expand Up @@ -125,7 +126,7 @@ private static <T> void compressWhileUpdatingSketch(final KllItemsSketch<T> itmS

//assumes readOnly = false, and UPDATABLE, called from KllItemSketch::merge
static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,
final KllItemsSketch<T> otherItmSk, final Comparator<? super T> comp) {
final KllItemsSketch<T> otherItmSk, final Comparator<? super T> comp, final Random random) {
if (otherItmSk.isEmpty()) { return; }

//capture my key mutable fields before doing any merging
Expand All @@ -142,12 +143,12 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

//MERGE: update this sketch with level0 items from the other sketch
if (otherItmSk.isCompactSingleItem()) {
updateItem(mySketch, otherItmSk.getSingleItem());
updateItem(mySketch, otherItmSk.getSingleItem(), random);
otherItemsArr = new Object[0];
} else {
otherItemsArr = otherItmSk.getTotalItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
updateItem(mySketch, otherItemsArr[i]);
updateItem(mySketch, otherItemsArr[i], random);
}
}

Expand Down Expand Up @@ -179,7 +180,7 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

// notice that workbuf is being used as both the input and output
final int[] result = generalItemsCompress(mySketch.getK(), mySketch.getM(), provisionalNumLevels,
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), KllSketch.random, comp);
workbuf, worklevels, workbuf, outlevels, mySketch.isLevelZeroSorted(), random, comp);
final int targetItemCount = result[1]; //was finalCapacity. Max size given k, m, numLevels
final int curItemCount = result[2]; //was finalPop

Expand Down Expand Up @@ -316,12 +317,12 @@ private static void randomlyHalveUpItems(final Object[] buf, final int start, fi
}

//Called from KllItemsSketch::update and this
static <T> void updateItem(final KllItemsSketch<T> itmSk, final Object item) {
static <T> void updateItem(final KllItemsSketch<T> itmSk, final Object item, final Random random) {
itmSk.updateMinMax((T)item);
int freeSpace = itmSk.levelsArr[0];
assert freeSpace >= 0;
if (freeSpace == 0) {
compressWhileUpdatingSketch(itmSk);
compressWhileUpdatingSketch(itmSk, random);
freeSpace = itmSk.levelsArr[0];
assert (freeSpace > 0);
}
Expand All @@ -333,9 +334,9 @@ static <T> void updateItem(final KllItemsSketch<T> itmSk, final Object item) {
}

//Called from KllItemsSketch::update with weight
static <T> void updateItem(final KllItemsSketch<T> itmSk, final T item, final long weight) {
static <T> void updateItem(final KllItemsSketch<T> itmSk, final T item, final long weight, final Random random) {
if (weight < itmSk.levelsArr[0]) {
for (int i = 0; i < (int)weight; i++) { updateItem(itmSk, item); }
for (int i = 0; i < (int)weight; i++) { updateItem(itmSk, item, random); }
} else {
itmSk.updateMinMax(item);
final KllHeapItemsSketch<T> tmpSk =
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;
import java.util.Random;

import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.SketchesArgumentException;
Expand Down Expand Up @@ -262,12 +263,12 @@ public QuantilesGenericSketchIteratorAPI<T> iterator() {
}

@Override
public final void merge(final KllSketch other) {
public final void merge(final KllSketch other, final Random random) {
if (readOnly || (sketchStructure != UPDATABLE)) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
final KllItemsSketch<T> othItmSk = (KllItemsSketch<T>)other;
if (othItmSk.isEmpty()) { return; }
KllItemsHelper.mergeItemImpl(this, othItmSk, comparator);
KllItemsHelper.mergeItemImpl(this, othItmSk, comparator, random);
itemsSV = null;
}

Expand Down Expand Up @@ -309,7 +310,7 @@ public String toString(final boolean withLevels, final boolean withLevelsAndItem
public void update(final T item) {
if (item == null) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
KllItemsHelper.updateItem(this, item);
KllItemsHelper.updateItem(this, item, random);
itemsSV = null;
}

Expand All @@ -322,8 +323,8 @@ public void update(final T item, final long weight) {
if (item == null) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllItemsHelper.updateItem(this, item); }
else { KllItemsHelper.updateItem(this, item, weight); }
if (weight == 1L) { KllItemsHelper.updateItem(this, item, random); }
else { KllItemsHelper.updateItem(this, item, weight, random); }
itemsSV = null;
}

Expand Down
Loading