显示导航

Grails 和 Micronaut Kafka 的消息队列

学习如何使用 Grails 和 Micronaut Kafka 消息队列

作者:Sergio del Amo

Grails 版本:4.1.0.M5

1 培训

Grails 培训 - 由创建并积极维护 Grails 框架的人员开发和提供的!

2 开始使用

在此指南中,我们将向您展示如何使用 Grails 应用设置和使用 Micronaut Kafka

2.1 你将需要

要完成此指南,您需要以下内容

  • 时间

  • 一个像样的文本编辑器或 IDE

  • 安装并使用 `JAVA_HOME` 正确配置 JDK 1.8 或更高版本

2.2 如何完成指南

要开始,请执行以下操作

Grails 指南存储库包含三个文件夹

  • docker

  • complete

  • complete-analytics

在此指南中,您将创建两个 Grails 应用。completecomplete-analytics 都是已完成的示例。这是完成指南中介绍的步骤并应用这些更改的结果。

要在 Docker 中运行 Kafka,docker 文件夹包含一个 docker compose 文件。您需要安装 DockerDocker Compose

要完成指南,请按照接下来章节中的说明操作。

如果您进入grails-guides/grails-micronaut-kakfa/completegrails-guides/grails-micronaut-kakfa/complete-analytics,您可以直接转到完成的示例

3 应用概述

在本指南中,我们设置一个消息队列,以便在两个不同的应用程序中正常工作。在本指南中,我们有一个列出书籍和书籍详细信息的应用程序。我们希望跟踪每本书被浏览的次数。我们添加一个独立的分析应用程序,它会持续跟踪每本书被浏览的次数。

4 运行 Kafka

使用 Kafka 的快速方法是通过 Docker。创建这个 docker-compose.yml 文件

docker/docker-compose.yml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - 2181:2181 (1)
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092 (2)
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://127.0.0.1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 Zookeeper 默认使用端口 2181,但您可以在有需要的情况下更改此值。
2 Kafka 默认使用端口 9092,但您可以在有需要的情况下更改此值。

启动 Zookeeper 和 Kafka(使用 CTRL-C 同时停止两者)

$ docker-compose up

另外,您可以 安装并运行本地 Kafka 实例

5 图书应用程序

使用 rest-api 配置文件创建一个 Grails 应用程序。

grails create-app example.grails.complete --profile=rest-api

首先,添加一个 Book

grails-app/domain/example/grails/Book.groovy
package example.grails

class Book {
    String isbn
    String name

    static constraints = {
        isbn unique: true, blank: false, nullable: false
        name blank: false, nullable: false
    }
}

利用 GORM 数据服务Book 创建默认 CRUD 操作

grails-app/services/example/grails/BookGormService.groovy
package example.grails

import grails.gorm.services.Service

@Service(Book)
interface BookGormService {
    Book saveBook(Book book)

    List<Book> findAll()

    Book findByIsbn(String isbn)
}

然后,我们需要使用 Bootstrap.groovy 实际创建图书数据

grails-app/init/example/grails/Bootstrap.groovy
package example.grails

import groovy.transform.CompileStatic

@CompileStatic
class BootStrap {

    BookGormService bookGormService

    def init = { servletContext ->
        [
                new Book(isbn: '1491950358', name: 'Building Microservices'),
                new Book(isbn: '1680502395', name: 'Release It!'),
                new Book(isbn: '0321601912', name: 'Continuous Delivery')
        ].each {book ->
            bookGormService.saveBook(book)
        }
    }
    def destroy = {
    }
}

添加 Micronaut Kafka 依赖

build.gradle
implementation "io.micronaut:micronaut-inject-groovy"
implementation("io.micronaut.kafka:micronaut-kafka:3.1.0")

这个应用程序连接到运行在 localhost:9092 上的 Kafka 代理。添加以下配置

grails-app/conf/application.yml
kafka:
    bootstrap:
        servers: localhost:9092

创建一个发送消息到 Kafka 的接口。Micronaut 框架将在编译时实现此接口

src/main/groovy/example/grails/AnalyticsClient.groovy
package example.grails

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic

@KafkaClient
interface AnalyticsClient {

    @Topic('analytics') (1)
    Map updateAnalytics(Map book) (2)
}
1 设置主题名称
2 发送图书信息。Micronaut 框架会自动在发送前将其转换为 JSON 格式。

创建一个控制器,它会获取图书并使用 AnalyticsClient 通知 Kafka

grails-app/controllers/example/grails/BooksController.groovy
package example.grails

import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired

@CompileStatic
class BooksController {

    BookGormService bookGormService

    @Autowired
    AnalyticsClient analyticsClient

    static allowedMethods = [
            index: 'GET',
            show: 'GET'
    ]

    def index() {
        [books: bookGormService.findAll()]
    }

    def show(String isbn) {
        Book book = bookGormService.findByIsbn(isbn)
        if (!book) {
            response.status = 404
            return
        }
        analyticsClient.updateAnalytics([isbn: book.isbn])
        render(template: 'book', model: [book: book])
    }
}

将以下映射添加到 UrlMappings

grails-app/controllers/example/grails/UrlMappings.groovy
        "/books/$isbn" {
            controller = 'books'
            action = 'show'
        }

为控制器的操作创建两个 JSON 视图

grails-app/views/books/_book.gson
import example.grails.Book

model {
    Book book
}
json {
    isbn book.isbn
    name book.name
}
grails-app/views/books/index.gson
import example.grails.Book
model {
    List<Book> books = []
}
json tmpl.book(books)

6 构建分析应用程序

为这个附加应用程序创建一个新的 Grails 应用程序。例如,使用 Grails Application Forge 或命令行

$ grails create-app example.grails.complete-analytics --profile=rest-api

对于本指南的多应用程序部分,我们需要能够同时运行两个应用程序。要避免运行端口冲突,请更新您应用程序的 application.yml,使其包括以下内容

grails-app/conf/application.yml
server:
    port: 8081

创建一个域类 BookAnalytics,它将持续跟踪一本书被浏览的次数

grails-app/domain/example/grails/BookAnalytics.groovy
package example.grails

class BookAnalytics {
    String isbn
    Long count

    static constraints = {
        isbn unique: true, blank: false, nullable: false
        count blank: false, nullable: false
    }
}

为此域类创建一个 GORM 数据服务

grails-app/services/example/grails/BookAnalyticsGormService.groovy
package example.grails

import grails.gorm.services.Query
import grails.gorm.services.Service
import javax.inject.Singleton

@Singleton
@Service(BookAnalytics)
interface BookAnalyticsGormService {

    List<BookAnalytics> findAll()

    BookAnalytics findByIsbn(String isbn)

    BookAnalytics saveBookAnalytics(BookAnalytics bookAnalytics)

    @Query("update ${BookAnalytics bookAnalytics} set ${bookAnalytics.count} = $newCount where bookAnalytics.isbn = $isbn") (1)
    void updateCount(String isbn, Long newCount)

}
1 使用 JPA-QL 实现更新操作

创建一个使用先前服务的控制器

grails-app/controllers/example/grails/AnalyticsController.groovy
package example.grails

import groovy.transform.CompileStatic

@CompileStatic
class AnalyticsController {
    BookAnalyticsGormService bookAnalyticsGormService

    def index() {
        [analytics: bookAnalyticsGormService.findAll()]
    }
}

创建两个 JSON 视图

grails-app/views/analytics/_bookAnalytics.gson
import example.grails.BookAnalytics
model {
    BookAnalytics bookAnalytics
}
json {
    isbn bookAnalytics.id
    count bookAnalytics.count
}
grails-app/views/analytics/index.gson
import example.grails.BookAnalytics
model {
    List<BookAnalytics> analytics = []
}
json tmpl.bookAnalytics(analytics)

创建一个新类作为 books 微服务的消费者来使用 Kafka 发送的消息。Micronaut 框架将在编译时实施调用消费者的逻辑。创建 AnalyticsListener

src/main/groovy/example/grails/AnalyticsListener.groovy
package example.grails

import groovy.transform.CompileStatic
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import org.springframework.beans.factory.annotation.Autowired

@CompileStatic
@Requires(notEnv = Environment.TEST) (1)
@KafkaListener (2)
class AnalyticsListener {

    private final BookAnalyticsGormService bookAnalyticsGormService (3)

    AnalyticsListener(BookAnalyticsGormService bookAnalyticsGormService) { (3)
        this.bookAnalyticsGormService = bookAnalyticsGormService
    }

    @Topic('analytics') (4)
    void updateAnalytics(Map payload) {

        if (payload.containsKey('isbn')) {
            BookAnalytics bookAnalytics = bookAnalyticsGormService.findByIsbn(payload.isbn as String)
            if (bookAnalytics) {
                bookAnalyticsGormService.updateCount(payload.isbn as String, bookAnalytics.count + 1)
            } else {
                bookAnalytics = new BookAnalytics(isbn: payload.isbn as String, count: 1L)
                bookAnalyticsGormService.saveBookAnalytics(bookAnalytics)
            }
        }
    }
}
1 不要为测试环境加载此 bean - 这让我们在 Kafka 未运行的情况下运行测试
2 使用 @KafkaListener 注解该类,以表明此 bean 将从 Kafka 接收消息
3 BookAnalyticsGormService

的构造函数注入
4 使用 @Topic 注解该方法,并指定要使用的主题名称

7 运行应用程序

启动 Kafka

$ cd docker
docker$ docker-compose up

启动 books 微服务

$ cd complete
complete$ ./gradlew bootRun

启动 analytics 微服务

$ cd complete-analytics
complete-analytics$ ./gradlew bootRun

执行一个 curl 请求来获取一本书籍

$ curl https://127.0.0.1:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}

现在,使用 curl 查看分析结果

$ curl https://127.0.0.1:8081/analytics
[{"bookIsbn":"1491950358","count":1}]

更新 books 微服务的 curl 命令以检索其他书籍并重复调用,然后重新运行 curl 命令到 analytics 微服务中以查看计数是否会增加。

8 后续步骤

如需进一步了解,请阅读 Micronaut Kafka 插件文档。

9 您需要帮助使用 Grails 吗?

Object Computing, Inc. (OCI) 赞助创建了本指南。可提供各种咨询和支持服务。

OCI 是 Grails 的家

了解团队