Skip to content

Latest commit

 

History

History
187 lines (123 loc) · 7.02 KB

spark-LiveListenerBus.adoc

File metadata and controls

187 lines (123 loc) · 7.02 KB

LiveListenerBus

LiveListenerBus asynchronously passes listener events to registered Spark listeners.

LiveListenerBus is a single-JVM SparkListenerBus that uses listenerThread to poll events. Emitters are supposed to use post method to post SparkListenerEvent events.

Note
The event queue is java.util.concurrent.LinkedBlockingQueue with capacity of 10000 SparkListenerEvent events.
Note
An instance of LiveListenerBus is created in SparkContext.

Listener Bus

Caution
FIXME Remove the anchor

Creating LiveListenerBus Instance

Caution
FIXME

Starting LiveListenerBus (start method)

start(sc: SparkContext): Unit

start starts processing events.

Internally, it saves the input SparkContext for later use and starts listenerThread. It makes sure that it only happens when LiveListenerBus has not been started before (i.e. started is disabled).

If however LiveListenerBus has already been started, a IllegalStateException is thrown:

[name] already started!

Posting SparkListenerEvent Events (post method)

post(event: SparkListenerEvent): Unit

post puts the input event onto the internal eventQueue queue and releases the internal eventLock semaphore. If the event placement was not successful (and it could happen since it is tapped at 10000 events) onDropEvent method is called.

The event publishing is only possible when stopped flag has been enabled.

Caution
FIXME Who’s enabling the stopped flag and when/why?

If LiveListenerBus has been stopped, the following ERROR appears in the logs:

ERROR [name] has already stopped! Dropping event [event]

Event Dropped Callback (onDropEvent method)

onDropEvent(event: SparkListenerEvent): Unit

onDropEvent is called when no further events can be added to the internal eventQueue queue (while posting a SparkListenerEvent event).

It simply prints out the following ERROR message to the logs and ensures that it happens only once.

ERROR Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
Note
It uses the internal logDroppedEvent atomic variable to track the state.

Stopping LiveListenerBus

stop(): Unit

stop releases the internal eventLock semaphore and waits until listenerThread dies. It can only happen after all events were posted (and polling eventQueue gives nothing).

It checks that started flag is enabled (i.e. true) and throws a IllegalStateException otherwise.

Attempted to stop [name] that has not yet started!

stopped flag is enabled.

listenerThread for Event Polling

LiveListenerBus uses SparkListenerBus single daemon thread that ensures that the polling events from the event queue is only after the listener was started and only one event at a time.

Caution
FIXME There is some logic around no events in the queue.

Settings

spark.extraListeners

spark.extraListeners (default: empty) is a comma-separated list of listener class names that should be registered with LiveListenerBus when SparkContext is initialized.

SparkListenerBus

SparkListenerBus is a ListenerBus that manages SparkListenerInterface listeners that process SparkListenerEvent events.

It comes with a custom doPostEvent method.

doPostEvent(listener: SparkListenerInterface, event: SparkListenerEvent): Unit

doPostEvent method simply relays SparkListenerEvent events to SparkListenerInterface methods as follows:

  • SparkListenerStageSubmittedonStageSubmitted

  • SparkListenerStageCompletedonStageCompleted

  • SparkListenerJobStartonJobStart

  • SparkListenerJobEndonJobEnd

  • SparkListenerTaskStartonTaskStart

  • SparkListenerTaskGettingResultonTaskGettingResult

  • SparkListenerTaskEndonTaskEnd

  • SparkListenerEnvironmentUpdateonEnvironmentUpdate

  • SparkListenerBlockManagerAddedonBlockManagerAdded

  • SparkListenerBlockManagerRemovedonBlockManagerRemoved

  • SparkListenerUnpersistRDDonUnpersistRDD

  • SparkListenerApplicationStartonApplicationStart

  • SparkListenerApplicationEndonApplicationEnd

  • SparkListenerExecutorMetricsUpdateonExecutorMetricsUpdate

  • SparkListenerExecutorAddedonExecutorAdded

  • SparkListenerExecutorRemovedonExecutorRemoved

  • SparkListenerBlockUpdatedonBlockUpdated

  • SparkListenerLogStart ⇒ event ignored

  • other event types ⇒ onOtherEvent

Note
There are two custom SparkListenerBus listeners: LiveListenerBus and ReplayListenerBus.

SparkListenerInterface

SparkListenerEvent

Caution
FIXME What are SparkListenerEvents? Where and why are they posted? What do they cause?

ListenerBus

ListenerBus[L <: AnyRef, E]

ListenerBus is an event bus that post events (of type E) to all registered listeners (of type L).

It manages listeners of type L, i.e. it can add to and remove listeners from an internal listeners collection.

addListener(listener: L): Unit
removeListener(listener: L): Unit

It can post events of type E to all registered listeners (using postToAll method). It simply iterates over the internal listeners collection and executes the abstract doPostEvent method.

doPostEvent(listener: L, event: E): Unit
Note
doPostEvent is provided by more specialized ListenerBus event buses.

In case of exception while posting an event to a listener you should see the following ERROR message in the logs and the exception.

ERROR Listener [listener] threw an exception
Note
There are three custom ListenerBus listeners: SparkListenerBus, ContinuousQueryListenerBus, and StreamingListenerBus.
Tip

Enable ERROR logging level for org.apache.spark.util.ListenerBus logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.util.ListenerBus=ERROR

Refer to Logging.