Skip to content

Commit

Permalink
Add Spark function current_timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
acvictor committed Mar 26, 2024
1 parent 8f0adb1 commit 2d23b7b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 0 deletions.
25 changes: 25 additions & 0 deletions velox/functions/sparksql/DateTimeFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,31 @@ struct UnixDateFunction {
}
};

template <typename T>
struct CurrentTimestampFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);

FOLLY_ALWAYS_INLINE void initialize(
const std::vector<TypePtr>& /*inputTypes*/,
const core::QueryConfig& config) {
timeZone_ = getTimeZoneFromConfig(config);
}

FOLLY_ALWAYS_INLINE void call(out_type<Timestamp>& result) {
auto now = std::chrono::system_clock::now();
auto epoch = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch())
.count();
result = Timestamp::fromMicros(epoch);
if (timeZone_ != nullptr) {
result.toTimezone(*timeZone_);
}
}

private:
const date::time_zone* timeZone_ = nullptr;
};

template <typename T>
struct UnixTimestampFunction {
// unix_timestamp();
Expand Down
2 changes: 2 additions & 0 deletions velox/functions/sparksql/Register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ void registerFunctions(const std::string& prefix) {
registerFunction<FromUtcTimestampFunction, Timestamp, Timestamp, Varchar>(
{prefix + "from_utc_timestamp"});

registerFunction<CurrentTimestampFunction, Timestamp>({prefix + "current_timestamp"});

registerFunction<UnixDateFunction, int32_t, Date>({prefix + "unix_date"});

registerFunction<UnixTimestampFunction, int64_t>({prefix + "unix_timestamp"});
Expand Down
38 changes: 38 additions & 0 deletions velox/functions/sparksql/tests/DateTimeFunctionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/external/date/tz.h"
#include "velox/functions/sparksql/tests/SparkFunctionBaseTest.h"
#include "velox/type/Timestamp.h"
#include "velox/type/tz/TimeZoneMap.h"
Expand All @@ -41,6 +42,17 @@ class DateTimeFunctionsTest : public SparkFunctionBaseTest {
static constexpr int8_t kMinTinyint = std::numeric_limits<int8_t>::min();
static constexpr int8_t kMaxTinyint = std::numeric_limits<int8_t>::max();

int64_t getCurrentTimestamp(const std::optional<std::string>& timeZone) {
auto now = std::chrono::system_clock::now();
auto tp = timeZone.has_value()
? date::make_zoned(
timeZone.value(), now).get_local_time().time_since_epoch()
: now.time_since_epoch();
auto since_epoch = std::chrono::duration_cast<std::chrono::microseconds>(tp);
LOG(INFO) << since_epoch.count();
return since_epoch.count();
}

protected:
void setQueryTimeZone(const std::string& timeZone) {
queryCtx_->testingOverrideConfigUnsafe({
Expand Down Expand Up @@ -228,6 +240,32 @@ TEST_F(DateTimeFunctionsTest, unixDate) {
EXPECT_EQ(unixDate("-5877641-06-23"), kMin);
}

TEST_F(DateTimeFunctionsTest, currentTimestamp) {
auto emptyRowVector = makeRowVector(ROW({}), 1);

auto timestampBefore = getCurrentTimestamp(std::nullopt);
auto result = evaluateOnce<Timestamp>("current_timestamp()", emptyRowVector);
auto timestampAfter = getCurrentTimestamp(std::nullopt);

EXPECT_TRUE(result.has_value());
auto resultInInt = result.value().toMicros();
EXPECT_LE(timestampBefore, resultInInt);
EXPECT_LE(resultInInt, timestampAfter);
EXPECT_LE(timestampAfter - timestampBefore, 300);

auto tz = "America/Los_Angeles";
setQueryTimeZone(tz);
timestampBefore = getCurrentTimestamp(tz);
result = evaluateOnce<Timestamp>("current_timestamp()", emptyRowVector);
timestampAfter = getCurrentTimestamp(tz);

EXPECT_TRUE(result.has_value());
resultInInt = result.value().toMicros();
EXPECT_LE(timestampBefore, resultInInt);
EXPECT_LE(resultInInt, timestampAfter);
EXPECT_LE(timestampAfter - timestampBefore, 300);
}

TEST_F(DateTimeFunctionsTest, unixTimestamp) {
const auto unixTimestamp = [&](std::optional<StringView> dateStr) {
return evaluateOnce<int64_t>("unix_timestamp(c0)", dateStr);
Expand Down

0 comments on commit 2d23b7b

Please sign in to comment.