forked from LLNL/libROM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ParallelBuffer.C
259 lines (237 loc) · 6.66 KB
/
ParallelBuffer.C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
/******************************************************************************
*
* Copyright (c) 2013-2019, Lawrence Livermore National Security, LLC
* and other libROM project developers. See the top-level COPYRIGHT
* file for details.
*
* SPDX-License-Identifier: (Apache-2.0 OR MIT)
*
*****************************************************************************/
// Description: A simple I/O stream class that intercepts output from an
// ostream and redirects the output as necessary for parallel
// I/O.
#include "ParallelBuffer.h"
#include "Utilities.h"
#include "mpi.h"
#include <string>
#include <cstring>
#include <cstdio>
namespace CAROM {
const int ParallelBuffer::DEFAULT_BUFFER_SIZE = 128;
/*
*************************************************************************
*
* Construct a parallel buffer object. The object will require further
* initialization to set up I/O streams and the prefix string.
*
*************************************************************************
*/
ParallelBuffer::ParallelBuffer()
{
int mpi_init;
MPI_Initialized(&mpi_init);
int rank;
if (mpi_init) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
}
else {
rank = 0;
}
d_prefix = "P=" + Utilities::processorToString(rank) + ":";
d_ostream = &std::cerr;
d_buffer = 0;
d_buffer_size = 0;
d_buffer_ptr = 0;
}
/*
*************************************************************************
*
* The destructor deallocates internal data buffer. It does not modify
* the output streams.
*
*************************************************************************
*/
ParallelBuffer::~ParallelBuffer()
{
if (d_buffer) {
delete[] d_buffer;
}
}
/*
*************************************************************************
*
* Write a text string of the specified length to the output stream.
* Note that the string data is accumulated into the internal output
* buffer until an end-of-line is detected.
*
*************************************************************************
*/
void
ParallelBuffer::outputString(
const std::string& text,
int length)
{
if (length > 0) {
/*
* If we need to allocate the internal buffer, then do so.
*/
if (!d_buffer) {
d_buffer = new char[DEFAULT_BUFFER_SIZE];
d_buffer_size = DEFAULT_BUFFER_SIZE;
d_buffer_ptr = 0;
}
/*
* If the buffer pointer is zero, then prepend the prefix.
*/
if (d_buffer_ptr == 0) {
copyToBuffer(d_prefix, static_cast<int>(d_prefix.length()));
}
/*
* Search for an end-of-line in the string.
*/
// const int eol_ptr = static_cast<int>(text.find('\n'));
int eol_ptr = 0;
for ( ; (eol_ptr < length) && (text[eol_ptr] != '\n'); eol_ptr++) {}
/*
* If no end-of-line is found, copy the entire text string but do not
* output. Otherwise copy the text string through the end-of-line,
* output, and recurse with the remainder of the text string if there are
* more characters in it.
*/
if (eol_ptr == length) {
copyToBuffer(text, length);
} else {
const int ncopy = eol_ptr + 1;
copyToBuffer(text, ncopy);
outputBuffer();
if (ncopy < length) {
outputString(text.substr(ncopy), length - ncopy);
}
}
}
}
/*
*************************************************************************
*
* Synchronize the parallel buffer and write string data. This routine
* is called from streambuf.
*
*************************************************************************
*/
int
ParallelBuffer::sync()
{
const int n = static_cast<int>(pptr() - pbase());
if (n > 0) {
outputString(pbase(), n);
}
return 0;
}
/*
*************************************************************************
*
* Write the specified number of characters into the output stream.
* This routine is called from streambuf. If this routine is not
* provided, then overflow() is called instead for each character.
*
* Note that this routine is not required; it only
* offers some efficiency over overflow().
*
*************************************************************************
*/
#if !defined(__INTEL_COMPILER) && (defined(__GNUG__))
std::streamsize
ParallelBuffer::xsputn(
const std::string& text,
std::streamsize n)
{
sync();
if (n > 0) outputString(text, static_cast<int>(n));
return n;
}
#endif
/*
*************************************************************************
*
* Write a single character into the parallel buffer. This routine is
* called from streambuf.
*
*************************************************************************
*/
int
ParallelBuffer::overflow(
int ch)
{
const int n = static_cast<int>(pptr() - pbase());
if (n && sync()) {
return EOF;
}
if (ch != EOF) {
char character[2];
character[0] = (char)ch;
character[1] = 0;
outputString(character, 1);
}
pbump(-n);
return 0;
}
/*
*************************************************************************
*
* Copy data from the text string into the internal output buffer.
* If the internal buffer is not large enough to hold all of the string
* data, then allocate a new internal buffer.
*
*************************************************************************
*/
void
ParallelBuffer::copyToBuffer(
const std::string& text,
int length)
{
/*
* First check whether we need to increase the size of the buffer
*/
if (d_buffer_ptr + length > d_buffer_size) {
int new_size;
if (d_buffer_ptr + length > 2 * d_buffer_size) {
new_size = d_buffer_ptr + length;
}
else {
new_size = 2 * d_buffer_size;
}
char* new_buffer = new char[new_size];
if (d_buffer_ptr > 0) {
(void)strncpy(new_buffer, d_buffer, d_buffer_ptr);
}
delete[] d_buffer;
d_buffer = new_buffer;
d_buffer_size = new_size;
}
CAROM_ASSERT(d_buffer_ptr + length <= d_buffer_size);
/*
* Copy data from the input into the internal buffer and increment pointer
*/
strncpy(d_buffer + d_buffer_ptr, text.c_str(), length);
d_buffer_ptr += length;
}
/*
*************************************************************************
*
* Output buffered stream data to the active output streams and reset
* the buffer pointer to its empty state.
*
*************************************************************************
*/
void
ParallelBuffer::outputBuffer()
{
if (d_buffer_ptr > 0) {
if (d_ostream) {
d_ostream->write(d_buffer, d_buffer_ptr);
d_ostream->flush();
}
d_buffer_ptr = 0;
}
}
}