Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single Snapshot on Root with RollBack #64

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions schema/schema.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
CREATE DATABASE IF NOT EXISTS `hop_pushparaj`;
USE `hop_pushparaj`;

delimiter $$

CREATE TABLE `hdfs_block_infos` (
Expand All @@ -10,6 +13,7 @@ CREATE TABLE `hdfs_block_infos` (
`time_stamp` bigint(20) DEFAULT NULL,
`primary_node_index` int(11) DEFAULT NULL,
`block_recovery_id` bigint(20) DEFAULT NULL,
`status` int(11) DEFAULT 1,
PRIMARY KEY (`inode_id`,`block_id`)
) ENGINE=ndbcluster DEFAULT CHARSET=latin1
/*!50100 PARTITION BY KEY (inode_id) */$$
Expand Down Expand Up @@ -57,6 +61,7 @@ CREATE TABLE `hdfs_inode_attributes` (
`dsquota` bigint(20) DEFAULT NULL,
`nscount` bigint(20) DEFAULT NULL,
`diskspace` bigint(20) DEFAULT NULL,
`status` int(11) DEFAULT 1,
PRIMARY KEY (`inodeId`)
) ENGINE=ndbcluster DEFAULT CHARSET=latin1$$

Expand All @@ -82,6 +87,8 @@ CREATE TABLE `hdfs_inodes` (
`subtree_lock_owner` bigint(20) DEFAULT NULL,
`meta_enabled` bit(8) DEFAULT b'110000',
`size` bigint(20) NOT NULL DEFAULT '0',
`isdeleted` int(11) DEFAULT 0,
`status` int(11) DEFAULT 1,
PRIMARY KEY (`parent_id`,`name`),
KEY `pidex` (`parent_id`),
KEY `inode_idx` (`id`)
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/hops/metadata/ndb/NdbStorageFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import io.hops.metadata.hdfs.dal.StorageIdMapDataAccess;
import io.hops.metadata.hdfs.dal.UnderReplicatedBlockDataAccess;
import io.hops.metadata.hdfs.dal.VariableDataAccess;
import io.hops.metadata.rollBack.dal.RollBackAccess;
import io.hops.metadata.ndb.dalimpl.rollBack.RollBackImpl;
import io.hops.metadata.ndb.dalimpl.election.HdfsLeaderClusterj;
import io.hops.metadata.ndb.dalimpl.election.YarnLeaderClusterj;
import io.hops.metadata.ndb.dalimpl.hdfs.AccessTimeLogClusterj;
Expand Down Expand Up @@ -293,6 +295,7 @@ private void initDataAccessMap() {
.put(NextHeartbeatDataAccess.class, new NextHeartbeatClusterJ());
dataAccessMap.put(RMLoadDataAccess.class, new RMLoadClusterJ());
dataAccessMap.put(FullRMNodeDataAccess.class, new FullRMNodeClusterJ());
dataAccessMap.put(RollBackAccess.class,new RollBackImpl());
dataAccessMap.put(MetadataLogDataAccess.class, new MetadataLogClusterj());
dataAccessMap.put(AccessTimeLogDataAccess.class,
new AccessTimeLogClusterj());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ public interface BlockInfoDTO {
long getBlockRecoveryId();

void setBlockRecoveryId(long recoveryId);
}
private ClusterjConnector connector = ClusterjConnector.getInstance();
@Column(name=STATUS)

int getStatus();

void setStatus(int status);
}
private ClusterjConnector connector = ClusterjConnector.getInstance();
private final static int NOT_FOUND_ROW = -1000;

@Override
Expand Down Expand Up @@ -323,7 +328,7 @@ private BlockInfo createBlockInfo(BlockInfoClusterj.BlockInfoDTO bDTO) {
new BlockInfo(bDTO.getBlockId(), bDTO.getBlockIndex(),
bDTO.getINodeId(), bDTO.getNumBytes(), bDTO.getGenerationStamp(),
bDTO.getBlockUCState(), bDTO.getTimestamp(),
bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId());
bDTO.getPrimaryNodeIndex(), bDTO.getBlockRecoveryId(),bDTO.getStatus());
return hopBlockInfo;
}

Expand All @@ -338,5 +343,6 @@ private void createPersistable(BlockInfo block,
persistable.setBlockUCState(block.getBlockUCState());
persistable.setPrimaryNodeIndex(block.getPrimaryNodeIndex());
persistable.setBlockRecoveryId(block.getBlockRecoveryId());
persistable.setStatus(block.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ public interface INodeAttributesDTO {
long getDiskspace();

void setDiskspace(long diskspace);
}

@Column(name = STATUS)
int getStatus();

void setStatus(int status);
}
private ClusterjConnector connector = ClusterjConnector.getInstance();

@Override
Expand Down Expand Up @@ -141,6 +145,7 @@ private INodeAttributesDTO createPersistable(INodeAttributes attribute,
dto.setNSCount(attribute.getNsCount());
dto.setDSQuota(attribute.getDsQuota());
dto.setDiskspace(attribute.getDiskspace());
dto.setStatus(attribute.getStatus());
return dto;
}

Expand All @@ -150,7 +155,7 @@ private INodeAttributes makeINodeAttributes(INodeAttributesDTO dto) {
}
INodeAttributes iNodeAttributes =
new INodeAttributes(dto.getId(), dto.getNSQuota(), dto.getNSCount(),
dto.getDSQuota(), dto.getDiskspace());
dto.getDSQuota(), dto.getDiskspace(),dto.getStatus());
return iNodeAttributes;
}
}
87 changes: 81 additions & 6 deletions src/main/java/io/hops/metadata/ndb/dalimpl/hdfs/INodeClusterj.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.hops.metadata.hdfs.entity.INode;
import io.hops.metadata.hdfs.entity.INodeIdentifier;
import io.hops.metadata.hdfs.entity.ProjectedINode;
import io.hops.metadata.hdfs.snapshots.SnapShotConstants;
import io.hops.metadata.ndb.ClusterjConnector;
import io.hops.metadata.ndb.NdbBoolean;
import io.hops.metadata.ndb.mysqlserver.HopsSQLExceptionHelper;
Expand Down Expand Up @@ -157,6 +158,16 @@ public interface InodeDTO {
long getSize();

void setSize(long size);

@Column(name = ISDELETED)
int getIsDeleted();

void setIsDeleted(int isdeleted);

@Column(name = STATUS)
int getStatus();

void setStatus(int Status);
}

private ClusterjConnector connector = ClusterjConnector.getInstance();
Expand Down Expand Up @@ -236,7 +247,7 @@ public List<INode> indexScanFindInodesByParentId(int parentId)
qb.createQueryDefinition(InodeDTO.class);
HopsPredicate pred1 =
dobj.get("parentId").equal(dobj.param("parentIDParam"));
dobj.where(pred1);
dobj.where(pred1);
HopsQuery<InodeDTO> query = session.createQuery(dobj);
query.setParameter("parentIDParam", parentId);

Expand All @@ -245,14 +256,74 @@ public List<INode> indexScanFindInodesByParentId(int parentId)
session.release(results);
return inodeList;
}


@Override
public List<INode> indexScanFindInodesByParentIdIncludeDeletes(int parentId) throws StorageException {
try {

HopsSession session = connector.obtainSession();
HopsQueryBuilder qb = session.getQueryBuilder();
HopsQueryDomainType<InodeDTO> dobj = qb.createQueryDefinition(InodeDTO.class);
HopsPredicate pred1 = dobj.get("parentId").equal(dobj.param("parentIDParam1"));
HopsPredicate pred2 = dobj.get("parentId").equal(dobj.param("parentIDParam2"));
HopsPredicate pred3 = dobj.get("isdeleted").equal(dobj.param("isdeletedParam"));
dobj.where(pred1.or(pred2.and(pred3)));

HopsQuery<InodeDTO> query = session.createQuery(dobj);
query.setParameter("parentIDParam1", parentId);
query.setParameter("parentIDParam2", -parentId);
query.setParameter("parentIDParam3", 1);

List<InodeDTO> results = query.getResultList();
explain(query);
return createInodeList(results);
} catch (Exception e) {
throw new StorageException(e);
}
}
@Override
public List<ProjectedINode> findInodesByParentIdForSubTreeOpsWithReadLockIncludeDeletes(
int parentId) throws StorageException {
final String query = String.format(
"SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d or (%s=%d and %s=%d) LOCK IN SHARE MODE",
ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED,
UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER,STATUS,ISDELETED,TABLE_NAME,
PARENT_ID,PARENT_ID,ISDELETED, parentId,-parentId,SnapShotConstants.isDeleted);
ArrayList<ProjectedINode> resultList;
try {
Connection conn = mysqlConnector.obtainSession();
PreparedStatement s = conn.prepareStatement(query);
ResultSet result = s.executeQuery();
resultList = new ArrayList<ProjectedINode>();

while (result.next()) {
resultList.add(
new ProjectedINode(result.getInt(ID), result.getInt(PARENT_ID),
result.getString(NAME), result.getBytes(PERMISSION),
result.getLong(HEADER),
result.getString(SYMLINK) == null ? false : true,
result.getBoolean(QUOTA_ENABLED),
result.getBoolean(UNDER_CONSTRUCTION),
result.getBoolean(SUBTREE_LOCKED),
result.getLong(SUBTREE_LOCK_OWNER),
result.getLong(SIZE),
result.getInt(ISDELETED),
result.getInt(STATUS)));
}
} catch (SQLException ex) {
throw HopsSQLExceptionHelper.wrap(ex);
} finally {
mysqlConnector.closeSession();
}
return resultList;
}
@Override
public List<ProjectedINode> findInodesForSubtreeOperationsWithWriteLock(
int parentId) throws StorageException {
final String query = String.format(
"SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s FROM %s WHERE %s=%d FOR UPDATE ",
"SELECT %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s,%s FROM %s WHERE %s=%d FOR UPDATE ",
ID, NAME, PARENT_ID, PERMISSION, HEADER, SYMLINK, QUOTA_ENABLED,
UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER, SIZE, TABLE_NAME,
UNDER_CONSTRUCTION, SUBTREE_LOCKED, SUBTREE_LOCK_OWNER, SIZE,ISDELETED,STATUS, TABLE_NAME,
PARENT_ID, parentId);
ArrayList<ProjectedINode> resultList;
try {
Expand All @@ -271,7 +342,9 @@ public List<ProjectedINode> findInodesForSubtreeOperationsWithWriteLock(
result.getBoolean(UNDER_CONSTRUCTION),
result.getBoolean(SUBTREE_LOCKED),
result.getLong(SUBTREE_LOCK_OWNER),
result.getLong(SIZE)));
result.getLong(SIZE),
result.getInt(ISDELETED),
result.getInt(STATUS)));
}
} catch (SQLException ex) {
throw HopsSQLExceptionHelper.wrap(ex);
Expand Down Expand Up @@ -433,7 +506,7 @@ private INode createInode(InodeDTO persistable) {
NdbBoolean.convert(persistable.getSubtreeLocked()),
persistable.getSubtreeLockOwner(),
NdbBoolean.convert(persistable.getMetaEnabled()),
persistable.getSize());
persistable.getSize(), persistable.getIsDeleted(),persistable.getStatus());
return node;
}

Expand All @@ -456,6 +529,8 @@ private void createPersistable(INode inode, InodeDTO persistable) {
persistable.setSubtreeLockOwner(inode.getSubtreeLockOwner());
persistable.setMetaEnabled(NdbBoolean.convert(inode.isMetaEnabled()));
persistable.setSize(inode.getSize());
persistable.setIsDeleted(inode.getIsDeleted());
persistable.setStatus(inode.getStatus());
}

private void explain(HopsQuery<InodeDTO> query) {
Expand Down
Loading