import time
from typing import Any, Generator
import opentelemetry.exporter.otlp.proto.http.trace_exporter as oltp_exporter
import opentelemetry.sdk.resources as resources
import opentelemetry.sdk.trace as sdk_trace
import opentelemetry.sdk.trace.export as trace_export
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
trace.set_tracer_provider(
TracerProvider(resource=Resource.create({resources.SERVICE_NAME: "UserModel"}))
)
tracer = trace.get_tracer(__name__)
trace_provider = trace.get_tracer_provider()
class Model:
def __init__(self, **kwargs) -> None:
honeycomb_api_key = kwargs["secrets"]["HONEYCOMB_API_KEY"]
honeycomb_exporter = oltp_exporter.OTLPSpanExporter(
endpoint="https://api.honeycomb.io/v1/traces",
headers={
"x-honeycomb-team" : honeycomb_api_key,
"x-honeycomb-dataset": "marius_testing_user",
},
)
honeycomb_processor = sdk_trace.export.BatchSpanProcessor(honeycomb_exporter)
trace_provider.add_span_processor(honeycomb_processor)
@tracer.start_as_current_span("load_model")
def load(self):
...
def preprocess(self, model_input):
with tracer.start_as_current_span("preprocess"):
...
return model_input
@tracer.start_as_current_span("predict")
def predict(self, model_input: Any) -> Generator[str, None, None]:
with tracer.start_as_current_span("start-predict") as span:
def inner():
time.sleep(0.01)
for i in range(5):
span.add_event("yield")
yield str(i)
return inner()