| #! /usr/bin/env python |
| # |
| # Protocol Buffers - Google's data interchange format |
| # Copyright 2008 Google Inc. All rights reserved. |
| # https://developers.google.com/protocol-buffers/ |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """Test for google.protobuf.internal.well_known_types.""" |
| |
| __author__ = 'jieluo@google.com (Jie Luo)' |
| |
| from datetime import datetime |
| |
| try: |
| import unittest2 as unittest #PY26 |
| except ImportError: |
| import unittest |
| |
| from google.protobuf import any_pb2 |
| from google.protobuf import duration_pb2 |
| from google.protobuf import field_mask_pb2 |
| from google.protobuf import struct_pb2 |
| from google.protobuf import timestamp_pb2 |
| from google.protobuf import unittest_pb2 |
| from google.protobuf.internal import any_test_pb2 |
| from google.protobuf.internal import test_util |
| from google.protobuf.internal import well_known_types |
| from google.protobuf import descriptor |
| from google.protobuf import text_format |
| |
| |
| class TimeUtilTestBase(unittest.TestCase): |
| |
| def CheckTimestampConversion(self, message, text): |
| self.assertEqual(text, message.ToJsonString()) |
| parsed_message = timestamp_pb2.Timestamp() |
| parsed_message.FromJsonString(text) |
| self.assertEqual(message, parsed_message) |
| |
| def CheckDurationConversion(self, message, text): |
| self.assertEqual(text, message.ToJsonString()) |
| parsed_message = duration_pb2.Duration() |
| parsed_message.FromJsonString(text) |
| self.assertEqual(message, parsed_message) |
| |
| |
| class TimeUtilTest(TimeUtilTestBase): |
| |
| def testTimestampSerializeAndParse(self): |
| message = timestamp_pb2.Timestamp() |
| # Generated output should contain 3, 6, or 9 fractional digits. |
| message.seconds = 0 |
| message.nanos = 0 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00Z') |
| message.nanos = 10000000 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.010Z') |
| message.nanos = 10000 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000010Z') |
| message.nanos = 10 |
| self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000000010Z') |
| # Test min timestamps. |
| message.seconds = -62135596800 |
| message.nanos = 0 |
| self.CheckTimestampConversion(message, '0001-01-01T00:00:00Z') |
| # Test max timestamps. |
| message.seconds = 253402300799 |
| message.nanos = 999999999 |
| self.CheckTimestampConversion(message, '9999-12-31T23:59:59.999999999Z') |
| # Test negative timestamps. |
| message.seconds = -1 |
| self.CheckTimestampConversion(message, '1969-12-31T23:59:59.999999999Z') |
| |
| # Parsing accepts an fractional digits as long as they fit into nano |
| # precision. |
| message.FromJsonString('1970-01-01T00:00:00.1Z') |
| self.assertEqual(0, message.seconds) |
| self.assertEqual(100000000, message.nanos) |
| # Parsing accpets offsets. |
| message.FromJsonString('1970-01-01T00:00:00-08:00') |
| self.assertEqual(8 * 3600, message.seconds) |
| self.assertEqual(0, message.nanos) |
| |
| def testDurationSerializeAndParse(self): |
| message = duration_pb2.Duration() |
| # Generated output should contain 3, 6, or 9 fractional digits. |
| message.seconds = 0 |
| message.nanos = 0 |
| self.CheckDurationConversion(message, '0s') |
| message.nanos = 10000000 |
| self.CheckDurationConversion(message, '0.010s') |
| message.nanos = 10000 |
| self.CheckDurationConversion(message, '0.000010s') |
| message.nanos = 10 |
| self.CheckDurationConversion(message, '0.000000010s') |
| |
| # Test min and max |
| message.seconds = 315576000000 |
| message.nanos = 999999999 |
| self.CheckDurationConversion(message, '315576000000.999999999s') |
| message.seconds = -315576000000 |
| message.nanos = -999999999 |
| self.CheckDurationConversion(message, '-315576000000.999999999s') |
| |
| # Parsing accepts an fractional digits as long as they fit into nano |
| # precision. |
| message.FromJsonString('0.1s') |
| self.assertEqual(100000000, message.nanos) |
| message.FromJsonString('0.0000001s') |
| self.assertEqual(100, message.nanos) |
| |
| def testTimestampIntegerConversion(self): |
| message = timestamp_pb2.Timestamp() |
| message.FromNanoseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.000000001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToNanoseconds()) |
| |
| message.FromNanoseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999999999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToNanoseconds()) |
| |
| message.FromMicroseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.000001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| message.FromMicroseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| message.FromMilliseconds(1) |
| self.assertEqual('1970-01-01T00:00:00.001Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMilliseconds()) |
| |
| message.FromMilliseconds(-1) |
| self.assertEqual('1969-12-31T23:59:59.999Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMilliseconds()) |
| |
| message.FromSeconds(1) |
| self.assertEqual('1970-01-01T00:00:01Z', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToSeconds()) |
| |
| message.FromSeconds(-1) |
| self.assertEqual('1969-12-31T23:59:59Z', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToSeconds()) |
| |
| message.FromNanoseconds(1999) |
| self.assertEqual(1, message.ToMicroseconds()) |
| # For negative values, Timestamp will be rounded down. |
| # For example, "1969-12-31T23:59:59.5Z" (i.e., -0.5s) rounded to seconds |
| # will be "1969-12-31T23:59:59Z" (i.e., -1s) rather than |
| # "1970-01-01T00:00:00Z" (i.e., 0s). |
| message.FromNanoseconds(-1999) |
| self.assertEqual(-2, message.ToMicroseconds()) |
| |
| def testDurationIntegerConversion(self): |
| message = duration_pb2.Duration() |
| message.FromNanoseconds(1) |
| self.assertEqual('0.000000001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToNanoseconds()) |
| |
| message.FromNanoseconds(-1) |
| self.assertEqual('-0.000000001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToNanoseconds()) |
| |
| message.FromMicroseconds(1) |
| self.assertEqual('0.000001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| message.FromMicroseconds(-1) |
| self.assertEqual('-0.000001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| message.FromMilliseconds(1) |
| self.assertEqual('0.001s', |
| message.ToJsonString()) |
| self.assertEqual(1, message.ToMilliseconds()) |
| |
| message.FromMilliseconds(-1) |
| self.assertEqual('-0.001s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToMilliseconds()) |
| |
| message.FromSeconds(1) |
| self.assertEqual('1s', message.ToJsonString()) |
| self.assertEqual(1, message.ToSeconds()) |
| |
| message.FromSeconds(-1) |
| self.assertEqual('-1s', |
| message.ToJsonString()) |
| self.assertEqual(-1, message.ToSeconds()) |
| |
| # Test truncation behavior. |
| message.FromNanoseconds(1999) |
| self.assertEqual(1, message.ToMicroseconds()) |
| |
| # For negative values, Duration will be rounded towards 0. |
| message.FromNanoseconds(-1999) |
| self.assertEqual(-1, message.ToMicroseconds()) |
| |
| def testDatetimeConverison(self): |
| message = timestamp_pb2.Timestamp() |
| dt = datetime(1970, 1, 1) |
| message.FromDatetime(dt) |
| self.assertEqual(dt, message.ToDatetime()) |
| |
| message.FromMilliseconds(1999) |
| self.assertEqual(datetime(1970, 1, 1, 0, 0, 1, 999000), |
| message.ToDatetime()) |
| |
| def testTimedeltaConversion(self): |
| message = duration_pb2.Duration() |
| message.FromNanoseconds(1999999999) |
| td = message.ToTimedelta() |
| self.assertEqual(1, td.seconds) |
| self.assertEqual(999999, td.microseconds) |
| |
| message.FromNanoseconds(-1999999999) |
| td = message.ToTimedelta() |
| self.assertEqual(-1, td.days) |
| self.assertEqual(86398, td.seconds) |
| self.assertEqual(1, td.microseconds) |
| |
| message.FromMicroseconds(-1) |
| td = message.ToTimedelta() |
| self.assertEqual(-1, td.days) |
| self.assertEqual(86399, td.seconds) |
| self.assertEqual(999999, td.microseconds) |
| converted_message = duration_pb2.Duration() |
| converted_message.FromTimedelta(td) |
| self.assertEqual(message, converted_message) |
| |
| def testInvalidTimestamp(self): |
| message = timestamp_pb2.Timestamp() |
| self.assertRaisesRegexp( |
| ValueError, |
| 'time data \'10000-01-01T00:00:00\' does not match' |
| ' format \'%Y-%m-%dT%H:%M:%S\'', |
| message.FromJsonString, '10000-01-01T00:00:00.00Z') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'nanos 0123456789012 more than 9 fractional digits.', |
| message.FromJsonString, |
| '1970-01-01T00:00:00.0123456789012Z') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| (r'Invalid timezone offset value: \+08.'), |
| message.FromJsonString, |
| '1972-01-01T01:00:00.01+08',) |
| self.assertRaisesRegexp( |
| ValueError, |
| 'year is out of range', |
| message.FromJsonString, |
| '0000-01-01T00:00:00Z') |
| message.seconds = 253402300800 |
| self.assertRaisesRegexp( |
| OverflowError, |
| 'date value out of range', |
| message.ToJsonString) |
| |
| def testInvalidDuration(self): |
| message = duration_pb2.Duration() |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Duration must end with letter "s": 1.', |
| message.FromJsonString, '1') |
| self.assertRaisesRegexp( |
| well_known_types.ParseError, |
| 'Couldn\'t parse duration: 1...2s.', |
| message.FromJsonString, '1...2s') |
| |
| |
| class FieldMaskTest(unittest.TestCase): |
| |
| def testStringFormat(self): |
| mask = field_mask_pb2.FieldMask() |
| self.assertEqual('', mask.ToJsonString()) |
| mask.paths.append('foo') |
| self.assertEqual('foo', mask.ToJsonString()) |
| mask.paths.append('bar') |
| self.assertEqual('foo,bar', mask.ToJsonString()) |
| |
| mask.FromJsonString('') |
| self.assertEqual('', mask.ToJsonString()) |
| mask.FromJsonString('foo') |
| self.assertEqual(['foo'], mask.paths) |
| mask.FromJsonString('foo,bar') |
| self.assertEqual(['foo', 'bar'], mask.paths) |
| |
| def testDescriptorToFieldMask(self): |
| mask = field_mask_pb2.FieldMask() |
| msg_descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR |
| mask.AllFieldsFromDescriptor(msg_descriptor) |
| self.assertEqual(75, len(mask.paths)) |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| for field in msg_descriptor.fields: |
| self.assertTrue(field.name in mask.paths) |
| mask.paths.append('optional_nested_message.bb') |
| self.assertTrue(mask.IsValidForDescriptor(msg_descriptor)) |
| mask.paths.append('repeated_nested_message.bb') |
| self.assertFalse(mask.IsValidForDescriptor(msg_descriptor)) |
| |
| def testCanonicalFrom(self): |
| mask = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| # Paths will be sorted. |
| mask.FromJsonString('baz.quz,bar,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,baz.quz,foo', out_mask.ToJsonString()) |
| # Duplicated paths will be removed. |
| mask.FromJsonString('foo,bar,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,foo', out_mask.ToJsonString()) |
| # Sub-paths of other paths will be removed. |
| mask.FromJsonString('foo.b1,bar.b1,foo.b2,bar') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('bar,foo.b1,foo.b2', out_mask.ToJsonString()) |
| |
| # Test more deeply nested cases. |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2.quz,foo.bar.baz2') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar.baz1,foo.bar.baz2', |
| out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar.baz1,foo.bar.baz2', |
| out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo.bar') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo.bar', out_mask.ToJsonString()) |
| mask.FromJsonString( |
| 'foo.bar.baz1,foo.bar.baz2,foo.bar.baz2.quz,foo') |
| out_mask.CanonicalFormFromMask(mask) |
| self.assertEqual('foo', out_mask.ToJsonString()) |
| |
| def testUnion(self): |
| mask1 = field_mask_pb2.FieldMask() |
| mask2 = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| mask1.FromJsonString('foo,baz') |
| mask2.FromJsonString('bar,quz') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('bar,baz,foo,quz', out_mask.ToJsonString()) |
| # Overlap with duplicated paths. |
| mask1.FromJsonString('foo,baz.bb') |
| mask2.FromJsonString('baz.bb,quz') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('baz.bb,foo,quz', out_mask.ToJsonString()) |
| # Overlap with paths covering some other paths. |
| mask1.FromJsonString('foo.bar.baz,quz') |
| mask2.FromJsonString('foo.bar,bar') |
| out_mask.Union(mask1, mask2) |
| self.assertEqual('bar,foo.bar,quz', out_mask.ToJsonString()) |
| |
| def testIntersect(self): |
| mask1 = field_mask_pb2.FieldMask() |
| mask2 = field_mask_pb2.FieldMask() |
| out_mask = field_mask_pb2.FieldMask() |
| # Test cases without overlapping. |
| mask1.FromJsonString('foo,baz') |
| mask2.FromJsonString('bar,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('', out_mask.ToJsonString()) |
| # Overlap with duplicated paths. |
| mask1.FromJsonString('foo,baz.bb') |
| mask2.FromJsonString('baz.bb,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('baz.bb', out_mask.ToJsonString()) |
| # Overlap with paths covering some other paths. |
| mask1.FromJsonString('foo.bar.baz,quz') |
| mask2.FromJsonString('foo.bar,bar') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) |
| mask1.FromJsonString('foo.bar,bar') |
| mask2.FromJsonString('foo.bar.baz,quz') |
| out_mask.Intersect(mask1, mask2) |
| self.assertEqual('foo.bar.baz', out_mask.ToJsonString()) |
| |
| def testMergeMessage(self): |
| # Test merge one field. |
| src = unittest_pb2.TestAllTypes() |
| test_util.SetAllFields(src) |
| for field in src.DESCRIPTOR.fields: |
| if field.containing_oneof: |
| continue |
| field_name = field.name |
| dst = unittest_pb2.TestAllTypes() |
| # Only set one path to mask. |
| mask = field_mask_pb2.FieldMask() |
| mask.paths.append(field_name) |
| mask.MergeMessage(src, dst) |
| # The expected result message. |
| msg = unittest_pb2.TestAllTypes() |
| if field.label == descriptor.FieldDescriptor.LABEL_REPEATED: |
| repeated_src = getattr(src, field_name) |
| repeated_msg = getattr(msg, field_name) |
| if field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: |
| for item in repeated_src: |
| repeated_msg.add().CopyFrom(item) |
| else: |
| repeated_msg.extend(repeated_src) |
| elif field.cpp_type == descriptor.FieldDescriptor.CPPTYPE_MESSAGE: |
| getattr(msg, field_name).CopyFrom(getattr(src, field_name)) |
| else: |
| setattr(msg, field_name, getattr(src, field_name)) |
| # Only field specified in mask is merged. |
| self.assertEqual(msg, dst) |
| |
| # Test merge nested fields. |
| nested_src = unittest_pb2.NestedTestAllTypes() |
| nested_dst = unittest_pb2.NestedTestAllTypes() |
| nested_src.child.payload.optional_int32 = 1234 |
| nested_src.child.child.payload.optional_int32 = 5678 |
| mask = field_mask_pb2.FieldMask() |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(0, nested_dst.child.child.payload.optional_int32) |
| |
| mask.FromJsonString('child.child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| nested_dst.Clear() |
| mask.FromJsonString('child.child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(0, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| nested_dst.Clear() |
| mask.FromJsonString('child') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(5678, nested_dst.child.child.payload.optional_int32) |
| |
| # Test MergeOptions. |
| nested_dst.Clear() |
| nested_dst.child.payload.optional_int64 = 4321 |
| # Message fields will be merged by default. |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(4321, nested_dst.child.payload.optional_int64) |
| # Change the behavior to replace message fields. |
| mask.FromJsonString('child.payload') |
| mask.MergeMessage(nested_src, nested_dst, True, False) |
| self.assertEqual(1234, nested_dst.child.payload.optional_int32) |
| self.assertEqual(0, nested_dst.child.payload.optional_int64) |
| |
| # By default, fields missing in source are not cleared in destination. |
| nested_dst.payload.optional_int32 = 1234 |
| self.assertTrue(nested_dst.HasField('payload')) |
| mask.FromJsonString('payload') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertTrue(nested_dst.HasField('payload')) |
| # But they are cleared when replacing message fields. |
| nested_dst.Clear() |
| nested_dst.payload.optional_int32 = 1234 |
| mask.FromJsonString('payload') |
| mask.MergeMessage(nested_src, nested_dst, True, False) |
| self.assertFalse(nested_dst.HasField('payload')) |
| |
| nested_src.payload.repeated_int32.append(1234) |
| nested_dst.payload.repeated_int32.append(5678) |
| # Repeated fields will be appended by default. |
| mask.FromJsonString('payload.repeated_int32') |
| mask.MergeMessage(nested_src, nested_dst) |
| self.assertEqual(2, len(nested_dst.payload.repeated_int32)) |
| self.assertEqual(5678, nested_dst.payload.repeated_int32[0]) |
| self.assertEqual(1234, nested_dst.payload.repeated_int32[1]) |
| # Change the behavior to replace repeated fields. |
| mask.FromJsonString('payload.repeated_int32') |
| mask.MergeMessage(nested_src, nested_dst, False, True) |
| self.assertEqual(1, len(nested_dst.payload.repeated_int32)) |
| self.assertEqual(1234, nested_dst.payload.repeated_int32[0]) |
| |
| |
| class StructTest(unittest.TestCase): |
| |
| def testStruct(self): |
| struct = struct_pb2.Struct() |
| struct_class = struct.__class__ |
| |
| struct['key1'] = 5 |
| struct['key2'] = 'abc' |
| struct['key3'] = True |
| struct.get_or_create_struct('key4')['subkey'] = 11.0 |
| struct_list = struct.get_or_create_list('key5') |
| struct_list.extend([6, 'seven', True, False, None]) |
| struct_list.add_struct()['subkey2'] = 9 |
| |
| self.assertTrue(isinstance(struct, well_known_types.Struct)) |
| self.assertEquals(5, struct['key1']) |
| self.assertEquals('abc', struct['key2']) |
| self.assertIs(True, struct['key3']) |
| self.assertEquals(11, struct['key4']['subkey']) |
| inner_struct = struct_class() |
| inner_struct['subkey2'] = 9 |
| self.assertEquals([6, 'seven', True, False, None, inner_struct], |
| list(struct['key5'].items())) |
| |
| serialized = struct.SerializeToString() |
| |
| struct2 = struct_pb2.Struct() |
| struct2.ParseFromString(serialized) |
| |
| self.assertEquals(struct, struct2) |
| |
| self.assertTrue(isinstance(struct2, well_known_types.Struct)) |
| self.assertEquals(5, struct2['key1']) |
| self.assertEquals('abc', struct2['key2']) |
| self.assertIs(True, struct2['key3']) |
| self.assertEquals(11, struct2['key4']['subkey']) |
| self.assertEquals([6, 'seven', True, False, None, inner_struct], |
| list(struct2['key5'].items())) |
| |
| struct_list = struct2['key5'] |
| self.assertEquals(6, struct_list[0]) |
| self.assertEquals('seven', struct_list[1]) |
| self.assertEquals(True, struct_list[2]) |
| self.assertEquals(False, struct_list[3]) |
| self.assertEquals(None, struct_list[4]) |
| self.assertEquals(inner_struct, struct_list[5]) |
| |
| struct_list[1] = 7 |
| self.assertEquals(7, struct_list[1]) |
| |
| struct_list.add_list().extend([1, 'two', True, False, None]) |
| self.assertEquals([1, 'two', True, False, None], |
| list(struct_list[6].items())) |
| |
| text_serialized = str(struct) |
| struct3 = struct_pb2.Struct() |
| text_format.Merge(text_serialized, struct3) |
| self.assertEquals(struct, struct3) |
| |
| struct.get_or_create_struct('key3')['replace'] = 12 |
| self.assertEquals(12, struct['key3']['replace']) |
| |
| |
| class AnyTest(unittest.TestCase): |
| |
| def testAnyMessage(self): |
| # Creates and sets message. |
| msg = any_test_pb2.TestAny() |
| msg_descriptor = msg.DESCRIPTOR |
| all_types = unittest_pb2.TestAllTypes() |
| all_descriptor = all_types.DESCRIPTOR |
| all_types.repeated_string.append(u'\u00fc\ua71f') |
| # Packs to Any. |
| msg.value.Pack(all_types) |
| self.assertEqual(msg.value.type_url, |
| 'type.googleapis.com/%s' % all_descriptor.full_name) |
| self.assertEqual(msg.value.value, |
| all_types.SerializeToString()) |
| # Tests Is() method. |
| self.assertTrue(msg.value.Is(all_descriptor)) |
| self.assertFalse(msg.value.Is(msg_descriptor)) |
| # Unpacks Any. |
| unpacked_message = unittest_pb2.TestAllTypes() |
| self.assertTrue(msg.value.Unpack(unpacked_message)) |
| self.assertEqual(all_types, unpacked_message) |
| # Unpacks to different type. |
| self.assertFalse(msg.value.Unpack(msg)) |
| # Only Any messages have Pack method. |
| try: |
| msg.Pack(all_types) |
| except AttributeError: |
| pass |
| else: |
| raise AttributeError('%s should not have Pack method.' % |
| msg_descriptor.full_name) |
| |
| def testMessageName(self): |
| # Creates and sets message. |
| submessage = any_test_pb2.TestAny() |
| submessage.int_value = 12345 |
| msg = any_pb2.Any() |
| msg.Pack(submessage) |
| self.assertEqual(msg.TypeName(), 'google.protobuf.internal.TestAny') |
| |
| def testPackWithCustomTypeUrl(self): |
| submessage = any_test_pb2.TestAny() |
| submessage.int_value = 12345 |
| msg = any_pb2.Any() |
| # Pack with a custom type URL prefix. |
| msg.Pack(submessage, 'type.myservice.com') |
| self.assertEqual(msg.type_url, |
| 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) |
| # Pack with a custom type URL prefix ending with '/'. |
| msg.Pack(submessage, 'type.myservice.com/') |
| self.assertEqual(msg.type_url, |
| 'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name) |
| # Pack with an empty type URL prefix. |
| msg.Pack(submessage, '') |
| self.assertEqual(msg.type_url, |
| '/%s' % submessage.DESCRIPTOR.full_name) |
| # Test unpacking the type. |
| unpacked_message = any_test_pb2.TestAny() |
| self.assertTrue(msg.Unpack(unpacked_message)) |
| self.assertEqual(submessage, unpacked_message) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |