com.netflix.astyanax.recipes.locks
Class ColumnPrefixDistributedRowLock<K>

java.lang.Object
  extended by com.netflix.astyanax.recipes.locks.ColumnPrefixDistributedRowLock<K>
Type Parameters:
K -
All Implemented Interfaces:
DistributedRowLock

public class ColumnPrefixDistributedRowLock<K>
extends Object
implements DistributedRowLock

Takes a distributed row lock for a single row. The row lock is accomplished using a sequence of read/write events to Cassandra without the need for something like zookeeper. Algorithm 1. Write a column with name _. Value is an expiration time. 2. Read back all columns with case 1) count==1 Got the lock case 2) count> 1 No lock 3. Do something in your code assuming the row is locked 4. Release the lock by deleting the lock columns Usage considerations 1. Set an expiration time (expireLockAfter) that is long enough for your processing to complete 2. Use this when the probability for contension is very low 3. Optimize by reading all columns (withIncludeAllColumn(true)) and merge the mutation into the release. This will save 2 calls to cassandra. 4. If the client fails after Step 1. A subsequent attempt to lock will automatically release these stale locks. You can turn this auto cleanup off by calling failOnStaleLock(false), handling a StaleLockException and doing manual cleanup by calling releaseExpiredLocks() 5. An optional TTL can be set on the lock columns which will ensure abandoned/stale locks will be cleaned up by compactions at some point. 6. You can customize the 'prefix' used for the lock columns. This will help with storing the lock columns with data in the same row. 7. You can customize the unique part of the lock column to include meaningful data such as the UUID row key from another column family. This can have the same effect as assigning a foreign key to the lock column and is useful for uniqueness constraint. 8. This recipe is not a transaction. Take a lock, ColumnPrefixDistributedRowLock lock = new ColumnPrefixDistributedRowLock(keyspace, columnFamily, "KeyBeingLocked"); try { lock.acquire(); } finally { lock.release(); } Read, Modify, Write. The read, modify, write piggybacks on top of the lock calls. ColumnPrefixDistributedRowLock lock = new ColumnPrefixDistributedRowLock(keyspace, columnFamily, "KeyBeingLocked"); MutationBatch m = keyspace.prepareMutationBatch(); try { ColumnMap columns = lock.acquireLockAndReadRow(); m.withRow("KeyBeingLocked") .putColumn("SomeColumnBeingUpdated", ); lock.releaseWithMutation(m); } catch (Exception e) { lock.release(); }

Author:
elandau

Field Summary
static String DEFAULT_LOCK_PREFIX
           
static TimeUnit DEFAULT_OPERATION_TIMEOUT_UNITS
           
static int LOCK_TIMEOUT
           
 
Constructor Summary
ColumnPrefixDistributedRowLock(Keyspace keyspace, ColumnFamily<K,String> columnFamily, K key)
           
 
Method Summary
 void acquire()
          Try to take the lock.
 ColumnMap<String> acquireLockAndReadRow()
          Take the lock and return the row data columns.
 ColumnPrefixDistributedRowLock<K> expireLockAfter(long timeout, TimeUnit unit)
          Time for failed locks.
 ColumnPrefixDistributedRowLock<K> failOnStaleLock(boolean failOnStaleLock)
          When set to true the operation will fail if a stale lock is detected
 String fillLockMutation(MutationBatch m, Long time, Integer ttl)
          Fill a mutation with the lock column.
 void fillReleaseMutation(MutationBatch m, boolean excludeCurrentLock)
          Fill a mutation that will release the locks.
 ConsistencyLevel getConsistencyLevel()
           
 ColumnMap<String> getDataColumns()
           
 K getKey()
           
 Keyspace getKeyspace()
           
 String getLockColumn()
           
 String getLockId()
           
 String getPrefix()
           
 int getRetryCount()
           
 Map<String,Long> readLockColumns()
          Return a mapping of existing lock columns and their expiration times
 void release()
          Release the lock by releasing this and any other stale lock columns
 Map<String,Long> releaseAllLocks()
          Release all locks.
 Map<String,Long> releaseExpiredLocks()
          Release all expired locks for this key.
 Map<String,Long> releaseLocks(boolean force)
          Delete locks columns.
 void releaseWithMutation(MutationBatch m)
          Release using the provided mutation.
 void verifyLock(long curTimeInMicros)
          Verify that the lock was acquired.
 ColumnPrefixDistributedRowLock<K> withBackoff(RetryPolicy policy)
           
 ColumnPrefixDistributedRowLock<K> withColumnPrefix(String prefix)
          Specify the prefix that uniquely distinguishes the lock columns from data column
 ColumnPrefixDistributedRowLock<K> withConsistencyLevel(ConsistencyLevel consistencyLevel)
          Modify the consistency level being used.
 ColumnPrefixDistributedRowLock<K> withDataColumns(boolean flag)
          If true the first read will also fetch all the columns in the row as opposed to just the lock columns.
 ColumnPrefixDistributedRowLock<K> withLockId(String lockId)
          Override the autogenerated lock column.
 ColumnPrefixDistributedRowLock<K> withTtl(Integer ttl)
          This is the TTL on the lock column being written, as opposed to expireLockAfter which is written as the lock column value.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

LOCK_TIMEOUT

public static final int LOCK_TIMEOUT
See Also:
Constant Field Values

DEFAULT_OPERATION_TIMEOUT_UNITS

public static final TimeUnit DEFAULT_OPERATION_TIMEOUT_UNITS

DEFAULT_LOCK_PREFIX

public static final String DEFAULT_LOCK_PREFIX
See Also:
Constant Field Values
Constructor Detail

ColumnPrefixDistributedRowLock

public ColumnPrefixDistributedRowLock(Keyspace keyspace,
                                      ColumnFamily<K,String> columnFamily,
                                      K key)
Method Detail

withConsistencyLevel

public ColumnPrefixDistributedRowLock<K> withConsistencyLevel(ConsistencyLevel consistencyLevel)
Modify the consistency level being used. Consistency should always be a variant of quorum. The default is CL_QUORUM, which is OK for single region. For multi region the consistency level should be CL_LOCAL_QUORUM. CL_EACH_QUORUM can be used but will Incur substantial latency.

Parameters:
consistencyLevel -
Returns:

withColumnPrefix

public ColumnPrefixDistributedRowLock<K> withColumnPrefix(String prefix)
Specify the prefix that uniquely distinguishes the lock columns from data column

Parameters:
prefix -
Returns:

withDataColumns

public ColumnPrefixDistributedRowLock<K> withDataColumns(boolean flag)
If true the first read will also fetch all the columns in the row as opposed to just the lock columns.

Parameters:
flag -
Returns:

withLockId

public ColumnPrefixDistributedRowLock<K> withLockId(String lockId)
Override the autogenerated lock column.

Parameters:
lockId -
Returns:

failOnStaleLock

public ColumnPrefixDistributedRowLock<K> failOnStaleLock(boolean failOnStaleLock)
When set to true the operation will fail if a stale lock is detected

Parameters:
failOnStaleLock -
Returns:

expireLockAfter

public ColumnPrefixDistributedRowLock<K> expireLockAfter(long timeout,
                                                         TimeUnit unit)
Time for failed locks. Under normal circumstances the lock column will be deleted. If not then this lock column will remain and the row will remain locked. The lock will expire after this timeout.

Parameters:
timeout -
unit -
Returns:

withTtl

public ColumnPrefixDistributedRowLock<K> withTtl(Integer ttl)
This is the TTL on the lock column being written, as opposed to expireLockAfter which is written as the lock column value. Whereas the expireLockAfter can be used to identify a stale or abandoned lock the TTL will result in the stale or abandoned lock being eventually deleted by cassandra. Set the TTL to a number that is much greater tan the expireLockAfter time.

Parameters:
ttl -
Returns:

withBackoff

public ColumnPrefixDistributedRowLock<K> withBackoff(RetryPolicy policy)

acquire

public void acquire()
             throws Exception
Try to take the lock. The caller must call .release() to properly clean up the lock columns from cassandra

Specified by:
acquire in interface DistributedRowLock
Throws:
Exception

acquireLockAndReadRow

public ColumnMap<String> acquireLockAndReadRow()
                                        throws Exception
Take the lock and return the row data columns. Use this, instead of acquire, when you want to implement a read-modify-write scenario and want to reduce the number of calls to Cassandra.

Returns:
Throws:
Exception

verifyLock

public void verifyLock(long curTimeInMicros)
                throws Exception,
                       BusyLockException,
                       StaleLockException
Verify that the lock was acquired. This shouldn't be called unless it's part of a recipe built on top of ColumnPrefixDistributedRowLock.

Parameters:
curTimeInMicros -
Throws:
BusyLockException
Exception
StaleLockException

release

public void release()
             throws Exception
Release the lock by releasing this and any other stale lock columns

Specified by:
release in interface DistributedRowLock
Throws:
Exception

releaseWithMutation

public void releaseWithMutation(MutationBatch m)
                         throws Exception
Release using the provided mutation. Use this when you want to commit actual data when releasing the lock

Parameters:
m -
Throws:
Exception

readLockColumns

public Map<String,Long> readLockColumns()
                                 throws Exception
Return a mapping of existing lock columns and their expiration times

Returns:
Throws:
Exception

releaseAllLocks

public Map<String,Long> releaseAllLocks()
                                 throws Exception
Release all locks. Use this carefully as it could release a lock for a running operation.

Returns:
Throws:
Exception

releaseExpiredLocks

public Map<String,Long> releaseExpiredLocks()
                                     throws Exception
Release all expired locks for this key.

Returns:
Throws:
Exception

releaseLocks

public Map<String,Long> releaseLocks(boolean force)
                              throws Exception
Delete locks columns. Set force=true to remove locks that haven't expired yet. This operation first issues a read to cassandra and then deletes columns in the response.

Parameters:
force - - Force delete of non expired locks as well
Returns:
Throws:
Exception

fillLockMutation

public String fillLockMutation(MutationBatch m,
                               Long time,
                               Integer ttl)
Fill a mutation with the lock column. This may be used when the mutation is executed externally but should be used with extreme caution to ensure the lock is properly released

Parameters:
m -
time -
ttl -

fillReleaseMutation

public void fillReleaseMutation(MutationBatch m,
                                boolean excludeCurrentLock)
Fill a mutation that will release the locks. This may be used from a separate recipe to release multiple locks.

Parameters:
m -

getDataColumns

public ColumnMap<String> getDataColumns()

getKey

public K getKey()

getKeyspace

public Keyspace getKeyspace()

getConsistencyLevel

public ConsistencyLevel getConsistencyLevel()

getLockColumn

public String getLockColumn()

getLockId

public String getLockId()

getPrefix

public String getPrefix()

getRetryCount

public int getRetryCount()


Copyright © 2012. All Rights Reserved.