使用 OpenTelemetry 和 Jaeger 使用 MinIO 进行分布式跟踪

使用 OpenTelemetry 和 Jaeger 使用 MinIO 进行分布式跟踪

几年前,当你有一个单体应用程序时,调试和诊断相当容易,因为可能只有一个服务有几个用户。如今,系统被分解成更小的微服务,部署在跨不同云环境的多个集群中的 Kubernetes 之上的容器中。在这些类型的分布式环境中,需要观察所有情况,包括整体情况,如果需要,还需要在更细粒度的级别进行观察。

可观察性大致可以分为三个子类别:日志记录、指标和跟踪。在这篇博文中,我们将向您展示在新的或现有的 MinIO 应用程序中设置跟踪是多么简单。我们将构建一个小型 MinIO 应用程序来执行一些基本请求;这将是我们的基础应用程序,我们将向其添加跟踪以更好地了解系统组件和功能如何交互。

跟踪是一个术语,用于描述记录和观察应用程序发出的请求的活动,以及这些请求如何在系统中传播。当系统以分布式方式设置时,我们称之为分布式跟踪,因为它涉及观察应用程序及其通过多个系统的交互。例如,作为开发人员,您的代码可能包含多个函数,但您更感兴趣的是 MinIO 函数的执行时间以及这些函数在应用程序中使用时的相互依赖性。追踪将通过以下方式提供必要的见解:

  • 识别性能和延迟瓶颈

  • 重大事件后查找根本原因分析

  • 确定多服务架构中的依赖关系

  • 监控应用程序中的事务

最小IO

我们将从一个小型 MinIO Python 应用程序开始,该应用程序将显示几个简单的操作。稍后我们将添加用于跟踪的代码,以测量我们的代码执行所花费的时间。

安装 MinIO

有多种方法可以在各种环境中安装 MinIO在这篇博文中,我们将使用 Docker 启动 MinIO,但在生产中请确保安装在分布式设置中。

  • 在本地机器上创建一个目录,MinIO 将在其中保存数据

$ mkdir -p /minio/data

  • 使用 Docker 启动 MinIO 容器

$ docker run -d \
  -p 9000:9000 \
  -p 9001:9001 \
  --name minio \
  -v /minio/data:/data \
  -e "MINIO_ROOT_USER=minio" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  quay.io/minio/minio server /data --console-address ":9001"

注意:保留上面使用的凭据的副本,稍后您将需要它们来访问 MinIO。

  • 使用浏览器通过http://localhost:9001/使用用于启动上述容器的凭据登录,验证您是否可以访问 MinIO。


Screen Shot 2022-09-14 at 12.53.46 PM.png


MinIO SDK

有多种SDK支持您将您的应用程序与 MinIO API 集成。在此示例中,我们将使用Python SDK

  • 安装 MinIO Python SDK

$ pip install minio

  • 将包含 MinIO 函数的整个 Python 脚本复制并粘贴到本地文本编辑器中,并将其另存为minio_app.py您可以参考它,因为我们在下面描述了它的作用。

from minio import Minio

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

# Create a test object
file_path = "test_object.txt"
f = open(file_path, "w")
f.write("created test object")
f.close()

# Put an object inside the bucket
minio_client.fput_object(config["dest_bucket"], file_path, file_path)

# Get the object from the bucket
minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("Some objects here")

让我们来看看上面的脚本。我们正在调用我们在上一步中启动的 MinIO 容器的一些基本操作。

在最顶部,我们正在导入我们之前安装的 MinIO Python SDK 并使用默认值初始化它

  • MinIO 端点

  • MinIO用户名

  • 最小密码

  • MinIO 中的目标存储桶名称

from minio import Minio

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

  • 检查特定目标桶是否存在;如果不创建它。

## Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

  • 创建一个测试对象来执行基本操作。在这里,我们正在创建一个包含一些文本的文件

# Create a test object
file_path = "test_object.txt"
f = open(file_path, "w")
f.write("created test object")
f.close()

  • 将测试对象放入我们之前创建的桶中

minio_client.fput_object(config["dest_bucket"], file_path, file_path)

  • 获取我们在上一步中添加的测试对象。在您运行脚本的机器上,我们将文件放在/这样它就不会与原始文件冲突

minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)

  • 获取存储桶中的对象列表以确认我们的文件已经存在

for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("Some objects here")

  • 输出将如下所示。我们添加的文件显示为 Python 对象。现在我们知道对象就在那里。

<minio.datatypes.Object object at 0x109b1d3d0>
Some objects here

  • 使用以下命令运行脚本。您应该会在存储桶中看到新对象。

$ python minio_app.py


Screen Shot 2022-09-14 at 4.08.36 PM.png


  • 现在我们在您运行脚本的机器上有了该对象,让我们从我们的 MinIO 存储桶中删除它并验证那里没有其他对象。

minio_client.remove_object(config["dest_bucket"], file_path)

for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("No objects here")

  • 因为那是唯一的对象,现在我们可以删除我们之前创建的桶

if minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.remove_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been removed" % (config["dest_bucket"]))

请记住,我们在本教程中使用了一个非常简单的应用程序,该应用程序使用 MinIO SDK。从这里,您可以轻松地了解如何在您自己的应用程序中包含跟踪。虽然在构建您的应用程序时添加跟踪是理想的,但添加它并利用它提供的洞察力永远不会太晚。

开放遥测

OpenTelemetry 是一个框架,允许您从您的应用程序中获取跟踪、指标和日志,并以某种方式将它们标准化,然后它们可以被许多导出器使用,例如 Jaeger。

安装 OpenTelemetry

与 MinIO 一样,OpenTelemetry 支持许多SDK有些比其他的功能更丰富,但是已经构建了所有功能的 SDK 之一是Python SDK我们需要安装两个 Python 包

$ pip install opentelemetry-api
$ pip install opentelemetry-sdk

初始化 OpenTelemetry

安装所需的包后,将它们导入minio_app.py我们在上一节中启动的脚本。

  • 导入以下包

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

  • 在资源属性中设置服务名称,以便在搜索时很容易找到痕迹。我们为服务命名,my-minio但您可以随意命名。此代码完成并初始化跟踪

resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

我们已经创建了所有的构建块;现在让我们创建一个我们可以观察到的跨度。但什么是跨度?简单来说,跨度不过是函数发出的单个请求的开始和结束。可能有父母和孩子的跨度;这些一起形成了痕迹。

创建跨度

我们提出的第一个请求是检查存储桶是否存在。让我们为它创建一个跨度

with tracer.start_as_current_span("check if bucket exist"):

此时脚本应该看起来像这样

from minio import Minio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

# Initialize OpenTelemetry provider
resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("check if bucket exists"):
# Create destination bucket if it does not exist
  if not minio_client.bucket_exists(config["dest_bucket"]):
    minio_client.make_bucket(config["dest_bucket"])
    print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))



...TRUNCATED...


跨度可以发送到多个导出器,但开始时我们只将跟踪发送到 CLI。如果您运行上面的脚本,您应该会在末尾看到一个 JSON 输出,如下所示。这是你的踪迹。

$ python3 minio_app.py
Destination Bucket 'processed' has been created
<minio.datatypes.Object object at 0x103f36eb0>
Some objects here
Destination Bucket 'processed' has been removed
{
    "name": "check if bucket exists",
    "context": {
        "trace_id": "0xef41e07cf082045a2fc4eea70fd1a6de",
        "span_id": "0x867c14fe1fd97590",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": null,
    "start_time": "2022-09-14T20:49:15.569511Z",
    "end_time": "2022-09-14T20:49:15.599886Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {},
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "my-minio"
        },
        "schema_url": ""
    }
}

通过添加附加属性,可以通过多种方式自定义每个跨度。让我们添加一个function.name具有值的属性CHECK_BUCKET

with tracer.start_as_current_span("check if bucket exists"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CHECK_BUCKET")

如果您再次重新运行脚本,您会注意到在输出中会有一个新属性


...TRUNCATED…

 

   },
    "attributes": {
        "function.name": "CHECK_BUCKET"
    },
    "events": [],


...TRUNCATED...


您还可以添加事件以进一步丰富跟踪

  current_span.add_event("Checking if bucket exists.")
  if not minio_client.bucket_exists(config["dest_bucket"]):
    current_span.add_event("Bucket does not exist, going to create it.")

    minio_client.make_bucket(config["dest_bucket"])

再次运行脚本,您会注意到 JSON 输出中有两个新事件

...TRUNCATED... 


   "events": [
        {
            "name": "Checking if bucket exists.",
            "timestamp": "2022-09-14T21:09:48.505709Z",
            "attributes": {}
        },
        {
            "name": "Bucket does not exist, going to create it.",
            "timestamp": "2022-09-14T21:09:48.514541Z",
            "attributes": {}
        }
    ],


...TRUNCATED...


添加更多跨度

到目前为止,我们只添加了一个跨度。为了使其更有用,让我们添加更多跨度以及其他属性和事件。通常添加更多跨度不会降低应用程序的性能,因为这些是异步请求。

考虑到这一点,这里是用一些额外的跨度更新的脚本

from minio import Minio
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "localhost:9000",
  "minio_username": "minio",
  "minio_password": "minioadmin",
}

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=False,
              access_key=config["minio_username"],
              secret_key=config["minio_password"]
              )

resource = Resource(attributes={
  SERVICE_NAME: "my-minio"
})


provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)


tracer = trace.get_tracer(__name__)


# Create destination bucket if it does not exist
with tracer.start_as_current_span("check if bucket exists"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CHECK_BUCKET")

  current_span.add_event("Checking if bucket exists.")
  if not minio_client.bucket_exists(config["dest_bucket"]):
    current_span.add_event("Bucket does not exist, going to create it.")

    with tracer.start_as_current_span("create bucket"):
      minio_client.make_bucket(config["dest_bucket"])
      current_span.add_event("Bucket has been created.")
      print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

with tracer.start_as_current_span("create object to add"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CREATE_OBJECT")

  # Create a test object
  file_path = "test_object.txt"
  f = open(file_path, "w")
  f.write("created test object")
  f.close()
  current_span.add_event("Test object has been created.")

# Put an object inside the bucket
with tracer.start_as_current_span("add created object to bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "CREATE_OBJECT")

  minio_client.fput_object(config["dest_bucket"], file_path, file_path)
  current_span.add_event("Test object has been placed in bucket.")

# Get the object from the bucket
with tracer.start_as_current_span("fetch object from bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "FETCH_OBJECT")

  minio_client.fget_object(config["dest_bucket"], file_path, config["dest_bucket"] + "/" + file_path)
  current_span.add_event("Test object has been fetched from bucket.")

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("Some objects here")


# Remove the object from bucket
with tracer.start_as_current_span("remove object from bucket"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "REMOVE_OBJECT")

  minio_client.remove_object(config["dest_bucket"], file_path)
  current_span.add_event("Test object has been removed from bucket.")

# Get list of objects
for obj in minio_client.list_objects(config["dest_bucket"]):
  print(obj)
  print("No objects here")

# Remove destination bucket if it does exist
with tracer.start_as_current_span("check if bucket exists"):
  current_span = trace.get_current_span()
  current_span.set_attribute("function.name", "REMOVE_BUCKET")

  current_span.add_event("Checking if bucket exists.")
  if minio_client.bucket_exists(config["dest_bucket"]):
    current_span.add_event("Bucket exists, going to remove it.")

    with tracer.start_as_current_span("delete bucket"):
      minio_client.remove_bucket(config["dest_bucket"])
      current_span.add_event("Bucket has been removed.")
      print("Destination Bucket '%s' has been removed" % (config["dest_bucket"]))

这些只是几个例子;您可以根据需要进行详细说明,以使您的跟踪对您的团队有帮助。你可以测量

  • 数据库调用的性能

  • AI/ML 作业的处理时间和性能

  • 连接到外部服务时的延迟

Jaeger

但是有一个问题:如果您现在尝试运行脚本,它会运行,但是您会遇到一堵文本墙,这些文本可能比只有单跨度。为了理解这些痕迹,我们需要一个工具来收集和处理它们。

您可以使用许多工具,但 Jaeger 是最受欢迎的工具之一。像 MinIO 一样启动和运行非常容易,并且像 MinIO 一样,功能非常丰富,可以帮助您进行根本原因分析和服务依赖性分析等操作。

安装 Jaeger

我们将 Jaeger 部署为 Docker 容器并公开必要的端口

  • 安装 Jaeger 容器

$ docker run -d --name jaeger -p 16686:16686 -p 6831:6831/udp jaegertracing/all-in-one

  • 以上将公开两个端口localhost

6831: thrift server端口,即接受trace的入站端口

16686: Jaeger UI 让你可视化痕迹


Screen Shot 2022-09-14 at 5.31.59 PM.png


配置 Jaeger

目前我们的跟踪正在发送到 CLI。现在我们将稍微修改 Python 脚本以将它们发送到我们刚刚创建的 Jaeger 容器。

  • 安装 OpenTelemetry 的 Jaeger 导出器

$ pip install opentelemetry-exporter-jaeger

  • 通过替换以下行在 Python 中导入包

from opentelemetry.sdk.trace.export import ConsoleSpanExporter

from opentelemetry.exporter.jaeger.thrift import JaegerExporter

  • 添加 Jaeger exporter 主机信息

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

  • 替换以下行

processor = BatchSpanProcessor(ConsoleSpanExporter())

processor = BatchSpanProcessor(jaeger_exporter)

  • 最终结果看起来像这样

...TRUNCATED…


from opentelemetry.exporter.jaeger.thrift import JaegerExporter

...TRUNCATED...

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(jaeger_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)


...TRUNCATED...


使用 Jaeger

重新运行脚本。转至 Jaeger UI,您将在左侧看到该my-minio服务,而不是看到发出的 JSON blob(记住我们的文字墙)。选择它然后单击Find Traces


Screen Shot 2022-09-14 at 5.50.11 PM.png


到目前为止,我们只提出了一个请求,我们创建的几个 span 应该有几个跟踪。


Screen Shot 2022-09-14 at 5.54.33 PM.png


单击显示的其中一条迹线2 Spans让我们选择“检查桶是否存在”的那个。您可以通过比 JSON blob 更易于使用的方式查看所有详细信息,永远消除文本墙并重新提高我们的效率。


Screen Shot 2022-09-14 at 5.55.56 PM.png


运行脚本五到六次后,我们可以开始看到模式出现了。您在上面看到的时间是执行不同跨度所花费的时间。我们在这里看到两个跨度,因为如果您还记得我们添加了两个父跨度和子跨度。这不仅在视觉上比怪物 JSON blob 更吸引人,而且也更有帮助。


Screen Shot 2022-09-14 at 6.13.50 PM.png


然后,您可以将这些数据从 Jaeger 发送到 Grafana,这样您就可以获得历史图表,甚至可以根据特定阈值设置警报。例如,如果 AI/ML 作业执行其功能所需的时间超过 10 毫秒,则根据设置的阈值发出警报。您的应用程序在哪个环境中运行并不重要;你可以确保你可以用一块玻璃来存放手表。

一旦我们有了痕迹,您就想建立一个历史数据库,您可以回顾它以了解趋势和相关性。这就是指标可以派上用场的地方。OpenTelemetry 支持值得一试的指标和日志记录框架的完整缓存。

分布式跟踪加速故障排除

追踪只是通向可观察性之路的其中一个环节。通常,当事件发生时,我们用来确定和解决问题的不仅仅是一条痕迹、一份日志或一项指标。通常需要了解这些因素的组合才能找到根本原因。

可观察性为自动化打开了大门。我们是自动化的忠实拥护者,但为了在云规模上高效运行,您需要拥有坚实的基础以及对日志和应用程序性能的可见性。


上一篇 下一篇