1. pulsar-go¶
1.1. 客户端¶
Pulsar协议 url 使用 pulsar
scheme来指定被连接的集群,默认端口为6650。以下是 localhost
的示例:
pulsar://localhost:6650
生产环境的Pulsar 集群URL类似这样:
pulsar://pulsar.us-west.example.com:6650
1.2. 创建客户端¶
import (
"log"
"runtime"
"github.com/apache/pulsar/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeoutSeconds: 5,
MessageListenerThreads: runtime.NumCPU(),
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
}
1.3. Producers¶
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar producer: %v", err)
}
defer producer.Close()
msg := pulsar.ProducerMessage{
Payload: []byte("Hello, Pulsar"),
}
if err := producer.Send(msg); err != nil {
log.Fatalf("Producer could not send message: %v", err)
}
1.3.1. Producer operations¶
方法 | 说明: | 返回类型 |
---|---|---|
Topic() |
Fetches the producer's topic | string |
Name() |
Fetches the producer's name | string |
Send(context.Context, ProducerMessage) error |
Publishes a message to the producer's topic. 该调用将一直阻塞,直到Pulsar代理成功确认该消息为止;如果超出了在生产者配置中使用SendTimeout设置的超时设置,则将引发错误。 | error |
SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) |
Publishes a message to the producer's topic asynchronously. 第三个参数是一个回调函数,它指定在确认消息或引发错误时发生的情况。 | |
Close() |
关闭生产者并释放分配给它的所有资源。如果调用Close(),则发布者将不再接受任何消息。该方法将一直阻塞,直到Pulsar保留了所有待处理的发布请求。如果抛出错误,将不会重试任何暂挂写入。 |
1.4. Consumers¶
消费者订阅一个或多个主题,并听取在该主题/这些主题上产生的传入消息。
msgChannel := make(chan pulsar.ConsumerMessage)
consumerOpts := pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription-1",
Type: pulsar.Exclusive,
MessageChannel: msgChannel,
}
consumer, err := client.Subscribe(consumerOpts)
if err != nil {
log.Fatalf("Could not establish subscription: %v", err)
}
defer consumer.Close()
for cm := range msgChannel {
msg := cm.Message
fmt.Printf("Message ID: %s", msg.ID())
fmt.Printf("Message value: %s", string(msg.Payload()))
consumer.Ack(msg)
}
1.4.1. Consumer operations¶
方法 | 说明: | 返回类型 |
---|---|---|
Topic() |
Returns the consumer's topic | string |
Subscription() |
Returns the consumer's subscription name | string |
Unsubcribe() |
取消订阅分配的主题。如果取消订阅操作不成功,则会引发错误 | error |
Receive(context.Context) |
从主题接收一条消息。此方法将阻塞,直到出现消息为止 | (Message, error) |
Ack(Message) |
Acknowledges a message to the Pulsar broker | error |
AckID(MessageID) |
Acknowledges a message to the Pulsar broker by message ID | error |
AckCumulative(Message) |
Acknowledges all the messages in the stream, up to and including the specified message. AckCumulative方法将一直阻塞,直到将确认发送到代理为止。此后,将不会将邮件重新传递给使用者。累积确认只能与共享订阅类型一起使用. | error |
Nack(Message) |
Acknowledge the failure to process a single message. | error |
NackID(MessageID) |
Acknowledge the failure to process a single message. | error |
Close() |
Closes the consumer, disabling its ability to receive messages from the broker | error |
RedeliverUnackedMessages() |
Redelivers all unacknowledged messages on the topic. In failover mode, 如果使用者未在指定主题上处于活动状态,则将忽略此请求; in shared mode, 重新传递的消息分布在与该主题相关的所有使用者上。注意:这是一个不会引发错误的非阻塞操作。. |
1.5. Readers¶
Pulsar readers process messages from Pulsar topics
readers 与 consumers 不同,因为使用读取器,您需要明确指定要从流中开始的消息(另一方面, consumers会自动以最新的未确认消息开始)。
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "my-golang-topic",
StartMessageId: pulsar.LatestMessage,
})