Sunday, December 7, 2014

Custom ThreadPoolExecutor based on spinlock approach.

      In one project we used ActiveMQ(JMS API) to communicate between business services and custom engine that worked asynchronously behind the scene to process financial data.
The flow to add/remove entity from ui was done as it usually organized in context of three-tier architecture: call to ui controller, route appropriate method, use service and then DAO to add/delete entity and then notify active mq corresponding consumers to process add/delete actions.

     But the issue was in the following: entity might be created in database, but not fully processed by engine while user tries to delete it from web interface.
It's not the issue on the  database side, this is multithreading synchronization question.

    I decided to use spin lock approach to sync two add/delete threads that did their work when the corresponding messages came to ActiveMQ queue.

    This is how JMS handler looks like. When message comes to queue it's then appropriate handled by  tickerEngineRequestProcessor, that is simply container for ThreadPoolExecutor performing tasks execution(add or delete one).

public void handleMessage(javax.jms.Message m) {
Message<?> message = null;
try {
message = client.translateFromJMS(m);
} catch (JMSException e) {
logger.error("Error translating JMS message", e);
}
if (message != null) {
logger.debug(String.format("Received %s", message));
final Object content = message.getContent();
if (content instanceof AddTicker) {
tickerEngineRequestProcessor.addTicker((AddTicker) content, m);
} else if (content instanceof RemoveTicker) {
tickerEngineRequestProcessor.removeTicker((RemoveTicker) content, m);
} else {
logger.warn(String.format("Received unsupported message: %s", message));
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
     Below is definition for tickerEngineRequestProcessor:
public class TickerEngineRequestProcessor implements DisposableBean {
private static final Logger logger = Logger.getLogger(TickerEngineRequestProcessor.class);
private final TickerEngineInternal tickerEngine;
private final ThreadPoolExecutor tickersThreadPool;
@SuppressWarnings("unchecked")
public TickerEngineRequestProcessor(TickerEngineInternal TickerEngine, InternalTickerService internalTickerService,
InternalTickerSupportService internalTickerSupportService) {
this.tickerEngine = tickerEngine;
final DelayQueue<TickerEngineControllerTask> queue = new DelayQueue<>();
tickersThreadPool = new TickerEngineAddRemoveSyncThreadPoolExecutor(minThreads, maxThreads,
60L, TimeUnit.SECONDS,
(BlockingQueue) queue, new TickerEngineRequestThreadFactory());
}
private void onRequest() {
logger.debug(String.format("Statistic of request processor: total tasks - %d, completed tasks - %d",
tickersThreadPool.getTaskCount(), TickersThreadPool.getCompletedTaskCount()));
}
public void addTicker(AddTicker addTicker, javax.jms.Message message) {
final long TickerId = addTicker.getTickerId();
tickersThreadPool.execute(new TickerTask(TickerEngine, TickerId, true, addTicker.getTimestamp(), message));
onRequest();
}
public void removeTicker(RemoveTicker removeTicker, javax.jms.Message message) {
final long TickerId = removeTicker.getTickerId();
tickersThreadPool.execute(new TickerTask(TickerEngine, TickerId, false, removeTicker.getTimestamp(), message));
onRequest();
}
@Override
public void destroy() throws Exception {
tickersThreadPool.shutdownNow();
}
static class TickerEngineRequestThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
TickerEngineRequestThreadFactory() {
namePrefix = "TickerEngineRequestProcessorThread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
Let's now describe TickerTask, it just implemented Runnable interface to be processed within custom spin lock thread pool which I will describe further.

public class TickerTask implements Runnable {
private final TickerEngine tickerEngine;
private final long tickerId;
private final boolean addTicker;
private final long timestamp;
public TickerTask(TickerEngine tickerEngine, long tickerId, boolean addTicker, long timestamp, javax.jms.Message message) {
super(message);
this.tickerEngine = tickerEngine;
this.tickerId = tickerId;
this.addTicker = addTicker;
this.timestamp = timestamp;
}
public boolean isAddTicker() {
return addTicker;
}
public long getTickerId() {
return TickerId;
}
@Override
public void run() {
if (addTicker) {
tickerEngine.addTicker(tickerId);
} else {
tickerEngine.removeTicker(tickerId);
}
}
@Override
public Logger getLogger() {
return Logger.getLogger(TickerTask.class);
}
@Override
public long getTimestamp() {
return timestamp;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
   The main idea in TickerEngineAddRemoveSyncThreadPoolExecutor is to perform spin iterations on thread that performs removing ticker while thread which adds ticker is not finished yet. This logic is placed in beforeExecute action. In afterExecute we just remove corresponding tickerId key from tickerIdsToAddInProgressState set.
   One more word about tickerIdsToAddInProgressState. This is ConcurrentHashSet that I choosed for storing state as per given tickerId, because we need to maintain sync add/remove operations based on the given tickerId and don't affect other tickers that might be processed at the same time. Thread.yield() is signal for threads scheduler to switch its work to other threads but the yielding thread can be scheduled back in one interrupt period later.

public class TickerEngineAddRemoveSyncThreadPoolExecutor extends ThreadPoolExecutor {
private final ConcurrentHashSet<String> tickerIdsToAddInProgressState = new ConcurrentHashSet<>();
/**
* @param corePoolSize
* The size of the pool
* @param maximumPoolSize
* The maximum size of the pool
* @param keepAliveTime
* The amount of time you wish to keep a single task alive
* @param unit
* The unit of time that the keep alive time represents
* @param workQueue
* The queue that holds your tasks
* @see {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)}
*/
public TickerEngineAddRemoveSyncThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
/**
* @param thread
* The thread being executed
* @param runnable
* The runnable task
* @see {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}
*/
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {
super.beforeExecute(thread, runnable);
final TickerTask tickerTask = tryToGetTickerTask(runnable);
if (tickerTask != null) {
final String tickerId = Long.toString(tickerTask.getTickerId());
if (tickerTask.isAddTicker()) {
tickerIdsToAddInProgressState.add(tickerId);
} else {
// no context swithcing, just simple spin logic to
// iterate while adding operation is not finished.
while (tickerIdsToAddInProgressState.contains(tickerId)) {
Thread.yield();
}
}
}
}
/**
* @param runnable
* the runnable that has completed
* @param t
* the exception that caused termination, or null if
*/
@Override
protected void afterExecute(Runnable runnable, Throwable t) {
super.afterExecute(runnable, t);
final TickerTask tickerTask = tryToGetTickerTask(runnable);
if (tickerTask != null) {
if (tickerTask.isAddTicker()) {
final String tickerId = Long.toString(tickerTask.getTickerId());
tickerIdsToAddInProgressState.remove(TickerId);
}
}
}
/**
* @param runnable
* the runnable which needs to be compared to TickerTask class.
*/
private TickerTask tryToGetTickerTask(Runnable runnable) {
return runnable instanceof TickerTask ? (TickerTask) runnable : null;
}
}
view raw gistfile1.java hosted with ❤ by GitHub





Tuesday, December 2, 2014

[mobile] How to overcome 'adb devices empty list' issue on Linux Mint 17

Recently I needed to install small  phonegap application with custom oauth2 authentication to my old htc desire.
It's not so big deal to run such applications on emulator devices via
phonegap run android,
But when I switched to the real device this caused some difficulties.
I work in Linux mint 17 and as it turned out adb devices is empty on this case.
After a little bit hacking and digging through internet I've found working solution.

1) type lsusb in shell and know vendor id: 
Device 007: ID 0bb4:0cb0 HTC (High Tech Computer Corp.)

So I see my old htc)
2) create 51-android.rules in /etc/udev/rules.d folder:
SUBSYSTEM=="usb", ATTR{idVendor}=="0bb4", MODE="0666", GROUP="plugdev"
3) Edited the adb_usb.ini file, added the 0x0bb4 
4) Enabled debugging on htc device(settings->developers)
And now list of devices is not empty!
List of devices attached HT15XTR04859 device
So we can run app on htc after that:

phonegap run android --device=HT15XTR04859






Monday, December 1, 2014

How To Fix NTFS Mount Issue In Linux Mint And Windows 8

After installing new Linux mint cinanmon 17 in dual bot with Windows 8 I'have faced with boring issue when mint could not mount disks belonging to Windows 8. The answer is this is because of Fast boot feature and it's might be simply resolved with turning off it from within Win 8. How?
Just follow this link for example :
http://itsfoss.com/solve-ntfs-mount-problem-ubuntu-windows-8-dual-boot/