7 minutes read

POSTED Feb, 2022 dot IN Observability

Instrumentation Of Python Apps With Opentracing

Gokhan Simsek

Written by Gokhan Simsek


Software Engineer

linkedin-share
 X

Microservice architecture is one of the hot topics nowadays by increasing the need of agility to meet customers’ expectations. Independently deployable, organizing around business needs, creating loosely coupled systems etc. are the main benefits of microservice architecture. Besides these benefits, microservice architecture has its own unique and hard to solve challenges. One of these challenges is debugging and observing to see the whole flow of transactions over the complex network and microservices. In this blog, we are going to talk about how to solve this problem by using Opentracing-Python over a basic Python example.

Basıc Components Of Python Example

1-) OpenTracing

Opentracing is an API specification that is used for distributed tracing. It has been implemented for many platforms such as Python, Java, Javascript, Go etc. Opentracing-Python is the instrumentation library that helps the developers to inject and extract trace information to requests over Span and SpanContext by using Tracer. Check the official website of Opentracing for more details.

2-) FastAPI

FastAPI is a modern, high performance web framework for building APIs with Python3.6+ based on standard Python type hints and empowered by asynchronous implementation. Startlette which is ASGI Framework for using web parts of FastAPI and Pydantic that is used for data parts of FastAPI. FastAPI also supports OpenAPI by default. FastAPI website contains enormous examples of both asynchronous and synchronous versions.

3-) Wrapt Module

wrapt is a Python module for decorators, wrappers and monkey patching. We will focus on decorators and wrappers for the following example. In order to decorate a specific function, first define a wrapper function. This wrapper function needs to take 4 positional arguments. For more information check the Wrapt website.

4-) Requests Library

Requests is a simple built-in HTTP module for Python. Requests module minimize the code for HTTP methods. For more information take a look Requests module pypi website.

OpenTracing Parts into Example

This part only mentioned simple opentracing features such as Tracer, Span and SpanContext. To get more information about opentracing capabilities like injecting/extracting for trace propagation or tags,logs and baggage items, please visit the Opentracing website.

1-) Tracer

Tracer is an interface to start a new Span, injecting a SpanContext or extracting a SpanContext. As you can see from the following screenshot, you can create your own Tracer by extending the opentracing.Tracer class. The “get_instance” static function is used for creating singleton FastapiTracer.

import uuid
import opentracing
from tracer.opentracing.span import FastapiSpan
from tracer.opentracing.span_context import FastapiSpanContext
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
 
class FastapiTracer(opentracing.Tracer):
   __instance = None
  
 
   @staticmethod
   def get_instance():
       return FastapiTracer() if FastapiTracer.__instance is None else FastapiTracer.__instance
 
 
   def __init__(self):
       scope_manager = ContextVarsScopeManager()
       super(FastapiTracer, self).__init__(scope_manager)
       FastapiTracer.__instance = self

Then, implement the abstract functions of opentracing.Tracer on your own such as start_active_span, start_span. There are also two other methods that are servicable. These are inject and extract methods. inject method is for propagating SpanContext that is created into another process or service, extract method is for gaining the propagated SpanContext.

  def start_active_span(self,
                         operation_name,
                         span_id=None,
                         tags=None,
                         start_time=None,
                         ignore_active_span=False,
                         finish_on_close=True):
       span_id = span_id or str(uuid.uuid4())
       _span = self.start_span(operation_name=operation_name,
                               span_id=span_id,
                               tags=tags,
                               start_time=start_time,
                               ignore_active_span=ignore_active_span)
       return self.scope_manager.activate(_span, finish_on_close)
 
   def start_span(self,
                  operation_name=None,
                  span_id=None,
                  tags=None,
                  start_time=None,
                  ignore_active_span=False):
       # Create a new span
       _span = self.create_span(operation_name=operation_name,
                                span_id=span_id,
                                tags=tags,
                                start_time=start_time,
                                ignore_active_span=ignore_active_span)
       return _span
 
   def create_span(self,
                   operation_name=None,
                   span_id=None,
                   tags=None,
                   start_time=None,
                   ignore_active_span=False):
 
       _span_id = span_id or str(uuid.uuid4())
 
       _context = FastapiSpanContext(span_id=_span_id)
       _span = FastapiSpan(self,
                           operation_name=operation_name,
                           context=_context,
                           tags=tags,
                           start_time=start_time)
 
       return _span
 
   def get_active_span(self):
       scope = self.scope_manager.active
       if scope is not None:
           return scope.span
 
       return None

Another important concept in the following screenshot is ScopeManager. ScopeManager is responsible for activating and deactivating Spans. Opentracing-Python has different ScopeManager implementations for synchronous or asynchronous applications.

2-) Span

Span is the primary building block of a distributed trace. It stores the necessary information for a specific task such as operation name, start and finish time of operation, tags, SpanContext etc. In order to create your own Span object you should extend the opentracing.Span object. context and tracer are the two main properties of the opentracing.Span object.

import time
from threading import Lock
 
import opentracing
 
 
class FastapiSpan(opentracing.Span):
 
   def __init__(self,
                tracer,
                operation_name=None,
                context=None,
                tags=None,
                start_time=None):
       super(FastapiSpan, self).__init__(tracer, context)
       self._context = context
       self._lock = Lock()
       self.operation_name = operation_name if operation_name is not None else ""
       self.start_time = start_time or int(time.time() * 1000)
       self.finish_time = 0
       self.tags = tags if tags is not None else {}
 
   @property
   def context(self):
       return self._context
 
   @property
   def span_id(self):
       return self._context.span_id
 
   def set_operation_name(self, operation_name):
       with self._lock:
           self.operation_name = operation_name
       return super(FastapiSpan, self).set_operation_name(operation_name)
 
   def set_tag(self, key, value):
       with self._lock:
           if self.tags is None:
               self.tags = {}
           self.tags[key] = value
       return super(FastapiSpan, self).set_tag(key, value)
 
   def get_tag(self, key):
       if self.tags is not None:
           return self.tags.get(key)
       return None
 
   def finish(self, f_time=None):
       with self._lock:
           self.finish_time = int(time.time() * 1000) if f_time is None else f_time
 
   def set_error_to_tag(self, err):
       error_type = type(err)
       self.set_tag('error', True)
       self.set_tag('error.kind', error_type.__name__)
       self.set_tag('error.message', str(err))
 
   def get_duration(self):
       if self.finish_time == 0:
           return int(time.time() * 1000) - self.start_time
       else:
           return self.finish_time - self.start_time
 
   def erroneous(self):
       return self.tags is not None and 'error' in self.tags

3-) Span Context

Carrying data across process or service boundaries is done by SpanContext. The below screenshot shows only span_id but SpanContext has an important concept called baggage items. These are key:value pairs and can be very useful to make specific data available throughout the trace.

import opentracing
 
class FastapiSpanContext(opentracing.SpanContext):
 
   def __init__(self,
                span_id=None):
       self._span_id = span_id
 
   @property
   def span_id(self):
       return self._span_id
 
   @span_id.setter
   def span_id(self, value):
       self._span_id = value

Wrappers

The wrap_function_wrapper function of the wrapt module takes 3 positional parameters:

  • 1st parameter is a module that contains the function which is wrapped.
  • 2nd parameter is the actual function.
  • 3rd parameter is wrapper function with following parameters:

1-) Wrapping Requests Library

When analyzing the Requests library, you can see that whole HTTP methods are directed to the request function of Session class. This request function calls the send method to get a response. So, we should wrap the send function in the Session class of requests module. The patch method in the following picture just doing this process.

import wrapt
from tracer.integrations.requests import RequestsIntegration
 
request_integration = RequestsIntegration()
 
 
def _wrapper(wrapped, instance, args, kwargs):
   return request_integration.run_and_trace(
       wrapped,
       instance,
       args,
       kwargs,
   )
 
def patch():
   wrapt.wrap_function_wrapper(
       'requests',
       'Session.send',
       _wrapper )

The _wrapper method is the decorator function of Session.send function. In this function, we just call the run_and_trace function of RequestsIntegration class which is a child class of BaseIntegration class.

As you can see from the below, run_and_trace gets the 4 positional arguments when it is called by wrapt.wrap_function_wrapper. This function first gets FastapiTracer and then checks if there is an already created span or not. It is because there should be an already created span in FastAPI integration described in the next chapter. The before_call function is to set appropriate variables for the created span before calling the actual function. If you need to process response and set specific variables or tags to the span corresponding to response, implementing the after_call function is all you need. At the end, you should finish the span for the request and close the scope.

import abc
import traceback
 
from tracer.opentracing.tracer import FastapiTracer
 
ABC = abc.ABCMeta('ABC', (object,), {})
 
 
class BaseIntegration(ABC):
   def run_and_trace(self, wrapped, instance, args, kwargs):
       tracer = FastapiTracer.get_instance()
       if not tracer.get_active_span():
           return wrapped(*args, **kwargs)
 
       response = None
       exception = None
 
       scope = tracer.start_active_span(operation_name=self.get_operation_name(wrapped, instance, args, kwargs),
                                        finish_on_close=False)
       # Inject before span tags
       try:
           self.before_call(scope, wrapped, instance, args, kwargs, response, exception)
       except Exception as e:
           scope.span.set_tag('instrumentation_error', "Error")
 
       try:
           response = self.actual_call(wrapped, args, kwargs)
       except Exception as e:
           exception = e
 
       try:
           self.after_call(scope, wrapped, instance, args, kwargs, response, exception)
       except Exception as e:
           scope.span.set_tag('instrumentation_error', "Error")
 
       try:
           scope.span.finish()
       except Exception as e:
           if exception is None:
               exception = e
           else:
               print(str(e))
 
       scope.close()
 
       if exception is not None:
           scope.span.set_error_to_tag(exception)
           raise exception
 
       return response
 
   def actual_call(self, wrapped, args, kwargs):
       return wrapped(*args, **kwargs)
 
   @abc.abstractmethod
   def before_call(self, scope, wrapped, instance, args, kwargs, response, exception):
       raise Exception("should be implemented")
 
   @abc.abstractmethod
   def after_call(self, scope, wrapped, instance, args, kwargs, response, exception):
       raise Exception("should be implemented")
 
 
   @abc.abstractmethod
   def get_operation_name(self, wrapped, instance, args, kwargs):
       raise Exception("should be implemented")

2-) Wrapping FastAPI

When the Fastapi app has been initialized, FastAPI.__init__ function has been called by the Python interpreter. In order to wrap and integrate Opentracing with FastAPI, you should wrap this function and add your ASGI Middleware to manipulate incoming requests. This FastAPI.__init__ function is located under fastapi.applications module. That is why wrap_function_wrapper is called like in the below screenshot.

import wrapt
 
def _wrapper(wrapped, instance, args, kwargs):
   from fastapi.middleware import Middleware
   from tracer.wrappers.fastapi.middleware import FastapiMiddleware
   from tracer.wrappers.fastapi.fastapi_wrapper import FastapiWrapper
   middlewares = kwargs.pop("middleware", [])
   middlewares.insert(0, Middleware(FastapiMiddleware, wrapper=FastapiWrapper.get_instance()))
   kwargs.update({"middleware": middlewares})
   wrapped(*args, **kwargs)
 
 
def patch():
   wrapt.wrap_function_wrapper(
       "fastapi.applications",
       "FastAPI.__init__",
       _wrapper
   )

The Middleware provides us to grap the framework flow with requests. After capturing the requests, you can get all information from the request and create your own Span and SpanContext data from it. As we talk in the request wrapper, the idea is the same. If you have to do something like creating before a request is processed in the actual call, you can implement all the logic in the before_request function like creating an active span for FastAPI application and setting span tags, start timestamp etc.

The app parameter of FastapiMiddleware is the FastAPI application. This application takes 3 parameters. These are scope, receive and send.

  • scope is the object that keeps all the information about the connection.
  • receive is a function for getting requests from a client
  • and send is to redirect the response object to a client.

If you need to get information from a request object such as gathering request body and setting it into FastAPI span, you can implement a function and call it into wrapped_receive function after getting the request from await receive().

RESPONSE_REDIRECT_STATUS_CODE = 307
 
class FastapiMiddleware(object):
   def __init__(self, app, wrapper):
       self.app = app
       self._wrapper = wrapper
 
   async def __call__(self, scope, receive, send):
       if scope["type"] != "http":
           return await self.app(scope, receive, send)
 
       try:
           self._wrapper.before_request(scope)
       except Exception as e:
           print("Error during the before part of fastapi: {}".format(e))
 
       def handle_response(message):
           try:
               if "status" in message and message.get("status") == RESPONSE_REDIRECT_STATUS_CODE:
                   scope["res_redirected"] = True
               if message and message.get("type") == "http.response.start" and message.get("status") != 307:
                   scope["res_redirected"] = False
               elif message and message.get("type") == "http.response.body" and not scope["res_redirected"]:
                   try:
                       if not message.get("more_body") or message.get("more_body") == False:
                           self._wrapper.after_request()
                   except Exception as e:
                       print("Error during the after part of fastapi: {}".format(e))
           except Exception as e:
               print("Error during getting res body in fast api: {}".format(e))
  
 
       async def wrapped_send(message):
           handle_response(message)
           await send(message)
 
 
       async def               req = await receive()
           except Exception as e:
               print("Error during receive request fast api asgi function: {}".format(e))
               raise e
           return req
 
       try:
           await self.app(scope, wrapped_receive, wrapped_send)
       except Exception as e:
           print("Error in the app fastapi: {}".format(e))
           raise e

Conclusion

Opentracing-Python is a mighty library, especially mixing the wrapt function. This example is just a simple demonstration of how to instrument a Python Application with Opentracing-Python. You can take a look at the example from there. I just created this project to open an issue that I have encountered about closing ContextVarsScopeManager with a FastAPI application that redirects incoming requests to Flask WSGI Application for specific the endpoint.

Bonus Section - Automated Instrumentation

Instrumentation can be done in many ways depending on your requirements. Manual instrumentation requires code changes and if you have a large enough system, you probably do not want to change all your code and break it up. Therefore, manual instrumentation is better employed if you plan for it from the beginning by implementing it as you write your code.

Thundra APM provides automated instrumentation to save you from the hassles and potential problems mentioned above. Thundra uses OpenTracing API to implement instrumentation and Thundra’s agents are compliant with OpenTracing API.

Automated instrumentation allows you to measure the execution time for the application code. By instrumenting your application, you immediately have end-to-end visibility into your application without changing your code.

By default, Thundra APM makes automated instrumentation for AWS resources, HTTP endpoints, Redis caches, MySQL, PostgreSQL tables, and many more. You can find more detailed information on how to instrument your applications automatically in Thundra APM’s documentation.