首页

关于flink-core源码包中NetUtils网络工具类网路通信涉及服务端口校验、访问地址验证等操作

标签:NetUtils,网络工具类,apache,util,flink     发布时间:2018-03-28   

一、前言

关于flink-core源码包中的org.apache.flink.util.NetUtils网络工具类,对涉及网络组件用的域名URL的抽取、端口验证getAvailablePort等。

二、源码说明

package org.apache.flink.util;@b@@b@import org.apache.flink.annotation.Internal;@b@import org.apache.flink.configuration.IllegalConfigurationException;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@import sun.net.util.IPAddressUtil;@b@import java.io.IOException;@b@import java.net.Inet4Address;@b@import java.net.Inet6Address;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.MalformedURLException;@b@import java.net.ServerSocket;@b@import java.net.URL;@b@import java.net.UnknownHostException;@b@import java.util.Arrays;@b@import java.util.Collections;@b@import java.util.Iterator;@b@@b@@Internal@b@public class NetUtils {@b@@b@	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);@b@@b@	/** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::) */@b@	private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();@b@	@b@	/**@b@	 * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts@b@	 * (separated by a period '.'), it will take the first part. Otherwise it takes the entire fqdn.@b@	 * @b@	 * @param fqdn The fully qualified domain name.@b@	 * @return The hostname.@b@	 */@b@	public static String getHostnameFromFQDN(String fqdn) {@b@		if (fqdn == null) {@b@			throw new IllegalArgumentException("fqdn is null");@b@		}@b@		int dotPos = fqdn.indexOf('.');@b@		if(dotPos == -1) {@b@			return fqdn;@b@		} else {@b@			return fqdn.substring(0, dotPos);@b@		}@b@	}@b@@b@	/**@b@	 * Method to validate if the given String represents a hostname:port.@b@	 *@b@	 * Works also for ipv6.@b@	 *@b@	 * See: http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress@b@	 *@b@	 * @return URL object for accessing host and Port@b@	 */@b@	public static URL getCorrectHostnamePort(String hostPort) {@b@		try {@b@			URL u = new URL("http://"+hostPort);@b@			if(u.getHost() == null) {@b@				throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid host");@b@			}@b@			if(u.getPort() == -1) {@b@				throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid port");@b@			}@b@			return u;@b@		} catch (MalformedURLException e) {@b@			throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);@b@		}@b@	}@b@@b@	// ------------------------------------------------------------------------@b@	//  Lookup of to free ports@b@	// ------------------------------------------------------------------------@b@	@b@	/**@b@	 * Find a non-occupied port.@b@	 *@b@	 * @return A non-occupied port.@b@	 */@b@	public static int getAvailablePort() {@b@		for (int i = 0; i < 50; i++) {@b@			try (ServerSocket serverSocket = new ServerSocket(0)) {@b@				int port = serverSocket.getLocalPort();@b@				if (port != 0) {@b@					return port;@b@				}@b@			}@b@			catch (IOException ignored) {}@b@		}@b@@b@		throw new RuntimeException("Could not find a free permitted port on the machine.");@b@	}@b@	@b@@b@	// ------------------------------------------------------------------------@b@	//  Encoding of IP addresses for URLs@b@	// ------------------------------------------------------------------------@b@@b@	/**@b@	 * Returns an address in a normalized format for Akka.@b@	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid@b@	 * complications with the exact URL match policy of Akka.@b@	 * @param host The hostname, IPv4 or IPv6 address@b@	 * @return host which will be normalized if it is an IPv6 address@b@	 */@b@	public static String unresolvedHostToNormalizedString(String host) {@b@		// Return loopback interface address if host is null@b@		// This represents the behavior of {@code InetAddress.getByName } and RFC 3330@b@		if (host == null) {@b@			host = InetAddress.getLoopbackAddress().getHostAddress();@b@		} else {@b@			host = host.trim().toLowerCase();@b@		}@b@@b@		// normalize and valid address@b@		if (IPAddressUtil.isIPv6LiteralAddress(host)) {@b@			byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);@b@			host = getIPv6UrlRepresentation(ipV6Address);@b@		} else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {@b@			try {@b@				// We don't allow these in hostnames@b@				Preconditions.checkArgument(!host.startsWith("."));@b@				Preconditions.checkArgument(!host.endsWith("."));@b@				Preconditions.checkArgument(!host.contains(":"));@b@			} catch (Exception e) {@b@				throw new IllegalConfigurationException("The configured hostname is not valid", e);@b@			}@b@		}@b@@b@		return host;@b@	}@b@@b@	/**@b@	 * Returns a valid address for Akka. It returns a String of format 'host:port'.@b@	 * When an IPv6 address is specified, it normalizes the IPv6 address to avoid@b@	 * complications with the exact URL match policy of Akka.@b@	 * @param host The hostname, IPv4 or IPv6 address@b@	 * @param port The port@b@	 * @return host:port where host will be normalized if it is an IPv6 address@b@	 */@b@	public static String unresolvedHostAndPortToNormalizedString(String host, int port) {@b@		Preconditions.checkArgument(port >= 0 && port < 65536,@b@			"Port is not within the valid range,");@b@		return unresolvedHostToNormalizedString(host) + ":" + port;@b@	}@b@@b@	/**@b@	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses@b@	 * have the proper formatting to be included in URLs.@b@	 * @b@	 * @param address The IP address to encode.@b@	 * @return The proper URL string encoded IP address.@b@	 */@b@	public static String ipAddressToUrlString(InetAddress address) {@b@		if (address == null) {@b@			throw new NullPointerException("address is null");@b@		}@b@		else if (address instanceof Inet4Address) {@b@			return address.getHostAddress();@b@		}@b@		else if (address instanceof Inet6Address) {@b@			return getIPv6UrlRepresentation((Inet6Address) address);@b@		}@b@		else {@b@			throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);@b@		}@b@	}@b@@b@	/**@b@	 * Encodes an IP address and port to be included in URL. in particular, this method makes@b@	 * sure that IPv6 addresses have the proper formatting to be included in URLs.@b@	 *@b@	 * @param address The address to be included in the URL.@b@	 * @param port The port for the URL address.@b@	 * @return The proper URL string encoded IP address and port.@b@	 */@b@	public static String ipAddressAndPortToUrlString(InetAddress address, int port) {@b@		return ipAddressToUrlString(address) + ':' + port;@b@	}@b@@b@	/**@b@	 * Encodes an IP address and port to be included in URL. in particular, this method makes@b@	 * sure that IPv6 addresses have the proper formatting to be included in URLs.@b@	 * @b@	 * @param address The socket address with the IP address and port.@b@	 * @return The proper URL string encoded IP address and port.@b@	 */@b@	public static String socketAddressToUrlString(InetSocketAddress address) {@b@		if (address.isUnresolved()) {@b@			throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());@b@		}@b@		return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());@b@	}@b@@b@	/**@b@	 * Normalizes and encodes a hostname and port to be included in URL. @b@	 * In particular, this method makes sure that IPv6 address literals have the proper@b@	 * formatting to be included in URLs.@b@	 *@b@	 * @param host The address to be included in the URL.@b@	 * @param port The port for the URL address.@b@	 * @return The proper URL string encoded IP address and port.@b@	 * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.@b@	 */@b@	public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {@b@		return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);@b@	}@b@@b@	/**@b@	 * Creates a compressed URL style representation of an Inet6Address.@b@	 *@b@	 * <p>This method copies and adopts code from Google's Guava library.@b@	 * We re-implement this here in order to reduce dependency on Guava.@b@	 * The Guava library has frequently caused dependency conflicts in the past.@b@	 */@b@	private static String getIPv6UrlRepresentation(Inet6Address address) {@b@		return getIPv6UrlRepresentation(address.getAddress());@b@	}@b@@b@	/**@b@	 * Creates a compressed URL style representation of an Inet6Address.@b@	 *@b@	 * <p>This method copies and adopts code from Google's Guava library.@b@	 * We re-implement this here in order to reduce dependency on Guava.@b@	 * The Guava library has frequently caused dependency conflicts in the past.@b@	 */@b@	private static String getIPv6UrlRepresentation(byte[] addressBytes) {@b@		// first, convert bytes to 16 bit chunks@b@		int[] hextets = new int[8];@b@		for (int i = 0; i < hextets.length; i++) {@b@			hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);@b@		}@b@@b@		// now, find the sequence of zeros that should be compressed@b@		int bestRunStart = -1;@b@		int bestRunLength = -1;@b@		int runStart = -1;@b@		for (int i = 0; i < hextets.length + 1; i++) {@b@			if (i < hextets.length && hextets[i] == 0) {@b@				if (runStart < 0) {@b@					runStart = i;@b@				}@b@			} else if (runStart >= 0) {@b@				int runLength = i - runStart;@b@				if (runLength > bestRunLength) {@b@					bestRunStart = runStart;@b@					bestRunLength = runLength;@b@				}@b@				runStart = -1;@b@			}@b@		}@b@		if (bestRunLength >= 2) {@b@			Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);@b@		}@b@@b@		// convert into text form@b@		StringBuilder buf = new StringBuilder(40);@b@		buf.append('[');@b@		@b@		boolean lastWasNumber = false;@b@		for (int i = 0; i < hextets.length; i++) {@b@			boolean thisIsNumber = hextets[i] >= 0;@b@			if (thisIsNumber) {@b@				if (lastWasNumber) {@b@					buf.append(':');@b@				}@b@				buf.append(Integer.toHexString(hextets[i]));@b@			} else {@b@				if (i == 0 || lastWasNumber) {@b@					buf.append("::");@b@				}@b@			}@b@			lastWasNumber = thisIsNumber;@b@		}@b@		buf.append(']');@b@		return buf.toString();@b@	}@b@	@b@	// ------------------------------------------------------------------------@b@	//  Port range parsing@b@	// ------------------------------------------------------------------------@b@	@b@	/**@b@	 * Returns an iterator over available ports defined by the range definition.@b@	 *@b@	 * @param rangeDefinition String describing a single port, a range of ports or multiple ranges.@b@	 * @return Set of ports from the range definition@b@	 * @throws NumberFormatException If an invalid string is passed.@b@	 */@b@	public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {@b@		final String[] ranges = rangeDefinition.trim().split(",");@b@		@b@		UnionIterator<Integer> iterators = new UnionIterator<>();@b@		@b@		for (String rawRange: ranges) {@b@			Iterator<Integer> rangeIterator;@b@			String range = rawRange.trim();@b@			int dashIdx = range.indexOf('-');@b@			if (dashIdx == -1) {@b@				// only one port in range:@b@				final int port = Integer.valueOf(range);@b@				if (port < 0 || port > 65535) {@b@					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@						"and 65535, but was " + port + ".");@b@				}@b@				rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();@b@			} else {@b@				// evaluate range@b@				final int start = Integer.valueOf(range.substring(0, dashIdx));@b@				if (start < 0 || start > 65535) {@b@					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@						"and 65535, but was " + start + ".");@b@				}@b@				final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));@b@				if (end < 0 || end > 65535) {@b@					throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@						"and 65535, but was " + end + ".");@b@				}@b@				rangeIterator = new Iterator<Integer>() {@b@					int i = start;@b@					@Override@b@					public boolean hasNext() {@b@						return i <= end;@b@					}@b@@b@					@Override@b@					public Integer next() {@b@						return i++;@b@					}@b@@b@					@Override@b@					public void remove() {@b@						throw new UnsupportedOperationException("Remove not supported");@b@					}@b@				};@b@			}@b@			iterators.add(rangeIterator);@b@		}@b@		@b@		return iterators;@b@	}@b@@b@	/**@b@	 * Tries to allocate a socket from the given sets of ports.@b@	 *@b@	 * @param portsIterator A set of ports to choose from.@b@	 * @param factory A factory for creating the SocketServer@b@	 * @return null if no port was available or an allocated socket.@b@	 */@b@	public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) {@b@		while (portsIterator.hasNext()) {@b@			int port = portsIterator.next();@b@			LOG.debug("Trying to open socket on port {}", port);@b@			try {@b@				return factory.createSocket(port);@b@			} catch (IOException | IllegalArgumentException e) {@b@				if (LOG.isDebugEnabled()) {@b@					LOG.debug("Unable to allocate socket on port", e);@b@				} else {@b@					LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());@b@				}@b@			}@b@		}@b@		return null;@b@	}@b@@b@	/**@b@	 * Returns the wildcard address to listen on all interfaces.@b@	 * @return Either 0.0.0.0 or :: depending on the IP setup.@b@	 */@b@	public static String getWildcardIPAddress() {@b@		return WILDCARD_ADDRESS;@b@	}@b@@b@	public interface SocketFactory {@b@		ServerSocket createSocket(int port) throws IOException;@b@	}@b@}