|
@ -38,6 +38,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Capture the current ID, since it can change in the future.
|
|
|
|
|
|
subscriptionID := ctx.JSONReq.ID |
|
|
go func() { |
|
|
go func() { |
|
|
for { |
|
|
for { |
|
|
select { |
|
|
select { |
|
@ -46,7 +48,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er |
|
|
ctx.WSConn.TryWriteRPCResponse( |
|
|
ctx.WSConn.TryWriteRPCResponse( |
|
|
rpctypes.NewRPCSuccessResponse( |
|
|
rpctypes.NewRPCSuccessResponse( |
|
|
ctx.WSConn.Codec(), |
|
|
ctx.WSConn.Codec(), |
|
|
ctx.JSONReq.ID, |
|
|
|
|
|
|
|
|
subscriptionID, |
|
|
resultEvent, |
|
|
resultEvent, |
|
|
)) |
|
|
)) |
|
|
case <-sub.Cancelled(): |
|
|
case <-sub.Cancelled(): |
|
@ -59,7 +61,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er |
|
|
} |
|
|
} |
|
|
ctx.WSConn.TryWriteRPCResponse( |
|
|
ctx.WSConn.TryWriteRPCResponse( |
|
|
rpctypes.RPCServerError( |
|
|
rpctypes.RPCServerError( |
|
|
ctx.JSONReq.ID, |
|
|
|
|
|
|
|
|
subscriptionID, |
|
|
fmt.Errorf("subscription was cancelled (reason: %s)", reason), |
|
|
fmt.Errorf("subscription was cancelled (reason: %s)", reason), |
|
|
)) |
|
|
)) |
|
|
} |
|
|
} |
|
|