Ignite Cache Store - 释放资源的方法

问题描述 投票:0回答:1

我使用HBase作为后端持久存储实现了Ignite缓存存储。 Cache Store的代码如下:

public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {

@IgniteInstanceResource
Ignite gridReference;

@CacheNameResource
private String cacheName;

@Override
public byte[] load(Long key) {

    String hbaseKey;

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());

            Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));

            Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);

            if (rowFetched == null || rowFetched.isEmpty()) {
                return null; // Can't return an empty array as Ignite will
                                // load the entry
            }

            return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                    TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);

        }

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
                        + " ] ");
    }

}

@Override
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {

    String hbaseKey;

    long startTime = System.currentTimeMillis();

    long numberOfKeysLoaded = 0l;

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);

            Get rowToBeFetched;

            Result rowFetched;

            for (Long key : keys) {

                hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());

                rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));

                rowFetched = bitDataPersistentTable.get(rowToBeFetched);

                cacheToBeLoaded.put(key,
                        rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                                TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));

                numberOfKeysLoaded++;

            }

            System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
                    + " ] took [ " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds ] ");

            return null;

        }

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                "Error while reading multiple keys for the cache [ " + cacheName + " ] ");
    }

}

@Override
public void write(Entry<? extends Long, ? extends byte[]> entry) {

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));

            rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                    TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());

            bitDataPersistentTable.put(rowToBeWritten);

        }

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
                        + " ] ");
    }

}

@Override
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {

    long startTime = System.currentTimeMillis();

    String hbaseKey;

    List<Put> rowsToBeWritten = new ArrayList<>();

    Put currentRowToBeWritten;

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {

                hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                        entryToBeInserted.getKey().toString());

                currentRowToBeWritten = new Put(hbaseKey.getBytes());

                currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
                        entryToBeInserted.getValue());

                rowsToBeWritten.add(currentRowToBeWritten);

            }

            bitDataPersistentTable.put(rowsToBeWritten);

        }

        System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
                + " ] is " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds");

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                "Error while writing multiple keys for the cache [ " + cacheName + " ] ");
    }

}

@Override
public void delete(Object key) {

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));

            bitDataPersistentTable.delete(rowToBeDeleted);

        }

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
                        + " ] ");
    }

}

@Override
public void deleteAll(Collection<?> keys) {

    String hbaseKey;

    List<Delete> rowsToBeDeleted = new ArrayList<>();

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {

        try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {

            for (Object keyToBeDeleted : keys) {

                hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                        keyToBeDeleted.toString());

                rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));

            }

            bitDataPersistentTable.delete(rowsToBeDeleted);

        }

    } catch (IOException e) {
        throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                "Error while deleting entries for the cache [ " + cacheName + " ] ");
    }

}

@Override
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
    // No implementation provided
}

@Override
public void sessionEnd(boolean commit) {
    // No implementation provided
}

}

缓存模式为PARTITIONED。

缓存原子性模式是ATOMIC。

从商店实现中可以明显看出,我在每个实现的方法中产生了与HBase的新连接。

我想知道是否有任何方法或方法可以在更多宏级别上更多地控制打开和关闭我的数据源特定资源(在本例中为HBase连接),而不是在每次方法调用时执行它。

caching ignite gridgain
1个回答
2
投票

您需要在商店中使用连接池。看看c3p0

© www.soinside.com 2019 - 2024. All rights reserved.