博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
微服务之间通过RabbitMQ通信
阅读量:6367 次
发布时间:2019-06-23

本文共 4324 字,大约阅读时间需要 14 分钟。

微服务之间通过RabbitMQ通信

微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议rpc协议或者使用消息中间件如RabbitMQ``Kafka

在这篇文章 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP的形式 ,那各个服务之间如何通过 RabbitMQ进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 **Booking Service(port 8003)**之间通信,用户预订之后,把预订信息写入到 booking的数据库中

安装 RabbitMQ

安装 RabbitMQ 之前需要先安装 的环境 ,然后下载安装 ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ 的接口如何使用,请参考官网的 ,有各个主流语言的实现我们使用的是Go版本,请下载对应的实现接口 go get github.com/streadway/amqp

RabbitMQ的接口做一下简单的封装

  • 定义一个接口

messaging/message.go

type IMessageClient interface {	ConnectToBroker(connectionStr string) error	PublishToQueue(data []byte, queueName string) error	SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error	Close()}type MessageClient struct {	conn *amqp.Connection}复制代码
  • 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {	if connectionStr == "" {		panic("the connection str mustnt be null")	}	var err error	m.conn, err = amqp.Dial(connectionStr)	return err}复制代码
  • 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {	if m.conn == nil {		panic("before publish you must connect the RabbitMQ first")	}	ch, err := m.conn.Channel()	defer ch.Close()	failOnError(err, "Failed to open a channel")	q, err := ch.QueueDeclare(		queueName,		false,		false,		false,		false,		nil,	)	failOnError(err, "Failed to declare a queue")	err = ch.Publish(		"",		q.Name,		false,		false,		amqp.Publishing{			ContentType: "application/json",			Body:        body,		},	)	failOnError(err, "Failed to publish a message")	return nil}复制代码
  • 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {	ch, err := m.conn.Channel()	//defer ch.Close()	failOnError(err, "Failed to open a channel")	q, err := ch.QueueDeclare(		queueName,		false,		false,		false,		false,		nil,	)	failOnError(err, "Failed to declare a queue")	msgs, err := ch.Consume(		q.Name,		"",		true,		false,		false,		false,		nil,	)	failOnError(err, "Failed to register a consumer")	go consumeLoop(msgs, handlerFunc)	return nil}复制代码

实现通信

User Service中定义一个新的POST接口 /user/{name}/booking,实现用户的预订功能,预订之后,通过RabbitMQ发布一个消息给 Booking Service,Booking Service接收到消息之后,做相应的处理(写入数据库)

User Service

  • 初始化 MessageClient

users/controllers/user.go

var client messaging.IMessageClientfunc init() {	client = &messaging.MessageClient{}	err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")	if err != nil {		fmt.Println("connect to rabbitmq error", err)	}}复制代码
  • 添加新的路由和实现

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)复制代码

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {	params := mux.Vars(r)	user_name := params["name"]	defer r.Body.Close()	var bookings models.Booking	body, _ := ioutil.ReadAll(r.Body)	err := json.Unmarshal(body, &bookings)	if err != nil {		fmt.Println("the format body error ", err)	}	fmt.Println("user name:", user_name, bookings)	go notifyMsg(body)}复制代码
  • 用一个协程实现消息的发布
func notifyMsg(body []byte) {	err := client.PublishToQueue(body, "new_booking")	if err != nil {		fmt.Println("Failed to publis message", err)	}}复制代码

Booking Service

  • 初始化MessageClient
var client messaging.IMessageClientfunc initMessage() {	client = &messaging.MessageClient{}	err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")	if err != nil {		fmt.Println("Failed to connect to RabbitMQ", err)	}	err = client.SubscribeToQueue("new_booking", getBooking)	if err != nil {		fmt.Println("Failed to comsuer the msg", err)	}}复制代码

在 web服务之前启动

func main() {	initMessage()	r := routes.NewRouter()	http.ListenAndServe(":8003", r)}复制代码
  • 接收后的消息处理
func getBooking(delivery amqp.Delivery) {  var booking models.Booking	json.Unmarshal(delivery.Body, &booking)  booking.Id = bson.NewObjectId().Hex()	dao.Insert("Booking", "BookModel", booking)	fmt.Println("the booking msg", booking)}复制代码

验证,需要启动 User ServiceBooking Service

使用 Postman 发送对应的数据

post 127.0.0.1:8000/user/kevin_woo/booking{	"name":"kevin_woo",	"books":[		{			"date":"20180727",			"movies":["5b4c45d49d5e3e33c4a5b97a"]		},		{			"date":"20180810",			"movies":["5b4c45ea9d5e3e33c4a5b97b"]		}	]}复制代码

可以看到数据库已经有了一条新的预订信息

说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证, 在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程

转载地址:http://qjrma.baihongyu.com/

你可能感兴趣的文章
Android 多线程之阻塞队列
查看>>
[译] 关于 Angular 依赖注入你需要知道的
查看>>
Haskell 在 macOS 下的环境搭建
查看>>
适配mpvue平台的的微信小程序日历组件mpvue-calendar
查看>>
【Linux学习】 Redis常用的一些指令
查看>>
Spring Cloud 中使用Feign解决参数注解无法继承的问题
查看>>
数据迁移方案 + Elasticsearch在综合搜索列表实现
查看>>
干货 | 分分钟教你用Python创建一个区块链
查看>>
Angular开发实践(八): 使用ng-content进行组件内容投射
查看>>
canvas+websocket+vue做一个完整的你画我猜小游戏
查看>>
android复习清单
查看>>
工作代码备用
查看>>
spring cloud互联网分布式微服务云平台规划分析--spring cloud定时调度平台
查看>>
说说如何配置 Webpack
查看>>
小程序中使用箭头函数的问题
查看>>
走进 JDK 之 Long
查看>>
Android打地鼠游戏的修改和优化
查看>>
Java异常
查看>>
map、reduce、filter、for...of、for...in等总结
查看>>
html2canvas-实现页面截图
查看>>