Grails 和 Micronaut Kafka 的消息队列
学习如何使用 Grails 和 Micronaut Kafka 消息队列
作者:Sergio del Amo
Grails 版本:4.1.0.M5
1 培训
Grails 培训 - 由创建并积极维护 Grails 框架的人员开发和提供的!
2 开始使用
在此指南中,我们将向您展示如何使用 Grails 应用设置和使用 Micronaut Kafka。
2.1 你将需要
要完成此指南,您需要以下内容
-
时间
-
一个像样的文本编辑器或 IDE
-
安装并使用 `JAVA_HOME` 正确配置 JDK 1.8 或更高版本
2.2 如何完成指南
要开始,请执行以下操作
-
下载并解压源代码
或
-
克隆 Git 存储库
git clone https://github.com/grails-guides/grails-micronaut-kakfa.git
Grails 指南存储库包含三个文件夹
-
docker
-
complete
-
complete-analytics
在此指南中,您将创建两个 Grails 应用。complete
和 complete-analytics
都是已完成的示例。这是完成指南中介绍的步骤并应用这些更改的结果。
要在 Docker 中运行 Kafka,docker
文件夹包含一个 docker compose 文件。您需要安装 Docker 和 Docker Compose。
要完成指南,请按照接下来章节中的说明操作。
如果您进入grails-guides/grails-micronaut-kakfa/complete 和grails-guides/grails-micronaut-kakfa/complete-analytics ,您可以直接转到完成的示例。 |
3 应用概述
在本指南中,我们设置一个消息队列,以便在两个不同的应用程序中正常工作。在本指南中,我们有一个列出书籍和书籍详细信息的应用程序。我们希望跟踪每本书被浏览的次数。我们添加一个独立的分析应用程序,它会持续跟踪每本书被浏览的次数。
4 运行 Kafka
使用 Kafka 的快速方法是通过 Docker。创建这个 docker-compose.yml
文件
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181:2181 (1)
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092:9092 (2)
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 | Zookeeper 默认使用端口 2181,但您可以在有需要的情况下更改此值。 |
2 | Kafka 默认使用端口 9092,但您可以在有需要的情况下更改此值。 |
启动 Zookeeper 和 Kafka(使用 CTRL-C 同时停止两者)
$ docker-compose up
另外,您可以 安装并运行本地 Kafka 实例。
5 图书应用程序
使用 rest-api
配置文件创建一个 Grails 应用程序。
grails create-app example.grails.complete --profile=rest-api
首先,添加一个 Book
域
package example.grails
class Book {
String isbn
String name
static constraints = {
isbn unique: true, blank: false, nullable: false
name blank: false, nullable: false
}
}
利用 GORM 数据服务 为 Book
创建默认 CRUD 操作
package example.grails
import grails.gorm.services.Service
@Service(Book)
interface BookGormService {
Book saveBook(Book book)
List<Book> findAll()
Book findByIsbn(String isbn)
}
然后,我们需要使用 Bootstrap.groovy
实际创建图书数据
package example.grails
import groovy.transform.CompileStatic
@CompileStatic
class BootStrap {
BookGormService bookGormService
def init = { servletContext ->
[
new Book(isbn: '1491950358', name: 'Building Microservices'),
new Book(isbn: '1680502395', name: 'Release It!'),
new Book(isbn: '0321601912', name: 'Continuous Delivery')
].each {book ->
bookGormService.saveBook(book)
}
}
def destroy = {
}
}
添加 Micronaut Kafka 依赖
implementation "io.micronaut:micronaut-inject-groovy"
implementation("io.micronaut.kafka:micronaut-kafka:3.1.0")
这个应用程序连接到运行在 localhost:9092 上的 Kafka 代理。添加以下配置
kafka:
bootstrap:
servers: localhost:9092
创建一个发送消息到 Kafka 的接口。Micronaut 框架将在编译时实现此接口
package example.grails
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.Topic
@KafkaClient
interface AnalyticsClient {
@Topic('analytics') (1)
Map updateAnalytics(Map book) (2)
}
1 | 设置主题名称 |
2 | 发送图书信息。Micronaut 框架会自动在发送前将其转换为 JSON 格式。 |
创建一个控制器,它会获取图书并使用 AnalyticsClient
通知 Kafka
package example.grails
import groovy.transform.CompileStatic
import org.springframework.beans.factory.annotation.Autowired
@CompileStatic
class BooksController {
BookGormService bookGormService
@Autowired
AnalyticsClient analyticsClient
static allowedMethods = [
index: 'GET',
show: 'GET'
]
def index() {
[books: bookGormService.findAll()]
}
def show(String isbn) {
Book book = bookGormService.findByIsbn(isbn)
if (!book) {
response.status = 404
return
}
analyticsClient.updateAnalytics([isbn: book.isbn])
render(template: 'book', model: [book: book])
}
}
将以下映射添加到 UrlMappings
"/books/$isbn" {
controller = 'books'
action = 'show'
}
为控制器的操作创建两个 JSON 视图
import example.grails.Book
model {
Book book
}
json {
isbn book.isbn
name book.name
}
import example.grails.Book
model {
List<Book> books = []
}
json tmpl.book(books)
6 构建分析应用程序
为这个附加应用程序创建一个新的 Grails 应用程序。例如,使用 Grails Application Forge 或命令行
$ grails create-app example.grails.complete-analytics --profile=rest-api
对于本指南的多应用程序部分,我们需要能够同时运行两个应用程序。要避免运行端口冲突,请更新您应用程序的 application.yml
,使其包括以下内容
server:
port: 8081
创建一个域类 BookAnalytics
,它将持续跟踪一本书被浏览的次数
package example.grails
class BookAnalytics {
String isbn
Long count
static constraints = {
isbn unique: true, blank: false, nullable: false
count blank: false, nullable: false
}
}
为此域类创建一个 GORM 数据服务
package example.grails
import grails.gorm.services.Query
import grails.gorm.services.Service
import javax.inject.Singleton
@Singleton
@Service(BookAnalytics)
interface BookAnalyticsGormService {
List<BookAnalytics> findAll()
BookAnalytics findByIsbn(String isbn)
BookAnalytics saveBookAnalytics(BookAnalytics bookAnalytics)
@Query("update ${BookAnalytics bookAnalytics} set ${bookAnalytics.count} = $newCount where bookAnalytics.isbn = $isbn") (1)
void updateCount(String isbn, Long newCount)
}
1 | 使用 JPA-QL 实现更新操作 |
创建一个使用先前服务的控制器
package example.grails
import groovy.transform.CompileStatic
@CompileStatic
class AnalyticsController {
BookAnalyticsGormService bookAnalyticsGormService
def index() {
[analytics: bookAnalyticsGormService.findAll()]
}
}
创建两个 JSON 视图
import example.grails.BookAnalytics
model {
BookAnalytics bookAnalytics
}
json {
isbn bookAnalytics.id
count bookAnalytics.count
}
import example.grails.BookAnalytics
model {
List<BookAnalytics> analytics = []
}
json tmpl.bookAnalytics(analytics)
创建一个新类作为 books 微服务的消费者来使用 Kafka 发送的消息。Micronaut 框架将在编译时实施调用消费者的逻辑。创建 AnalyticsListener
类
package example.grails
import groovy.transform.CompileStatic
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.context.env.Environment
import org.springframework.beans.factory.annotation.Autowired
@CompileStatic
@Requires(notEnv = Environment.TEST) (1)
@KafkaListener (2)
class AnalyticsListener {
private final BookAnalyticsGormService bookAnalyticsGormService (3)
AnalyticsListener(BookAnalyticsGormService bookAnalyticsGormService) { (3)
this.bookAnalyticsGormService = bookAnalyticsGormService
}
@Topic('analytics') (4)
void updateAnalytics(Map payload) {
if (payload.containsKey('isbn')) {
BookAnalytics bookAnalytics = bookAnalyticsGormService.findByIsbn(payload.isbn as String)
if (bookAnalytics) {
bookAnalyticsGormService.updateCount(payload.isbn as String, bookAnalytics.count + 1)
} else {
bookAnalytics = new BookAnalytics(isbn: payload.isbn as String, count: 1L)
bookAnalyticsGormService.saveBookAnalytics(bookAnalytics)
}
}
}
}
1 | 不要为测试环境加载此 bean - 这让我们在 Kafka 未运行的情况下运行测试 |
2 | 使用 @KafkaListener 注解该类,以表明此 bean 将从 Kafka 接收消息 |
3 | BookAnalyticsGormService 的构造函数注入
|
4 | 使用 @Topic 注解该方法,并指定要使用的主题名称 |
7 运行应用程序
启动 Kafka
$ cd docker
docker$ docker-compose up
启动 books
微服务
$ cd complete
complete$ ./gradlew bootRun
启动 analytics
微服务
$ cd complete-analytics
complete-analytics$ ./gradlew bootRun
执行一个 curl 请求来获取一本书籍
$ curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
现在,使用 curl 查看分析结果
$ curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]
更新 books 微服务的 curl 命令以检索其他书籍并重复调用,然后重新运行 curl 命令到 analytics 微服务中以查看计数是否会增加。
8 后续步骤
如需进一步了解,请阅读 Micronaut Kafka 插件文档。