In one project we used ActiveMQ(JMS API) to communicate between business services and custom engine that worked asynchronously behind the scene to process financial data.
The flow to add/remove entity from ui was done as it usually organized in context of three-tier architecture: call to ui controller, route appropriate method, use service and then DAO to add/delete entity and then notify active mq corresponding consumers to process add/delete actions.
But the issue was in the following: entity might be created in database, but not fully processed by engine while user tries to delete it from web interface.
It's not the issue on the database side, this is multithreading synchronization question.
I decided to use spin lock approach to sync two add/delete threads that did their work when the corresponding messages came to ActiveMQ queue.
This is how JMS handler looks like. When message comes to queue it's then appropriate handled by tickerEngineRequestProcessor, that is simply container for ThreadPoolExecutor performing tasks execution(add or delete one).
Below is definition for tickerEngineRequestProcessor:
Let's now describe TickerTask, it just implemented Runnable interface to be processed within custom spin lock thread pool which I will describe further.
The main idea in TickerEngineAddRemoveSyncThreadPoolExecutor is to perform spin iterations on thread that performs removing ticker while thread which adds ticker is not finished yet.
This logic is placed in beforeExecute action. In afterExecute we just remove corresponding tickerId key from tickerIdsToAddInProgressState set.
One more word about tickerIdsToAddInProgressState. This is ConcurrentHashSet that I choosed for storing state as per given tickerId, because we need to maintain sync add/remove operations based on the given tickerId and don't affect other tickers that might be processed at the same time. Thread.yield() is signal for threads scheduler to switch its work to other threads but the yielding thread can be scheduled back in one interrupt period later.
The flow to add/remove entity from ui was done as it usually organized in context of three-tier architecture: call to ui controller, route appropriate method, use service and then DAO to add/delete entity and then notify active mq corresponding consumers to process add/delete actions.
But the issue was in the following: entity might be created in database, but not fully processed by engine while user tries to delete it from web interface.
It's not the issue on the database side, this is multithreading synchronization question.
I decided to use spin lock approach to sync two add/delete threads that did their work when the corresponding messages came to ActiveMQ queue.
This is how JMS handler looks like. When message comes to queue it's then appropriate handled by tickerEngineRequestProcessor, that is simply container for ThreadPoolExecutor performing tasks execution(add or delete one).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void handleMessage(javax.jms.Message m) { | |
Message<?> message = null; | |
try { | |
message = client.translateFromJMS(m); | |
} catch (JMSException e) { | |
logger.error("Error translating JMS message", e); | |
} | |
if (message != null) { | |
logger.debug(String.format("Received %s", message)); | |
final Object content = message.getContent(); | |
if (content instanceof AddTicker) { | |
tickerEngineRequestProcessor.addTicker((AddTicker) content, m); | |
} else if (content instanceof RemoveTicker) { | |
tickerEngineRequestProcessor.removeTicker((RemoveTicker) content, m); | |
} else { | |
logger.warn(String.format("Received unsupported message: %s", message)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TickerEngineRequestProcessor implements DisposableBean { | |
private static final Logger logger = Logger.getLogger(TickerEngineRequestProcessor.class); | |
private final TickerEngineInternal tickerEngine; | |
private final ThreadPoolExecutor tickersThreadPool; | |
@SuppressWarnings("unchecked") | |
public TickerEngineRequestProcessor(TickerEngineInternal TickerEngine, InternalTickerService internalTickerService, | |
InternalTickerSupportService internalTickerSupportService) { | |
this.tickerEngine = tickerEngine; | |
final DelayQueue<TickerEngineControllerTask> queue = new DelayQueue<>(); | |
tickersThreadPool = new TickerEngineAddRemoveSyncThreadPoolExecutor(minThreads, maxThreads, | |
60L, TimeUnit.SECONDS, | |
(BlockingQueue) queue, new TickerEngineRequestThreadFactory()); | |
} | |
private void onRequest() { | |
logger.debug(String.format("Statistic of request processor: total tasks - %d, completed tasks - %d", | |
tickersThreadPool.getTaskCount(), TickersThreadPool.getCompletedTaskCount())); | |
} | |
public void addTicker(AddTicker addTicker, javax.jms.Message message) { | |
final long TickerId = addTicker.getTickerId(); | |
tickersThreadPool.execute(new TickerTask(TickerEngine, TickerId, true, addTicker.getTimestamp(), message)); | |
onRequest(); | |
} | |
public void removeTicker(RemoveTicker removeTicker, javax.jms.Message message) { | |
final long TickerId = removeTicker.getTickerId(); | |
tickersThreadPool.execute(new TickerTask(TickerEngine, TickerId, false, removeTicker.getTimestamp(), message)); | |
onRequest(); | |
} | |
@Override | |
public void destroy() throws Exception { | |
tickersThreadPool.shutdownNow(); | |
} | |
static class TickerEngineRequestThreadFactory implements ThreadFactory { | |
private final AtomicInteger threadNumber = new AtomicInteger(1); | |
private final String namePrefix; | |
TickerEngineRequestThreadFactory() { | |
namePrefix = "TickerEngineRequestProcessorThread-"; | |
} | |
@Override | |
public Thread newThread(Runnable r) { | |
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); | |
if (t.isDaemon()) { | |
t.setDaemon(false); | |
} | |
if (t.getPriority() != Thread.NORM_PRIORITY) { | |
t.setPriority(Thread.NORM_PRIORITY); | |
} | |
return t; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TickerTask implements Runnable { | |
private final TickerEngine tickerEngine; | |
private final long tickerId; | |
private final boolean addTicker; | |
private final long timestamp; | |
public TickerTask(TickerEngine tickerEngine, long tickerId, boolean addTicker, long timestamp, javax.jms.Message message) { | |
super(message); | |
this.tickerEngine = tickerEngine; | |
this.tickerId = tickerId; | |
this.addTicker = addTicker; | |
this.timestamp = timestamp; | |
} | |
public boolean isAddTicker() { | |
return addTicker; | |
} | |
public long getTickerId() { | |
return TickerId; | |
} | |
@Override | |
public void run() { | |
if (addTicker) { | |
tickerEngine.addTicker(tickerId); | |
} else { | |
tickerEngine.removeTicker(tickerId); | |
} | |
} | |
@Override | |
public Logger getLogger() { | |
return Logger.getLogger(TickerTask.class); | |
} | |
@Override | |
public long getTimestamp() { | |
return timestamp; | |
} | |
} |
One more word about tickerIdsToAddInProgressState. This is ConcurrentHashSet that I choosed for storing state as per given tickerId, because we need to maintain sync add/remove operations based on the given tickerId and don't affect other tickers that might be processed at the same time. Thread.yield() is signal for threads scheduler to switch its work to other threads but the yielding thread can be scheduled back in one interrupt period later.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TickerEngineAddRemoveSyncThreadPoolExecutor extends ThreadPoolExecutor { | |
private final ConcurrentHashSet<String> tickerIdsToAddInProgressState = new ConcurrentHashSet<>(); | |
/** | |
* @param corePoolSize | |
* The size of the pool | |
* @param maximumPoolSize | |
* The maximum size of the pool | |
* @param keepAliveTime | |
* The amount of time you wish to keep a single task alive | |
* @param unit | |
* The unit of time that the keep alive time represents | |
* @param workQueue | |
* The queue that holds your tasks | |
* @see {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)} | |
*/ | |
public TickerEngineAddRemoveSyncThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, | |
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { | |
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); | |
} | |
/** | |
* @param thread | |
* The thread being executed | |
* @param runnable | |
* The runnable task | |
* @see {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)} | |
*/ | |
@Override | |
protected void beforeExecute(Thread thread, Runnable runnable) { | |
super.beforeExecute(thread, runnable); | |
final TickerTask tickerTask = tryToGetTickerTask(runnable); | |
if (tickerTask != null) { | |
final String tickerId = Long.toString(tickerTask.getTickerId()); | |
if (tickerTask.isAddTicker()) { | |
tickerIdsToAddInProgressState.add(tickerId); | |
} else { | |
// no context swithcing, just simple spin logic to | |
// iterate while adding operation is not finished. | |
while (tickerIdsToAddInProgressState.contains(tickerId)) { | |
Thread.yield(); | |
} | |
} | |
} | |
} | |
/** | |
* @param runnable | |
* the runnable that has completed | |
* @param t | |
* the exception that caused termination, or null if | |
*/ | |
@Override | |
protected void afterExecute(Runnable runnable, Throwable t) { | |
super.afterExecute(runnable, t); | |
final TickerTask tickerTask = tryToGetTickerTask(runnable); | |
if (tickerTask != null) { | |
if (tickerTask.isAddTicker()) { | |
final String tickerId = Long.toString(tickerTask.getTickerId()); | |
tickerIdsToAddInProgressState.remove(TickerId); | |
} | |
} | |
} | |
/** | |
* @param runnable | |
* the runnable which needs to be compared to TickerTask class. | |
*/ | |
private TickerTask tryToGetTickerTask(Runnable runnable) { | |
return runnable instanceof TickerTask ? (TickerTask) runnable : null; | |
} | |
} |