case 1) count==1 Got the lock
case 2) count> 1 No lock
3. Do something in your code assuming the row is locked
4. Release the lock by deleting the lock columns
Usage considerations
1. Set an expiration time (expireLockAfter) that is long enough for your processing to complete
2. Use this when the probability for contension is very low
3. Optimize by reading all columns (withIncludeAllColumn(true)) and merge the mutation
into the release. This will save 2 calls to cassandra.
4. If the client fails after Step 1. A subsequent attempt to lock will automatically
release these stale locks. You can turn this auto cleanup off by calling
failOnStaleLock(false), handling a StaleLockException and doing manual cleanup by
calling releaseExpiredLocks()
5. An optional TTL can be set on the lock columns which will ensure abandoned/stale locks
will be cleaned up by compactions at some point.
6. You can customize the 'prefix' used for the lock columns. This will help with storing
the lock columns with data in the same row.
7. You can customize the unique part of the lock column to include meaningful data such
as the UUID row key from another column family. This can have the same effect as
assigning a foreign key to the lock column and is useful for uniqueness constraint.
8. This recipe is not a transaction.
Take a lock,
ColumnPrefixDistributedRowLock lock = new ColumnPrefixDistributedRowLock(keyspace, columnFamily, "KeyBeingLocked");
try {
lock.acquire();
}
finally {
lock.release();
}
Read, Modify, Write. The read, modify, write piggybacks on top of the lock calls.
ColumnPrefixDistributedRowLock lock = new ColumnPrefixDistributedRowLock(keyspace, columnFamily, "KeyBeingLocked");
MutationBatch m = keyspace.prepareMutationBatch();
try {
ColumnMap columns = lock.acquireLockAndReadRow();
m.withRow("KeyBeingLocked")
.putColumn("SomeColumnBeingUpdated", );
lock.releaseWithMutation(m);
}
catch (Exception e) {
lock.release();
}
- Author:
- elandau
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
LOCK_TIMEOUT
public static final int LOCK_TIMEOUT
- See Also:
- Constant Field Values
DEFAULT_OPERATION_TIMEOUT_UNITS
public static final TimeUnit DEFAULT_OPERATION_TIMEOUT_UNITS
DEFAULT_LOCK_PREFIX
public static final String DEFAULT_LOCK_PREFIX
- See Also:
- Constant Field Values
ColumnPrefixDistributedRowLock
public ColumnPrefixDistributedRowLock(Keyspace keyspace,
ColumnFamily<K,String> columnFamily,
K key)
withConsistencyLevel
public ColumnPrefixDistributedRowLock<K> withConsistencyLevel(ConsistencyLevel consistencyLevel)
- Modify the consistency level being used. Consistency should always be a
variant of quorum. The default is CL_QUORUM, which is OK for single
region. For multi region the consistency level should be CL_LOCAL_QUORUM.
CL_EACH_QUORUM can be used but will Incur substantial latency.
- Parameters:
consistencyLevel
-
- Returns:
withColumnPrefix
public ColumnPrefixDistributedRowLock<K> withColumnPrefix(String prefix)
- Specify the prefix that uniquely distinguishes the lock columns from data
column
- Parameters:
prefix
-
- Returns:
withDataColumns
public ColumnPrefixDistributedRowLock<K> withDataColumns(boolean flag)
- If true the first read will also fetch all the columns in the row as
opposed to just the lock columns.
- Parameters:
flag
-
- Returns:
withLockId
public ColumnPrefixDistributedRowLock<K> withLockId(String lockId)
- Override the autogenerated lock column.
- Parameters:
lockId
-
- Returns:
failOnStaleLock
public ColumnPrefixDistributedRowLock<K> failOnStaleLock(boolean failOnStaleLock)
- When set to true the operation will fail if a stale lock is detected
- Parameters:
failOnStaleLock
-
- Returns:
expireLockAfter
public ColumnPrefixDistributedRowLock<K> expireLockAfter(long timeout,
TimeUnit unit)
- Time for failed locks. Under normal circumstances the lock column will be
deleted. If not then this lock column will remain and the row will remain
locked. The lock will expire after this timeout.
- Parameters:
timeout
- unit
-
- Returns:
withTtl
public ColumnPrefixDistributedRowLock<K> withTtl(Integer ttl)
- This is the TTL on the lock column being written, as opposed to expireLockAfter which
is written as the lock column value. Whereas the expireLockAfter can be used to
identify a stale or abandoned lock the TTL will result in the stale or abandoned lock
being eventually deleted by cassandra. Set the TTL to a number that is much greater
tan the expireLockAfter time.
- Parameters:
ttl
-
- Returns:
withBackoff
public ColumnPrefixDistributedRowLock<K> withBackoff(RetryPolicy policy)
acquire
public void acquire()
throws Exception
- Try to take the lock. The caller must call .release() to properly clean up
the lock columns from cassandra
- Specified by:
acquire
in interface DistributedRowLock
- Throws:
Exception
acquireLockAndReadRow
public ColumnMap<String> acquireLockAndReadRow()
throws Exception
- Take the lock and return the row data columns. Use this, instead of acquire, when you
want to implement a read-modify-write scenario and want to reduce the number of calls
to Cassandra.
- Returns:
-
- Throws:
Exception
verifyLock
public void verifyLock(long curTimeInMicros)
throws Exception,
BusyLockException,
StaleLockException
- Verify that the lock was acquired. This shouldn't be called unless it's part of a recipe
built on top of ColumnPrefixDistributedRowLock.
- Parameters:
curTimeInMicros
-
- Throws:
BusyLockException
Exception
StaleLockException
release
public void release()
throws Exception
- Release the lock by releasing this and any other stale lock columns
- Specified by:
release
in interface DistributedRowLock
- Throws:
Exception
releaseWithMutation
public void releaseWithMutation(MutationBatch m)
throws Exception
- Release using the provided mutation. Use this when you want to commit actual data
when releasing the lock
- Parameters:
m
-
- Throws:
Exception
readLockColumns
public Map<String,Long> readLockColumns()
throws Exception
- Return a mapping of existing lock columns and their expiration times
- Returns:
-
- Throws:
Exception
releaseAllLocks
public Map<String,Long> releaseAllLocks()
throws Exception
- Release all locks. Use this carefully as it could release a lock for a
running operation.
- Returns:
-
- Throws:
Exception
releaseExpiredLocks
public Map<String,Long> releaseExpiredLocks()
throws Exception
- Release all expired locks for this key.
- Returns:
-
- Throws:
Exception
releaseLocks
public Map<String,Long> releaseLocks(boolean force)
throws Exception
- Delete locks columns. Set force=true to remove locks that haven't
expired yet.
This operation first issues a read to cassandra and then deletes columns
in the response.
- Parameters:
force
- - Force delete of non expired locks as well
- Returns:
-
- Throws:
Exception
fillLockMutation
public String fillLockMutation(MutationBatch m,
Long time,
Integer ttl)
- Fill a mutation with the lock column. This may be used when the mutation
is executed externally but should be used with extreme caution to ensure
the lock is properly released
- Parameters:
m
- time
- ttl
-
fillReleaseMutation
public void fillReleaseMutation(MutationBatch m,
boolean excludeCurrentLock)
- Fill a mutation that will release the locks. This may be used from a
separate recipe to release multiple locks.
- Parameters:
m
-
getDataColumns
public ColumnMap<String> getDataColumns()
getKey
public K getKey()
getKeyspace
public Keyspace getKeyspace()
getConsistencyLevel
public ConsistencyLevel getConsistencyLevel()
getLockColumn
public String getLockColumn()
getLockId
public String getLockId()
getPrefix
public String getPrefix()
getRetryCount
public int getRetryCount()
Copyright © 2012. All Rights Reserved.