Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions spark_frame/data_diff/diff_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,15 @@ def _compute_top_per_col_state_df(self, diff_df: DataFrame) -> DataFrame:
"diff.right_value",
"diff.sample_ids",
).where(exists_in_left_or_right)
window = Window.partitionBy("column_name", "state").orderBy(
f.col("nb").desc(),
f.col("left_value"),
f.col("right_value"),
).rowsBetween(Window.unboundedPreceding, Window.currentRow)
window = (
Window.partitionBy("column_name", "state")
.orderBy(
f.col("nb").desc(),
f.col("left_value"),
f.col("right_value"),
)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df_2 = (
df_1.groupBy("column_name", "state", "left_value", "right_value")
.agg(f.count(f.lit(1)).alias("nb"), f.first("sample_ids").alias("sample_ids"))
Expand Down
6 changes: 4 additions & 2 deletions spark_frame/graph_impl/ascending_forest_traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,13 @@ def ascending_forest_traversal(
)

if keep_labels:
res_df = res_df.join(input_df, node_id).select(
res_df = res_df.join(input_df, node_id, how="inner").select(
res_df["*"],
f.struct(*[input_df[quote(col)] for col in input_df.columns]).alias("node"),
)
res_df = res_df.join(input_df.alias("input_df"), res_df[quote(parent_id)] == input_df[quote(node_id)]).select(
res_df = res_df.join(
input_df.alias("input_df"), res_df[quote(parent_id)] == input_df[quote(node_id)], how="inner",
).select(
res_df[quote(node_id)],
res_df[quote(parent_id)],
res_df["node"],
Expand Down
78 changes: 39 additions & 39 deletions spark_frame/transformations_impl/analyze_aggs.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
from pyspark.sql import Column
from pyspark.sql import functions as f
from pyspark.sql.types import StructField
def _to_string(col: Column) -> Column:
return col.cast("STRING")
def column_number(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return f.lit(col_num).alias("column_number")
def column_name(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return f.lit(struct_field.name).alias("column_name")
def column_type(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return f.lit(struct_field.dataType.typeName().upper()).alias("column_type")
def count(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return f.count(f.lit(1)).alias("count")
def count_distinct(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return f.count_distinct(col).alias("count_distinct")
def count_null(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, NOSONAR
return (f.count(f.lit(1)) - f.count(col)).alias("count_null")
def min(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, A001, NOSONAR
return _to_string(f.min(col)).alias("min")
def max(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001, A001, NOSONAR
return _to_string(f.max(col)).alias("max")
from pyspark.sql import Column
from pyspark.sql import functions as f
from pyspark.sql.types import StructField


def _to_string(col: Column) -> Column:
return col.cast("STRING")


def column_number(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return f.lit(col_num).alias("column_number")


def column_name(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return f.lit(struct_field.name).alias("column_name")


def column_type(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return f.lit(struct_field.dataType.typeName().upper()).alias("column_type")


def count(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return f.count(f.lit(1)).alias("count")


def count_distinct(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return f.count_distinct(col).alias("count_distinct")


def count_null(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,NOSONAR(S1172)
return (f.count(f.lit(1)) - f.count(col)).alias("count_null")


def min(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,A001,NOSONAR(S1172)
return _to_string(f.min(col)).alias("min")


def max(col: str, struct_field: StructField, col_num: int) -> Column: # noqa: ARG001,A001,NOSONAR(S1172)
return _to_string(f.max(col)).alias("max")
Loading