Interface Subscriber<V>

Type Parameters:
V - the type of the value returned by the subscriber
All Superinterfaces:
AutoCloseable

public interface Subscriber<V> extends AutoCloseable
A Subscriber subscribes either directly to a NamedTopic or to a subscriber group of a NamedTopic. Each value published to a NamedTopic is delivered to all of its subscriber groups and direct (anonymous) Subscribers.

The factory methods NamedTopic.createSubscriber(Subscriber.Option[]) or Session.createSubscriber(String, Option[]) allows one to specify one or more Subscriber.Options to configure the Subscriber. The Subscriber.Name.inGroup(String) option specifies the subscriber group for the Subscriber to join. If this option is not specified, the Subscriber is a direct (anonymous) subscriber to the topic. All Subscriber options and defaults are summarized in a table in Subscriber.Option.

Channels

Topics use the concept of channels to improve scalability. This is similar to how Coherence uses partition for caches but to avoid confusion the name channel was chosen. The default is the next prime above the square root of the partition count configured for the underlying cache service, which for the default partition count of 257 is 17 channels.

Publishers publish messages to a channel based on their ordering configuration. Subscribers then subscribe from channels by assigning channel ownership to subscribers. An anonymous subscriber has ownership of all channels. A subscriber that is part of a subscriber group has ownership of a sub-set of the available channels.

Channel count is configurable, but ideally should not be set too high nor too low. For example setting the channel count to 1, would mean that all publishers contend to publish to a single channel, and that only one subscriber in a subscriber group will be able to receive messages. Setting the channel count too high (above say the number of publishers) may mean that some channels never receive any messages and are wasted. Finding the appropriate value is admittedly non-trivial, however when faced with maxing out throughput from a publisher's perspective this is a configuration that can be tweaked.

Subscriber Groups

Subscribers can be part of a subscriber group. Within each subscriber group, each value is only received by one of the group members, enabling distributed, parallel processing of the values delivered to the subscriber group. Thus, each subscriber group in effect behaves like a queue over the topic data.

Subscribers in a group can be considered durable, if they are closed, or fail, then message processing will continue from the next element after the last committed position when subscriber reconnect.

To maintain ordering or messages, only a single subscriber in a group polls for messages from a channel. Each subscriber in a group is allocated ownership of a sub-set of the channels in a topic. This means that the maximum number of subscribers in a group that could be doing any work would be the same as the topic's channel count. If there are more subscribers in a group that there are channels, then the additional subscribers will not be allocated ownership of any channels and hence will receive no messages. It may seem that increasing the channel count to a higher number would therefore allow more subscribers to work in parallel, but this is not necessarily the case.

Channels are allocated to subscribers when they are created. As more subscribers in a group are created channel ownership is rebalanced, existing subscribers may lose ownership of channels that are allocated to new subscribers. As subscribers are closed (or die or are timed out), again channel ownership is rebalanced over the remaining subscribers.

Subscribers in a group have a configurable timeout. If a subscriber does not call receive within the timeout it is considered dead and the channels it owns will be reallocated to other subscribers in the group. This is to stop channel starvation where no subscriber in a group is polling from a channel.

Positions

Elements in a NamedTopic are published to a channel have a unique Position within that channel. A Position is an opaque representation of the underlying position as theoretically the implementation of the Position could change for different types of topic. Positions are used in various places in the API, for example, positions can be committed, and they can be used to move the subscriber to back or forwards within a channel. A Position is serializable, so they can be stored and recovered to later reset a subscriber to a desired position. Positions are Comparable so positions for elements can be used to determine whether how two elements related to each other within a channel.

Committing

Subscribers in a group are durable, so if they disconnect and reconnect, messages processing will restart from the correct position. A Subscriber can commit an element or commit a position in a channel. After a successful commit, then on reconnection the first message received from a channel will be the the next message after the committed position.

When a position is committed, this will also commit any earlier positions in the channel. For example, if five elements are received and commit is called only on the last element, this effectively also commits the previous four elements.

When topics are configured not to retain elements, received elements will be removed as they are committed by subscribers.

If a subscriber in a subscriber group is timed-out (or dies) and its channel ownership is reallocated to other subscribers in the group, those subscribers will start to receive messages from the last committed position for of the channels from the failed subscriber.

Commits may be performed synchronously (using commit(int, Position)) or asynchronously (using commitAsync(int, Position)). There is no facility for automatic commit of messages, all calls to commit must be done manually by application code.

Seeking

A new subscriber will start to receive message from the head position of a channel (or the last committed position for a durable subscriber) and continue to receive messages in order until the tail is reached. It is possible to reposition a subscriber (backwards or forwards) using one of the seek(int, Instant) methods. Seeking applies a new position to a specific channel. If an attempt is made to position the channel before the first available message (the channel's head) then the subscriber will be positioned at the head. Correspondingly, if an attempt is made to position a subscriber way beyond the last position in a channel (the channel's tail) then the subscriber will be positioned at the tail of the channel and will receive the next message published to that channel.

It is also possible to seek to a timestamp. Published messages are given a timestamp, which is the Coherence cluster time on the storage enabled cluster member that accepted the published message. Using the seek(int, Instant) method a subscriber can be repositioned so that the next message received from a channel is the first message with a timestamp greater than the timestamp used in the seek call. For example to position channel 0 so that the next message received is the first message after 20:30 on July 5th 2021:

     Instant timestamp = LocalDateTime.of(2021, Month.JULY, 5, 20, 30)
             .toInstant(ZoneOffset.UTC);
     subscriber.seek(0, timestamp);
 

Seeking Forwards

It is important to note that seeking forwards is skipping over messages, those skipped message will never be received once another commit is executed. Moving forwards does not alter the commit position, so when a subscriber has committed a position, then moves forwards and later fails, it will restart back at the commit.

Seeking Backwards Over Previous Commits

When topics are configured not to retain elements removal of elements occurs as their positions are committed, so they can never be received again. This means that in this case, it would not be possible to seek backwards further than the last commit, as those elements no longer exist in the topic.

When topics are configured to retain elements then it is possible to seek backwards further than the last commit. This effectively rolls-back those commits and the previously committed messages after the new seek position will be re-received. If a subscriber fails after moving back in this fashion it will restart at the rolled-back position.

Receiving Elements

Receiving elements from a topic is an asynchronous operation, calls to receive() (or the batch version receive(int)) return a CompletableFuture. If multiple calls are made to receive, the returned futures will complete in the correct order to maintain message ordering in a channel. To maintain ordering, the futures are completed by a single daemon thread. This means that code using any of the synchronous CompletableFuture handling patterns, such as CompletableFuture.thenApply(java.util.function.Function) or CompletableFuture.thenAccept(java.util.function.Consumer), etc. will run on the same daemon thread, so application code in the handler methods must complete before the next receive future will be completed. Again, this is intentional, to maintain strict ordering of processing of received elements. If the CompletableFuture asynchronous handler methods are used such as, CompletableFuture.thenApplyAsync(java.util.function.Function) or CompletableFuture.thenAcceptAsync(java.util.function.Consumer), etc. then the application code handling the received element will execute on another thread, and at this point there are no ordering guarantees.

It is important that application code uses the correct handling of the returned futures to both maintain ordering (if that is important to the application) and also to have correct error handling, and not lose exceptions, which is easy to do in poorly written asynchronous future handler code.

Clean-Up

Subscribers should ideally be closed when application code finishes with them. This will clean up server-side resources associated with a subscriber.

It is also important (possibly more important) to clean up subscriber groups that are no longer required. Failure to delete a subscriber group will cause messages to be retained on the server that would otherwise have been removed, so consuming more server-side resources such as heap and disc.

Since:
Coherence 14.1.1
Author:
jf/jk/mf 2015.06.03
  • Method Details

    • receive

      Receive a value from the topic. If there is no value available then the future will complete according to the Subscriber.CompleteOnEmpty option used to create the Subscriber.

      Note: If the returned future is cancelled it is possible that a value may still be considered by the topic to have been received by this group, while the group would consider this a lost value. Subscriber implementations will make its best effort to prevent such loss, but it cannot be guaranteed and thus cancellation is not advisable.

      The futures returned from calls to receive are completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handles future completion using the asynchronous methods of CompletableFuture (i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.

      Returns:
      a CompletableFuture which can be used to access the result of this completed operation
      Throws:
      IllegalStateException - if the Subscriber is closed
    • receive

      CompletableFuture<List<Subscriber.Element<V>>> receive(int cBatch)
      Receive a batch of elements from the topic.

      The cMessage parameter specifies the maximum number of elements to receive in the batch. The subscriber may return fewer elements than the cMessage parameter; this does not signify that the topic is empty.

      If there is no value available then the future will complete according to the Subscriber.CompleteOnEmpty option used to create the Subscriber.

      If the poll of the topic returns nothing (i.e. the topic was empty and Subscriber.CompleteOnEmpty) is true then the Consumer will not be called.

      The futures returned from calls to receive are completed sequentially. If the methods used to handle completion in application code block this will block completions of subsequent futures. This is to maintain ordering of consumption of completed futures. If the application code handles future completion using the asynchronous methods of CompletableFuture (i.e. handling is handed off to another thread) this could cause out of order consumption as received values are consumed on different threads.

      Parameters:
      cBatch - the maximum number of elements to receive in the batch
      Returns:
      a future which can be used to access the result of this completed operation
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getChannels

      int[] getChannels()
      Returns the current set of channels that this Subscriber owns.

      Subscribers that are part of a subscriber group own a sub-set of the available channels. A subscriber in a group should normally be assigned ownership of at least one channel. In the case where there are more subscribers in a group that the number of channels configured for a topic, then some subscribers will obviously own zero channels. Anonymous subscribers that are not part of a group are always owners all the available channels.

      Returns:
      the current set of channels that this Subscriber is the owner of, or an empty array if this subscriber has not been assigned ownership any channels
    • isOwner

      default boolean isOwner(int nChannel)
      Returns true if this subscriber is the owner of the specified channel.

      This method only really applies to subscribers that are part of a group that may own zero or more channels. This method will always return true for an anonymous subscriber that is not part of a group.

      As channel ownership may change as subscribers in a group are created and closed the result of this method is somewhat transient. To more accurately track channel ownership changes create a subscriber using the Subscriber.ChannelOwnershipListeners option

      Parameters:
      nChannel - the channel number
      Returns:
      true if this subscriber is currently the owner of the specified channel, otherwise returns false if the channel is not owned by this subscriber
    • getChannelCount

      int getChannelCount()
      Returns the number of channels in the underlying NamedTopic.

      This could be different to the number of channels owned by this Subscriber.

      Returns:
      the number of channels in the underlying NamedTopic
    • getFlowControl

      FlowControl getFlowControl()
      Return the FlowControl object governing this subscriber.
      Returns:
      the FlowControl object.
    • close

      void close()
      Close the Subscriber.

      Closing a subscriber ensures that no new receive() requests will be accepted and all pending receive requests will be completed or safely cancelled.

      For a direct topic Subscriber.close() enables the release of storage resources for unconsumed values.

      For a group member, close() indicates that this member has left its corresponding group. One must actively manage a NamedTopic's logical subscriber groups since their life span is independent of active Subscriber group membership. NamedTopic.destroySubscriberGroup(String) releases storage and stops accumulating topic values for a subscriber group.

      Calling close() on an already closed subscriber is a no-op.

      Specified by:
      close in interface AutoCloseable
    • heartbeat

      void heartbeat()
      Send a heartbeat to the server to keep this subscriber alive.

      Heartbeat messages are sent on calls to any of the receive() methods. If a subscriber does not call receive within the configured timeout time, it will be considered dead and lose its channel ownership. This is to stop badly behaving subscribers from blocking messages from being processed, for example if a bug in application code causes a subscriber to deadlock or take an excessive amount of time to process messages.

      If application code knows that it will take longer to execute than the subscriber timeout, for example the message processing communicates with a third-party system that is in some kind of back-off loop, then the subscriber can send a heartbeat to keep itself alive.

      A subscriber that times-out is still active and is not closed. On the next call to a receive() method the subscriber will reconnect and be reallocated channel ownerships. Although a timed-out subscriber is not closed it would have lost ownership of channels and not be able to commit any messages that it was processing when it timed out. This is because another subscriber in the group may have already been allocated ownership of the same channels, already processed, and committed that same message. Allowing commits of unowned channels is configurable, by default a subscriber can only commit positions for channels it owns.

      Heart-beating only applies to subscribers that are members of a subscriber group. An anonymous subscriber owns all channels and no other subscriber is sharing the workload, so anonymous subscriber will not be timed-out. For an anonymous subscriber the heartbeat operation is a no-op.

    • isActive

      boolean isActive()
      Determine whether this Subscriber is active.
      Returns:
      true if this Subscriber is active
    • onClose

      void onClose(Runnable action)
      Add an action to be executed when this Subscriber is closed.
      Parameters:
      action - the action to execute
    • commit

      default Subscriber.CommitResult commit(int nChannel, Position position)
      Commit the specified channel and position.
      Parameters:
      nChannel - the channel to commit
      position - the position within the channel to commit
      Returns:
      the result of the commit request
      Throws:
      IllegalStateException - if the Subscriber is closed
    • commitAsync

      CompletableFuture<Subscriber.CommitResult> commitAsync(int nChannel, Position position)
      Asynchronously commit the specified channel and position.
      Parameters:
      nChannel - the channel to commit
      position - the position within the channel to commit
      Returns:
      the result of the commit request
      Throws:
      IllegalStateException - if the Subscriber is closed
    • commit

      default Map<Integer,Subscriber.CommitResult> commit(Map<Integer,Position> mapPositions)
      Commit the specified channels and positions.
      Parameters:
      mapPositions - a map of channels napped to the position to commit
      Returns:
      a map of results of the commit request for each channel
      Throws:
      IllegalStateException - if the Subscriber is closed
    • commitAsync

      Asynchronously commit the specified channels and positions.
      Parameters:
      mapPositions - a map of channels mapped to the positions to commit
      Returns:
      a map of results of the commit request for each channel
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getLastCommitted

      default Optional<Position> getLastCommitted(int nChannel)
      Returns an Optional containing the latest position committed for a channel, or Optional.empty() if the channel is not owned by this Subscriber
      Parameters:
      nChannel - the channel to get the last committed position for
      Returns:
      the latest position committed for a channel, or Optional.empty() if the channel is not owned by this Subscriber
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getLastCommitted

      Map<Integer,Position> getLastCommitted()
      Returns a Map of channels to the latest Position committed for that channel; the map will only contain channels owned by this Subscriber.
      Returns:
      a Map of channels to the latest Position committed for that channel; the map will only contain channels owned by this Subscriber
      Throws:
      IllegalStateException - if the Subscriber is closed
    • seek

      Position seek(int nChannel, Position position)
      Seek to the specified position in a channel.

      This method will position the subscriber such that the element returned by the next call to any receive method will be the element after the specified position in the channel.

      An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.

      Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to commit(int, Position) using the seeked to Position for the channel returned by this method will be required.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      nChannel - the channel to reposition
      position - the Position to seek to
      Returns:
      the Position actually seeked to, which may be different to the position parameter if the position parameter is before the channel's head or after the channel's tail.
      Throws:
      IllegalArgumentException - if the Position is not the correct type for the topic implementation or is an invalid position
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
    • seekAndCommit

      default Position seekAndCommit(int nChannel, Position position)
      Seek to the specified position in a channel and set the commit point to the new Position.

      This method will position the subscriber such that the element returned by the next call to any receive method will be the element after the specified position in the channel.

      An attempt to move the position after the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position before the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method).

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      nChannel - the channel to reposition
      position - the Position to seek to
      Returns:
      the Position actually seeked to, which may be different to the position parameter if the position parameter is before the channel's head or after the channel's tail.
      Throws:
      IllegalArgumentException - if the Position is not the correct type for the topic implementation or is an invalid position
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
    • seek

      Map<Integer,Position> seek(Map<Integer,Position> mapPosition)
      Seek to the specified position in a set of channels.

      This method will position the subscriber such that the element returned by the next call to any receive method will be the element after the specified position in each channel.

      An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.

      Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to commit(int, Position) using the seeked to Position for the channel returned by this method will be required.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      mapPosition - a Map of positions keyed by channel to seek to
      Returns:
      a Map keyed by channel of the Position seeked to, which may be different to the position parameter if the position parameter is before the channel's head or after the channel's tail.
      Throws:
      IllegalArgumentException - if the Position is not the correct type for the topic implementation or is an invalid position
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
    • seekAndCommit

      default Map<Integer,Position> seekAndCommit(Map<Integer,Position> mapPosition)
      Seek to the specified position in a set of channels and sets the commit position for the channels.

      This method will position the subscriber such that the element returned by the next call to any receive method will be the element after the specified position in each channel.

      An attempt to move the position after the current tail position for a channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position before the current head position for a channel will reposition the channel at the head, effectively moving the subscriber back to the first available element for that channel.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned for that channel from this method).

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      mapPosition - a Map of positions keyed by channel to seek to
      Returns:
      a Map keyed by channel of the Position seeked to, which may be different to the position parameter if the position parameter is before the channel's head or after the channel's tail.
      Throws:
      IllegalArgumentException - if the Position is not the correct type for the topic implementation or is an invalid position
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
    • seek

      Position seek(int nChannel, Instant timestamp)
      Seek to a position in a channel based the published timestamp of the elements in the topic.

      This method will position the subscriber such that the element returned by the next call to any receive method that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is the cluster timestamp on the member receiving the published element at the time the publish request was accepted.

      An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.

      Repositioning a channel ahead of its current position will skip unread elements in that channel. It will not move the last committed position for the channel, so if the subscriber is disconnected then it will restart at the previous commit. To also move the committed position a subsequent call to commit(int, Position) using the seeked to Position for the channel returned by this method will be required.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      nChannel - the channel to reposition
      timestamp - the timestamp to seek to
      Returns:
      the Position actually seeked to
      Throws:
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
      NullPointerException - if the timestamp is null
    • seekAndCommit

      default Position seekAndCommit(int nChannel, Instant timestamp)
      Seek to a position in a channel based the published timestamp of the elements in the topic and set the commit point to the new position.

      This method will position the subscriber such that the element returned by the next call to any receive method that polls the specific channel will be the element with a published timestamp after the specified timestamp. The published timestamp is the cluster timestamp on the member receiving the published element at the time the publish request was accepted.

      An attempt to move the position using a timestamp later that the timestamp of the current tail position for the channel will reposition the channel at the tail, effectively making the channel appear empty.

      An attempt to move the position using a timestamp earlier than the current head position for the channel will reposition the channel at the head, effectively moving the subscriber back to the first available element in the channel.

      Repositioning a channel back before the current committed position will move the committed position for the channel back to the seeked position (i.e. back to the position returned from this method) effectively rolling back commits made to elements in the channel after the seeked.

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      nChannel - the channel to reposition
      timestamp - the timestamp to seek to
      Returns:
      the Position actually seeked to
      Throws:
      IllegalStateException - if this subscriber is not the owner of the channel being repositioned
      NullPointerException - if the timestamp is null
    • seekToHead

      Map<Integer,Position> seekToHead(int... anChannel)
      Reposition one or more channels to their respective head positions.

      If any of the specified channels have been committed their commits will also be reset, effectively removing all committed positions for the channel.

      The next message received will be the first available message in the topic for the channel. Note that this may not be the first message originally received by the subscriber if a topic is not set to retain received elements, as in that case messages are removed after being committed, so cannot be re-received.

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      anChannel - one or more channels to reposition to the head
      Returns:
      a Map keyed by channel of the head Position seeked to for each channel
      Throws:
      IllegalStateException - if this subscriber is not the owner one or more of the specified channels
    • seekToTail

      Map<Integer,Position> seekToTail(int... anChannel)
      Reposition one or more channels to their respective tail positions. The channel will be repositioned to read the next message published to that channel.

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      anChannel - one or more channels to reposition to the tail
      Returns:
      a Map keyed by channel of the tail Position seeked to for each channel
      Throws:
      IllegalStateException - if this subscriber is not the owner of one or more of the specified channels
    • seekToTailAndCommit

      default Map<Integer,Position> seekToTailAndCommit(int... anChannel)
      Reposition one or more channels to their respective tail positions and set the commit point to the new Position. The channel will be repositioned to read the next message published to that channel.

      Any in-flight receive() requests will complete before the seek operation takes place, whilst any requests still locally queued will be completed after seeking.

      Parameters:
      anChannel - one or more channels to reposition to the tail
      Returns:
      a Map keyed by channel of the tail Position seeked to for each channel
      Throws:
      IllegalStateException - if this subscriber is not the owner of one or more of the specified channels
    • getHead

      default Optional<Position> getHead(int nChannel)
      Returns the Position that is currently the tail for the specified channel, or Optional.empty() if the channel is not owned by thisSubscriber.
      Parameters:
      nChannel - the channel to obtain the tail Position for
      Returns:
      the Position that is currently the tail for the specified channel or Optional.empty() if the channel is not owned by thisSubscriber.
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getHeads

      Map<Integer,Position> getHeads()
      Returns a Map of the Positions that are currently the head for each channel owned by this Subscriber.

      This result is somewhat transient in situations where the Subscriber has in-flight receive requests, so the heads returned may change just after the method returns.

      Returns:
      the Positions that are currently the heads for each channel owned by this Subscriber
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getTail

      default Optional<Position> getTail(int nChannel)
      Returns the Position that is currently the tail for the specified channel or Optional.empty() if the channel is not owned by thisSubscriber.

      This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail position returned may change just after this method returns.

      Parameters:
      nChannel - the channel to obtain the tail Position for
      Returns:
      the Position that is currently the tail for the specified channel or Optional.empty() if the channel is not owned by thisSubscriber.
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getTails

      Map<Integer,Position> getTails()
      Returns a Map of the Positions that are currently the tail for each channel owned by this Subscriber; that is the last message in the channel.

      This result is somewhat transient in situations where publishers are actively publishing messages to the topic, so the tail positions returned may change just after this method returns.

      Returns:
      the Positions that are currently the tails for each channel
      Throws:
      IllegalStateException - if the Subscriber is closed
    • getNamedTopic

      <T> NamedTopic<T> getNamedTopic()
      Returns the underlying NamedTopic that this Subscriber is subscribed to, which could be of a different generic type to this Subscriber if the subscriber is using a transformer.
      Type Parameters:
      T - the type of the underlying topic
      Returns:
      the underlying NamedTopic that this Subscriber is subscribed to
    • getRemainingMessages

      int getRemainingMessages()
      Returns the number of remaining messages to be read from the topic for this subscriber.
      Returns:
      the number of remaining messages
    • getRemainingMessages

      int getRemainingMessages(int nChannel)
      Returns the number of remaining messages to be read from the topic channel for this subscriber.
      Parameters:
      nChannel - the channel to count remaining messages in
      Returns:
      the number of remaining messages, or zero if this subscriber does not own the channel
    • inGroup

      static <V> Subscriber.Name<V> inGroup(String sName)
      Obtain a Subscriber.Option that specifies a group name for a Subscriber.
      Type Parameters:
      V - the type of the elements being received from the topic
      Parameters:
      sName - the group name to use for the {Link Subscriber}.
      Returns:
      a Subscriber.Option that specifies a group name for a Subscriber
    • completeOnEmpty

      static <V> Subscriber.CompleteOnEmpty<V> completeOnEmpty()
      Obtain the Option indicating futures should complete if the topic is empty.
      Returns:
      the Option indicating futures should complete if the topic is empty
    • withListener

      Create a Subscriber.ChannelOwnershipListeners option with one or more listeners.
      Type Parameters:
      V - the type of the elements being received from the topic
      Parameters:
      aListener - the listeners to add to the subscriber
      Returns:
      a Subscriber.ChannelOwnershipListeners option with one or more listeners
    • withConverter

      static <V, U> Subscriber.Convert<V,U> withConverter(ValueExtractor<? super V,U> extractor)
      Return a Convert option with the specified extractor.
      Type Parameters:
      V - the type of the elements being received from the topic
      Parameters:
      extractor - the converter extractor
      Returns:
      the Filtered option
    • withFilter

      static <V> Subscriber.Filtered<V> withFilter(Filter<? super V> filter)
      Return a Filtered option with the specified filter.
      Type Parameters:
      V - the type of the elements being received from the topic
      Parameters:
      filter - the filter
      Returns:
      the Filtered option