Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-unique keys as outputs of mapreduce when using a combiner #167

Open
saargolde opened this issue Apr 18, 2015 · 4 comments
Open

Non-unique keys as outputs of mapreduce when using a combiner #167

saargolde opened this issue Apr 18, 2015 · 4 comments
Assignees
Labels

Comments

@saargolde
Copy link

I'm trying to write a mapreduce function that counts the occurrence of different values in a data column.

My code is:

myOut <- mapreduce(
  input = inFile,
  input.format = make.input.format(
    format="csv", sep="|", comment.char="", quote="", stringsAsFactors=FALSE
  ),
  map = function(k, v) {

    conditionName <- factor(v[, 8])

    conditionCounts <- table(conditionName)

    return(keyval(key=names(conditionCounts), val = as.numeric(conditionCounts)))

  },
  reduce = function(k, vv) {
    #return(keyval(k, sum(vv)))
    return(keyval(k[1], sum(vv)))
  },
  #combine = function(k, vv) {
  #  return(keyval(k[1], sum(vv)))
  #},
  #combine = FALSE,
  combine = TRUE,
  backend.parameters = list(hadoop = list(
    D = "mapred.job.name=Count Conditions",
    D = "mapred.child.java.opts=-Xmx6666m",
    D = "mapreduce.map.java.opts=-Xmx6666m",
    D = "mapreduce.map.memory.mb=5555",
    D = "mapreduce.job.queuename=datascience"))
)

localOut <- from.dfs(myOut)
localOutDF <- as.data.frame(localOut, stringsAsFactors=FALSE)

print(dim(localOutDF))
print(length(keys(localOut)))
print(length(unique(keys(localOut))))

As a result, I'm getting non-unique keys coming out of the reduce phase...
Sample output from the above print commands:

[1] 1675    2
[1] 1675
[1] 284

(I know the right answer is 284).

Different ways I tried to make this work:

  • when combine=FALSE, I get the "right" answer
  • when I use the commented out combiner I still get the wrong answer.

An interesting thing about the wrong answer: the counts still sum up to what they should, they just don't really get "combined" enough. Meaning that I don't get duplicate entries coming back, I get partially aggregated answers back.

Not sure if I'm doing anything wrong...

Versions:

> IP <- installed.packages()
> IP[IP[, 1]=="rmr2", ]
                                                                                    Package
                                                                                     "rmr2"
                                                                                    LibPath
                                                               "/usr/local/lib64/R/library"
                                                                                    Version
                                                                                    "3.3.1"
                                                                                   Priority
                                                                                         NA
                                                                                    Depends
                                                                    "R (>= 2.6.0), methods"
                                                                                    Imports
"Rcpp, RJSONIO (>= 0.8-2), digest, functional, reshape2,\nstringr, plyr, caTools (>= 1.16)"
                                                                                  LinkingTo
                                                                                         NA
                                                                                   Suggests
                                            "quickcheck (>= 3.0.0), ravro, rhdfs, testthat"
                                                                                   Enhances
                                                                                         NA
                                                                                    License
                                                                  "Apache License (== 2.0)"
                                                                            License_is_FOSS
                                                                                         NA
                                                                      License_restricts_use
                                                                                         NA
                                                                                    OS_type
                                                                                         NA
                                                                                     MD5sum
                                                                                         NA
                                                                           NeedsCompilation
                                                                                         NA
                                                                                      Built
                                                                                    "3.1.2"

> R.Version()
$platform
[1] "x86_64-unknown-linux-gnu"

$arch
[1] "x86_64"

$os
[1] "linux-gnu"

$system
[1] "x86_64, linux-gnu"

$status
[1] ""

$major
[1] "3"

$minor
[1] "1.2"

$year
[1] "2014"

$month
[1] "10"

$day
[1] "31"

$`svn rev`
[1] "66913"

$language
[1] "R"

$version.string
[1] "R version 3.1.2 (2014-10-31)"

$nickname
[1] "Pumpkin Helmet"

And hadoop version:

[~]$ hadoop version
Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/home/tmp
Hadoop 2.4.0.2.1.5.0-695
Subversion [email protected]:hortonworks/hadoop.git -r [[xxx]]
Compiled by jenkins on 2014-08-28T03:10Z
Compiled with protoc 2.5.0
From source with checksum [[xxx]]
This command was run using /usr/lib/hadoop/hadoop-common-2.4.0.2.1.5.0-695.jar
@piccolbo piccolbo added the bug label Apr 20, 2015
@piccolbo piccolbo self-assigned this Apr 20, 2015
@piccolbo
Copy link
Collaborator

This looks like a bug. I am not sure why you are trying the two different reducers. The first argument of the reducer is always lenght one unless one is setting vectorized.reduce to TRUE. For a vector k[1] when k is of length one is not doing anything. Back to this problem. The only scenario I can think of that hadoop and R are ordering keys differently ... wait a second we have fixed that already. Maybe there is some version of that that involves a combiner and is not as fixed as I would like it to be. I would like you to perform the following experiment (unless you can share the data, in the meantime I will try to repro with synthetic data, no luck so far). The idea is to normalize the length of the keys to the maximum of possible lengths

ks = keys(localOut)
maxlen = max(sapply(ks, nchar))

Then in the map function:

return(
 keyval(
    sapply(
      names(z), 
      function(x)  paste0(x, paste0(rep("-", maxlen - nchar(x)), collapse = ""))) ...

Does that make sense? Thanks

@saargolde
Copy link
Author

Makes sense. Unfortunately I won't be able to share the data (not even the
keys only). I'll try your code tonight: both as reducer and as combiner.
One thing I do know is that done keys are very short (three characters) and
some are long (but not that long: about 50 characters, if I remember
correctly).
Thanks!

On Mon, Apr 20, 2015, 12:50 Antonio Piccolboni [email protected]
wrote:

This looks like a bug. I am not sure why you are trying the two different
reducers. The first argument of the reducer is always lenght one unless one
is setting vectorized.reduce to TRUE. For a vector k[1] when k is of length
one is not doing anything. Back to this problem. The only scenario I can
think of that hadoop and R are ordering keys differently ... wait a second
we have fixed that already. Maybe there is some version of that that
involves a combiner and is not as fixed as I would like it to be. I would
like you to perform the following experiment (unless you can share the
data, in the meantime I will try to repro with synthetic data, no luck so
far). The idea is to normalize the length of the keys to the maximum of
possible lengths

ks = keys(localOut)
maxlen = max(sapply(ks, nchar))

Then in the map function:

return(
keyval(
sapply(
names(z),
function(x) paste0(x, paste0(rep("-", maxlen - nchar(x)), collapse = ""))) ...

Does that make sense? Thanks


Reply to this email directly or view it on GitHub
#167 (comment)
.

@saargolde
Copy link
Author

The keys are now all of a similar length (56 characters). But the counts are off...
Unfortunately, this didn't fix it. In fact, it created even more duplication:

> length(keys(localOut))
[1] 3198
> length(unique(keys(localOut)))
[1] 284

Without a combiner everything works fine...
I've been staring into counters a lot recently, so this one caught my mind: without combiner (combine=FALSE) the number of reduce input group is:
Reduce input groups=284
When using the reducer as a combiner (combine=TRUE), I see the following count:
Reduce input groups=1035377
(which is not the actual number of keys I get out).
Not sure if this helps or detracts, but I thought it may be worth mentioning.

@piccolbo
Copy link
Collaborator

Since you can't share your data, this is what I propose. Let's duplicate the problem with variants of this program

library(rmr2)
library(quickcheck)
set.seed(0)
z = rcharacter(size = ~10^6)
anyDuplicated(keys(from.dfs(mapreduce(to.dfs(keyval(z,1)), reduce = function(k, vv) keyval(k, sum(vv)), combine = TRUE))))

Please let me know if I appear to have captured the essence of the problem you reported.
Please install quickcheck with devtools::install_github(repo = "quickcheck", username="RevolutionAnalytics", subdir = "pkg")

rcharacter has been changed is non compatible ways and is a lot faster, enough to support this kind of testing. I could not immediately reproduce your problem. You may want to play with the exact distribution of strings in the input, how many duplicated etc, to match your proprietary data set at least in some relevant statistics. Take a look at help(rcharacter) and please let me know if there are any problems.

I have had some trouble with my dev environment but I am now running this and I don't see any problems so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants