1. pulsar-go

1.1. 客户端

Pulsar协议 url 使用 pulsar scheme来指定被连接的集群,默认端口为6650。以下是 localhost 的示例:

pulsar://localhost:6650

生产环境的Pulsar 集群URL类似这样:

pulsar://pulsar.us-west.example.com:6650

1.2. 创建客户端

import (
    "log"
    "runtime"

    "github.com/apache/pulsar/pulsar-client-go/pulsar"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650",
        OperationTimeoutSeconds: 5,
        MessageListenerThreads: runtime.NumCPU(),
    })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }
}

1.3. Producers

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: "my-topic",
})

if err != nil {
    log.Fatalf("Could not instantiate Pulsar producer: %v", err)
}

defer producer.Close()

msg := pulsar.ProducerMessage{
    Payload: []byte("Hello, Pulsar"),
}

if err := producer.Send(msg); err != nil {
    log.Fatalf("Producer could not send message: %v", err)
}

1.3.1. Producer operations

方法 说明: 返回类型
Topic() Fetches the producer's topic string
Name() Fetches the producer's name string
Send(context.Context, ProducerMessage) error Publishes a message to the producer's topic. 该调用将一直阻塞,直到Pulsar代理成功确认该消息为止;如果超出了在生产者配置中使用SendTimeout设置的超时设置,则将引发错误。 error
SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error)) Publishes a message to the producer's topic asynchronously. 第三个参数是一个回调函数,它指定在确认消息或引发错误时发生的情况。
Close() 关闭生产者并释放分配给它的所有资源。如果调用Close(),则发布者将不再接受任何消息。该方法将一直阻塞,直到Pulsar保留了所有待处理的发布请求。如果抛出错误,将不会重试任何暂挂写入。

1.4. Consumers

消费者订阅一个或多个主题,并听取在该主题/这些主题上产生的传入消息。

msgChannel := make(chan pulsar.ConsumerMessage)

consumerOpts := pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "my-subscription-1",
    Type:             pulsar.Exclusive,
    MessageChannel:   msgChannel,
}

consumer, err := client.Subscribe(consumerOpts)

if err != nil {
    log.Fatalf("Could not establish subscription: %v", err)
}

defer consumer.Close()

for cm := range msgChannel {
    msg := cm.Message

    fmt.Printf("Message ID: %s", msg.ID())
    fmt.Printf("Message value: %s", string(msg.Payload()))

    consumer.Ack(msg)
}

1.4.1. Consumer operations

方法 说明: 返回类型
Topic() Returns the consumer's topic string
Subscription() Returns the consumer's subscription name string
Unsubcribe() 取消订阅分配的主题。如果取消订阅操作不成功,则会引发错误 error
Receive(context.Context) 从主题接收一条消息。此方法将阻塞,直到出现消息为止 (Message, error)
Ack(Message) Acknowledges a message to the Pulsar broker error
AckID(MessageID) Acknowledges a message to the Pulsar broker by message ID error
AckCumulative(Message) Acknowledges all the messages in the stream, up to and including the specified message. AckCumulative方法将一直阻塞,直到将确认发送到代理为止。此后,将不会将邮件重新传递给使用者。累积确认只能与共享订阅类型一起使用. error
Nack(Message) Acknowledge the failure to process a single message. error
NackID(MessageID) Acknowledge the failure to process a single message. error
Close() Closes the consumer, disabling its ability to receive messages from the broker error
RedeliverUnackedMessages() Redelivers all unacknowledged messages on the topic. In failover mode, 如果使用者未在指定主题上处于活动状态,则将忽略此请求; in shared mode, 重新传递的消息分布在与该主题相关的所有使用者上。注意:这是一个不会引发错误的非阻塞操作。.

1.5. Readers

Pulsar readers process messages from Pulsar topics

readers 与 consumers 不同,因为使用读取器,您需要明确指定要从流中开始的消息(另一方面, consumers会自动以最新的未确认消息开始)。

reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic: "my-golang-topic",
    StartMessageId: pulsar.LatestMessage,
})