Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
private long availableExecutorTime = 0;

private volatile boolean applicationEnded = false;
private java.lang.ref.WeakReference<Throwable> lastSqlFailure =
new java.lang.ref.WeakReference<>(null);

public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sparkVersion) {
tracer = AgentTracer.get();
Expand Down Expand Up @@ -1052,6 +1054,33 @@ private synchronized void onStreamingQueryProgressEvent(
}
}

public synchronized void onSqlFailure(String sqlText, Throwable throwable) {
if (isRunningOnDatabricks) return;
if (applicationEnded) return;
if (throwable == lastSqlFailure.get()) return;
lastSqlFailure = new java.lang.ref.WeakReference<>(throwable);

initApplicationSpanIfNotInitialized();

AgentSpan sqlSpan =
buildSparkSpan("spark.sql", null)
.withTag(DDTags.RESOURCE_NAME, sqlText)
.withTag("description", sqlText)
.asChildOf(applicationSpan.context())
.start();

sqlSpan.setError(true);
sqlSpan.setTag(DDTags.ERROR_TYPE, throwable.getClass().getName());
sqlSpan.setTag(DDTags.ERROR_MSG, throwable.getMessage());

java.io.StringWriter sw = new java.io.StringWriter();
throwable.printStackTrace(new java.io.PrintWriter(sw));
sqlSpan.setTag(DDTags.ERROR_STACK, sw.toString());

setDataJobsSamplingPriority(sqlSpan);
sqlSpan.finish();
}

private void setDataJobsSamplingPriority(AgentSpan span) {
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameEndsWith;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
Expand Down Expand Up @@ -39,7 +40,9 @@ public String[] knownMatchingTypes() {
"org.apache.spark.util.Utils",
"org.apache.spark.util.SparkClassUtils",
"org.apache.spark.scheduler.LiveListenerBus",
"org.apache.spark.sql.execution.SparkPlanInfo$"
"org.apache.spark.sql.execution.SparkPlanInfo$",
"org.apache.spark.sql.SparkSession",
"org.apache.spark.sql.DataFrameReader"
};
}

Expand Down Expand Up @@ -74,6 +77,22 @@ public void methodAdvice(MethodTransformer transformer) {
.and(takesArgument(0, named("org.apache.spark.scheduler.SparkListenerInterface")))
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");

// SparkSession.sql(String, ...) and SparkSession.table(String) — catch AnalysisException
// failures that fire before SparkListenerSQLExecutionStart and are invisible to the listener bus
transformer.applyAdvice(
isMethod()
.and(named("sql").or(named("table")))
.and(takesArgument(0, String.class))
.and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))),
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
transformer.applyAdvice(
isMethod()
.and(named("table"))
.and(takesArguments(1))
.and(takesArgument(0, String.class))
.and(isDeclaredBy(named("org.apache.spark.sql.DataFrameReader"))),
AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice");
}

public static class PrepareSubmitEnvAdvice {
Expand Down Expand Up @@ -122,6 +141,20 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
}
}

public static class SparkSqlFailureAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void exit(
@Advice.Argument(0) String sqlText, @Advice.Thrown Throwable throwable) {
System.err.println(
"[DD-TRACE-DEBUG] SparkSqlFailureAdvice.exit: throwable="
+ throwable
+ " listener="
+ AbstractDatadogSparkListener.listener);
if (throwable == null || AbstractDatadogSparkListener.listener == null) return;
AbstractDatadogSparkListener.listener.onSqlFailure(sqlText, throwable);
}
}

public static class LiveListenerBusAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
// If OL is disabled in tracer config but user set it up manually don't interfere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,38 @@ abstract class AbstractSpark32SqlTest extends InstrumentationSpecification {
return plan
}

def "failed spark.sql call creates an error span"() {
def sqlText = "SELECT * FROM non_existing_table"
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.getOrCreate()

try {
sparkSession.sql(sqlText).show()
} catch (Exception e) {
// expected
}
sparkSession.stop()

expect:
assertTraces(1) {
trace(2) {
span {
operationName "spark.application"
spanType "spark"
errored false
}
span {
operationName "spark.sql"
spanType "spark"
resourceName sqlText
childOf(span(0))
errored true
}
}
}
}

private static Object normalizeColumnRefs(Object plan) {
if (plan instanceof String) {
return plan.replaceAll(/#\d+L?/, '#N').replaceAll(/plan_id=\d+/, 'plan_id=N')
Expand Down
Loading