在此查看完整示例。
Dubbo-Python 支持流式调用,包括 ClientStream、ServerStream 和 BidirectionalStream 三种模式。
在流式调用中,操作可以分为写入流和读取流两部分。对于 ClientStream,是多次写入、单次读取;对于 ServerStream,是单次写入、多次读取;而 BidirectionalStream 支持多次写入和多次读取。
流式调用的写入操作分为单次写入(ServerStream)和多次写入(ClientStream 和 BidirectionalStream)。
单次写入流的调用方式与 unary 模式类似。例如:
stub.server_stream(greeter_pb2.GreeterRequest(name="hello world from dubbo-python"))
对于多次写入流,用户可以通过迭代器或 writeStream 方式写入数据(两者只能选其一)。
迭代器写入:写入方式类似于 unary 模式,唯一的区别是传入的是迭代器。例如:
# Use an iterator to send multiple requests
def request_generator():
    for i in ["hello", "world", "from", "dubbo-python"]:
        yield greeter_pb2.GreeterRequest(name=str(i))
# Call the remote method and return a read_stream
stream = stub.client_stream(request_generator())
使用 writeStream 写入:此方法不传入参数,使用空参调用,然后通过 write 方法逐条写入数据,写入完成后调用 done_writing 方法结束流。例如:
stream = stub.bi_stream()
# Use the write method to send messages
stream.write(greeter_pb2.GreeterRequest(name="jock"))
stream.write(greeter_pb2.GreeterRequest(name="jane"))
stream.write(greeter_pb2.GreeterRequest(name="alice"))
stream.write(greeter_pb2.GreeterRequest(name="dave"))
# Call done_writing to notify the server that the client has finished writing
stream.done_writing()
流式调用的读取操作分为单次读取(ClientStream)和多次读取(ServerStream 和 BidirectionalStream)。在流式调用中,无论是哪种模式,返回的都是一个 ReadStream。我们可以使用 read 方法或迭代器读取数据,针对 read 方法,需要注意以下几点:
read 方法支持 timeout 参数,用于设置阻塞等待时间(单位:秒)。read 方法的返回结果可能为三种:所需信息(正常情况)、None(等待超时)、EOF(读取流结束)。调用 read 方法一次即可读取数据,例如:
result = stream.read()
print(f"Received response: {result.message}")
可以通过多次调用 read 方法读取数据,但需要处理 None 和 EOF 等非期望值。因为 ReadStream 实现了 __iter__ 和 __next__ 等迭代方法,我们可以通过迭代调用进行多次读取,此方法无需处理非期望值,但不支持设置阻塞超时参数。
迭代调用(推荐):
def client_stream(self, request_iterator):
    response = ""
    for request in request_iterator:
        print(f"Received request: {request.name}")
        response += f"{request.name} "
    return greeter_pb2.GreeterReply(message=response)
多次调用 read 方法:
# Use read method to receive messages
# If no message arrives within the specified time, returns None
# If the server has finished sending messages, returns EOF
while True:
    i = stream.read(timeout=0.5)
    if i is dubbo.classes.EOF:
        break
    elif i is None:
        print("No message received")
        continue
    print(f"Received response: {i.message}")