Thursday, February 25, 2010

Eclipselink: table sequence generator and concurrency - Part Three

Thanks to eager readers, I finally got to finish this :D

So far, last two posts showed that there is a problem with sequencing in eclipselink regarding concurrent access and that there is a possibility of implementing custom sequence generator. We decided to to our own implementation of table generated sequence.

So, as we said, we have our own sequence generator class which is, interestingly, called CustomSequenceGenerator:

As you can see, getGeneratedValue method is used to get concrete value. There an instance of CustomSequenceManager class is used. Of course, it is used in our

public class CustomSequenceGenerator extends Sequence implements SessionCustomizer {
private Logger logger = Logging.getLogger(CustomSequenceGenerator.class);

private CustomSequenceManager customSequenceManager;
public CustomSequenceGenerator() {
super();
logger.debug("CustomSequenceGenerator()");
customSequenceManager = new CustomSequenceManager();
logger.debug("new CustomSequenceManager()");
}

public CustomSequenceGenerator(String name) {
super(name);
logger.debug("CustomSequenceGenerator(String name)");
customSequenceManager = new CustomSequenceManager();
logger.debug("new CustomSequenceManager()");
}

@Override
public Object getGeneratedValue(Accessor accessor,
AbstractSession writeSession, String seqName) {
logger.debug("getGeneratedValue()");
Long nextValue = customSequenceManager.getNextValue(seqName);
logger.debug("nextValue = " + nextValue);
return nextValue;
}

@Override
public Vector getGeneratedVector(Accessor accessor,
AbstractSession writeSession, String seqName, int size) {
return null;
}

@Override
protected void onConnect() {
}

@Override
protected void onDisconnect() {
}

@Override
public boolean shouldAcquireValueAfterInsert() {
return false;
}

@Override
public boolean shouldOverrideExistingValue(String seqName,
Object existingValue) {
return existingValue == null;
}

@Override
public boolean shouldUseTransaction() {
return false;
}

@Override
public boolean shouldUsePreallocation() {
return false;
}

public void customize(Session session) throws Exception {
logger.debug("customize(Session session) ");
CustomSequenceGenerator sequence = new CustomSequenceGenerator("BASE_ENTITY_SEQ");
logger.debug("new CustomSequenceGenerator('BASE_ENTITY_SEQ')");
session.getLogin().addSequence(sequence);
logger.debug("session.getLogin().addSequence(sequence)");
}

}


Now, let's see CUstomSequenceManager class. What we do is just simulate the sequencing as we know it in Oracle.

public class CustomSequenceManager {
private static Logger logger = Logger .getLogger(CustomSequenceManager.class);
protected HashMap sequenceMap = new HashMap();
//Saves values for frame limits
private Long allocationSize = MyParameters.getSequenceFrameSize();
protected HashMap sequenceFrame = new HashMap();

public synchronized Long getNextValue(String seqName){
SequenceService sequenceService = new SequenceService();

Long seqCount = sequenceMap.get(seqName);
Long seqLimit = sequenceFrame.get(seqName);

if(seqCount == null || seqCount == seqLimit-1){
seqCount = sequenceService.getCustomSequenceNextValue(seqName);
if(seqCount == null){
seqCount = 0L;
}
seqLimit = seqCount + allocationSize;

sequenceFrame.put(seqName, seqLimit);
sequenceMap.put(seqName, seqCount);
}
else if(seqCount<=seqLimit-1){
seqCount++;
sequenceMap.put(seqName, seqCount);
}


return seqCount;
}
}

I guess it's easy to follow. We allocate frame of values and use the values from it. when we exceed the frame values, we get the next frame etc. We use hash map for storing current values, and we get new frame values from SequenceService.


public class SequenceService {
SequencePersist sequencePersist;

public SequenceService (){
try {
Context ctx = new InitialContext();
sequencePersist = (SequencePersist) ctx.lookup("SequencePersistBean");

} catch (javax.naming.NamingException ne) {
throw new EJBException(ne);
}
}

public UserSequence create(UserSequence s){

return sequencePersist.create(s);
}
public UserSequence modify(UserSequence s){
return sequencePersist.modify(s);
}
public UserSequence find(String seqName){
Object seqObj = sequencePersist.find(seqName);
if(seqObj == null){
return null;
}
return (UserSequence)seqObj;
}
public List findQuery(String queryString){
return sequencePersist.findQuery(queryString);
}
public Long getCustomSequenceNextValue(String seqName){
return sequencePersist.getCustomSequenceNextVal(seqName);
}

}

Nothing special here, because all the work is done behind SequencePersist interface. Here we used EJB implementation, but you can also use Spring to get bean. What is really important is in SequencePersistBean implementation, and that is database access to sequence table.

@Stateless(name="SequencePersistBean")
public class SequencePersistBean implements SequencePersistLocal,SequencePersistRemote{
@PersistenceContext(name = "my_sequence/EntityManager",
unitName = "my_sequence")
EntityManager em;
Logger logger = Logger .getLogger(SequencePersistBean.class);

public SequencePersistBean(){

}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public UserSequence create(UserSequence s){
em.persist(s);
return (UserSequence)em.find(s.getClass(), s.getSeqName());
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public UserSequence modify(UserSequence s){
return (UserSequence)em.merge(s);
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public UserSequence find(String seqName){
Object seqObj = em.find(UserSequence.class, seqName);
if(seqObj == null){
return null;
}
return (UserSequence)seqObj;
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public List findQuery(String queryString){
return (List) em.createQuery(queryString).getResultList();
}
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public Long getCustomSequenceNextVal(String seqName){
Long sequenceCount = null;
try{
StoredProcedureCall spcall = new StoredProcedureCall();
spcall.setProcedureName("CC1NextSequenceVal");
spcall.addNamedArgument("SequenceName");
spcall.addNamedOutputArgument(
"NextValue", // procedure parameter name
"NextValue", // out argument field name
BigDecimal.class // Java type corresponding to type returned by procedure
);

ValueReadQuery query = new ValueReadQuery();
query.setCall(spcall);
query.addArgument("SequenceName"); // input

List args = new ArrayList();
args.add(seqName);


Session session = JpaHelper.getEntityManager(em).getActiveSession();
logger.debug("session" + session);
BigDecimal intSeq = (BigDecimal) session.executeQuery(query, args);
sequenceCount = intSeq.longValue();
}catch(RuntimeException e){
logger.fatal("getCustomSequenceNextVal", e);
}
logger.debug("sequenceCount" + sequenceCount);
return sequenceCount;
}
}


As you can see, @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) denoting methods is used to guarantee that each call is done in separate transaction.

I won't get into too much details about seuence table, because it is just imitation of Ewclipselink generated table. We use bean's methods to modify values in the table.


getCustomSequenceNextVal method is interesting, because we use stored procedure to ensure complete transaction isolation on database level. This Java code should be universal on all databases that have stord procedures and are supported by EclipseLink. The final touch is done writing stored procedure, which is DB thing. We used Oracle, so procedure is written as autonomous, on other database you should make something equivalent.

So, the essence of the matter is that you take frame of N values from DB table, store it in Java Hash map where you read and increment values. When you exceed the frame, you go to the DB to get new values. Reading through stored procedure that is autonomous will ensure that every concurrent reader gets different frame value. Basically, procedure looks in the table for row with specific SEQ_NAME. From the same row reads the value, and frame size and returns it. Then it increments value for frame, stores it, and that is it. Everything that Eclipselink table generated sequence should do, but doesn't for some reason.

2 comments:

  1. I would not have thought of using a stateless session bean with TX NEW but that is a very nice idea! Also the stored procedure is a good idea.

    Thanks for posting this.

    Brett

    ReplyDelete
  2. We are here to help anyone who asks nicely :)

    Best,
    Milos

    ReplyDelete