Interface Subscriber<V>
-
- Type Parameters:
V
- the type of the value returned by the subscriber
- All Superinterfaces:
AutoCloseable
public interface Subscriber<V> extends AutoCloseable
ASubscriber
subscribes either directly to aNamedTopic
or to asubscriber group
of aNamedTopic
. Each value published to aNamedTopic
is delivered to all of itssubscriber groups
and direct (anonymous)Subscriber
s.The factory methods
NamedTopic.createSubscriber(Subscriber.Option[])
orSession.createSubscriber(String, Option[])
allows one to specify one or moreSubscriber.Option
s to configure theSubscriber
. TheSubscriber.Name.inGroup(String)
option specifies the subscriber group for theSubscriber
to join. If this option is not specified, theSubscriber
is a direct (anonymous) subscriber to the topic. All Subscriber options and defaults are summarized in a table inSubscriber.Option
.Channels
Topics use the concept of channels to improve scalability. This is similar to how Coherence uses partition for caches but to avoid confusion the name channel was chosen. The default is the next prime above the square root of the partition count configured for the underlying cache service, which for the default partition count of 257 is 17 channels.Publishers
publish messages to a channel based on their ordering configuration. Subscribers then subscribe from channels by assigning channel ownership to subscribers. An anonymous subscriber has ownership of all channels. A subscriber that is part of a subscriber group has ownership of a sub-set of the available channels.Channel count is configurable, but ideally should not be set too high nor too low. For example setting the channel count to 1, would mean that all publishers contend to publish to a single channel, and that only one subscriber in a subscriber group will be able to receive messages. Setting the channel count too high (above say the number of publishers) may mean that some channels never receive any messages and are wasted. Finding the appropriate value is admittedly non-trivial, however when faced with maxing out throughput from a publisher's perspective this is a configuration that can be tweaked.
Subscriber Groups
Subscribers can be part of a subscriber group. Within each subscriber group, each value is onlyreceived
by one of thegroup members
, enabling distributed, parallel processing of the values delivered to the subscriber group. Thus, each subscriber group in effect behaves like a queue over the topic data.Subscribers in a group can be considered durable, if they are closed, or fail, then message processing will continue from the next element after the last committed position when subscriber reconnect.
To maintain ordering or messages, only a single subscriber in a group polls for messages from a channel. Each subscriber in a group is allocated ownership of a sub-set of the channels in a topic. This means that the maximum number of subscribers in a group that could be doing any work would be the same as the topic's channel count. If there are more subscribers in a group that there are channels, then the additional subscribers will not be allocated ownership of any channels and hence will receive no messages. It may seem that increasing the channel count to a higher number would therefore allow more subscribers to work in parallel, but this is not necessarily the case.
Channels are allocated to subscribers when they are created. As more subscribers in a group are created channel ownership is rebalanced, existing subscribers may lose ownership of channels that are allocated to new subscribers. As subscribers are
closed
(or die or are timed out), again channel ownership is rebalanced over the remaining subscribers.Subscribers in a group have a configurable timeout. If a subscriber does not call receive within the timeout it is considered dead and the channels it owns will be reallocated to other subscribers in the group. This is to stop channel starvation where no subscriber in a group is polling from a channel.
Positions
Elements in aNamedTopic
are published to a channel have a uniquePosition
within that channel. APosition
is an opaque representation of the underlying position as theoretically the implementation of thePosition
could change for different types of topic. Positions are used in various places in the API, for example, positions can be committed, and they can be used to move the subscriber to back or forwards within a channel. APosition
is serializable, so they can be stored and recovered to later reset a subscriber to a desired position. Positions areComparable
so positions for elements can be used to determine whether how two elements related to each other within a channel.Committing
Subscribers in a group are durable, so if they disconnect and reconnect, messages processing will restart from the correct position. A Subscriber cancommit an element
orcommit a position in a channel
. After a successful commit, then on reconnection the first message received from a channel will be the the next message after the committed position.When a position is committed, this will also commit any earlier positions in the channel. For example, if five elements are received and commit is called only on the last element, this effectively also commits the previous four elements.
When topics are configured not to retain elements, received elements will be removed as they are committed by subscribers.
If a subscriber in a subscriber group is timed-out (or dies) and its channel ownership is reallocated to other subscribers in the group, those subscribers will start to receive messages from the last committed position for of the channels from the failed subscriber.
Commits may be performed synchronously (using
commit(int, Position)
) or asynchronously (usingcommitAsync(int, Position)
). There is no facility for automatic commit of messages, all calls to commit must be done manually by application code.Seeking
A new subscriber will start to receive message from the head position of a channel (or the last committed position for a durable subscriber) and continue to receive messages in order until the tail is reached. It is possible to reposition a subscriber (backwards or forwards) using one of theseek(int, Instant)
methods. Seeking applies a new position to a specific channel. If an attempt is made to position the channel before the first available message (the channel's head) then the subscriber will be positioned at the head. Correspondingly, if an attempt is made to position a subscriber way beyond the last position in a channel (the channel's tail) then the subscriber will be positioned at the tail of the channel and will receive the next message published to that channel.It is also possible to seek to a timestamp. Published messages are given a timestamp, which is the
Coherence cluster time
on the storage enabled cluster member that accepted the published message. Using theseek(int, Instant)
method a subscriber can be repositioned so that the next message received from a channel is the first message with a timestamp greater than the timestamp used in the seek call. For example to position channel 0 so that the next message received is the first message after 20:30 on July 5th 2021:Instant timestamp = LocalDateTime.of(2021, Month.JULY, 5, 20, 30) .toInstant(ZoneOffset.UTC); subscriber.seek(0, timestamp);
Seeking Forwards
It is important to note that seeking forwards is skipping over messages, those skipped message will never be received once another commit is executed. Moving forwards does not alter the commit position, so when a subscriber has committed a position, then moves forwards and later fails, it will restart back at the commit.Seeking Backwards Over Previous Commits
When topics are configured not to retain elements removal of elements occurs as their positions are committed, so they can never be received again. This means that in this case, it would not be possible to seek backwards further than the last commit, as those elements no longer exist in the topic.When topics are configured to retain elements then it is possible to seek backwards further than the last commit. This effectively rolls-back those commits and the previously committed messages after the new seek position will be re-received. If a subscriber fails after moving back in this fashion it will restart at the rolled-back position.
Receiving Elements
Receiving elements from a topic is an asynchronous operation, calls toreceive()
(or the batch versionreceive(int)
) return aCompletableFuture
. If multiple calls are made to receive, the returned futures will complete in the correct order to maintain message ordering in a channel. To maintain ordering, the futures are completed by a single daemon thread. This means that code using any of the synchronousCompletableFuture
handling patterns, such asCompletableFuture.thenApply(java.util.function.Function)
orCompletableFuture.thenAccept(java.util.function.Consumer)
, etc. will run on the same daemon thread, so application code in the handler methods must complete before the next receive future will be completed. Again, this is intentional, to maintain strict ordering of processing of received elements. If theCompletableFuture
asynchronous handler methods are used such as,CompletableFuture.thenApplyAsync(java.util.function.Function)
orCompletableFuture.thenAcceptAsync(java.util.function.Consumer)
, etc. then the application code handling the received element will execute on another thread, and at this point there are no ordering guarantees.It is important that application code uses the correct handling of the returned futures to both maintain ordering (if that is important to the application) and also to have correct error handling, and not lose exceptions, which is easy to do in poorly written asynchronous future handler code.
Clean-Up
Subscribers should ideally be closed when application code finishes with them. This will clean up server-side resources associated with a subscriber.It is also important (possibly more important) to
clean up subscriber groups
that are no longer required. Failure to delete a subscriber group will cause messages to be retained on the server that would otherwise have been removed, so consuming more server-side resources such as heap and disc.- Since:
- Coherence 14.1.1
- Author:
- jf/jk/mf 2015.06.03
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface Description static interface
Subscriber.Channel
A representation of a topic channel within subscriber.static interface
Subscriber.ChannelOwnershipListener
A listener that receives notification of channel ownership changes.static class
Subscriber.ChannelOwnershipListeners<V>
A subscriberSubscriber.Option
that allows one or morelisteners
to be added to the subscriber, that will be notified of changes to the subscribers channel ownership.static class
Subscriber.CommitResult
The result of a commit request.static class
Subscriber.CommitResultStatus
The different result statuses for a commit request.static class
Subscriber.CompleteOnEmpty<V>
The CompleteOnEmpty option indicates that theCompletableFuture
returned from thereceive()
operation should complete with a nullSubscriber.Element
upon identifying that the topic is or has become empty.static class
Subscriber.Convert<V,U>
The Convert option specifies aValueExtractor
that will convert topic values that a subscriber is interested in receiving prior to sending them to the subscriber.static interface
Subscriber.Element<V>
Element represents a container for returned values.static class
Subscriber.Filtered<V>
The Filtered option specifies a filter that will determine which topic values a subscriber is interested in receiving.static interface
Subscriber.Id
A marker interface forSubscriber
identifiers.static class
Subscriber.Name<V>
The Name option is used to specify a subscriber group name.static interface
Subscriber.Option<V,U>
A marker interface to indicate that a class is a validSubscriber.Option
for aSubscriber
.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description void
close()
Close the Subscriber.default Subscriber.CommitResult
commit(int nChannel, Position position)
Commit the specified channel and position.default Map<Integer,Subscriber.CommitResult>
commit(Map<Integer,Position> mapPositions)
Commit the specified channels and positions.CompletableFuture<Subscriber.CommitResult>
commitAsync(int nChannel, Position position)
Asynchronously commit the specified channel and position.CompletableFuture<Map<Integer,Subscriber.CommitResult>>
commitAsync(Map<Integer,Position> mapPositions)
Asynchronously commit the specified channels and positions.static <V> Subscriber.CompleteOnEmpty<V>
completeOnEmpty()
Obtain the Option indicating futures should complete if the topic is empty.int
getChannelCount()
Returns the number of channels in the underlyingNamedTopic
.int[]
getChannels()
Returns the current set of channels that thisSubscriber
owns.FlowControl
getFlowControl()
Return theFlowControl
object governing this subscriber.default Optional<Position>
getHead(int nChannel)
Returns thePosition
that is currently the tail for the specified channel, orOptional.empty()
if the channel is not owned by thisSubscriber
.Map<Integer,Position>
getHeads()
Returns aMap
of thePositions
that are currently the head for each channel owned by thisSubscriber
.Map<Integer,Position>
getLastCommitted()
Returns aMap
of channels to the latestPosition
committed for that channel; the map will only contain channels owned by thisSubscriber
.default Optional<Position>
getLastCommitted(int nChannel)
Returns anOptional
containing the latest position committed for a channel, orOptional.empty()
if the channel is not owned by thisSubscriber
<T> NamedTopic<T>
getNamedTopic()
Returns the underlyingNamedTopic
that thisSubscriber
is subscribed to, which could be of a different generic type to thisSubscriber
if the subscriber is using a transformer.int
getRemainingMessages()
Returns the number of remaining messages to be read from the topic for this subscriber.int
getRemainingMessages(int nChannel)
Returns the number of remaining messages to be read from the topic channel for this subscriber.default Optional<Position>
getTail(int nChannel)
Returns thePosition
that is currently the tail for the specified channel orOptional.empty()
if the channel is not owned by thisSubscriber
.Map<Integer,Position>
getTails()
Returns aMap
of thePositions
that are currently the tail for each channel owned by thisSubscriber
; that is the last message in the channel.void
heartbeat()
Send a heartbeat to the server to keep this subscriber alive.static <V> Subscriber.Name<V>
inGroup(String sName)
Obtain aSubscriber.Option
that specifies a group name for aSubscriber
.boolean
isActive()
Determine whether thisSubscriber
is active.default boolean
isOwner(int nChannel)
Returnstrue
if this subscriber is the owner of the specified channel.void
onClose(Runnable action)
Add an action to be executed when thisSubscriber
is closed.CompletableFuture<Subscriber.Element<V>>
receive()
Receive a value from the topic.CompletableFuture<List<Subscriber.Element<V>>>
receive(int cBatch)
Receive a batch ofelements
from the topic.Position
seek(int nChannel, Position position)
Seek to the specified position in a channel.Position
seek(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic.Map<Integer,Position>
seek(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels.default Position
seekAndCommit(int nChannel, Position position)
Seek to the specified position in a channel and set the commit point to the newPosition
.default Position
seekAndCommit(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic and set the commit point to the new position.default Map<Integer,Position>
seekAndCommit(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels and sets the commit position for the channels.Map<Integer,Position>
seekToHead(int... anChannel)
Reposition one or more channels to their respective head positions.Map<Integer,Position>
seekToTail(int... anChannel)
Reposition one or more channels to their respective tail positions.default Map<Integer,Position>
seekToTailAndCommit(int... anChannel)
Reposition one or more channels to their respective tail positions and set the commit point to the newPosition
.static <V,U>
Subscriber.Convert<V,U>withConverter(ValueExtractor<? super V,U> extractor)
Return a Convert option with the specified extractor.static <V> Subscriber.Filtered<V>
withFilter(Filter<? super V> filter)
Return a Filtered option with the specified filter.static <V> Subscriber.ChannelOwnershipListeners<V>
withListener(Subscriber.ChannelOwnershipListener... aListener)
Create aSubscriber.ChannelOwnershipListeners
option with one or morelisteners
.
-
-
-
Method Detail
-
receive
CompletableFuture<Subscriber.Element<V>> receive()
Receive a value from the topic. If there is no value available then the future will complete according to theSubscriber.CompleteOnEmpty
option used to create theSubscriber
.Note: If the returned future is
cancelled
it is possible that a value may still be considered by the topic to have been received by this group, while the group would consider this a lost value. Subscriber implementations will make its best effort to prevent such loss, but it cannot be guaranteed and thus cancellation is not advisable.The
futures
returned from calls toreceive
are completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handlesfuture
completion using the asynchronous methods ofCompletableFuture
(i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.- Returns:
- a
CompletableFuture
which can be used to access the result of this completed operation - Throws:
IllegalStateException
- if theSubscriber
is closed
-
receive
CompletableFuture<List<Subscriber.Element<V>>> receive(int cBatch)
Receive a batch ofelements
from the topic.The
cMessage
parameter specifies the maximum number of elements to receive in the batch. The subscriber may return fewer elements than thecMessage
parameter; this does not signify that the topic is empty.If there is no value available then the future will complete according to the
Subscriber.CompleteOnEmpty
option used to create theSubscriber
.If the poll of the topic returns nothing (i.e. the topic was empty and
Subscriber.CompleteOnEmpty
) is true then theConsumer
will not be called.The
futures
returned from calls toreceive
are completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handlesfuture
completion using the asynchronous methods ofCompletableFuture
(i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.- Parameters:
cBatch
- the maximum number of elements to receive in the batch- Returns:
- a future which can be used to access the result of this completed operation
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
getChannels
int[] getChannels()
Returns the current set of channels that thisSubscriber
owns.Subscribers that are part of a subscriber group own a sub-set of the available channels. A subscriber in a group should normally be assigned ownership of at least one channel. In the case where there are more subscribers in a group that the number of channels configured for a topic, then some subscribers will obviously own zero channels. Anonymous subscribers that are not part of a group are always owners all the available channels.
- Returns:
- the current set of channels that this
Subscriber
is the owner of, or an empty array if this subscriber has not been assigned ownership any channels
-
isOwner
default boolean isOwner(int nChannel)
Returnstrue
if this subscriber is the owner of the specified channel.This method only really applies to subscribers that are part of a group that may own zero or more channels. This method will always return
true
for an anonymous subscriber that is not part of a group.As channel ownership may change as subscribers in a group are created and closed the result of this method is somewhat transient. To more accurately track channel ownership changes create a subscriber using the
Subscriber.ChannelOwnershipListeners
option- Parameters:
nChannel
- the channel number- Returns:
true
if this subscriber is currently the owner of the specified channel, otherwise returnsfalse
if the channel is not owned by this subscriber
-
getChannelCount
int getChannelCount()
Returns the number of channels in the underlyingNamedTopic
.This could be different to the number of channels
owned
by thisSubscriber
.- Returns:
- the number of channels in the underlying
NamedTopic
-
getFlowControl
FlowControl getFlowControl()
Return theFlowControl
object governing this subscriber.- Returns:
- the FlowControl object.
-
close
void close()
Close the Subscriber.Closing a subscriber ensures that no new
receive()
requests will be accepted and all pending receive requests will be completed or safely cancelled.For a direct topic
Subscriber.close()
enables the release of storage resources for unconsumed values.For a
group member
,close()
indicates that this member has left its correspondinggroup
. One must actively manage aNamedTopic's logical subscriber groups
since their life span is independent of activeSubscriber
group membership.NamedTopic.destroySubscriberGroup(String)
releases storage and stops accumulating topic values for a subscriber group.Calling
close()
on an already closed subscriber is a no-op.- Specified by:
close
in interfaceAutoCloseable
-
heartbeat
void heartbeat()
Send a heartbeat to the server to keep this subscriber alive.Heartbeat messages are sent on calls to any of the
receive()
methods. If a subscriber does not call receive within the configured timeout time, it will be considered dead and lose its channel ownership. This is to stop badly behaving subscribers from blocking messages from being processed, for example if a bug in application code causes a subscriber to deadlock or take an excessive amount of time to process messages.If application code knows that it will take longer to execute than the subscriber timeout, for example the message processing communicates with a third-party system that is in some kind of back-off loop, then the subscriber can send a heartbeat to keep itself alive.
A subscriber that times-out is still active and is not closed. On the next call to a
receive()
method the subscriber will reconnect and be reallocated channel ownerships. Although a timed-out subscriber is not closed it would have lost ownership of channels and not be able to commit any messages that it was processing when it timed out. This is because another subscriber in the group may have already been allocated ownership of the same channels, already processed, and committed that same message. Allowing commits of unowned channels is configurable, by default a subscriber can only commit positions for channels it owns.Heart-beating only applies to subscribers that are members of a subscriber group. An anonymous subscriber owns all channels and no other subscriber is sharing the workload, so anonymous subscriber will not be timed-out. For an anonymous subscriber the heartbeat operation is a no-op.
-
isActive
boolean isActive()
Determine whether thisSubscriber
is active.- Returns:
true
if thisSubscriber
is active
-
onClose
void onClose(Runnable action)
Add an action to be executed when thisSubscriber
is closed.- Parameters:
action
- the action to execute
-
commit
default Subscriber.CommitResult commit(int nChannel, Position position)
Commit the specified channel and position.- Parameters:
nChannel
- the channel to commitposition
- the position within the channel to commit- Returns:
- the result of the commit request
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
commitAsync
CompletableFuture<Subscriber.CommitResult> commitAsync(int nChannel, Position position)
Asynchronously commit the specified channel and position.- Parameters:
nChannel
- the channel to commitposition
- the position within the channel to commit- Returns:
- the result of the commit request
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
commit
default Map<Integer,Subscriber.CommitResult> commit(Map<Integer,Position> mapPositions)
Commit the specified channels and positions.- Parameters:
mapPositions
- a map of channels napped to the position to commit- Returns:
- a map of results of the commit request for each channel
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
commitAsync
CompletableFuture<Map<Integer,Subscriber.CommitResult>> commitAsync(Map<Integer,Position> mapPositions)
Asynchronously commit the specified channels and positions.- Parameters:
mapPositions
- a map of channels mapped to the positions to commit- Returns:
- a map of results of the commit request for each channel
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
getLastCommitted
default Optional<Position> getLastCommitted(int nChannel)
Returns anOptional
containing the latest position committed for a channel, orOptional.empty()
if the channel is not owned by thisSubscriber
- Parameters:
nChannel
- the channel to get the last committed position for- Returns:
- the latest position committed for a channel, or
Optional.empty()
if the channel is not owned by thisSubscriber
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
getLastCommitted
Map<Integer,Position> getLastCommitted()
Returns aMap
of channels to the latestPosition
committed for that channel; the map will only contain channels owned by thisSubscriber
.- Returns:
- a
Map
of channels to the latestPosition
committed for that channel; the map will only contain channels owned by thisSubscriber
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
seek
Position seek(int nChannel, Position position)
Seek to the specified position in a channel.This method will position the subscriber such that the element returned by the next call to any
receive
method will be the element after the specified position in the channel.An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)
using the seeked toPosition
for the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel
- the channel to repositionposition
- thePosition
to seek to- Returns:
- the
Position
actually seeked to, which may be different to theposition
parameter if theposition
parameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException
- if thePosition
is not the correct type for the topic implementation or is an invalid positionIllegalStateException
- if this subscriber is not the owner of the channel being repositioned
-
seekAndCommit
default Position seekAndCommit(int nChannel, Position position)
Seek to the specified position in a channel and set the commit point to the newPosition
.This method will position the subscriber such that the element returned by the next call to any
receive
method will be the element after the specified position in the channel.An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel
- the channel to repositionposition
- thePosition
to seek to- Returns:
- the
Position
actually seeked to, which may be different to theposition
parameter if theposition
parameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException
- if thePosition
is not the correct type for the topic implementation or is an invalid positionIllegalStateException
- if this subscriber is not the owner of the channel being repositioned
-
seek
Map<Integer,Position> seek(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels.This method will position the subscriber such that the element returned by the next call to any
receive
method will be the element after the specified position in each channel.An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)
using the seeked toPosition
for the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
mapPosition
- aMap
ofpositions
keyed by channel to seek to- Returns:
- a
Map
keyed by channel of thePosition
seeked to, which may be different to theposition
parameter if theposition
parameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException
- if thePosition
is not the correct type for the topic implementation or is an invalid positionIllegalStateException
- if this subscriber is not the owner of the channel being repositioned
-
seekAndCommit
default Map<Integer,Position> seekAndCommit(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels and sets the commit position for the channels.This method will position the subscriber such that the element returned by the next call to any
receive
method will be the element after the specified position in each channel.An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
mapPosition
- aMap
ofpositions
keyed by channel to seek to- Returns:
- a
Map
keyed by channel of thePosition
seeked to, which may be different to theposition
parameter if theposition
parameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException
- if thePosition
is not the correct type for the topic implementation or is an invalid positionIllegalStateException
- if this subscriber is not the owner of the channel being repositioned
-
seek
Position seek(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic.This method will position the subscriber such that the element returned by the next call to any
receive
method that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is thecluster timestamp
on the member receiving the published element at the time the publish request was accepted.An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)
using the seeked toPosition
for the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel
- the channel to repositiontimestamp
- the timestamp to seek to- Returns:
- the
Position
actually seeked to - Throws:
IllegalStateException
- if this subscriber is not the owner of the channel being repositionedNullPointerException
- if thetimestamp
isnull
-
seekAndCommit
default Position seekAndCommit(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic and set the commit point to the new position.This method will position the subscriber such that the element returned by the next call to any
receive
method that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is thecluster timestamp
on the member receiving the published element at the time the publish request was accepted.An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel
- the channel to repositiontimestamp
- the timestamp to seek to- Returns:
- the
Position
actually seeked to - Throws:
IllegalStateException
- if this subscriber is not the owner of the channel being repositionedNullPointerException
- if thetimestamp
isnull
-
seekToHead
Map<Integer,Position> seekToHead(int... anChannel)
Reposition one or more channels to their respective head positions.If any of the specified channels have been committed their commits will also be reset, effectively removing all committed positions for the channel.
The next message received will be the first available message in the topic for the channel. Note that this may not be the first message originally received by the subscriber if a topic is not set to retain received elements, as in that case messages are removed after being committed, so cannot be re-received.
Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel
- one or more channels to reposition to the head- Returns:
- a
Map
keyed by channel of the headPosition
seeked to for each channel - Throws:
IllegalStateException
- if this subscriber is not the owner one or more of the specified channels
-
seekToTail
Map<Integer,Position> seekToTail(int... anChannel)
Reposition one or more channels to their respective tail positions. The channel will be repositioned to read the next message published to that channel.Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel
- one or more channels to reposition to the tail- Returns:
- a
Map
keyed by channel of the tailPosition
seeked to for each channel - Throws:
IllegalStateException
- if this subscriber is not the owner of one or more of the specified channels
-
seekToTailAndCommit
default Map<Integer,Position> seekToTailAndCommit(int... anChannel)
Reposition one or more channels to their respective tail positions and set the commit point to the newPosition
. The channel will be repositioned to read the next message published to that channel.Any in-flight
receive()
requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel
- one or more channels to reposition to the tail- Returns:
- a
Map
keyed by channel of the tailPosition
seeked to for each channel - Throws:
IllegalStateException
- if this subscriber is not the owner of one or more of the specified channels
-
getHead
default Optional<Position> getHead(int nChannel)
Returns thePosition
that is currently the tail for the specified channel, orOptional.empty()
if the channel is not owned by thisSubscriber
.- Parameters:
nChannel
- the channel to obtain the tailPosition
for- Returns:
- the
Position
that is currently the tail for the specified channel orOptional.empty()
if the channel is not owned by thisSubscriber
. - Throws:
IllegalStateException
- if theSubscriber
is closed
-
getHeads
Map<Integer,Position> getHeads()
Returns aMap
of thePositions
that are currently the head for each channel owned by thisSubscriber
.This result is somewhat transient in situations where the Subscriber has in-flight receive requests, so the heads returned may change just after the method returns.
- Returns:
- the
Positions
that are currently the heads for each channel owned by thisSubscriber
- Throws:
IllegalStateException
- if theSubscriber
is closed
-
getTail
default Optional<Position> getTail(int nChannel)
Returns thePosition
that is currently the tail for the specified channel orOptional.empty()
if the channel is not owned by thisSubscriber
.This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail position returned may change just after this method returns.
- Parameters:
nChannel
- the channel to obtain the tailPosition
for- Returns:
- the
Position
that is currently the tail for the specified channel orOptional.empty()
if the channel is not owned by thisSubscriber
. - Throws:
IllegalStateException
- if theSubscriber
is closed
-
getTails
Map<Integer,Position> getTails()
Returns aMap
of thePositions
that are currently the tail for each channel owned by thisSubscriber
; that is the last message in the channel.This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail positions returned may change just after this method returns.
- Returns:
- the
Positions
that are currently the tails for each channel - Throws:
IllegalStateException
- if theSubscriber
is closed
-
getNamedTopic
<T> NamedTopic<T> getNamedTopic()
Returns the underlyingNamedTopic
that thisSubscriber
is subscribed to, which could be of a different generic type to thisSubscriber
if the subscriber is using a transformer.- Type Parameters:
T
- the type of the underlying topic- Returns:
- the underlying
NamedTopic
that thisSubscriber
is subscribed to
-
getRemainingMessages
int getRemainingMessages()
Returns the number of remaining messages to be read from the topic for this subscriber.- Returns:
- the number of remaining messages
-
getRemainingMessages
int getRemainingMessages(int nChannel)
Returns the number of remaining messages to be read from the topic channel for this subscriber.- Parameters:
nChannel
- the channel to count remaining messages in- Returns:
- the number of remaining messages, or zero if this subscriber does not own the channel
-
inGroup
static <V> Subscriber.Name<V> inGroup(String sName)
Obtain aSubscriber.Option
that specifies a group name for aSubscriber
.- Type Parameters:
V
- the type of the elements being received from the topic- Parameters:
sName
- the group name to use for the {Link Subscriber}.- Returns:
- a
Subscriber.Option
that specifies a group name for aSubscriber
-
completeOnEmpty
static <V> Subscriber.CompleteOnEmpty<V> completeOnEmpty()
Obtain the Option indicating futures should complete if the topic is empty.- Returns:
- the Option indicating futures should complete if the topic is empty
-
withListener
static <V> Subscriber.ChannelOwnershipListeners<V> withListener(Subscriber.ChannelOwnershipListener... aListener)
Create aSubscriber.ChannelOwnershipListeners
option with one or morelisteners
.- Type Parameters:
V
- the type of the elements being received from the topic- Parameters:
aListener
- thelisteners
to add to the subscriber- Returns:
- a
Subscriber.ChannelOwnershipListeners
option with one or morelisteners
-
withConverter
static <V,U> Subscriber.Convert<V,U> withConverter(ValueExtractor<? super V,U> extractor)
Return a Convert option with the specified extractor.- Type Parameters:
V
- the type of the elements being received from the topic- Parameters:
extractor
- the converter extractor- Returns:
- the Filtered option
-
withFilter
static <V> Subscriber.Filtered<V> withFilter(Filter<? super V> filter)
Return a Filtered option with the specified filter.- Type Parameters:
V
- the type of the elements being received from the topic- Parameters:
filter
- the filter- Returns:
- the Filtered option
-
-