Interface Subscriber<V>
-
- Type Parameters:
V- the type of the value returned by the subscriber
- All Superinterfaces:
AutoCloseable
public interface Subscriber<V> extends AutoCloseable
ASubscribersubscribes either directly to aNamedTopicor to asubscriber groupof aNamedTopic. Each value published to aNamedTopicis delivered to all of itssubscriber groupsand direct (anonymous)Subscribers.The factory methods
NamedTopic.createSubscriber(Subscriber.Option[])orSession.createSubscriber(String, Option[])allows one to specify one or moreSubscriber.Options to configure theSubscriber. TheSubscriber.Name.inGroup(String)option specifies the subscriber group for theSubscriberto join. If this option is not specified, theSubscriberis a direct (anonymous) subscriber to the topic. All Subscriber options and defaults are summarized in a table inSubscriber.Option.Channels
Topics use the concept of channels to improve scalability. This is similar to how Coherence uses partition for caches but to avoid confusion the name channel was chosen. The default is the next prime above the square root of the partition count configured for the underlying cache service, which for the default partition count of 257 is 17 channels.Publisherspublish messages to a channel based on their ordering configuration. Subscribers then subscribe from channels by assigning channel ownership to subscribers. An anonymous subscriber has ownership of all channels. A subscriber that is part of a subscriber group has ownership of a sub-set of the available channels.Channel count is configurable, but ideally should not be set too high nor too low. For example setting the channel count to 1, would mean that all publishers contend to publish to a single channel, and that only one subscriber in a subscriber group will be able to receive messages. Setting the channel count too high (above say the number of publishers) may mean that some channels never receive any messages and are wasted. Finding the appropriate value is admittedly non-trivial, however when faced with maxing out throughput from a publisher's perspective this is a configuration that can be tweaked.
Subscriber Groups
Subscribers can be part of a subscriber group. Within each subscriber group, each value is onlyreceivedby one of thegroup members, enabling distributed, parallel processing of the values delivered to the subscriber group. Thus, each subscriber group in effect behaves like a queue over the topic data.Subscribers in a group can be considered durable, if they are closed, or fail, then message processing will continue from the next element after the last committed position when subscriber reconnect.
To maintain ordering or messages, only a single subscriber in a group polls for messages from a channel. Each subscriber in a group is allocated ownership of a sub-set of the channels in a topic. This means that the maximum number of subscribers in a group that could be doing any work would be the same as the topic's channel count. If there are more subscribers in a group that there are channels, then the additional subscribers will not be allocated ownership of any channels and hence will receive no messages. It may seem that increasing the channel count to a higher number would therefore allow more subscribers to work in parallel, but this is not necessarily the case.
Channels are allocated to subscribers when they are created. As more subscribers in a group are created channel ownership is rebalanced, existing subscribers may lose ownership of channels that are allocated to new subscribers. As subscribers are
closed(or die or are timed out), again channel ownership is rebalanced over the remaining subscribers.Subscribers in a group have a configurable timeout. If a subscriber does not call receive within the timeout it is considered dead and the channels it owns will be reallocated to other subscribers in the group. This is to stop channel starvation where no subscriber in a group is polling from a channel.
Positions
Elements in aNamedTopicare published to a channel have a uniquePositionwithin that channel. APositionis an opaque representation of the underlying position as theoretically the implementation of thePositioncould change for different types of topic. Positions are used in various places in the API, for example, positions can be committed, and they can be used to move the subscriber to back or forwards within a channel. APositionis serializable, so they can be stored and recovered to later reset a subscriber to a desired position. Positions areComparableso positions for elements can be used to determine whether how two elements related to each other within a channel.Committing
Subscribers in a group are durable, so if they disconnect and reconnect, messages processing will restart from the correct position. A Subscriber cancommit an elementorcommit a position in a channel. After a successful commit, then on reconnection the first message received from a channel will be the the next message after the committed position.When a position is committed, this will also commit any earlier positions in the channel. For example, if five elements are received and commit is called only on the last element, this effectively also commits the previous four elements.
When topics are configured not to retain elements, received elements will be removed as they are committed by subscribers.
If a subscriber in a subscriber group is timed-out (or dies) and its channel ownership is reallocated to other subscribers in the group, those subscribers will start to receive messages from the last committed position for of the channels from the failed subscriber.
Commits may be performed synchronously (using
commit(int, Position)) or asynchronously (usingcommitAsync(int, Position)). There is no facility for automatic commit of messages, all calls to commit must be done manually by application code.Seeking
A new subscriber will start to receive message from the head position of a channel (or the last committed position for a durable subscriber) and continue to receive messages in order until the tail is reached. It is possible to reposition a subscriber (backwards or forwards) using one of theseek(int, Instant)methods. Seeking applies a new position to a specific channel. If an attempt is made to position the channel before the first available message (the channel's head) then the subscriber will be positioned at the head. Correspondingly, if an attempt is made to position a subscriber way beyond the last position in a channel (the channel's tail) then the subscriber will be positioned at the tail of the channel and will receive the next message published to that channel.It is also possible to seek to a timestamp. Published messages are given a timestamp, which is the
Coherence cluster timeon the storage enabled cluster member that accepted the published message. Using theseek(int, Instant)method a subscriber can be repositioned so that the next message received from a channel is the first message with a timestamp greater than the timestamp used in the seek call. For example to position channel 0 so that the next message received is the first message after 20:30 on July 5th 2021:Instant timestamp = LocalDateTime.of(2021, Month.JULY, 5, 20, 30) .toInstant(ZoneOffset.UTC); subscriber.seek(0, timestamp);Seeking Forwards
It is important to note that seeking forwards is skipping over messages, those skipped message will never be received once another commit is executed. Moving forwards does not alter the commit position, so when a subscriber has committed a position, then moves forwards and later fails, it will restart back at the commit.Seeking Backwards Over Previous Commits
When topics are configured not to retain elements removal of elements occurs as their positions are committed, so they can never be received again. This means that in this case, it would not be possible to seek backwards further than the last commit, as those elements no longer exist in the topic.When topics are configured to retain elements then it is possible to seek backwards further than the last commit. This effectively rolls-back those commits and the previously committed messages after the new seek position will be re-received. If a subscriber fails after moving back in this fashion it will restart at the rolled-back position.
Receiving Elements
Receiving elements from a topic is an asynchronous operation, calls toreceive()(or the batch versionreceive(int)) return aCompletableFuture. If multiple calls are made to receive, the returned futures will complete in the correct order to maintain message ordering in a channel. To maintain ordering, the futures are completed by a single daemon thread. This means that code using any of the synchronousCompletableFuturehandling patterns, such asCompletableFuture.thenApply(java.util.function.Function)orCompletableFuture.thenAccept(java.util.function.Consumer), etc. will run on the same daemon thread, so application code in the handler methods must complete before the next receive future will be completed. Again, this is intentional, to maintain strict ordering of processing of received elements. If theCompletableFutureasynchronous handler methods are used such as,CompletableFuture.thenApplyAsync(java.util.function.Function)orCompletableFuture.thenAcceptAsync(java.util.function.Consumer), etc. then the application code handling the received element will execute on another thread, and at this point there are no ordering guarantees.It is important that application code uses the correct handling of the returned futures to both maintain ordering (if that is important to the application) and also to have correct error handling, and not lose exceptions, which is easy to do in poorly written asynchronous future handler code.
Clean-Up
Subscribers should ideally be closed when application code finishes with them. This will clean up server-side resources associated with a subscriber.It is also important (possibly more important) to
clean up subscriber groupsthat are no longer required. Failure to delete a subscriber group will cause messages to be retained on the server that would otherwise have been removed, so consuming more server-side resources such as heap and disc.- Since:
- Coherence 14.1.1
- Author:
- jf/jk/mf 2015.06.03
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface Description static interfaceSubscriber.ChannelA representation of a topic channel within subscriber.static interfaceSubscriber.ChannelOwnershipListenerA listener that receives notification of channel ownership changes.static classSubscriber.ChannelOwnershipListeners<V>A subscriberSubscriber.Optionthat allows one or morelistenersto be added to the subscriber, that will be notified of changes to the subscribers channel ownership.static classSubscriber.CommitResultThe result of a commit request.static classSubscriber.CommitResultStatusThe different result statuses for a commit request.static classSubscriber.CompleteOnEmpty<V>The CompleteOnEmpty option indicates that theCompletableFuturereturned from thereceive()operation should complete with a nullSubscriber.Elementupon identifying that the topic is or has become empty.static classSubscriber.Convert<V,U>The Convert option specifies aValueExtractorthat will convert topic values that a subscriber is interested in receiving prior to sending them to the subscriber.static interfaceSubscriber.Element<V>Element represents a container for returned values.static classSubscriber.Filtered<V>The Filtered option specifies a filter that will determine which topic values a subscriber is interested in receiving.static interfaceSubscriber.IdA marker interface forSubscriberidentifiers.static classSubscriber.Name<V>The Name option is used to specify a subscriber group name.static interfaceSubscriber.Option<V,U>A marker interface to indicate that a class is a validSubscriber.Optionfor aSubscriber.
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description voidclose()Close the Subscriber.default Subscriber.CommitResultcommit(int nChannel, Position position)Commit the specified channel and position.default Map<Integer,Subscriber.CommitResult>commit(Map<Integer,Position> mapPositions)Commit the specified channels and positions.CompletableFuture<Subscriber.CommitResult>commitAsync(int nChannel, Position position)Asynchronously commit the specified channel and position.CompletableFuture<Map<Integer,Subscriber.CommitResult>>commitAsync(Map<Integer,Position> mapPositions)Asynchronously commit the specified channels and positions.static <V> Subscriber.CompleteOnEmpty<V>completeOnEmpty()Obtain the Option indicating futures should complete if the topic is empty.intgetChannelCount()Returns the number of channels in the underlyingNamedTopic.int[]getChannels()Returns the current set of channels that thisSubscriberowns.FlowControlgetFlowControl()Return theFlowControlobject governing this subscriber.default Optional<Position>getHead(int nChannel)Returns thePositionthat is currently the tail for the specified channel, orOptional.empty()if the channel is not owned by thisSubscriber.Map<Integer,Position>getHeads()Returns aMapof thePositionsthat are currently the head for each channel owned by thisSubscriber.Map<Integer,Position>getLastCommitted()Returns aMapof channels to the latestPositioncommitted for that channel; the map will only contain channels owned by thisSubscriber.default Optional<Position>getLastCommitted(int nChannel)Returns anOptionalcontaining the latest position committed for a channel, orOptional.empty()if the channel is not owned by thisSubscriber<T> NamedTopic<T>getNamedTopic()Returns the underlyingNamedTopicthat thisSubscriberis subscribed to, which could be of a different generic type to thisSubscriberif the subscriber is using a transformer.intgetRemainingMessages()Returns the number of remaining messages to be read from the topic for this subscriber.intgetRemainingMessages(int nChannel)Returns the number of remaining messages to be read from the topic channel for this subscriber.default Optional<Position>getTail(int nChannel)Returns thePositionthat is currently the tail for the specified channel orOptional.empty()if the channel is not owned by thisSubscriber.Map<Integer,Position>getTails()Returns aMapof thePositionsthat are currently the tail for each channel owned by thisSubscriber; that is the last message in the channel.voidheartbeat()Send a heartbeat to the server to keep this subscriber alive.static <V> Subscriber.Name<V>inGroup(String sName)Obtain aSubscriber.Optionthat specifies a group name for aSubscriber.booleanisActive()Determine whether thisSubscriberis active.default booleanisOwner(int nChannel)Returnstrueif this subscriber is the owner of the specified channel.voidonClose(Runnable action)Add an action to be executed when thisSubscriberis closed.CompletableFuture<Subscriber.Element<V>>receive()Receive a value from the topic.CompletableFuture<List<Subscriber.Element<V>>>receive(int cBatch)Receive a batch ofelementsfrom the topic.Positionseek(int nChannel, Position position)Seek to the specified position in a channel.Positionseek(int nChannel, Instant timestamp)Seek to a position in a channel based the published timestamp of the elements in the topic.Map<Integer,Position>seek(Map<Integer,Position> mapPosition)Seek to the specified position in a set of channels.default PositionseekAndCommit(int nChannel, Position position)Seek to the specified position in a channel and set the commit point to the newPosition.default PositionseekAndCommit(int nChannel, Instant timestamp)Seek to a position in a channel based the published timestamp of the elements in the topic and set the commit point to the new position.default Map<Integer,Position>seekAndCommit(Map<Integer,Position> mapPosition)Seek to the specified position in a set of channels and sets the commit position for the channels.Map<Integer,Position>seekToHead(int... anChannel)Reposition one or more channels to their respective head positions.Map<Integer,Position>seekToTail(int... anChannel)Reposition one or more channels to their respective tail positions.default Map<Integer,Position>seekToTailAndCommit(int... anChannel)Reposition one or more channels to their respective tail positions and set the commit point to the newPosition.static <V,U>
Subscriber.Convert<V,U>withConverter(ValueExtractor<? super V,U> extractor)Return a Convert option with the specified extractor.static <V> Subscriber.Filtered<V>withFilter(Filter<? super V> filter)Return a Filtered option with the specified filter.static <V> Subscriber.ChannelOwnershipListeners<V>withListener(Subscriber.ChannelOwnershipListener... aListener)Create aSubscriber.ChannelOwnershipListenersoption with one or morelisteners.
-
-
-
Method Detail
-
receive
CompletableFuture<Subscriber.Element<V>> receive()
Receive a value from the topic. If there is no value available then the future will complete according to theSubscriber.CompleteOnEmptyoption used to create theSubscriber.Note: If the returned future is
cancelledit is possible that a value may still be considered by the topic to have been received by this group, while the group would consider this a lost value. Subscriber implementations will make its best effort to prevent such loss, but it cannot be guaranteed and thus cancellation is not advisable.The
futuresreturned from calls toreceiveare completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handlesfuturecompletion using the asynchronous methods ofCompletableFuture(i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.- Returns:
- a
CompletableFuturewhich can be used to access the result of this completed operation - Throws:
IllegalStateException- if theSubscriberis closed
-
receive
CompletableFuture<List<Subscriber.Element<V>>> receive(int cBatch)
Receive a batch ofelementsfrom the topic.The
cMessageparameter specifies the maximum number of elements to receive in the batch. The subscriber may return fewer elements than thecMessageparameter; this does not signify that the topic is empty.If there is no value available then the future will complete according to the
Subscriber.CompleteOnEmptyoption used to create theSubscriber.If the poll of the topic returns nothing (i.e. the topic was empty and
Subscriber.CompleteOnEmpty) is true then theConsumerwill not be called.The
futuresreturned from calls toreceiveare completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handlesfuturecompletion using the asynchronous methods ofCompletableFuture(i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.- Parameters:
cBatch- the maximum number of elements to receive in the batch- Returns:
- a future which can be used to access the result of this completed operation
- Throws:
IllegalStateException- if theSubscriberis closed
-
getChannels
int[] getChannels()
Returns the current set of channels that thisSubscriberowns.Subscribers that are part of a subscriber group own a sub-set of the available channels. A subscriber in a group should normally be assigned ownership of at least one channel. In the case where there are more subscribers in a group that the number of channels configured for a topic, then some subscribers will obviously own zero channels. Anonymous subscribers that are not part of a group are always owners all the available channels.
- Returns:
- the current set of channels that this
Subscriberis the owner of, or an empty array if this subscriber has not been assigned ownership any channels
-
isOwner
default boolean isOwner(int nChannel)
Returnstrueif this subscriber is the owner of the specified channel.This method only really applies to subscribers that are part of a group that may own zero or more channels. This method will always return
truefor an anonymous subscriber that is not part of a group.As channel ownership may change as subscribers in a group are created and closed the result of this method is somewhat transient. To more accurately track channel ownership changes create a subscriber using the
Subscriber.ChannelOwnershipListenersoption- Parameters:
nChannel- the channel number- Returns:
trueif this subscriber is currently the owner of the specified channel, otherwise returnsfalseif the channel is not owned by this subscriber
-
getChannelCount
int getChannelCount()
Returns the number of channels in the underlyingNamedTopic.This could be different to the number of channels
ownedby thisSubscriber.- Returns:
- the number of channels in the underlying
NamedTopic
-
getFlowControl
FlowControl getFlowControl()
Return theFlowControlobject governing this subscriber.- Returns:
- the FlowControl object.
-
close
void close()
Close the Subscriber.Closing a subscriber ensures that no new
receive()requests will be accepted and all pending receive requests will be completed or safely cancelled.For a direct topic
Subscriber.close()enables the release of storage resources for unconsumed values.For a
group member,close()indicates that this member has left its correspondinggroup. One must actively manage aNamedTopic's logical subscriber groupssince their life span is independent of activeSubscribergroup membership.NamedTopic.destroySubscriberGroup(String)releases storage and stops accumulating topic values for a subscriber group.Calling
close()on an already closed subscriber is a no-op.- Specified by:
closein interfaceAutoCloseable
-
heartbeat
void heartbeat()
Send a heartbeat to the server to keep this subscriber alive.Heartbeat messages are sent on calls to any of the
receive()methods. If a subscriber does not call receive within the configured timeout time, it will be considered dead and lose its channel ownership. This is to stop badly behaving subscribers from blocking messages from being processed, for example if a bug in application code causes a subscriber to deadlock or take an excessive amount of time to process messages.If application code knows that it will take longer to execute than the subscriber timeout, for example the message processing communicates with a third-party system that is in some kind of back-off loop, then the subscriber can send a heartbeat to keep itself alive.
A subscriber that times-out is still active and is not closed. On the next call to a
receive()method the subscriber will reconnect and be reallocated channel ownerships. Although a timed-out subscriber is not closed it would have lost ownership of channels and not be able to commit any messages that it was processing when it timed out. This is because another subscriber in the group may have already been allocated ownership of the same channels, already processed, and committed that same message. Allowing commits of unowned channels is configurable, by default a subscriber can only commit positions for channels it owns.Heart-beating only applies to subscribers that are members of a subscriber group. An anonymous subscriber owns all channels and no other subscriber is sharing the workload, so anonymous subscriber will not be timed-out. For an anonymous subscriber the heartbeat operation is a no-op.
-
isActive
boolean isActive()
Determine whether thisSubscriberis active.- Returns:
trueif thisSubscriberis active
-
onClose
void onClose(Runnable action)
Add an action to be executed when thisSubscriberis closed.- Parameters:
action- the action to execute
-
commit
default Subscriber.CommitResult commit(int nChannel, Position position)
Commit the specified channel and position.- Parameters:
nChannel- the channel to commitposition- the position within the channel to commit- Returns:
- the result of the commit request
- Throws:
IllegalStateException- if theSubscriberis closed
-
commitAsync
CompletableFuture<Subscriber.CommitResult> commitAsync(int nChannel, Position position)
Asynchronously commit the specified channel and position.- Parameters:
nChannel- the channel to commitposition- the position within the channel to commit- Returns:
- the result of the commit request
- Throws:
IllegalStateException- if theSubscriberis closed
-
commit
default Map<Integer,Subscriber.CommitResult> commit(Map<Integer,Position> mapPositions)
Commit the specified channels and positions.- Parameters:
mapPositions- a map of channels napped to the position to commit- Returns:
- a map of results of the commit request for each channel
- Throws:
IllegalStateException- if theSubscriberis closed
-
commitAsync
CompletableFuture<Map<Integer,Subscriber.CommitResult>> commitAsync(Map<Integer,Position> mapPositions)
Asynchronously commit the specified channels and positions.- Parameters:
mapPositions- a map of channels mapped to the positions to commit- Returns:
- a map of results of the commit request for each channel
- Throws:
IllegalStateException- if theSubscriberis closed
-
getLastCommitted
default Optional<Position> getLastCommitted(int nChannel)
Returns anOptionalcontaining the latest position committed for a channel, orOptional.empty()if the channel is not owned by thisSubscriber- Parameters:
nChannel- the channel to get the last committed position for- Returns:
- the latest position committed for a channel, or
Optional.empty()if the channel is not owned by thisSubscriber - Throws:
IllegalStateException- if theSubscriberis closed
-
getLastCommitted
Map<Integer,Position> getLastCommitted()
Returns aMapof channels to the latestPositioncommitted for that channel; the map will only contain channels owned by thisSubscriber.- Returns:
- a
Mapof channels to the latestPositioncommitted for that channel; the map will only contain channels owned by thisSubscriber - Throws:
IllegalStateException- if theSubscriberis closed
-
seek
Position seek(int nChannel, Position position)
Seek to the specified position in a channel.This method will position the subscriber such that the element returned by the next call to any
receivemethod will be the element after the specified position in the channel.An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)using the seeked toPositionfor the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel- the channel to repositionposition- thePositionto seek to- Returns:
- the
Positionactually seeked to, which may be different to thepositionparameter if thepositionparameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException- if thePositionis not the correct type for the topic implementation or is an invalid positionIllegalStateException- if this subscriber is not the owner of the channel being repositioned
-
seekAndCommit
default Position seekAndCommit(int nChannel, Position position)
Seek to the specified position in a channel and set the commit point to the newPosition.This method will position the subscriber such that the element returned by the next call to any
receivemethod will be the element after the specified position in the channel.An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel- the channel to repositionposition- thePositionto seek to- Returns:
- the
Positionactually seeked to, which may be different to thepositionparameter if thepositionparameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException- if thePositionis not the correct type for the topic implementation or is an invalid positionIllegalStateException- if this subscriber is not the owner of the channel being repositioned
-
seek
Map<Integer,Position> seek(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels.This method will position the subscriber such that the element returned by the next call to any
receivemethod will be the element after the specified position in each channel.An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)using the seeked toPositionfor the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
mapPosition- aMapofpositionskeyed by channel to seek to- Returns:
- a
Mapkeyed by channel of thePositionseeked to, which may be different to thepositionparameter if thepositionparameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException- if thePositionis not the correct type for the topic implementation or is an invalid positionIllegalStateException- if this subscriber is not the owner of the channel being repositioned
-
seekAndCommit
default Map<Integer,Position> seekAndCommit(Map<Integer,Position> mapPosition)
Seek to the specified position in a set of channels and sets the commit position for the channels.This method will position the subscriber such that the element returned by the next call to any
receivemethod will be the element after the specified position in each channel.An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
mapPosition- aMapofpositionskeyed by channel to seek to- Returns:
- a
Mapkeyed by channel of thePositionseeked to, which may be different to thepositionparameter if thepositionparameter is before the channel's head or after the channel's tail. - Throws:
IllegalArgumentException- if thePositionis not the correct type for the topic implementation or is an invalid positionIllegalStateException- if this subscriber is not the owner of the channel being repositioned
-
seek
Position seek(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic.This method will position the subscriber such that the element returned by the next call to any
receivemethod that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is thecluster timestampon the member receiving the published element at the time the publish request was accepted.An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to
commit(int, Position)using the seeked toPositionfor the channel returned by this method will be required.Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel- the channel to repositiontimestamp- the timestamp to seek to- Returns:
- the
Positionactually seeked to - Throws:
IllegalStateException- if this subscriber is not the owner of the channel being repositionedNullPointerException- if thetimestampisnull
-
seekAndCommit
default Position seekAndCommit(int nChannel, Instant timestamp)
Seek to a position in a channel based the published timestamp of the elements in the topic and set the commit point to the new position.This method will position the subscriber such that the element returned by the next call to any
receivemethod that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is thecluster timestampon the member receiving the published element at the time the publish request was accepted.An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.
An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.
Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
nChannel- the channel to repositiontimestamp- the timestamp to seek to- Returns:
- the
Positionactually seeked to - Throws:
IllegalStateException- if this subscriber is not the owner of the channel being repositionedNullPointerException- if thetimestampisnull
-
seekToHead
Map<Integer,Position> seekToHead(int... anChannel)
Reposition one or more channels to their respective head positions.If any of the specified channels have been committed their commits will also be reset, effectively removing all committed positions for the channel.
The next message received will be the first available message in the topic for the channel. Note that this may not be the first message originally received by the subscriber if a topic is not set to retain received elements, as in that case messages are removed after being committed, so cannot be re-received.
Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel- one or more channels to reposition to the head- Returns:
- a
Mapkeyed by channel of the headPositionseeked to for each channel - Throws:
IllegalStateException- if this subscriber is not the owner one or more of the specified channels
-
seekToTail
Map<Integer,Position> seekToTail(int... anChannel)
Reposition one or more channels to their respective tail positions. The channel will be repositioned to read the next message published to that channel.Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel- one or more channels to reposition to the tail- Returns:
- a
Mapkeyed by channel of the tailPositionseeked to for each channel - Throws:
IllegalStateException- if this subscriber is not the owner of one or more of the specified channels
-
seekToTailAndCommit
default Map<Integer,Position> seekToTailAndCommit(int... anChannel)
Reposition one or more channels to their respective tail positions and set the commit point to the newPosition. The channel will be repositioned to read the next message published to that channel.Any in-flight
receive()requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.- Parameters:
anChannel- one or more channels to reposition to the tail- Returns:
- a
Mapkeyed by channel of the tailPositionseeked to for each channel - Throws:
IllegalStateException- if this subscriber is not the owner of one or more of the specified channels
-
getHead
default Optional<Position> getHead(int nChannel)
Returns thePositionthat is currently the tail for the specified channel, orOptional.empty()if the channel is not owned by thisSubscriber.- Parameters:
nChannel- the channel to obtain the tailPositionfor- Returns:
- the
Positionthat is currently the tail for the specified channel orOptional.empty()if the channel is not owned by thisSubscriber. - Throws:
IllegalStateException- if theSubscriberis closed
-
getHeads
Map<Integer,Position> getHeads()
Returns aMapof thePositionsthat are currently the head for each channel owned by thisSubscriber.This result is somewhat transient in situations where the Subscriber has in-flight receive requests, so the heads returned may change just after the method returns.
- Returns:
- the
Positionsthat are currently the heads for each channel owned by thisSubscriber - Throws:
IllegalStateException- if theSubscriberis closed
-
getTail
default Optional<Position> getTail(int nChannel)
Returns thePositionthat is currently the tail for the specified channel orOptional.empty()if the channel is not owned by thisSubscriber.This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail position returned may change just after this method returns.
- Parameters:
nChannel- the channel to obtain the tailPositionfor- Returns:
- the
Positionthat is currently the tail for the specified channel orOptional.empty()if the channel is not owned by thisSubscriber. - Throws:
IllegalStateException- if theSubscriberis closed
-
getTails
Map<Integer,Position> getTails()
Returns aMapof thePositionsthat are currently the tail for each channel owned by thisSubscriber; that is the last message in the channel.This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail positions returned may change just after this method returns.
- Returns:
- the
Positionsthat are currently the tails for each channel - Throws:
IllegalStateException- if theSubscriberis closed
-
getNamedTopic
<T> NamedTopic<T> getNamedTopic()
Returns the underlyingNamedTopicthat thisSubscriberis subscribed to, which could be of a different generic type to thisSubscriberif the subscriber is using a transformer.- Type Parameters:
T- the type of the underlying topic- Returns:
- the underlying
NamedTopicthat thisSubscriberis subscribed to
-
getRemainingMessages
int getRemainingMessages()
Returns the number of remaining messages to be read from the topic for this subscriber.- Returns:
- the number of remaining messages
-
getRemainingMessages
int getRemainingMessages(int nChannel)
Returns the number of remaining messages to be read from the topic channel for this subscriber.- Parameters:
nChannel- the channel to count remaining messages in- Returns:
- the number of remaining messages, or zero if this subscriber does not own the channel
-
inGroup
static <V> Subscriber.Name<V> inGroup(String sName)
Obtain aSubscriber.Optionthat specifies a group name for aSubscriber.- Type Parameters:
V- the type of the elements being received from the topic- Parameters:
sName- the group name to use for the {Link Subscriber}.- Returns:
- a
Subscriber.Optionthat specifies a group name for aSubscriber
-
completeOnEmpty
static <V> Subscriber.CompleteOnEmpty<V> completeOnEmpty()
Obtain the Option indicating futures should complete if the topic is empty.- Returns:
- the Option indicating futures should complete if the topic is empty
-
withListener
static <V> Subscriber.ChannelOwnershipListeners<V> withListener(Subscriber.ChannelOwnershipListener... aListener)
Create aSubscriber.ChannelOwnershipListenersoption with one or morelisteners.- Type Parameters:
V- the type of the elements being received from the topic- Parameters:
aListener- thelistenersto add to the subscriber- Returns:
- a
Subscriber.ChannelOwnershipListenersoption with one or morelisteners
-
withConverter
static <V,U> Subscriber.Convert<V,U> withConverter(ValueExtractor<? super V,U> extractor)
Return a Convert option with the specified extractor.- Type Parameters:
V- the type of the elements being received from the topic- Parameters:
extractor- the converter extractor- Returns:
- the Filtered option
-
withFilter
static <V> Subscriber.Filtered<V> withFilter(Filter<? super V> filter)
Return a Filtered option with the specified filter.- Type Parameters:
V- the type of the elements being received from the topic- Parameters:
filter- the filter- Returns:
- the Filtered option
-
-