Microservice architecture is one of the hot topics nowadays by increasing the need of agility to meet customers’ expectations. Independently deployable, organizing around business needs, creating loosely coupled systems etc. are the main benefits of microservice architecture. Besides these benefits, microservice architecture has its own unique and hard to solve challenges. One of these challenges is debugging and observing to see the whole flow of transactions over the complex network and microservices. In this blog, we are going to talk about how to solve this problem by using Opentracing-Python over a basic Python example.
Basıc Components Of Python Example
1-) OpenTracing
Opentracing
is an API specification that is used for distributed tracing. It has been implemented for many platforms such as Python, Java, Javascript, Go etc. Opentracing-Python is the instrumentation library that helps the developers to inject and extract trace information to requests over Span
and SpanContext
by using Tracer
. Check the official website of Opentracing for more details.
2-) FastAPI
FastAPI
is a modern, high performance web framework for building APIs with Python3.6+ based on standard Python type hints and empowered by asynchronous implementation. Startlette
which is ASGI Framework for using web parts of FastAPI and Pydantic
that is used for data parts of FastAPI. FastAPI also supports OpenAPI
by default. FastAPI website contains enormous examples of both asynchronous and synchronous versions.
3-) Wrapt Module
wrapt
is a Python module for decorators, wrappers and monkey patching. We will focus on decorators and wrappers for the following example. In order to decorate a specific function, first define a wrapper function. This wrapper function needs to take 4 positional arguments. For more information check the Wrapt website.
4-) Requests Library
Requests
is a simple built-in HTTP module for Python. Requests
module minimize the code for HTTP methods. For more information take a look Requests
module pypi website.
OpenTracing Parts into Example
This part only mentioned simple opentracing features such as Tracer
, Span
and SpanContext
. To get more information about opentracing capabilities like injecting/extracting for trace propagation or tags,logs and baggage items
, please visit the Opentracing website.
1-) Tracer
Tracer
is an interface to start a new Span
, injecting a SpanContext
or extracting a SpanContext
. As you can see from the following screenshot, you can create your own Tracer by extending the opentracing.Tracer
class. The “get_instance”
static function is used for creating singleton FastapiTracer
.
import uuid
import opentracing
from tracer.opentracing.span import FastapiSpan
from tracer.opentracing.span_context import FastapiSpanContext
from opentracing.scope_managers.contextvars import ContextVarsScopeManager
class FastapiTracer(opentracing.Tracer):
__instance = None
@staticmethod
def get_instance():
return FastapiTracer() if FastapiTracer.__instance is None else FastapiTracer.__instance
def __init__(self):
scope_manager = ContextVarsScopeManager()
super(FastapiTracer, self).__init__(scope_manager)
FastapiTracer.__instance = self
Then, implement the abstract functions of opentracing.Tracer on your own such as start_active_span
, start_span
. There are also two other methods that are servicable. These are inject
and extract
methods. inject
method is for propagating SpanContext
that is created into another process or service, extract
method is for gaining the propagated SpanContext
.
def start_active_span(self,
operation_name,
span_id=None,
tags=None,
start_time=None,
ignore_active_span=False,
finish_on_close=True):
span_id = span_id or str(uuid.uuid4())
_span = self.start_span(operation_name=operation_name,
span_id=span_id,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span)
return self.scope_manager.activate(_span, finish_on_close)
def start_span(self,
operation_name=None,
span_id=None,
tags=None,
start_time=None,
ignore_active_span=False):
# Create a new span
_span = self.create_span(operation_name=operation_name,
span_id=span_id,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span)
return _span
def create_span(self,
operation_name=None,
span_id=None,
tags=None,
start_time=None,
ignore_active_span=False):
_span_id = span_id or str(uuid.uuid4())
_context = FastapiSpanContext(span_id=_span_id)
_span = FastapiSpan(self,
operation_name=operation_name,
context=_context,
tags=tags,
start_time=start_time)
return _span
def get_active_span(self):
scope = self.scope_manager.active
if scope is not None:
return scope.span
return None
Another important concept in the following screenshot is ScopeManager
. ScopeManager
is responsible for activating and deactivating Spans. Opentracing-Python has different ScopeManager
implementations for synchronous or asynchronous applications.
2-) Span
Span is the primary building block of a distributed trace. It stores the necessary information for a specific task such as operation name, start and finish time of operation, tags, SpanContext
etc. In order to create your own Span
object you should extend the opentracing.Span
object. context
and tracer
are the two main properties of the opentracing.Span
object.
import time
from threading import Lock
import opentracing
class FastapiSpan(opentracing.Span):
def __init__(self,
tracer,
operation_name=None,
context=None,
tags=None,
start_time=None):
super(FastapiSpan, self).__init__(tracer, context)
self._context = context
self._lock = Lock()
self.operation_name = operation_name if operation_name is not None else ""
self.start_time = start_time or int(time.time() * 1000)
self.finish_time = 0
self.tags = tags if tags is not None else {}
@property
def context(self):
return self._context
@property
def span_id(self):
return self._context.span_id
def set_operation_name(self, operation_name):
with self._lock:
self.operation_name = operation_name
return super(FastapiSpan, self).set_operation_name(operation_name)
def set_tag(self, key, value):
with self._lock:
if self.tags is None:
self.tags = {}
self.tags[key] = value
return super(FastapiSpan, self).set_tag(key, value)
def get_tag(self, key):
if self.tags is not None:
return self.tags.get(key)
return None
def finish(self, f_time=None):
with self._lock:
self.finish_time = int(time.time() * 1000) if f_time is None else f_time
def set_error_to_tag(self, err):
error_type = type(err)
self.set_tag('error', True)
self.set_tag('error.kind', error_type.__name__)
self.set_tag('error.message', str(err))
def get_duration(self):
if self.finish_time == 0:
return int(time.time() * 1000) - self.start_time
else:
return self.finish_time - self.start_time
def erroneous(self):
return self.tags is not None and 'error' in self.tags
3-) Span Context
Carrying data across process or service boundaries is done by SpanContext
. The below screenshot shows only span_id
but SpanContext
has an important concept called baggage items
. These are key:value pairs and can be very useful to make specific data available throughout the trace.
import opentracing
class FastapiSpanContext(opentracing.SpanContext):
def __init__(self,
span_id=None):
self._span_id = span_id
@property
def span_id(self):
return self._span_id
@span_id.setter
def span_id(self, value):
self._span_id = value
Wrappers
The wrap_function_wrapper
function of the wrapt module takes 3 positional parameters:
- 1st parameter is a module that contains the function which is wrapped.
- 2nd parameter is the actual function.
- 3rd parameter is wrapper function with following parameters:
1-) Wrapping Requests Library
When analyzing the Requests
library, you can see that whole HTTP methods are directed to the request
function of Session
class. This request
function calls the send
method to get a response. So, we should wrap the send
function in the Session
class of requests
module. The patch
method in the following picture just doing this process.
import wrapt
from tracer.integrations.requests import RequestsIntegration
request_integration = RequestsIntegration()
def _wrapper(wrapped, instance, args, kwargs):
return request_integration.run_and_trace(
wrapped,
instance,
args,
kwargs,
)
def patch():
wrapt.wrap_function_wrapper(
'requests',
'Session.send',
_wrapper )
The _wrapper
method is the decorator function of Session.send
function. In this function, we just call the run_and_trace
function of RequestsIntegration
class which is a child class of BaseIntegration
class.
As you can see from the below, run_and_trace
gets the 4 positional arguments when it is called by wrapt.wrap_function_wrapper
. This function first gets FastapiTracer and then checks if there is an already created span or not. It is because there should be an already created span in FastAPI integration described in the next chapter. The before_call
function is to set appropriate variables for the created span before calling the actual function. If you need to process response and set specific variables or tags to the span corresponding to response, implementing the after_call
function is all you need. At the end, you should finish the span for the request and close the scope.
import abc
import traceback
from tracer.opentracing.tracer import FastapiTracer
ABC = abc.ABCMeta('ABC', (object,), {})
class BaseIntegration(ABC):
def run_and_trace(self, wrapped, instance, args, kwargs):
tracer = FastapiTracer.get_instance()
if not tracer.get_active_span():
return wrapped(*args, **kwargs)
response = None
exception = None
scope = tracer.start_active_span(operation_name=self.get_operation_name(wrapped, instance, args, kwargs),
finish_on_close=False)
# Inject before span tags
try:
self.before_call(scope, wrapped, instance, args, kwargs, response, exception)
except Exception as e:
scope.span.set_tag('instrumentation_error', "Error")
try:
response = self.actual_call(wrapped, args, kwargs)
except Exception as e:
exception = e
try:
self.after_call(scope, wrapped, instance, args, kwargs, response, exception)
except Exception as e:
scope.span.set_tag('instrumentation_error', "Error")
try:
scope.span.finish()
except Exception as e:
if exception is None:
exception = e
else:
print(str(e))
scope.close()
if exception is not None:
scope.span.set_error_to_tag(exception)
raise exception
return response
def actual_call(self, wrapped, args, kwargs):
return wrapped(*args, **kwargs)
@abc.abstractmethod
def before_call(self, scope, wrapped, instance, args, kwargs, response, exception):
raise Exception("should be implemented")
@abc.abstractmethod
def after_call(self, scope, wrapped, instance, args, kwargs, response, exception):
raise Exception("should be implemented")
@abc.abstractmethod
def get_operation_name(self, wrapped, instance, args, kwargs):
raise Exception("should be implemented")
2-) Wrapping FastAPI
When the Fastapi app has been initialized, FastAPI.__init__
function has been called by the Python interpreter. In order to wrap and integrate Opentracing with FastAPI, you should wrap this function and add your ASGI Middleware to manipulate incoming requests. This FastAPI.__init__
function is located under fastapi.applications module. That is why wrap_function_wrapper
is called like in the below screenshot.
import wrapt
def _wrapper(wrapped, instance, args, kwargs):
from fastapi.middleware import Middleware
from tracer.wrappers.fastapi.middleware import FastapiMiddleware
from tracer.wrappers.fastapi.fastapi_wrapper import FastapiWrapper
middlewares = kwargs.pop("middleware", [])
middlewares.insert(0, Middleware(FastapiMiddleware, wrapper=FastapiWrapper.get_instance()))
kwargs.update({"middleware": middlewares})
wrapped(*args, **kwargs)
def patch():
wrapt.wrap_function_wrapper(
"fastapi.applications",
"FastAPI.__init__",
_wrapper
)
The Middleware
provides us to grap the framework flow with requests. After capturing the requests, you can get all information from the request and create your own Span and SpanContext
data from it. As we talk in the request wrapper, the idea is the same. If you have to do something like creating before a request is processed in the actual call, you can implement all the logic in the before_request function like creating an active span for FastAPI application and setting span tags, start timestamp etc.
The app parameter of FastapiMiddleware
is the FastAPI application. This application takes 3 parameters. These are scope, receive and send.
scope
is the object that keeps all the information about the connection.receive
is a function for getting requests from a client- and
send
is to redirect the response object to a client.
If you need to get information from a request object such as gathering request body and setting it into FastAPI span, you can implement a function and call it into wrapped_receive
function after getting the request from await receive()
.
RESPONSE_REDIRECT_STATUS_CODE = 307
class FastapiMiddleware(object):
def __init__(self, app, wrapper):
self.app = app
self._wrapper = wrapper
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
try:
self._wrapper.before_request(scope)
except Exception as e:
print("Error during the before part of fastapi: {}".format(e))
def handle_response(message):
try:
if "status" in message and message.get("status") == RESPONSE_REDIRECT_STATUS_CODE:
scope["res_redirected"] = True
if message and message.get("type") == "http.response.start" and message.get("status") != 307:
scope["res_redirected"] = False
elif message and message.get("type") == "http.response.body" and not scope["res_redirected"]:
try:
if not message.get("more_body") or message.get("more_body") == False:
self._wrapper.after_request()
except Exception as e:
print("Error during the after part of fastapi: {}".format(e))
except Exception as e:
print("Error during getting res body in fast api: {}".format(e))
async def wrapped_send(message):
handle_response(message)
await send(message)
async def req = await receive()
except Exception as e:
print("Error during receive request fast api asgi function: {}".format(e))
raise e
return req
try:
await self.app(scope, wrapped_receive, wrapped_send)
except Exception as e:
print("Error in the app fastapi: {}".format(e))
raise e
Conclusion
Opentracing-Python is a mighty library, especially mixing the wrapt function. This example is just a simple demonstration of how to instrument a Python Application with Opentracing-Python. You can take a look at the example from there. I just created this project to open an issue that I have encountered about closing ContextVarsScopeManager
with a FastAPI application that redirects incoming requests to Flask WSGI Application for specific the endpoint.
Bonus Section - Automated Instrumentation
Instrumentation can be done in many ways depending on your requirements. Manual instrumentation requires code changes and if you have a large enough system, you probably do not want to change all your code and break it up. Therefore, manual instrumentation is better employed if you plan for it from the beginning by implementing it as you write your code.
Thundra APM provides automated instrumentation to save you from the hassles and potential problems mentioned above. Thundra uses OpenTracing API to implement instrumentation and Thundra’s agents are compliant with OpenTracing API.
Automated instrumentation allows you to measure the execution time for the application code. By instrumenting your application, you immediately have end-to-end visibility into your application without changing your code.
By default, Thundra APM makes automated instrumentation for AWS resources, HTTP endpoints, Redis caches, MySQL, PostgreSQL tables, and many more. You can find more detailed information on how to instrument your applications automatically in Thundra APM’s documentation.