Skip to content

Commit

Permalink
[ISSUE-425] add shuffle store apis (#428)
Browse files Browse the repository at this point in the history
  • Loading branch information
qingwen220 authored Dec 12, 2024
1 parent 32b2344 commit b162ca6
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ public class ExecutionConfigKeys implements Serializable {
.defaultValue(5)
.description("cluster client exit wait time in seconds");

public static final ConfigKey SERVICE_DISCOVERY_TYPE = ConfigKeys
.key("geaflow.service.discovery.type")
.defaultValue("redis")
.description("service discovery type, e.g.[zookeeper, redis]");

public static final ConfigKey JOB_MODE = ConfigKeys
.key("geaflow.job.mode")
.defaultValue("compute")
.description("job mode, e.g.[compute, olap service]");

// ------------------------------------------------------------------------
// supervisor
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -365,21 +375,6 @@ public class ExecutionConfigKeys implements Serializable {

/** Shuffle common config. */

public static final ConfigKey SHUFFLE_IO_MAX_RETRIES = ConfigKeys
.key("geaflow.shuffle.io.max.retries")
.defaultValue(5)
.description("shuffle io max retry times");

public static final ConfigKey SHUFFLE_IO_RETRY_WAIT = ConfigKeys
.key("geaflow.shuffle.io.retry.wait.ms")
.defaultValue(500)
.description("time to wait in each shuffle io retry");

public static final ConfigKey SHUFFLE_MAX_BYTES_IN_FLIGHT = ConfigKeys
.key("geaflow.shuffle.max.bytes.inflight")
.defaultValue(128 * 1024 * 1024L)
.description("max number of bytes in flight");

public static final ConfigKey SHUFFLE_PREFETCH = ConfigKeys
.key("geaflow.shuffle.prefetch.enable")
.defaultValue(true)
Expand All @@ -392,20 +387,8 @@ public class ExecutionConfigKeys implements Serializable {

public static final ConfigKey SHUFFLE_COMPRESSION_ENABLE = ConfigKeys
.key("geaflow.shuffle.compression.enable")
.defaultValue(true)
.description("whether to enable shuffle compression");

public static final ConfigKey SHUFFLE_FORCE_MEMORY_ENABLE = ConfigKeys
.key("geaflow.shuffle.force.memory.enable")
.defaultValue(false)
.description("Under the default conditions, data is written to memory first, and if there is insufficient memory, "
+ "it is then written to disk; upon enabling this configuration, writing to memory is enforced.");

public static final ConfigKey SHUFFLE_FORCE_DISK_ENABLE = ConfigKeys
.key("geaflow.shuffle.force.disk.enable")
.defaultValue(false)
.description("Under the default conditions, data is written to memory first, and if there is insufficient memory, "
+ "it is then written to disk; upon enabling this configuration, writing to disk is enforced.");
.description("whether to enable shuffle compression");

public static final ConfigKey SHUFFLE_COMPRESSION_CODEC = ConfigKeys
.key("geaflow.shuffle.compression.codec")
Expand Down Expand Up @@ -532,14 +515,9 @@ public class ExecutionConfigKeys implements Serializable {

public static final ConfigKey SHUFFLE_STORAGE_TYPE = ConfigKeys
.key("geaflow.shuffle.storage.type")
.defaultValue(StorageLevel.disk)
.defaultValue(StorageLevel.MEMORY_AND_DISK)
.description("type of shuffle storage");

public static final ConfigKey SHUFFLE_OFFHEAP_MEMORY_FRACTION = ConfigKeys
.key("geaflow.shuffle.offheap.fraction")
.defaultValue(0.2)
.description("fraction of shuffle offheap memory");

public static final ConfigKey SHUFFLE_HEAP_MEMORY_FRACTION = ConfigKeys
.key("geaflow.shuffle.heap.memory.fraction")
.defaultValue(0.2)
Expand Down Expand Up @@ -615,19 +593,4 @@ public class ExecutionConfigKeys implements Serializable {
.defaultValue(false)
.description("if enable detail job metric");

public static final ConfigKey SERVICE_DISCOVERY_TYPE = ConfigKeys
.key("geaflow.service.discovery.type")
.defaultValue("redis")
.description("service discovery type");

public static final ConfigKey JOB_MODE = ConfigKeys
.key("geaflow.job.mode")
.defaultValue("compute")
.description("job mode, e.g.[compute, olap service, state service]");

public static final ConfigKey META_SERVER_RETRY_TIMES = ConfigKeys
.key("geaflow.meta.server.retry.times")
.defaultValue(3)
.description("meta server connect retry times");

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,24 @@
package com.antgroup.geaflow.common.shuffle;

/**
* shuffle/cache data storage level.
* Shuffle data storage level.
*/
public enum StorageLevel {

/** data stored on local disk. */
disk,
/**
* Shuffle data is stored in memory.
*/
MEMORY,

/**
* Shuffle data are stored on local disks.
*/
DISK,

/**
* Shuffle data is written to memory first, and if there is insufficient memory, it is then
* written to disk.
*/
MEMORY_AND_DISK

/** data stored on pangu. */
pangu
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class PrefetchMessageBuffer<T> implements IInputMessageBuffer<T> {
private final int edgeId;

public PrefetchMessageBuffer(String logTag, SliceId sliceId) {
this.slice = new SpillablePipelineSlice(logTag, sliceId, 1);
this.slice = new SpillablePipelineSlice(logTag, sliceId);
this.edgeId = sliceId.getEdgeId();
SliceManager.getInstance().register(sliceId, this.slice);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_FETCH_TIMEOUT_MS;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_FLUSH_BUFFER_SIZE_BYTES;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_FLUSH_BUFFER_TIMEOUT_MS;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_FORCE_DISK_ENABLE;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_FORCE_MEMORY_ENABLE;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_MEMORY_POOL_ENABLE;
import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.SHUFFLE_STORAGE_TYPE;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.shuffle.StorageLevel;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,9 +93,7 @@ public class ShuffleConfig {
private final int emitBufferSize;
private final int flushBufferSizeBytes;
private final int flushBufferTimeoutMs;
private final boolean forceMemory;
private final boolean forceDisk;

private final StorageLevel storageLevel;

private static ShuffleConfig INSTANCE;

Expand Down Expand Up @@ -147,8 +145,7 @@ private ShuffleConfig(Configuration config) {
this.emitBufferSize = config.getInteger(SHUFFLE_EMIT_BUFFER_SIZE);
this.flushBufferSizeBytes = config.getInteger(SHUFFLE_FLUSH_BUFFER_SIZE_BYTES);
this.flushBufferTimeoutMs = config.getInteger(SHUFFLE_FLUSH_BUFFER_TIMEOUT_MS);
this.forceMemory = config.getBoolean(SHUFFLE_FORCE_MEMORY_ENABLE);
this.forceDisk = config.getBoolean(SHUFFLE_FORCE_DISK_ENABLE);
this.storageLevel = StorageLevel.valueOf(config.getString(SHUFFLE_STORAGE_TYPE));

LOGGER.info("init shuffle config: {}", config);
}
Expand Down Expand Up @@ -259,12 +256,7 @@ public int getFlushBufferTimeoutMs() {
return this.flushBufferTimeoutMs;
}

public boolean isForceMemory() {
return this.forceMemory;
}

public boolean isForceDisk() {
return this.forceDisk;
public StorageLevel getStorageLevel() {
return this.storageLevel;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ public void release() {
INSTANCE = null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
import com.antgroup.geaflow.common.encoder.Encoders;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.iterator.CloseableIterator;
import com.antgroup.geaflow.common.shuffle.StorageLevel;
import com.antgroup.geaflow.shuffle.config.ShuffleConfig;
import com.antgroup.geaflow.shuffle.message.SliceId;
import com.antgroup.geaflow.shuffle.pipeline.buffer.OutBuffer;
import com.antgroup.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import com.antgroup.geaflow.shuffle.pipeline.buffer.ShuffleMemoryTracker;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import com.antgroup.geaflow.shuffle.storage.ShuffleStore;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,29 +38,32 @@ public class SpillablePipelineSlice extends AbstractSlice {

private static final Logger LOGGER = LoggerFactory.getLogger(SpillablePipelineSlice.class);

private static final int BUFFER_SIZE = 64 * 1024;

private final String fileName;
private final StorageLevel storageLevel;
private final ShuffleStore store;

private OutputStream outputStream;
private CloseableIterator<PipeBuffer> streamBufferIterator;
private PipeBuffer value;
private volatile boolean ready2read = false;

// Whether force write data to memory;
private final boolean forceMemory;
// Whether force write data to disk;
private final boolean forceDisk;
// Bytes count in memory.
private long memoryBytes = 0;
// Bytes count on disk.
private long diskBytes = 0;

public SpillablePipelineSlice(String taskLogTag, SliceId sliceId) {
this(taskLogTag, sliceId, 0);
}

public SpillablePipelineSlice(String taskLogTag, SliceId sliceId, int refCount) {
super(taskLogTag, sliceId, refCount);
this.fileName = String.format("shuffle-%d-%d-%d",

this.store = ShuffleStore.getShuffleStore(ShuffleConfig.getInstance().getStorageLevel());
String fileName = String.format("shuffle-%d-%d-%d",
sliceId.getPipelineId(), sliceId.getEdgeId(), sliceId.getSliceIndex());
this.forceMemory = ShuffleConfig.getInstance().isForceMemory();
this.forceDisk = ShuffleConfig.getInstance().isForceDisk();
this.fileName = store.getFilePath(fileName);
this.storageLevel = ShuffleConfig.getInstance().getStorageLevel();
}

public String getFileName() {
Expand All @@ -79,18 +81,19 @@ public boolean add(PipeBuffer buffer) {
}
totalBufferCount++;

if (this.forceMemory) {
if (this.storageLevel == StorageLevel.MEMORY) {
this.writeMemory(buffer);
return true;
}
if (this.forceDisk) {
this.writeDisk(buffer);

if (this.storageLevel == StorageLevel.DISK) {
this.writeStore(buffer);
return true;
}

this.writeMemory(buffer);
if (!ShuffleMemoryTracker.getInstance().checkMemoryEnough()) {
this.spillDisk();
this.spillWrite();
}
return true;
}
Expand All @@ -100,21 +103,21 @@ private void writeMemory(PipeBuffer buffer) {
this.memoryBytes += buffer.getBufferSize();
}

private void writeDisk(PipeBuffer buffer) {
private void writeStore(PipeBuffer buffer) {
try {
if (this.outputStream == null) {
this.outputStream = this.createTmpFile();
this.outputStream = store.getOutputStream(fileName);
}
this.write2Stream(buffer);
} catch (IOException e) {
throw new GeaflowRuntimeException(e);
}
}

private void spillDisk() {
private void spillWrite() {
try {
if (this.outputStream == null) {
this.outputStream = this.createTmpFile();
this.outputStream = store.getOutputStream(fileName);
}
while (!buffers.isEmpty()) {
PipeBuffer buffer = buffers.poll();
Expand All @@ -140,31 +143,11 @@ private void write2Stream(PipeBuffer buffer) throws IOException {
}
}

private OutputStream createTmpFile() {
try {
Path path = Paths.get(this.fileName);
Files.deleteIfExists(path);
Files.createFile(path);
return new BufferedOutputStream(Files.newOutputStream(path, StandardOpenOption.WRITE), BUFFER_SIZE);
} catch (IOException e) {
throw new GeaflowRuntimeException(e);
}
}

private InputStream readFromTmpFile() {
try {
Path path = Paths.get(this.fileName);
return new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ), BUFFER_SIZE);
} catch (IOException e) {
throw new GeaflowRuntimeException(e);
}
}

@Override
public void flush() {
if (this.outputStream != null) {
try {
spillDisk();
spillWrite();
this.outputStream.flush();
this.outputStream.close();
this.outputStream = null;
Expand Down Expand Up @@ -250,7 +233,7 @@ class FileStreamIterator implements CloseableIterator<PipeBuffer> {
private PipeBuffer next;

FileStreamIterator() {
this.inputStream = readFromTmpFile();
this.inputStream = store.getInputStream(fileName);
}

@Override
Expand Down
Loading

0 comments on commit b162ca6

Please sign in to comment.