diff --git a/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java b/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java index 8f7a471e2d..81b55aba7d 100644 --- a/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java +++ b/communication/src/main/java/datadog/communication/serialization/GrowableBuffer.java @@ -17,11 +17,18 @@ public GrowableBuffer(int initialCapacity) { this.buffer = ByteBuffer.allocate(initialCapacity); } + /** Flips the buffer and returns a new slice which shares the buffered content. */ public ByteBuffer slice() { buffer.flip(); return buffer.slice(); } + /** Flips the buffer and returns the buffered content. */ + public ByteBuffer flip() { + buffer.flip(); + return buffer; + } + public int messageCount() { return messageCount; } diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java index d2836d5398..74c81614f6 100644 --- a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpCommonProto.java @@ -3,10 +3,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import datadog.communication.serialization.GenerationalUtf8Cache; +import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.SimpleUtf8Cache; import datadog.communication.serialization.StreamingBuffer; import datadog.trace.api.Config; import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope; +import java.nio.ByteBuffer; import java.util.List; /** @@ -43,6 +45,14 @@ public static int sizeVarInt(long value) { return 1 + (63 - Long.numberOfLeadingZeros(value)) / 7; } + public static void writeVarInt(ByteBuffer buf, int value) { + for (int i = 1, len = sizeVarInt(value); i < len; i++) { + buf.put((byte) ((value & 0x7f) | 0x80)); + value >>>= 7; + } + buf.put((byte) value); + } + public static void writeVarInt(StreamingBuffer buf, int value) { for (int i = 1, len = sizeVarInt(value); i < len; i++) { buf.put((byte) ((value & 0x7f) | 0x80)); @@ -96,10 +106,28 @@ public static void writeString(StreamingBuffer buf, String value) { writeString(buf, value.getBytes(UTF_8)); } + public static void writeTag(ByteBuffer buf, int fieldNum, int wireType) { + writeVarInt(buf, fieldNum << 3 | wireType); + } + public static void writeTag(StreamingBuffer buf, int fieldNum, int wireType) { writeVarInt(buf, fieldNum << 3 | wireType); } + public static byte[] recordMessage(GrowableBuffer buf, int fieldNum) { + try { + ByteBuffer data = buf.flip(); + int dataSize = data.remaining(); + ByteBuffer message = ByteBuffer.allocate(1 + sizeVarInt(dataSize) + dataSize); + writeTag(message, fieldNum, LEN_WIRE_TYPE); + writeVarInt(message, dataSize); + message.put(data); + return message.array(); + } finally { + buf.reset(); + } + } + public static void writeInstrumentationScope( StreamingBuffer buf, OtelInstrumentationScope scope) { byte[] scopeNameUtf8 = scope.getName().getUtf8Bytes(); diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java new file mode 100644 index 0000000000..8c99208394 --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/main/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProto.java @@ -0,0 +1,56 @@ +package datadog.trace.bootstrap.otel.common.export; + +import static datadog.trace.bootstrap.otel.common.export.OtlpAttributeVisitor.STRING; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.LEN_WIRE_TYPE; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.recordMessage; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeAttribute; +import static datadog.trace.bootstrap.otel.common.export.OtlpCommonProto.writeTag; + +import datadog.communication.serialization.GrowableBuffer; +import datadog.communication.serialization.StreamingBuffer; +import datadog.trace.api.Config; + +/** Provides a canned message for OpenTelemetry's "resource.proto" wire protocol. */ +public final class OtlpResourceProto { + private static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get()); + + /** Writes the resource message in protobuf format to the given buffer. */ + public static void writeResourceMessage(StreamingBuffer buf) { + buf.put(RESOURCE_MESSAGE); + } + + static byte[] buildResourceMessage(Config config) { + GrowableBuffer buf = new GrowableBuffer(512); + + String serviceName = config.getServiceName(); + String env = config.getEnv(); + String version = config.getVersion(); + + writeResourceAttribute(buf, "service.name", serviceName); + if (!env.isEmpty()) { + writeResourceAttribute(buf, "deployment.environment.name", env); + } + if (!version.isEmpty()) { + writeResourceAttribute(buf, "service.version", version); + } + + config + .getGlobalTags() + .forEach( + (key, value) -> { + // ignore datadog tags that we map above + if (!"service".equalsIgnoreCase(key) + && !"env".equalsIgnoreCase(key) + && !"version".equalsIgnoreCase(key)) { + writeResourceAttribute(buf, key, value); + } + }); + + return recordMessage(buf, 1); + } + + private static void writeResourceAttribute(StreamingBuffer buf, String key, String value) { + writeTag(buf, 1, LEN_WIRE_TYPE); + writeAttribute(buf, STRING, key, value); + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProtoTest.java b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProtoTest.java new file mode 100644 index 0000000000..49546d3f1a --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/java/datadog/trace/bootstrap/otel/common/export/OtlpResourceProtoTest.java @@ -0,0 +1,194 @@ +package datadog.trace.bootstrap.otel.common.export; + +import static datadog.trace.api.config.GeneralConfig.ENV; +import static datadog.trace.api.config.GeneralConfig.SERVICE_NAME; +import static datadog.trace.api.config.GeneralConfig.TAGS; +import static datadog.trace.api.config.GeneralConfig.VERSION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import datadog.trace.api.Config; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests for {@link OtlpResourceProto#buildResourceMessage}. + * + *

Each test creates a {@link Config} from a {@link Properties} instance, calls {@link + * OtlpResourceProto#buildResourceMessage}, then extracts the byte array and verifies its content + * against the OpenTelemetry protobuf encoding defined in {@code + * opentelemetry/proto/resource/v1/resource.proto}. + * + *

Relevant proto field numbers: + * + *

+ *   Resource { repeated KeyValue attributes = 1; }
+ *   KeyValue { string key = 1; AnyValue value = 2; }
+ *   AnyValue { string string_value = 1; }
+ * 
+ */ +class OtlpResourceProtoTest { + + // ── test data ───────────────────────────────────────────────────────────── + + private static Properties props(String... keyValues) { + Properties props = new Properties(); + for (int i = 0; i < keyValues.length; i += 2) { + props.setProperty(keyValues[i], keyValues[i + 1]); + } + return props; + } + + private static Map attrs(String... keyValues) { + Map map = new LinkedHashMap<>(); + for (int i = 0; i < keyValues.length; i += 2) { + map.put(keyValues[i], keyValues[i + 1]); + } + return map; + } + + static Stream resourceMessageCases() { + return Stream.of( + // service not set: should use the auto-detected name + Arguments.of( + "service not set, no env, no version, no tags", + props(), + attrs("service.name", Config.get().getServiceName())), + // custom service name + Arguments.of( + "custom service name, no env, no version, no tags", + props(SERVICE_NAME, "my-service"), + attrs("service.name", "my-service")), + // env set to empty string: no deployment.environment.name written; + Arguments.of( + "env set to empty string", + props(SERVICE_NAME, "my-service", ENV, ""), + attrs("service.name", "my-service")), + // env set to non-empty value: deployment.environment.name written; + Arguments.of( + "env set to non-empty value", + props(SERVICE_NAME, "my-service", ENV, "prod"), + attrs("service.name", "my-service", "deployment.environment.name", "prod")), + // version set to empty string: no service.version written; + Arguments.of( + "version set to empty string", + props(SERVICE_NAME, "my-service", VERSION, ""), + attrs("service.name", "my-service")), + // version set to non-empty value: service.version written; + Arguments.of( + "version set to non-empty value", + props(SERVICE_NAME, "my-service", VERSION, "1.0.0"), + attrs("service.name", "my-service", "service.version", "1.0.0")), + // tags as comma-separated key:value pairs (no env or version) + Arguments.of( + "tags as comma-separated key:value pairs", + props(SERVICE_NAME, "my-service", TAGS, "region:us-east,team:platform"), + attrs( + "service.name", "my-service", + "region", "us-east", + "team", "platform")), + // all config values set together + Arguments.of( + "service, env, version, and tags all set", + props( + SERVICE_NAME, + "my-service", + ENV, + "staging", + VERSION, + "2.0.0", + TAGS, + "region:eu-west"), + attrs( + "service.name", "my-service", + "deployment.environment.name", "staging", + "service.version", "2.0.0", + "region", "eu-west"))); + } + + // ── test ───────────────────────────────────────────────────────────────── + + @ParameterizedTest(name = "{0}") + @MethodSource("resourceMessageCases") + void testBuildResourceMessage( + String caseName, Properties properties, Map expectedAttributes) + throws IOException { + Config config = Config.get(properties); + byte[] bytes = OtlpResourceProto.buildResourceMessage(config); + + Map actualAttributes = parseResourceAttributes(bytes); + assertEquals(expectedAttributes, actualAttributes, "For case: " + caseName); + } + + // ── parsing helpers ─────────────────────────────────────────────────────── + + /** + * Parses the resource message bytes into an attribute map while validating the protobuf wire + * format (field numbers and wire types) of every field read. + * + *

{@code buildResourceMessage} returns a length-prefixed message with an outer tag (field 1, + * LEN wire type) followed by the Resource body size and body. Read the outer tag, then iterate + * over all {@code Resource.attributes} (field 1, LEN wire type). Each attribute is a {@code + * KeyValue} whose {@code value} is an {@code AnyValue} containing a {@code string_value}. + */ + private static Map parseResourceAttributes(byte[] bytes) throws IOException { + // Read the outer tag (field 1, LEN wire type) that wraps the Resource body + CodedInputStream outer = CodedInputStream.newInstance(bytes); + int outerTag = outer.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(outerTag), "outer field is Resource (field 1)"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(outerTag)); + CodedInputStream resource = outer.readBytes().newCodedInput(); + + Map attributes = new LinkedHashMap<>(); + while (!resource.isAtEnd()) { + // Each attribute is Resource.attributes (field 1, LEN wire type) + int tag = resource.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(tag), "Resource.attributes is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag)); + + // Read the full KeyValue body + CodedInputStream kv = resource.readBytes().newCodedInput(); + + String key = readKeyField(kv); + CodedInputStream av = readAnyValueField(kv); + + // Read AnyValue.string_value (field 1, LEN) + int avTag = av.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(avTag), "AnyValue.string_value is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(avTag)); + String value = av.readString(); + assertTrue(av.isAtEnd(), "no extra fields in AnyValue"); + assertTrue(kv.isAtEnd(), "no extra fields in KeyValue"); + + attributes.put(key, value); + } + return attributes; + } + + /** Reads the {@code KeyValue.key} field (field 1, LEN) and returns the string value. */ + private static String readKeyField(CodedInputStream kv) throws IOException { + int tag = kv.readTag(); + assertEquals(1, WireFormat.getTagFieldNumber(tag), "KeyValue.key is field 1"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag)); + return kv.readString(); + } + + /** + * Reads the {@code KeyValue.value} field (field 2, LEN) and returns a stream over the {@code + * AnyValue} body. + */ + private static CodedInputStream readAnyValueField(CodedInputStream kv) throws IOException { + int tag = kv.readTag(); + assertEquals(2, WireFormat.getTagFieldNumber(tag), "KeyValue.value is field 2"); + assertEquals(WireFormat.WIRETYPE_LENGTH_DELIMITED, WireFormat.getTagWireType(tag)); + return kv.readBytes().newCodedInput(); + } +} diff --git a/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/resource/v1/resource.proto b/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/resource/v1/resource.proto new file mode 100644 index 0000000000..42c5913cfa --- /dev/null +++ b/dd-java-agent/agent-otel/otel-bootstrap/src/test/resources/opentelemetry/proto/resource/v1/resource.proto @@ -0,0 +1,45 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.resource.v1; + +import "opentelemetry/proto/common/v1/common.proto"; + +option csharp_namespace = "OpenTelemetry.Proto.Resource.V1"; +option java_multiple_files = true; +option java_package = "io.opentelemetry.proto.resource.v1"; +option java_outer_classname = "ResourceProto"; +option go_package = "go.opentelemetry.io/proto/otlp/resource/v1"; + +// Resource information. +message Resource { + // Set of attributes that describe the resource. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + // The behavior of software that receives duplicated keys can be unpredictable. + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // The number of dropped attributes. If the value is 0, then + // no attributes were dropped. + uint32 dropped_attributes_count = 2; + + // Set of entities that participate in this Resource. + // + // Note: keys in the references MUST exist in attributes of this message. + // + // Status: [Development] + repeated opentelemetry.proto.common.v1.EntityRef entity_refs = 3; +}